diff --git a/dbms/programs/server/Server.cpp b/dbms/programs/server/Server.cpp index 87d66aa2d41..79572f3b178 100644 --- a/dbms/programs/server/Server.cpp +++ b/dbms/programs/server/Server.cpp @@ -322,6 +322,10 @@ int Server::main(const std::vector & /*args*/) if (mark_cache_size) global_context->setMarkCache(mark_cache_size); + size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", std::numeric_limits::max()); + if (compiled_expression_cache_size) + global_context->setCompiledExpressionCache(compiled_expression_cache_size); + /// Set path for format schema files auto format_schema_path = Poco::File(config().getString("format_schema_path", path + "format_schemas/")); global_context->setFormatSchemaPath(format_schema_path.path() + "/"); diff --git a/dbms/src/Common/LRUCache.h b/dbms/src/Common/LRUCache.h index 2a8aa1c0aaa..61ae3d63dd5 100644 --- a/dbms/src/Common/LRUCache.h +++ b/dbms/src/Common/LRUCache.h @@ -115,14 +115,18 @@ public: /// Insert the new value only if the token is still in present in insert_tokens. /// (The token may be absent because of a concurrent reset() call). + bool result = false; auto token_it = insert_tokens.find(key); if (token_it != insert_tokens.end() && token_it->second.get() == token) + { setImpl(key, token->value, cache_lock); + result = true; + } if (!token->cleaned_up) token_holder.cleanup(token_lock, cache_lock); - return std::make_pair(token->value, true); + return std::make_pair(token->value, result); } void getStats(size_t & out_hits, size_t & out_misses) const @@ -157,6 +161,29 @@ public: virtual ~LRUCache() {} +protected: + using LRUQueue = std::list; + using LRUQueueIterator = typename LRUQueue::iterator; + + struct Cell + { + bool expired(const Timestamp & last_timestamp, const Delay & delay) const + { + return (delay == Delay::zero()) || + ((last_timestamp > timestamp) && ((last_timestamp - timestamp) > delay)); + } + + MappedPtr value; + size_t size; + LRUQueueIterator queue_iterator; + Timestamp timestamp; + }; + + using Cells = std::unordered_map; + + Cells cells; + + mutable std::mutex mutex; private: /// Represents pending insertion attempt. @@ -222,36 +249,16 @@ private: friend struct InsertTokenHolder; - using LRUQueue = std::list; - using LRUQueueIterator = typename LRUQueue::iterator; - - struct Cell - { - bool expired(const Timestamp & last_timestamp, const Delay & delay) const - { - return (delay == Delay::zero()) || - ((last_timestamp > timestamp) && ((last_timestamp - timestamp) > delay)); - } - - MappedPtr value; - size_t size; - LRUQueueIterator queue_iterator; - Timestamp timestamp; - }; - - using Cells = std::unordered_map; InsertTokenById insert_tokens; LRUQueue queue; - Cells cells; /// Total weight of values. size_t current_size = 0; const size_t max_size; const Delay expiration_delay; - mutable std::mutex mutex; std::atomic hits {0}; std::atomic misses {0}; diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 69ff4f84bc1..8708dd34dcc 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -89,6 +89,8 @@ M(CompileSuccess, "Number of times a compilation of generated C++ code was successful.") \ \ M(CompileFunction, "Number of times a compilation of generated LLVM code (to create fused function for complex expressions) was initiated.") \ + M(CompiledFunctionExecute, "Number of times a compiled function was executed.") \ + M(CompileExpressionsMicroseconds, "Total time spent for compilation of expressions to LLVM code.") \ \ M(ExternalSortWritePart, "") \ M(ExternalSortMerge, "") \ @@ -168,7 +170,6 @@ M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \ M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \ - namespace ProfileEvents { diff --git a/dbms/src/Common/SipHash.h b/dbms/src/Common/SipHash.h index c30f8597753..a800bceedf0 100644 --- a/dbms/src/Common/SipHash.h +++ b/dbms/src/Common/SipHash.h @@ -15,6 +15,7 @@ #include #include +#include #include #define ROTL(x, b) static_cast(((x) << (b)) | ((x) >> (64 - (b)))) @@ -139,6 +140,11 @@ public: update(reinterpret_cast(&x), sizeof(x)); } + void update(const std::string & x) + { + update(x.data(), x.length()); + } + /// Get the result in some form. This can only be done once! void get128(char * out) diff --git a/dbms/src/DataStreams/tests/expression_stream.cpp b/dbms/src/DataStreams/tests/expression_stream.cpp index 3556c06b026..91028638e08 100644 --- a/dbms/src/DataStreams/tests/expression_stream.cpp +++ b/dbms/src/DataStreams/tests/expression_stream.cpp @@ -36,7 +36,7 @@ try Context context = Context::createGlobal(); ExpressionAnalyzer analyzer(ast, context, {}, {NameAndTypePair("number", std::make_shared())}); - ExpressionActionsChain chain; + ExpressionActionsChain chain(context); analyzer.appendSelect(chain, false); analyzer.appendProjectResult(chain); chain.finalize(); diff --git a/dbms/src/DataStreams/tests/filter_stream.cpp b/dbms/src/DataStreams/tests/filter_stream.cpp index f1e9494e874..015c976c00b 100644 --- a/dbms/src/DataStreams/tests/filter_stream.cpp +++ b/dbms/src/DataStreams/tests/filter_stream.cpp @@ -41,7 +41,7 @@ try Context context = Context::createGlobal(); ExpressionAnalyzer analyzer(ast, context, {}, {NameAndTypePair("number", std::make_shared())}); - ExpressionActionsChain chain; + ExpressionActionsChain chain(context); analyzer.appendSelect(chain, false); analyzer.appendProjectResult(chain); chain.finalize(); diff --git a/dbms/src/Interpreters/AsynchronousMetrics.cpp b/dbms/src/Interpreters/AsynchronousMetrics.cpp index 9fa890d6850..38b3142e94f 100644 --- a/dbms/src/Interpreters/AsynchronousMetrics.cpp +++ b/dbms/src/Interpreters/AsynchronousMetrics.cpp @@ -1,8 +1,10 @@ #include +#include #include #include #include #include +#include #include #include #include @@ -132,6 +134,16 @@ void AsynchronousMetrics::update() } } +#if USE_EMBEDDED_COMPILER + { + if (auto compiled_expression_cache = context.getCompiledExpressionCache()) + { + set("CompiledExpressionCacheBytes", compiled_expression_cache->weight()); + set("CompiledExpressionCacheCount", compiled_expression_cache->count()); + } + } +#endif + set("Uptime", context.getUptimeSeconds()); { diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 4270e0dd3f4..2d65bd2c2d8 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -24,12 +24,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -54,6 +56,7 @@ namespace ProfileEvents { extern const Event ContextLock; + extern const Event CompiledCacheSizeBytes; } namespace CurrentMetrics @@ -174,6 +177,10 @@ struct ContextShared ConfigurationPtr clusters_config; /// Soteres updated configs mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config +#if USE_EMBEDDED_COMPILER + std::shared_ptr compiled_expression_cache; +#endif + bool shutdown_called = false; /// Do not allow simultaneous execution of DDL requests on the same table. @@ -1805,6 +1812,35 @@ Context::SampleBlockCache & Context::getSampleBlockCache() const return getQueryContext().sample_block_cache; } + +#if USE_EMBEDDED_COMPILER + +std::shared_ptr Context::getCompiledExpressionCache() const +{ + auto lock = getLock(); + return shared->compiled_expression_cache; +} + +void Context::setCompiledExpressionCache(size_t cache_size) +{ + + auto lock = getLock(); + + if (shared->compiled_expression_cache) + throw Exception("Compiled expressions cache has been already created.", ErrorCodes::LOGICAL_ERROR); + + shared->compiled_expression_cache = std::make_shared(cache_size); +} + +void Context::dropCompiledExpressionCache() const +{ + auto lock = getLock(); + if (shared->compiled_expression_cache) + shared->compiled_expression_cache->reset(); +} + +#endif + std::shared_ptr Context::getActionLocksManager() { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 99fa7bf565b..d5df89422fa 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -8,7 +8,9 @@ #include #include +#include #include +#include #include #include #include @@ -77,6 +79,11 @@ using SystemLogsPtr = std::shared_ptr; class ActionLocksManager; using ActionLocksManagerPtr = std::shared_ptr; +#if USE_EMBEDDED_COMPILER + +class CompiledExpressionCache; + +#endif /// (database name, table name) using DatabaseAndTableName = std::pair; @@ -432,6 +439,12 @@ public: SampleBlockCache & getSampleBlockCache() const; +#if USE_EMBEDDED_COMPILER + std::shared_ptr getCompiledExpressionCache() const; + void setCompiledExpressionCache(size_t cache_size); + void dropCompiledExpressionCache() const; +#endif + private: /** Check if the current client has access to the specified database. * If access is denied, throw an exception. diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 48d703a69da..4c4b0859c9d 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -17,6 +18,7 @@ namespace ProfileEvents { extern const Event FunctionExecute; + extern const Event CompiledFunctionExecute; } namespace DB @@ -374,6 +376,8 @@ void ExpressionAction::execute(Block & block, std::unordered_mapexecute(block, arguments, num_columns_without_result, input_rows_count); break; @@ -535,7 +539,7 @@ std::string ExpressionAction::toString() const break; case APPLY_FUNCTION: - ss << "FUNCTION " << result_name << " " + ss << "FUNCTION " << result_name << " " << (is_function_compiled ? "[compiled] " : "") << (result_type ? result_type->getName() : "(no type)") << " = " << (function ? function->getName() : "(no function)") << "("; for (size_t i = 0; i < argument_names.size(); ++i) @@ -799,7 +803,7 @@ void ExpressionActions::finalize(const Names & output_columns) /// This has to be done before removing redundant actions and inserting REMOVE_COLUMNs /// because inlining may change dependency sets. if (settings.compile_expressions) - compileFunctions(actions, output_columns, sample_block); + compileFunctions(actions, output_columns, sample_block, compilation_cache); #endif /// Which columns are needed to perform actions from the current to the last. @@ -1111,13 +1115,119 @@ BlockInputStreamPtr ExpressionActions::createStreamWithNonJoinedDataIfFullOrRigh } +/// It is not important to calculate the hash of individual strings or their concatenation +size_t ExpressionAction::ActionHash::operator()(const ExpressionAction & action) const +{ + SipHash hash; + hash.update(action.type); + hash.update(action.is_function_compiled); + switch(action.type) + { + case ADD_COLUMN: + hash.update(action.result_name); + if (action.result_type) + hash.update(action.result_type->getName()); + if (action.added_column) + hash.update(action.added_column->getName()); + break; + case REMOVE_COLUMN: + hash.update(action.source_name); + break; + case COPY_COLUMN: + hash.update(action.result_name); + hash.update(action.source_name); + break; + case APPLY_FUNCTION: + hash.update(action.result_name); + if (action.result_type) + hash.update(action.result_type->getName()); + if (action.function) + { + hash.update(action.function->getName()); + for (const auto & arg_type : action.function->getArgumentTypes()) + hash.update(arg_type->getName()); + } + for (const auto & arg_name : action.argument_names) + hash.update(arg_name); + break; + case ARRAY_JOIN: + hash.update(action.array_join_is_left); + for (const auto & col : action.array_joined_columns) + hash.update(col); + break; + case JOIN: + for (const auto & col : action.columns_added_by_join) + hash.update(col.name); + break; + case PROJECT: + for (const auto & pair_of_strs : action.projection) + { + hash.update(pair_of_strs.first); + hash.update(pair_of_strs.second); + } + break; + case ADD_ALIASES: + break; + } + return hash.get64(); +} + +bool ExpressionAction::operator==(const ExpressionAction & other) const +{ + if (result_type != other.result_type) + { + if (result_type == nullptr || other.result_type == nullptr) + return false; + else if (!result_type->equals(*other.result_type)) + return false; + } + + if (function != other.function) + { + if (function == nullptr || other.function == nullptr) + return false; + else if (function->getName() != other.function->getName()) + return false; + + const auto & my_arg_types = function->getArgumentTypes(); + const auto & other_arg_types = other.function->getArgumentTypes(); + if (my_arg_types.size() != other_arg_types.size()) + return false; + + for (size_t i = 0; i < my_arg_types.size(); ++i) + if (!my_arg_types[i]->equals(*other_arg_types[i])) + return false; + } + + if (added_column != other.added_column) + { + if (added_column == nullptr || other.added_column == nullptr) + return false; + else if (added_column->getName() != other.added_column->getName()) + return false; + } + + return source_name == other.source_name + && result_name == other.result_name + && row_projection_column == other.row_projection_column + && is_row_projection_complementary == other.is_row_projection_complementary + && argument_names == other.argument_names + && array_joined_columns == other.array_joined_columns + && array_join_is_left == other.array_join_is_left + && join == other.join + && join_key_names_left == other.join_key_names_left + && columns_added_by_join == other.columns_added_by_join + && projection == other.projection + && is_function_compiled == other.is_function_compiled; +} + void ExpressionActionsChain::addStep() { if (steps.empty()) throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR); ColumnsWithTypeAndName columns = steps.back().actions->getSampleBlock().getColumnsWithTypeAndName(); - steps.push_back(Step(std::make_shared(columns, settings))); + steps.push_back(Step(std::make_shared(columns, context))); } void ExpressionActionsChain::finalize() diff --git a/dbms/src/Interpreters/ExpressionActions.h b/dbms/src/Interpreters/ExpressionActions.h index 2a5449f7ec9..6bea7105ef0 100644 --- a/dbms/src/Interpreters/ExpressionActions.h +++ b/dbms/src/Interpreters/ExpressionActions.h @@ -1,5 +1,8 @@ #pragma once +#include +#include +#include #include #include #include @@ -83,6 +86,7 @@ public: FunctionBuilderPtr function_builder; FunctionBasePtr function; Names argument_names; + bool is_function_compiled = false; /// For ARRAY_JOIN NameSet array_joined_columns; @@ -118,6 +122,13 @@ public: std::string toString() const; + bool operator==(const ExpressionAction & other) const; + + struct ActionHash + { + size_t operator()(const ExpressionAction & action) const; + }; + private: friend class ExpressionActions; @@ -135,16 +146,20 @@ class ExpressionActions public: using Actions = std::vector; - ExpressionActions(const NamesAndTypesList & input_columns_, const Settings & settings_) - : input_columns(input_columns_), settings(settings_) + ExpressionActions(const NamesAndTypesList & input_columns_, const Context & context_) + : input_columns(input_columns_), settings(context_.getSettingsRef()) { for (const auto & input_elem : input_columns) sample_block.insert(ColumnWithTypeAndName(nullptr, input_elem.type, input_elem.name)); + +#if USE_EMBEDDED_COMPILER + compilation_cache = context_.getCompiledExpressionCache(); +#endif } /// For constant columns the columns themselves can be contained in `input_columns_`. - ExpressionActions(const ColumnsWithTypeAndName & input_columns_, const Settings & settings_) - : settings(settings_) + ExpressionActions(const ColumnsWithTypeAndName & input_columns_, const Context & context_) + : settings(context_.getSettingsRef()) { for (const auto & input_elem : input_columns_) { @@ -213,11 +228,16 @@ public: BlockInputStreamPtr createStreamWithNonJoinedDataIfFullOrRightJoin(const Block & source_header, size_t max_block_size) const; + const Settings & getSettings() const { return settings; } + private: NamesAndTypesList input_columns; Actions actions; Block sample_block; Settings settings; +#if USE_EMBEDDED_COMPILER + std::shared_ptr compilation_cache; +#endif void checkLimits(Block & block) const; @@ -229,6 +249,18 @@ private: using ExpressionActionsPtr = std::shared_ptr; +struct ActionsHash +{ + size_t operator()(const ExpressionActions::Actions & actions) const + { + SipHash hash; + for (const ExpressionAction & act : actions) + hash.update(ExpressionAction::ActionHash{}(act)); + return hash.get64(); + } +}; + + /** The sequence of transformations over the block. * It is assumed that the result of each step is fed to the input of the next step. @@ -241,6 +273,8 @@ using ExpressionActionsPtr = std::shared_ptr; */ struct ExpressionActionsChain { + ExpressionActionsChain(const Context & context_) + : context(context_) {} struct Step { ExpressionActionsPtr actions; @@ -259,7 +293,7 @@ struct ExpressionActionsChain using Steps = std::vector; - Settings settings; + const Context & context; Steps steps; void addStep(); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index ee15c1ee7d9..67481affbcc 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -491,7 +491,7 @@ void ExpressionAnalyzer::analyzeAggregation() if (select_query && (select_query->group_expression_list || select_query->having_expression)) has_aggregation = true; - ExpressionActionsPtr temp_actions = std::make_shared(source_columns, settings); + ExpressionActionsPtr temp_actions = std::make_shared(source_columns, context); if (select_query && select_query->array_join_expression_list()) { @@ -1539,7 +1539,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block & temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); for (const auto & joined_column : analyzed_join.columns_added_by_join) temp_columns.push_back(joined_column.name_and_type); - ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, settings); + ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, context); getRootActions(func->arguments->children.at(0), true, false, temp_actions); Block sample_block_with_calculated_columns = temp_actions->getSampleBlock(); @@ -1742,8 +1742,8 @@ static String getUniqueName(const Block & block, const String & prefix) * For example, in the expression "select arrayMap(x -> x + column1 * column2, array1)" * calculation of the product must be done outside the lambda expression (it does not depend on x), and the calculation of the sum is inside (depends on x). */ -ScopeStack::ScopeStack(const ExpressionActionsPtr & actions, const Settings & settings_) - : settings(settings_) +ScopeStack::ScopeStack(const ExpressionActionsPtr & actions, const Context & context_) + : context(context_) { stack.emplace_back(); stack.back().actions = actions; @@ -1776,7 +1776,7 @@ void ScopeStack::pushLevel(const NamesAndTypesList & input_columns) all_columns.push_back(col); } - stack.back().actions = std::make_shared(all_columns, settings); + stack.back().actions = std::make_shared(all_columns, context); } size_t ScopeStack::getColumnLevel(const std::string & name) @@ -1822,7 +1822,7 @@ const Block & ScopeStack::getSampleBlock() const void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions) { - ScopeStack scopes(actions, settings); + ScopeStack scopes(actions, context); ProjectionManipulatorPtr projection_manipulator; if (!isThereArrayJoin(ast) && settings.enable_conditional_computation && !only_consts) @@ -1993,7 +1993,7 @@ bool ExpressionAnalyzer::isThereArrayJoin(const ASTPtr & ast) void ExpressionAnalyzer::getActionsFromJoinKeys(const ASTTableJoin & table_join, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions) { - ScopeStack scopes(actions, settings); + ScopeStack scopes(actions, context); ProjectionManipulatorPtr projection_manipulator; if (!isThereArrayJoin(query) && settings.enable_conditional_computation && !only_consts) @@ -2405,8 +2405,7 @@ void ExpressionAnalyzer::initChain(ExpressionActionsChain & chain, const NamesAn { if (chain.steps.empty()) { - chain.settings = settings; - chain.steps.emplace_back(std::make_shared(columns, settings)); + chain.steps.emplace_back(std::make_shared(columns, context)); } } @@ -2676,7 +2675,7 @@ bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool onl { /// Remove unused source_columns from prewhere actions. - auto tmp_actions = std::make_shared(source_columns, settings); + auto tmp_actions = std::make_shared(source_columns, context); getRootActions(select_query->prewhere_expression, only_types, false, tmp_actions); tmp_actions->finalize({prewhere_column_name}); auto required_columns = tmp_actions->getRequiredColumns(); @@ -2715,7 +2714,7 @@ bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool onl } } - chain.steps.emplace_back(std::make_shared(std::move(columns), settings)); + chain.steps.emplace_back(std::make_shared(std::move(columns), context)); chain.steps.back().additional_input = std::move(unused_source_columns); } @@ -2896,7 +2895,7 @@ void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, Express ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result) { - ExpressionActionsPtr actions = std::make_shared(source_columns, settings); + ExpressionActionsPtr actions = std::make_shared(source_columns, context); NamesWithAliases result_columns; Names result_names; @@ -2943,7 +2942,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool proje ExpressionActionsPtr ExpressionAnalyzer::getConstActions() { - ExpressionActionsPtr actions = std::make_shared(NamesAndTypesList(), settings); + ExpressionActionsPtr actions = std::make_shared(NamesAndTypesList(), context); getRootActions(query, true, true, actions); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 4b68d647040..5e01f049c5c 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -77,9 +77,10 @@ struct ScopeStack using Levels = std::vector; Levels stack; - const Settings & settings; - ScopeStack(const ExpressionActionsPtr & actions, const Settings & settings_); + const Context & context; + + ScopeStack(const ExpressionActionsPtr & actions, const Context & context_); void pushLevel(const NamesAndTypesList & input_columns); @@ -186,7 +187,7 @@ private: ASTPtr query; ASTSelectQuery * select_query; const Context & context; - Settings settings; + const Settings settings; size_t subquery_depth; /** Original columns. diff --git a/dbms/src/Interpreters/ExpressionJIT.cpp b/dbms/src/Interpreters/ExpressionJIT.cpp index edeaca34a9a..ef8b0e410ef 100644 --- a/dbms/src/Interpreters/ExpressionJIT.cpp +++ b/dbms/src/Interpreters/ExpressionJIT.cpp @@ -7,12 +7,14 @@ #include #include #include +#include +#include #include #include +#include #include #include #include -#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -49,6 +51,7 @@ namespace ProfileEvents { extern const Event CompileFunction; + extern const Event CompileExpressionsMicroseconds; } namespace DB @@ -160,8 +163,37 @@ auto wrapJITSymbolResolver(llvm::JITSymbolResolver & jsr) } #endif +#if LLVM_VERSION_MAJOR >= 6 +struct CountingMMapper final : public llvm::SectionMemoryManager::MemoryMapper +{ + MemoryTracker memory_tracker{VariableContext::Global}; + + llvm::sys::MemoryBlock allocateMappedMemory(llvm::SectionMemoryManager::AllocationPurpose /*purpose*/, + size_t num_bytes, + const llvm::sys::MemoryBlock * const near_block, + unsigned flags, + std::error_code & error_code) override + { + memory_tracker.alloc(num_bytes); + return llvm::sys::Memory::allocateMappedMemory(num_bytes, near_block, flags, error_code); + } + + std::error_code protectMappedMemory(const llvm::sys::MemoryBlock & block, unsigned flags) override + { + return llvm::sys::Memory::protectMappedMemory(block, flags); + } + + std::error_code releaseMappedMemory(llvm::sys::MemoryBlock & block) override + { + memory_tracker.free(block.size()); + return llvm::sys::Memory::releaseMappedMemory(block); + } +}; +#endif + struct LLVMContext { + static inline std::atomic id_counter{0}; llvm::LLVMContext context; #if LLVM_VERSION_MAJOR >= 7 llvm::orc::ExecutionSession execution_session; @@ -170,12 +202,16 @@ struct LLVMContext std::shared_ptr module; #endif std::unique_ptr machine; +#if LLVM_VERSION_MAJOR >= 6 + std::unique_ptr memory_mapper; +#endif std::shared_ptr memory_manager; llvm::orc::RTDyldObjectLinkingLayer object_layer; llvm::orc::IRCompileLayer compile_layer; llvm::DataLayout layout; llvm::IRBuilder<> builder; std::unordered_map symbols; + size_t id; LLVMContext() #if LLVM_VERSION_MAJOR >= 7 @@ -184,7 +220,13 @@ struct LLVMContext : module(std::make_shared("jit", context)) #endif , machine(getNativeMachine()) + +#if LLVM_VERSION_MAJOR >= 6 + , memory_mapper(std::make_unique()) + , memory_manager(std::make_shared(memory_mapper.get())) +#else , memory_manager(std::make_shared()) +#endif #if LLVM_VERSION_MAJOR >= 7 , object_layer(execution_session, [this](llvm::orc::VModuleKey) { @@ -196,6 +238,7 @@ struct LLVMContext , compile_layer(object_layer, llvm::orc::SimpleCompiler(*machine)) , layout(machine->createDataLayout()) , builder(context) + , id(id_counter++) { module->setDataLayout(layout); module->setTargetTriple(machine->getTargetTriple().getTriple()); @@ -290,7 +333,7 @@ public: reinterpret_cast(function)(block_size, columns.data()); } block.getByPosition(result).column = std::move(col_res); - }; + } }; static void compileFunction(std::shared_ptr & context, const IFunctionBase & f) @@ -424,126 +467,104 @@ static CompilableExpression subexpression(const IFunctionBase & f, std::vector context; - std::vector originals; - std::unordered_map subexpressions; - -public: - LLVMFunction(const ExpressionActions::Actions & actions, std::shared_ptr context, const Block & sample_block) +LLVMFunction::LLVMFunction(const ExpressionActions::Actions & actions, std::shared_ptr context, const Block & sample_block) : name(actions.back().result_name), context(context) +{ + for (const auto & c : sample_block) + /// TODO: implement `getNativeValue` for all types & replace the check with `c.column && toNativeType(...)` + if (c.column && getNativeValue(toNativeType(context->builder, c.type), *c.column, 0)) + subexpressions[c.name] = subexpression(c.column, c.type); + for (const auto & action : actions) { - for (const auto & c : sample_block) - /// TODO: implement `getNativeValue` for all types & replace the check with `c.column && toNativeType(...)` - if (c.column && getNativeValue(toNativeType(context->builder, c.type), *c.column, 0)) - subexpressions[c.name] = subexpression(c.column, c.type); - for (const auto & action : actions) + const auto & names = action.argument_names; + const auto & types = action.function->getArgumentTypes(); + std::vector args; + for (size_t i = 0; i < names.size(); ++i) { - const auto & names = action.argument_names; - const auto & types = action.function->getArgumentTypes(); - std::vector args; - for (size_t i = 0; i < names.size(); ++i) + auto inserted = subexpressions.emplace(names[i], subexpression(arg_names.size())); + if (inserted.second) { - auto inserted = subexpressions.emplace(names[i], subexpression(arg_names.size())); - if (inserted.second) - { - arg_names.push_back(names[i]); - arg_types.push_back(types[i]); - } - args.push_back(inserted.first->second); + arg_names.push_back(names[i]); + arg_types.push_back(types[i]); } - subexpressions[action.result_name] = subexpression(*action.function, std::move(args)); - originals.push_back(action.function); + args.push_back(inserted.first->second); } - compileFunction(context, *this); + subexpressions[action.result_name] = subexpression(*action.function, std::move(args)); + originals.push_back(action.function); } + compileFunction(context, *this); +} - bool isCompilable() const override { return true; } +PreparedFunctionPtr LLVMFunction::prepare(const Block &) const { return std::make_shared(name, context); } - llvm::Value * compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const override { return subexpressions.at(name)(builder, values); } +bool LLVMFunction::isDeterministic() const +{ + for (const auto & f : originals) + if (!f->isDeterministic()) + return false; + return true; +} - String getName() const override { return name; } +bool LLVMFunction::isDeterministicInScopeOfQuery() const +{ + for (const auto & f : originals) + if (!f->isDeterministicInScopeOfQuery()) + return false; + return true; +} - const Names & getArgumentNames() const { return arg_names; } +bool LLVMFunction::isSuitableForConstantFolding() const +{ + for (const auto & f : originals) + if (!f->isSuitableForConstantFolding()) + return false; + return true; +} - const DataTypes & getArgumentTypes() const override { return arg_types; } +bool LLVMFunction::isInjective(const Block & sample_block) +{ + for (const auto & f : originals) + if (!f->isInjective(sample_block)) + return false; + return true; +} - const DataTypePtr & getReturnType() const override { return originals.back()->getReturnType(); } +bool LLVMFunction::hasInformationAboutMonotonicity() const +{ + for (const auto & f : originals) + if (!f->hasInformationAboutMonotonicity()) + return false; + return true; +} - PreparedFunctionPtr prepare(const Block &) const override { return std::make_shared(name, context); } - - bool isDeterministic() const override +LLVMFunction::Monotonicity LLVMFunction::getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const +{ + const IDataType * type_ = &type; + Field left_ = left; + Field right_ = right; + Monotonicity result(true, true, true); + /// monotonicity is only defined for unary functions, so the chain must describe a sequence of nested calls + for (size_t i = 0; i < originals.size(); ++i) { - for (const auto & f : originals) - if (!f->isDeterministic()) - return false; - return true; - } - - bool isDeterministicInScopeOfQuery() const override - { - for (const auto & f : originals) - if (!f->isDeterministicInScopeOfQuery()) - return false; - return true; - } - - bool isSuitableForConstantFolding() const override - { - for (const auto & f : originals) - if (!f->isSuitableForConstantFolding()) - return false; - return true; - } - - bool isInjective(const Block & sample_block) override - { - for (const auto & f : originals) - if (!f->isInjective(sample_block)) - return false; - return true; - } - - bool hasInformationAboutMonotonicity() const override - { - for (const auto & f : originals) - if (!f->hasInformationAboutMonotonicity()) - return false; - return true; - } - - Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override - { - const IDataType * type_ = &type; - Field left_ = left; - Field right_ = right; - Monotonicity result(true, true, true); - /// monotonicity is only defined for unary functions, so the chain must describe a sequence of nested calls - for (size_t i = 0; i < originals.size(); ++i) + Monotonicity m = originals[i]->getMonotonicityForRange(*type_, left_, right_); + if (!m.is_monotonic) + return m; + result.is_positive ^= !m.is_positive; + result.is_always_monotonic &= m.is_always_monotonic; + if (i + 1 < originals.size()) { - Monotonicity m = originals[i]->getMonotonicityForRange(*type_, left_, right_); - if (!m.is_monotonic) - return m; - result.is_positive ^= !m.is_positive; - result.is_always_monotonic &= m.is_always_monotonic; - if (i + 1 < originals.size()) - { - if (left_ != Field()) - applyFunction(*originals[i], left_); - if (right_ != Field()) - applyFunction(*originals[i], right_); - if (!m.is_positive) - std::swap(left_, right_); - type_ = originals[i]->getReturnType().get(); - } + if (left_ != Field()) + applyFunction(*originals[i], left_); + if (right_ != Field()) + applyFunction(*originals[i], right_); + if (!m.is_positive) + std::swap(left_, right_); + type_ = originals[i]->getReturnType().get(); } - return result; } -}; + return result; +} + static bool isCompilable(llvm::IRBuilderBase & builder, const IFunctionBase & function) { @@ -555,7 +576,29 @@ static bool isCompilable(llvm::IRBuilderBase & builder, const IFunctionBase & fu return function.isCompilable(); } -void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block) +size_t CompiledExpressionCache::weight() const +{ + +#if LLVM_VERSION_MAJOR >= 6 + std::lock_guard lock(mutex); + size_t result{0}; + std::unordered_set seen; + for (const auto & cell : cells) + { + auto function_context = cell.second.value->getContext(); + if (!seen.count(function_context->id)) + { + result += function_context->memory_mapper->memory_tracker.get(); + seen.insert(function_context->id); + } + } + return result; +#else + return Base::weight(); +#endif +} + +void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block, std::shared_ptr compilation_cache) { struct LLVMTargetInitializer { @@ -638,9 +681,28 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output /// the result of compiling one function in isolation is pretty much the same as its `execute` method. if (fused[i].size() == 1) continue; - auto fn = std::make_shared(std::move(fused[i]), context, sample_block); + + std::shared_ptr fn; + if (compilation_cache) + { + bool success; + auto set_func = [&fused, i, context, &sample_block] () { return std::make_shared(fused[i], context, sample_block); }; + Stopwatch watch; + std::tie(fn, success) = compilation_cache->getOrSet(fused[i], set_func); + if (success) + ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds()); + } + else + { + Stopwatch watch; + fn = std::make_shared(fused[i], context, sample_block); + ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds()); + } + actions[i].function = fn; actions[i].argument_names = fn->getArgumentNames(); + actions[i].is_function_compiled = true; + continue; } diff --git a/dbms/src/Interpreters/ExpressionJIT.h b/dbms/src/Interpreters/ExpressionJIT.h index 799a80171b5..fdcb0d04b93 100644 --- a/dbms/src/Interpreters/ExpressionJIT.h +++ b/dbms/src/Interpreters/ExpressionJIT.h @@ -4,14 +4,76 @@ #if USE_EMBEDDED_COMPILER +#include +#include #include +#include +#include + namespace DB { +struct LLVMContext; +using CompilableExpression = std::function; + +class LLVMFunction : public IFunctionBase +{ + std::string name; + Names arg_names; + DataTypes arg_types; + std::shared_ptr context; + std::vector originals; + std::unordered_map subexpressions; +public: + LLVMFunction(const ExpressionActions::Actions & actions, std::shared_ptr context, const Block & sample_block); + + bool isCompilable() const override { return true; } + + llvm::Value * compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const override { return subexpressions.at(name)(builder, values); } + + String getName() const override { return name; } + + const Names & getArgumentNames() const { return arg_names; } + + const DataTypes & getArgumentTypes() const override { return arg_types; } + + const DataTypePtr & getReturnType() const override { return originals.back()->getReturnType(); } + + PreparedFunctionPtr prepare(const Block &) const override; + + bool isDeterministic() const override; + + bool isDeterministicInScopeOfQuery() const override; + + bool isSuitableForConstantFolding() const override; + + bool isInjective(const Block & sample_block) override; + + bool hasInformationAboutMonotonicity() const override; + + Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override; + + std::shared_ptr getContext() const { return context; } + +}; + +/** This child of LRUCache breaks one of it's invariants: total weight may be changed after insertion. + * We have to do so, because we don't known real memory consumption of generated LLVM code for every function. + */ +class CompiledExpressionCache : public LRUCache, LLVMFunction, ActionsHash> +{ +private: + using Base = LRUCache, LLVMFunction, ActionsHash>; +public: + using Base::Base; + + size_t weight() const; +}; + /// For each APPLY_FUNCTION action, try to compile the function to native code; if the only uses of a compilable /// function's result are as arguments to other compilable functions, inline it and leave the now-redundant action as-is. -void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block); +void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block, std::shared_ptr compilation_cache); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index e70d4322584..7e40d7d2175 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -315,7 +315,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression }; { - ExpressionActionsChain chain; + ExpressionActionsChain chain(context); if (query_analyzer->appendPrewhere(chain, !res.first_stage)) { @@ -740,7 +740,7 @@ void InterpreterSelectQuery::executeFetchColumns( if (prewhere_info) { /// Don't remove columns which are needed to be aliased. - auto new_actions = std::make_shared(prewhere_info->prewhere_actions->getRequiredColumnsWithTypes(), settings); + auto new_actions = std::make_shared(prewhere_info->prewhere_actions->getRequiredColumnsWithTypes(), context); for (const auto & action : prewhere_info->prewhere_actions->getActions()) { if (action.type != ExpressionAction::REMOVE_COLUMN diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp index ee0d6f8e6b8..85c9a786bfb 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSystemQuery.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -148,6 +149,11 @@ BlockIO InterpreterSystemQuery::execute() case Type::DROP_UNCOMPRESSED_CACHE: system_context.dropUncompressedCache(); break; +#if USE_EMBEDDED_COMPILER + case Type::DROP_COMPILED_EXPRESSION_CACHE: + system_context.dropCompiledExpressionCache(); + break; +#endif case Type::RELOAD_DICTIONARY: system_context.getExternalDictionaries().reloadDictionary(query.target_dictionary); break; diff --git a/dbms/src/Interpreters/tests/expression.cpp b/dbms/src/Interpreters/tests/expression.cpp index 56af8d04b7e..aea1ccce48c 100644 --- a/dbms/src/Interpreters/tests/expression.cpp +++ b/dbms/src/Interpreters/tests/expression.cpp @@ -54,7 +54,7 @@ int main(int argc, char ** argv) }; ExpressionAnalyzer analyzer(ast, context, {}, columns); - ExpressionActionsChain chain; + ExpressionActionsChain chain(context); analyzer.appendSelect(chain, false); analyzer.appendProjectResult(chain); chain.finalize(); diff --git a/dbms/src/Parsers/ASTSystemQuery.cpp b/dbms/src/Parsers/ASTSystemQuery.cpp index 51bd2c8c26a..27f3629b47e 100644 --- a/dbms/src/Parsers/ASTSystemQuery.cpp +++ b/dbms/src/Parsers/ASTSystemQuery.cpp @@ -27,6 +27,10 @@ const char * ASTSystemQuery::typeToString(Type type) return "DROP MARK CACHE"; case Type::DROP_UNCOMPRESSED_CACHE: return "DROP UNCOMPRESSED CACHE"; +#if USE_EMBEDDED_COMPILER + case Type::DROP_COMPILED_EXPRESSION_CACHE: + return "DROP COMPILED EXPRESSION CACHE"; +#endif case Type::STOP_LISTEN_QUERIES: return "STOP LISTEN QUERIES"; case Type::START_LISTEN_QUERIES: diff --git a/dbms/src/Parsers/ASTSystemQuery.h b/dbms/src/Parsers/ASTSystemQuery.h index 98d8975d545..4ee925e8c66 100644 --- a/dbms/src/Parsers/ASTSystemQuery.h +++ b/dbms/src/Parsers/ASTSystemQuery.h @@ -1,5 +1,6 @@ #pragma once +#include #include @@ -18,6 +19,9 @@ public: DROP_DNS_CACHE, DROP_MARK_CACHE, DROP_UNCOMPRESSED_CACHE, +#if USE_EMBEDDED_COMPILER + DROP_COMPILED_EXPRESSION_CACHE, +#endif STOP_LISTEN_QUERIES, START_LISTEN_QUERIES, RESTART_REPLICAS, diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index b945ff6dfad..38340905e45 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -272,7 +272,7 @@ void MergeTreeData::initPartitionKey() /// Add all columns used in the partition key to the min-max index. const NamesAndTypesList & minmax_idx_columns_with_types = partition_expr->getRequiredColumnsWithTypes(); - minmax_idx_expr = std::make_shared(minmax_idx_columns_with_types, context.getSettingsRef()); + minmax_idx_expr = std::make_shared(minmax_idx_columns_with_types, context); for (const NameAndTypePair & column : minmax_idx_columns_with_types) { minmax_idx_columns.emplace_back(column.name); @@ -996,7 +996,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name /// Need to modify column type. if (!out_expression) - out_expression = std::make_shared(NamesAndTypesList(), context.getSettingsRef()); + out_expression = std::make_shared(NamesAndTypesList(), context); out_expression->addInput(ColumnWithTypeAndName(nullptr, column.type, column.name)); diff --git a/dbms/tests/performance/jit_query_compilation/small_requests.xml b/dbms/tests/performance/jit_query_compilation/small_requests.xml new file mode 100644 index 00000000000..0013c03a1af --- /dev/null +++ b/dbms/tests/performance/jit_query_compilation/small_requests.xml @@ -0,0 +1,47 @@ + + small_requests + loop + + + + 5 + 10000 + + + 5000 + 60000 + + + + + + + + + + + + + WITH + bitXor(number, 0x4CF2D2BAAE6DA887) AS x0, + bitXor(x0, bitShiftRight(x0, 33)) AS x1, + x1 * 0xff51afd7ed558ccd AS x2, + bitXor(x2, bitShiftRight(x2, 33)) AS x3, + x3 * 0xc4ceb9fe1a85ec53 AS x4, + bitXor(x4, bitShiftRight(x4, 33)) AS x5 + SELECT x5, intHash64(number) FROM system.numbers LIMIT 10 + + + WITH + bitXor(number, 0x4CF2D2BAAE6DA887) AS x0, + bitXor(x0, bitShiftRight(x0, 33)) AS x1, + x1 * 0xff51afd7ed558ccd AS x2, + bitXor(x2, bitShiftRight(x2, 33)) AS x3, + x3 * 0xc4ceb9fe1a85ec53 AS x4, + bitXor(x4, bitShiftRight(x4, 33)) AS x5 + SELECT x5, intHash64(number) FROM system.numbers LIMIT 10 + SETTINGS + compile = 1, + compile_expressions = 1 + +