CLICKHOUSE-3800: Add expression cache

This commit is contained in:
alesapin 2018-08-28 20:06:42 +03:00
parent 1f0f1ecad3
commit 1a85271702
5 changed files with 109 additions and 6 deletions

View File

@ -762,7 +762,7 @@ void ExpressionActions::finalize(const Names & output_columns)
/// This has to be done before removing redundant actions and inserting REMOVE_COLUMNs
/// because inlining may change dependency sets.
if (settings.compile_expressions)
compileFunctions(actions, output_columns, sample_block);
compileFunctions(actions, output_columns, sample_block, settings);
#endif
/// Which columns are needed to perform actions from the current to the last.
@ -1063,6 +1063,81 @@ BlockInputStreamPtr ExpressionActions::createStreamWithNonJoinedDataIfFullOrRigh
return {};
}
bool operator==(const ExpressionActions::Actions & f, const ExpressionActions::Actions & s)
{
if (f.size() != s.size()) return false;
for (size_t i = 0; i < f.size(); ++i)
{
if (!(f[i] == s[i])) return false;
}
return true;
}
size_t ExpressionAction::ActionHash::operator()(const ExpressionAction & action) const
{
size_t seed = 0;
boost::hash_combine(seed, std::hash<size_t>{}(action.type));
auto str_hash_fn = std::hash<std::string>{};
switch(action.type)
{
case ADD_COLUMN:
boost::hash_combine(seed, str_hash_fn(action.source_name));
boost::hash_combine(seed, str_hash_fn(action.result_name));
boost::hash_combine(seed, std::hash<DataTypePtr>{}(action.result_type));
break;
case REMOVE_COLUMN:
boost::hash_combine(seed, str_hash_fn(action.source_name));
break;
case COPY_COLUMN:
boost::hash_combine(seed, str_hash_fn(action.result_name));
boost::hash_combine(seed, str_hash_fn(action.source_name));
break;
case APPLY_FUNCTION:
boost::hash_combine(seed, str_hash_fn(action.result_name));
boost::hash_combine(seed, std::hash<DataTypePtr>{}(action.result_type));
boost::hash_combine(seed, std::hash<FunctionBasePtr>{}(action.function));
for (const auto & arg_name : action.argument_names)
boost::hash_combine(seed, str_hash_fn(arg_name));
break;
case ARRAY_JOIN:
boost::hash_combine(seed, std::hash<bool>{}(action.array_join_is_left));
for (const auto & col : action.array_joined_columns)
boost::hash_combine(seed, str_hash_fn(col));
break;
case JOIN:
for (const auto & col : action.columns_added_by_join)
boost::hash_combine(seed, str_hash_fn(col.name));
break;
case PROJECT:
for (const auto & pair_of_strs : action.projection)
{
boost::hash_combine(seed, str_hash_fn(pair_of_strs.first));
boost::hash_combine(seed, str_hash_fn(pair_of_strs.second));
}
break;
}
return seed;
}
bool ExpressionAction::operator==(const ExpressionAction & other) const
{
return type == other.type
&& source_name == other.source_name
&& result_name == other.result_name
&& result_type == other.result_type
&& row_projection_column == other.row_projection_column
&& is_row_projection_complementary == other.is_row_projection_complementary
&& added_column == other.added_column
&& function_builder == other.function_builder
&& function == other.function
&& argument_names == other.argument_names
&& array_joined_columns == other.array_joined_columns
&& array_join_is_left == other.array_join_is_left
&& join == other.join
&& join_key_names_left == other.join_key_names_left
&& columns_added_by_join == other.columns_added_by_join
&& projection == other.projection;
}
void ExpressionActionsChain::addStep()
{

View File

@ -8,6 +8,8 @@
#include <unordered_set>
#include <unordered_map>
#include <boost/functional/hash/hash.hpp>
namespace DB
{
@ -115,6 +117,13 @@ public:
std::string toString() const;
bool operator==(const ExpressionAction & other) const;
struct ActionHash
{
size_t operator()(const ExpressionAction & action) const;
};
private:
friend class ExpressionActions;
@ -132,6 +141,17 @@ class ExpressionActions
public:
using Actions = std::vector<ExpressionAction>;
struct ActionsHash
{
size_t operator()(const Actions & actions) const
{
size_t seed = 0;
for (const ExpressionAction & act : actions)
boost::hash_combine(seed, ExpressionAction::ActionHash{}(act));
return seed;
}
};
ExpressionActions(const NamesAndTypesList & input_columns_, const Settings & settings_)
: input_columns(input_columns_), settings(settings_)
{
@ -210,6 +230,8 @@ public:
BlockInputStreamPtr createStreamWithNonJoinedDataIfFullOrRightJoin(const Block & source_header, size_t max_block_size) const;
const Settings & getSettings() const { return settings; }
private:
NamesAndTypesList input_columns;
Actions actions;
@ -226,6 +248,8 @@ private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
bool operator==(const ExpressionActions::Actions & f, const ExpressionActions::Actions & s);
/** The sequence of transformations over the block.
* It is assumed that the result of each step is fed to the input of the next step.

View File

@ -7,6 +7,7 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnVector.h>
#include <Common/LRUCache.h>
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <DataTypes/DataTypeNullable.h>
@ -555,7 +556,7 @@ static bool isCompilable(llvm::IRBuilderBase & builder, const IFunctionBase & fu
return function.isCompilable();
}
void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block)
void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block, const Settings & settings)
{
struct LLVMTargetInitializer
{
@ -621,6 +622,8 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
}
}
static LRUCache<ExpressionActions::Actions, LLVMFunction, ExpressionActions::ActionsHash> compilation_cache(settings.compiled_expressions_cache_size);
std::vector<ExpressionActions::Actions> fused(actions.size());
for (size_t i = 0; i < actions.size(); ++i)
{
@ -633,9 +636,9 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
/// the result of compiling one function in isolation is pretty much the same as its `execute` method.
if (fused[i].size() == 1)
continue;
auto fn = std::make_shared<LLVMFunction>(std::move(fused[i]), context, sample_block);
actions[i].function = fn;
actions[i].argument_names = fn->getArgumentNames();
auto fn = compilation_cache.getOrSet(fused[i], [&]() { return std::make_shared<LLVMFunction>(fused[i], context, sample_block); });
actions[i].function = fn.first;
actions[i].argument_names = fn.first->getArgumentNames();
continue;
}

View File

@ -11,7 +11,7 @@ namespace DB
/// For each APPLY_FUNCTION action, try to compile the function to native code; if the only uses of a compilable
/// function's result are as arguments to other compilable functions, inline it and leave the now-redundant action as-is.
void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block);
void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block, const Settings & settings);
}

View File

@ -75,6 +75,7 @@ struct Settings
\
M(SettingBool, compile, false, "Whether query compilation is enabled.") \
M(SettingBool, compile_expressions, false, "Compile some scalar functions and operators to native code.") \
M(SettingUInt64, compiled_expressions_cache_size, std::numeric_limits<UInt64>::max(), "Cache size for compiled expressions") \
M(SettingUInt64, min_count_to_compile, 3, "The number of structurally identical queries before they are compiled.") \
M(SettingUInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.") \
M(SettingUInt64, group_by_two_level_threshold_bytes, 100000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.") \