Merge pull request #47986 from ClickHouse/vdimir/direct-dict-async-read

This commit is contained in:
Vladimir C 2023-03-29 12:01:24 +02:00 committed by GitHub
commit 46c0c80bff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 12 deletions

View File

@ -935,6 +935,9 @@ class IColumn;
\
M(Bool, regexp_dict_allow_other_sources, false, "Allow regexp_tree dictionary to use sources other than yaml source.", 0) \
M(Bool, regexp_dict_allow_hyperscan, true, "Allow regexp_tree dictionary using Hyperscan library.", 0) \
\
M(Bool, dictionary_use_async_executor, false, "Execute a pipeline for reading from a dictionary with several threads. It's supported only by DIRECT dictionary with CLICKHOUSE source.", 0) \
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -4,16 +4,22 @@
#include <Common/HashTable/HashMap.h>
#include <Functions/FunctionHelpers.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
@ -73,9 +79,17 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
PullingPipelineExecutor executor(pipeline);
Stopwatch watch;
Block block;
size_t block_num = 0;
size_t rows_num = 0;
while (executor.pull(block))
{
if (!block)
continue;
++block_num;
rows_num += block.rows();
convertToFullIfSparse(block);
/// Split into keys columns and attribute columns
@ -104,6 +118,9 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
block_key_columns.clear();
}
LOG_DEBUG(&Poco::Logger::get("DirectDictionary"), "read {} blocks with {} rows from pipeline in {} ms",
block_num, rows_num, watch.elapsedMilliseconds());
Field value_to_insert;
size_t requested_keys_size = requested_keys.size();
@ -263,6 +280,7 @@ ColumnUInt8::Ptr DirectDictionary<dictionary_key_type>::isInHierarchy(
return nullptr;
}
template <typename TExecutor = PullingPipelineExecutor>
class SourceFromQueryPipeline : public ISource
{
public:
@ -272,7 +290,10 @@ public:
, executor(pipeline)
{}
std::string getName() const override { return "SourceFromQueryPipeline"; }
std::string getName() const override
{
return std::is_same_v<PullingAsyncPipelineExecutor, TExecutor> ? "SourceFromQueryPipelineAsync" : "SourceFromQueryPipeline";
}
Chunk generate() override
{
@ -286,10 +307,9 @@ public:
return {};
}
private:
QueryPipeline pipeline;
PullingPipelineExecutor executor;
TExecutor executor;
};
template <DictionaryKeyType dictionary_key_type>
@ -297,6 +317,8 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
const Columns & key_columns [[maybe_unused]],
const PaddedPODArray<KeyType> & requested_keys [[maybe_unused]]) const
{
Stopwatch watch;
size_t requested_keys_size = requested_keys.size();
Pipe pipe;
@ -309,7 +331,12 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
for (auto key : requested_keys)
ids.emplace_back(key);
pipe = Pipe(std::make_shared<SourceFromQueryPipeline>(source_ptr->loadIds(ids)));
auto pipeline = source_ptr->loadIds(ids);
if (use_async_executor)
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
else
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingPipelineExecutor>>(std::move(pipeline)));
}
else
{
@ -318,16 +345,31 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
for (size_t i = 0; i < requested_keys_size; ++i)
requested_rows.emplace_back(i);
pipe = Pipe(std::make_shared<SourceFromQueryPipeline>(source_ptr->loadKeys(key_columns, requested_rows)));
auto pipeline = source_ptr->loadKeys(key_columns, requested_rows);
if (use_async_executor)
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
else
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingPipelineExecutor>>(std::move(pipeline)));
}
LOG_DEBUG(&Poco::Logger::get("DirectDictionary"), "building pipeline for loading keys done in {} ms", watch.elapsedMilliseconds());
return pipe;
}
template <DictionaryKeyType dictionary_key_type>
Pipe DirectDictionary<dictionary_key_type>::read(const Names & /* column_names */, size_t /* max_block_size */, size_t /* num_streams */) const
{
return Pipe(std::make_shared<SourceFromQueryPipeline>(source_ptr->loadAll()));
return Pipe(std::make_shared<SourceFromQueryPipeline<>>(source_ptr->loadAll()));
}
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::applySettings(const Settings & settings)
{
if (dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get()))
{
/// Only applicable for CLICKHOUSE dictionary source.
use_async_executor = settings.dictionary_use_async_executor;
}
}
namespace
@ -339,7 +381,7 @@ namespace
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* global_context */,
ContextPtr global_context,
bool /* created_from_ddl */)
{
const auto * layout_name = dictionary_key_type == DictionaryKeyType::Simple ? "direct" : "complex_key_direct";
@ -372,7 +414,12 @@ namespace
"'lifetime' parameter is redundant for the dictionary' of layout '{}'",
layout_name);
return std::make_unique<DirectDictionary<dictionary_key_type>>(dict_id, dict_struct, std::move(source_ptr));
auto dictionary = std::make_unique<DirectDictionary<dictionary_key_type>>(dict_id, dict_struct, std::move(source_ptr));
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
dictionary->applySettings(context->getSettingsRef());
return dictionary;
}
}

View File

@ -95,6 +95,8 @@ public:
Pipe read(const Names & column_names, size_t max_block_size, size_t num_streams) const override;
void applySettings(const Settings & settings);
private:
Pipe getSourcePipe(const Columns & key_columns, const PaddedPODArray<KeyType> & requested_keys) const;
@ -102,6 +104,8 @@ private:
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
bool use_async_executor = false;
mutable std::atomic<size_t> query_count{0};
mutable std::atomic<size_t> found_count{0};
};

View File

@ -75,7 +75,7 @@ CREATE DICTIONARY db_01268.dict2
)
PRIMARY KEY region_id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict2' PASSWORD '' DB 'database_for_dict_01268'))
LAYOUT(DIRECT());
LAYOUT(DIRECT()) SETTINGS(dictionary_use_async_executor=1, max_threads=8);
CREATE DICTIONARY db_01268.dict3
(