From f6de2160419b2f0111a59deb2dc535cee90dceff Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 24 Mar 2023 16:57:32 +0000 Subject: [PATCH] PullingAsyncPipelineExecutor for Direct dictionary with ClickHouse source --- src/Core/Settings.h | 2 + src/Dictionaries/DirectDictionary.cpp | 69 ++++++++++++++++--- src/Dictionaries/DirectDictionary.h | 4 ++ .../01268_dictionary_direct_layout.sql | 2 +- 4 files changed, 65 insertions(+), 12 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index ca89106dc08..d80f26e86da 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -935,6 +935,8 @@ class IColumn; \ M(Bool, regexp_dict_allow_other_sources, false, "Allow regexp_tree dictionary to use sources other than yaml source.", 0) \ M(Bool, regexp_dict_allow_hyperscan, false, "Allow regexp_tree dictionary using Hyperscan library.", 0) \ + \ + M(Bool, dictionary_use_async_executor, false, "Execute a pipeline for reading from a dictionary with several threads. It's supported only by DIRECT dictionary with CLICKHOUSE source.", 0) \ // End of FORMAT_FACTORY_SETTINGS // Please add settings non-related to formats into the COMMON_SETTINGS above. diff --git a/src/Dictionaries/DirectDictionary.cpp b/src/Dictionaries/DirectDictionary.cpp index 189ea2a7bca..d84967fbae6 100644 --- a/src/Dictionaries/DirectDictionary.cpp +++ b/src/Dictionaries/DirectDictionary.cpp @@ -4,16 +4,22 @@ #include #include +#include #include +#include #include -#include -#include -#include #include +#include +#include + +#include +#include + namespace DB { + namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; @@ -73,9 +79,17 @@ Columns DirectDictionary::getColumns( PullingPipelineExecutor executor(pipeline); + Stopwatch watch; Block block; + size_t block_num = 0; + size_t rows_num = 0; while (executor.pull(block)) { + if (!block) + continue; + + ++block_num; + rows_num += block.rows(); convertToFullIfSparse(block); /// Split into keys columns and attribute columns @@ -104,6 +118,9 @@ Columns DirectDictionary::getColumns( block_key_columns.clear(); } + LOG_DEBUG(&Poco::Logger::get("DirectDictionary"), "read {} blocks with {} rows from pipeline in {} ms", + block_num, rows_num, watch.elapsedMilliseconds()); + Field value_to_insert; size_t requested_keys_size = requested_keys.size(); @@ -263,6 +280,7 @@ ColumnUInt8::Ptr DirectDictionary::isInHierarchy( return nullptr; } +template class SourceFromQueryPipeline : public ISource { public: @@ -272,7 +290,10 @@ public: , executor(pipeline) {} - std::string getName() const override { return "SourceFromQueryPipeline"; } + std::string getName() const override + { + return std::is_same_v ? "SourceFromQueryPipelineAsync" : "SourceFromQueryPipeline"; + } Chunk generate() override { @@ -286,10 +307,9 @@ public: return {}; } - private: QueryPipeline pipeline; - PullingPipelineExecutor executor; + TExecutor executor; }; template @@ -297,6 +317,8 @@ Pipe DirectDictionary::getSourcePipe( const Columns & key_columns [[maybe_unused]], const PaddedPODArray & requested_keys [[maybe_unused]]) const { + Stopwatch watch; + size_t requested_keys_size = requested_keys.size(); Pipe pipe; @@ -309,7 +331,12 @@ Pipe DirectDictionary::getSourcePipe( for (auto key : requested_keys) ids.emplace_back(key); - pipe = Pipe(std::make_shared(source_ptr->loadIds(ids))); + auto pipeline = source_ptr->loadIds(ids); + + if (use_async_executor) + pipe = Pipe(std::make_shared>(std::move(pipeline))); + else + pipe = Pipe(std::make_shared>(std::move(pipeline))); } else { @@ -318,16 +345,31 @@ Pipe DirectDictionary::getSourcePipe( for (size_t i = 0; i < requested_keys_size; ++i) requested_rows.emplace_back(i); - pipe = Pipe(std::make_shared(source_ptr->loadKeys(key_columns, requested_rows))); + auto pipeline = source_ptr->loadKeys(key_columns, requested_rows); + if (use_async_executor) + pipe = Pipe(std::make_shared>(std::move(pipeline))); + else + pipe = Pipe(std::make_shared>(std::move(pipeline))); } + LOG_DEBUG(&Poco::Logger::get("DirectDictionary"), "building pipeline for loading keys done in {} ms", watch.elapsedMilliseconds()); return pipe; } template Pipe DirectDictionary::read(const Names & /* column_names */, size_t /* max_block_size */, size_t /* num_streams */) const { - return Pipe(std::make_shared(source_ptr->loadAll())); + return Pipe(std::make_shared>(source_ptr->loadAll())); +} + +template +void DirectDictionary::applySettings(const Settings & settings) +{ + if (dynamic_cast(source_ptr.get())) + { + /// Only applicable for CLICKHOUSE dictionary source. + use_async_executor = settings.dictionary_use_async_executor; + } } namespace @@ -339,7 +381,7 @@ namespace const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, DictionarySourcePtr source_ptr, - ContextPtr /* global_context */, + ContextPtr global_context, bool /* created_from_ddl */) { const auto * layout_name = dictionary_key_type == DictionaryKeyType::Simple ? "direct" : "complex_key_direct"; @@ -372,7 +414,12 @@ namespace "'lifetime' parameter is redundant for the dictionary' of layout '{}'", layout_name); - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr)); + auto dictionary = std::make_unique>(dict_id, dict_struct, std::move(source_ptr)); + + auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix); + dictionary->applySettings(context->getSettingsRef()); + + return dictionary; } } diff --git a/src/Dictionaries/DirectDictionary.h b/src/Dictionaries/DirectDictionary.h index 2b5662b8c1a..214c8ef8a13 100644 --- a/src/Dictionaries/DirectDictionary.h +++ b/src/Dictionaries/DirectDictionary.h @@ -95,6 +95,8 @@ public: Pipe read(const Names & column_names, size_t max_block_size, size_t num_streams) const override; + void applySettings(const Settings & settings); + private: Pipe getSourcePipe(const Columns & key_columns, const PaddedPODArray & requested_keys) const; @@ -102,6 +104,8 @@ private: const DictionarySourcePtr source_ptr; const DictionaryLifetime dict_lifetime; + bool use_async_executor = false; + mutable std::atomic query_count{0}; mutable std::atomic found_count{0}; }; diff --git a/tests/queries/0_stateless/01268_dictionary_direct_layout.sql b/tests/queries/0_stateless/01268_dictionary_direct_layout.sql index 914d24a740a..45b5c580561 100644 --- a/tests/queries/0_stateless/01268_dictionary_direct_layout.sql +++ b/tests/queries/0_stateless/01268_dictionary_direct_layout.sql @@ -75,7 +75,7 @@ CREATE DICTIONARY db_01268.dict2 ) PRIMARY KEY region_id SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict2' PASSWORD '' DB 'database_for_dict_01268')) -LAYOUT(DIRECT()); +LAYOUT(DIRECT()) SETTINGS(dictionary_use_async_executor=1, max_threads=8); CREATE DICTIONARY db_01268.dict3 (