Merge pull request #55839 from ClickHouse/vdimir/async_executor_for_dictionary_load

Use AsyncPipelineExecutor all dictionaries
This commit is contained in:
vdimir 2023-10-24 11:31:36 +02:00 committed by GitHub
commit da6f3346fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 253 additions and 76 deletions

View File

@ -4768,3 +4768,18 @@ a Tuple(
l Nullable(String)
)
```
## dictionary_use_async_executor {#dictionary_use_async_executor}
Execute a pipeline for reading dictionary source in several threads. It's supported only by dictionaries with local CLICKHOUSE source.
You may specify it in `SETTINGS` section of dictionary definition:
```sql
CREATE DICTIONARY t1_dict ( key String, attr UInt64 )
PRIMARY KEY key
SOURCE(CLICKHOUSE(QUERY `SELECT key, attr FROM t1 GROUP BY key`))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(COMPLEX_KEY_HASHED_ARRAY())
SETTINGS(dictionary_use_async_executor=1, max_threads=8);
```

View File

@ -1073,7 +1073,7 @@ class IColumn;
M(Bool, regexp_dict_flag_case_insensitive, false, "Use case-insensitive matching for a regexp_tree dictionary. Can be overridden in individual expressions with (?i) and (?-i).", 0) \
M(Bool, regexp_dict_flag_dotall, false, "Allow '.' to match newline characters for a regexp_tree dictionary.", 0) \
\
M(Bool, dictionary_use_async_executor, false, "Execute a pipeline for reading from a dictionary with several threads. It's supported only by DIRECT dictionary with CLICKHOUSE source.", 0) \
M(Bool, dictionary_use_async_executor, false, "Execute a pipeline for reading dictionary source in several threads. It's supported only by dictionaries with local CLICKHOUSE source.", 0) \
M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \
// End of FORMAT_FACTORY_SETTINGS

View File

@ -10,10 +10,10 @@
#include <Common/ProfileEvents.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Dictionaries//DictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace ProfileEvents
@ -50,8 +50,7 @@ CacheDictionary<dictionary_key_type>::CacheDictionary(
DictionarySourcePtr source_ptr_,
CacheDictionaryStoragePtr cache_storage_ptr_,
CacheDictionaryUpdateQueueConfiguration update_queue_configuration_,
DictionaryLifetime dict_lifetime_,
bool allow_read_expired_keys_)
CacheDictionaryConfiguration configuration_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
@ -63,9 +62,8 @@ CacheDictionary<dictionary_key_type>::CacheDictionary(
{
update(unit_to_update);
})
, dict_lifetime(dict_lifetime_)
, configuration(configuration_)
, log(&Poco::Logger::get("ExternalDictionaries"))
, allow_read_expired_keys(allow_read_expired_keys_)
, rnd_engine(randomSeed())
{
if (!source_ptr->supportsSelectiveLoad())
@ -209,7 +207,7 @@ Columns CacheDictionary<dictionary_key_type>::getColumns(
HashMap<KeyType, size_t> requested_keys_to_fetched_columns_during_update_index;
MutableColumns fetched_columns_during_update = request.makeAttributesResultColumns();
if (not_found_keys_size == 0 && expired_keys_size > 0 && allow_read_expired_keys)
if (not_found_keys_size == 0 && expired_keys_size > 0 && configuration.allow_read_expired_keys)
{
/// Start async update only if allow read expired keys and all keys are found
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
@ -314,7 +312,7 @@ ColumnUInt8::Ptr CacheDictionary<dictionary_key_type>::hasKeys(const Columns & k
allow_expired_keys_during_aggregation = true;
}
else if (not_found_keys_size == 0 && expired_keys_size > 0 && allow_read_expired_keys)
else if (not_found_keys_size == 0 && expired_keys_size > 0 && configuration.allow_read_expired_keys)
{
/// Start async update only if allow read expired keys and all keys are found
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
@ -589,7 +587,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
while (executor.pull(block))
{

View File

@ -24,6 +24,14 @@
namespace DB
{
struct CacheDictionaryConfiguration
{
const bool allow_read_expired_keys;
const DictionaryLifetime lifetime;
const bool use_async_executor = false;
};
/** CacheDictionary store keys in cache storage and can asynchronous and synchronous updates during keys fetch.
If keys are not found in storage during fetch, dictionary start update operation with update queue.
@ -58,8 +66,7 @@ public:
DictionarySourcePtr source_ptr_,
CacheDictionaryStoragePtr cache_storage_ptr_,
CacheDictionaryUpdateQueueConfiguration update_queue_configuration_,
DictionaryLifetime dict_lifetime_,
bool allow_read_expired_keys_);
CacheDictionaryConfiguration configuration_);
~CacheDictionary() override;
@ -99,13 +106,12 @@ public:
getSourceAndUpdateIfNeeded()->clone(),
cache_storage_ptr,
update_queue.getConfiguration(),
dict_lifetime,
allow_read_expired_keys);
configuration);
}
DictionarySourcePtr getSource() const override;
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryLifetime & getLifetime() const override { return configuration.lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
@ -194,12 +200,10 @@ private:
CacheDictionaryStoragePtr cache_storage_ptr;
mutable CacheDictionaryUpdateQueue<dictionary_key_type> update_queue;
const DictionaryLifetime dict_lifetime;
const CacheDictionaryConfiguration configuration;
Poco::Logger * log;
const bool allow_read_expired_keys;
mutable pcg64 rnd_engine;
/// This lock is used for the inner cache state update function lock it for

View File

@ -59,6 +59,8 @@ public:
bool hasUpdateField() const override;
bool isLocal() const { return configuration.is_local; }
DictionarySourcePtr clone() const override { return std::make_shared<ClickHouseDictionarySource>(*this); }
std::string toString() const override;

View File

@ -9,11 +9,15 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/SettingsChanges.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
@ -130,4 +134,30 @@ String TransformWithAdditionalColumns::getName() const
{
return "TransformWithAdditionalColumns";
}
DictionaryPipelineExecutor::DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async)
: async_executor(async ? std::make_unique<PullingAsyncPipelineExecutor>(pipeline_) : nullptr)
, executor(async ? nullptr : std::make_unique<PullingPipelineExecutor>(pipeline_))
{}
bool DictionaryPipelineExecutor::pull(Block & block)
{
if (async_executor)
{
while (true)
{
bool has_data = async_executor->pull(block);
if (has_data && !block)
continue;
return has_data;
}
}
else if (executor)
return executor->pull(block);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "DictionaryPipelineExecutor is not initialized");
}
DictionaryPipelineExecutor::~DictionaryPipelineExecutor() = default;
}

View File

@ -16,6 +16,10 @@ namespace DB
struct DictionaryStructure;
class SettingsChanges;
class PullingPipelineExecutor;
class PullingAsyncPipelineExecutor;
class QueryPipeline;
/// For simple key
Block blockForIds(
@ -51,4 +55,17 @@ private:
size_t current_range_index = 0;
};
/// Wrapper for `Pulling(Async)PipelineExecutor` to dynamically dispatch calls to the right executor
class DictionaryPipelineExecutor
{
public:
DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async);
bool pull(Block & block);
~DictionaryPipelineExecutor();
private:
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor;
std::unique_ptr<PullingPipelineExecutor> executor;
};
}

View File

@ -366,10 +366,10 @@ Pipe DirectDictionary<dictionary_key_type>::read(const Names & /* column_names *
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::applySettings(const Settings & settings)
{
if (dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get()))
if (const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get()))
{
/// Only applicable for CLICKHOUSE dictionary source.
use_async_executor = settings.dictionary_use_async_executor;
use_async_executor = settings.dictionary_use_async_executor && clickhouse_source->isLocal();
}
}

View File

@ -12,9 +12,9 @@
#include <Functions/FunctionHelpers.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
@ -395,7 +395,7 @@ void FlatDictionary::updateData()
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
update_field_loaded_block.reset();
Block block;
@ -436,7 +436,7 @@ void FlatDictionary::loadData()
if (!source_ptr->hasUpdateField())
{
QueryPipeline pipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
while (executor.pull(block))

View File

@ -27,6 +27,7 @@ public:
size_t max_array_size;
bool require_nonempty;
DictionaryLifetime dict_lifetime;
bool use_async_executor = false;
};
FlatDictionary(

View File

@ -7,11 +7,12 @@
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
namespace DB
{
@ -409,7 +410,7 @@ void HashedArrayDictionary<dictionary_key_type>::updateData()
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
update_field_loaded_block.reset();
Block block;
@ -533,12 +534,12 @@ void HashedArrayDictionary<dictionary_key_type>::blockToAttributes(const Block &
}
template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::resize(size_t added_rows)
void HashedArrayDictionary<dictionary_key_type>::resize(size_t total_rows)
{
if (unlikely(!added_rows))
if (unlikely(!total_rows))
return;
key_attribute.container.reserve(added_rows);
key_attribute.container.reserve(total_rows);
}
template <DictionaryKeyType dictionary_key_type>
@ -727,14 +728,37 @@ void HashedArrayDictionary<dictionary_key_type>::loadData()
{
QueryPipeline pipeline;
pipeline = QueryPipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
UInt64 pull_time_microseconds = 0;
UInt64 process_time_microseconds = 0;
size_t total_rows = 0;
size_t total_blocks = 0;
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
while (true)
{
resize(block.rows());
Stopwatch watch_pull;
bool has_data = executor.pull(block);
pull_time_microseconds += watch_pull.elapsedMicroseconds();
if (!has_data)
break;
++total_blocks;
total_rows += block.rows();
Stopwatch watch_process;
resize(total_rows);
blockToAttributes(block);
process_time_microseconds += watch_process.elapsedMicroseconds();
}
LOG_DEBUG(&Poco::Logger::get("HashedArrayDictionary"),
"Finished {}reading {} blocks with {} rows from pipeline in {:.2f} sec and inserted into hashtable in {:.2f} sec",
configuration.use_async_executor ? "asynchronous " : "",
total_blocks, total_rows, pull_time_microseconds / 1000000.0, process_time_microseconds / 1000000.0);
}
else
{
@ -843,6 +867,7 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr global_context,
DictionarySourcePtr source_ptr,
DictionaryKeyType dictionary_key_type) -> DictionaryPtr
{
@ -863,6 +888,12 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)
HashedArrayDictionaryStorageConfiguration configuration{require_nonempty, dict_lifetime};
ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto & settings = context->getSettingsRef();
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
configuration.use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor;
if (dictionary_key_type == DictionaryKeyType::Simple)
return std::make_unique<HashedArrayDictionary<DictionaryKeyType::Simple>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else
@ -872,9 +903,15 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)
using namespace std::placeholders;
factory.registerLayout("hashed_array",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple); }, false);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/)
{
return create_layout(a, b, c, d, global_context, std::move(e), DictionaryKeyType::Simple);
}, false);
factory.registerLayout("complex_key_hashed_array",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex); }, true);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/)
{
return create_layout(a, b, c, d, global_context, std::move(e), DictionaryKeyType::Complex);
}, true);
}
}

View File

@ -25,6 +25,7 @@ struct HashedArrayDictionaryStorageConfiguration
{
const bool require_nonempty;
const DictionaryLifetime lifetime;
bool use_async_executor = false;
};
template <DictionaryKeyType dictionary_key_type>
@ -212,7 +213,7 @@ private:
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && get_container_func) const;
void resize(size_t added_rows);
void resize(size_t total_rows);
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;

View File

@ -20,7 +20,9 @@
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Dictionaries/HashedDictionaryCollectionTraits.h>
@ -709,7 +711,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
update_field_loaded_block.reset();
Block block;
@ -938,7 +940,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
QueryPipeline pipeline = QueryPipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
@ -1147,6 +1149,7 @@ void registerDictionaryHashed(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr global_context,
DictionaryKeyType dictionary_key_type,
bool sparse) -> DictionaryPtr
{
@ -1189,12 +1192,19 @@ void registerDictionaryHashed(DictionaryFactory & factory)
if (max_load_factor < 0.5f || max_load_factor > 0.99f)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: max_load_factor parameter should be within [0.5, 0.99], got {}", full_name, max_load_factor);
ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto & settings = context->getSettingsRef();
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
bool use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor;
HashedDictionaryConfiguration configuration{
static_cast<UInt64>(shards),
static_cast<UInt64>(shard_load_queue_backlog),
max_load_factor,
require_nonempty,
dict_lifetime,
use_async_executor,
};
if (source_ptr->hasUpdateField() && shards > 1)
@ -1239,13 +1249,13 @@ void registerDictionaryHashed(DictionaryFactory & factory)
using namespace std::placeholders;
factory.registerLayout("hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple, /* sparse = */ false); }, false);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), global_context, DictionaryKeyType::Simple, /* sparse = */ false); }, false);
factory.registerLayout("sparse_hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple, /* sparse = */ true); }, false);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), global_context, DictionaryKeyType::Simple, /* sparse = */ true); }, false);
factory.registerLayout("complex_key_hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex, /* sparse = */ false); }, true);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), global_context, DictionaryKeyType::Complex, /* sparse = */ false); }, true);
factory.registerLayout("complex_key_sparse_hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex, /* sparse = */ true); }, true);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), global_context, DictionaryKeyType::Complex, /* sparse = */ true); }, true);
}

View File

@ -28,6 +28,7 @@ struct HashedDictionaryConfiguration
const float max_load_factor;
const bool require_nonempty;
const DictionaryLifetime lifetime;
bool use_async_executor = false;
};
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>

View File

@ -16,7 +16,9 @@
#include <base/map.h>
#include <base/range.h>
#include <base/sort.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionHelpers.h>
@ -197,13 +199,11 @@ IPAddressDictionary::IPAddressDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_)
IPAddressDictionary::Configuration configuration_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, require_nonempty(require_nonempty_)
, configuration(configuration_)
, access_to_key_from_attributes(dict_struct_.access_to_key_from_attributes)
, logger(&Poco::Logger::get("IPAddressDictionary"))
{
@ -369,7 +369,7 @@ void IPAddressDictionary::loadData()
bool has_ipv6 = false;
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
while (executor.pull(block))
{
@ -525,7 +525,7 @@ void IPAddressDictionary::loadData()
LOG_TRACE(logger, "{} ip records are read", ip_records.size());
if (require_nonempty && 0 == element_count)
if (configuration.require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", getFullName());
}
@ -971,7 +971,7 @@ void registerDictionaryTrie(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* global_context */,
ContextPtr global_context,
bool /*created_from_ddl*/) -> DictionaryPtr
{
if (!dict_struct.key || dict_struct.key->size() != 1)
@ -981,8 +981,17 @@ void registerDictionaryTrie(DictionaryFactory & factory)
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
bool use_async_executor = clickhouse_source && clickhouse_source->isLocal() && context->getSettingsRef().dictionary_use_async_executor;
IPAddressDictionary::Configuration configuration{
.dict_lifetime = dict_lifetime,
.require_nonempty = require_nonempty,
.use_async_executor = use_async_executor,
};
// This is specialised dictionary for storing IPv4 and IPv6 prefixes.
return std::make_unique<IPAddressDictionary>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty);
return std::make_unique<IPAddressDictionary>(dict_id, dict_struct, std::move(source_ptr), configuration);
};
factory.registerLayout("ip_trie", create_layout, true);
}

View File

@ -22,12 +22,18 @@ class Arena;
class IPAddressDictionary final : public IDictionary
{
public:
struct Configuration
{
DictionaryLifetime dict_lifetime;
bool require_nonempty;
bool use_async_executor = false;
};
IPAddressDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_, /// NOLINT
bool require_nonempty_);
Configuration configuration_);
std::string getKeyDescription() const { return key_description; }
@ -53,12 +59,12 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<IPAddressDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);
return std::make_shared<IPAddressDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration);
}
DictionarySourcePtr getSource() const override { return source_ptr; }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryLifetime & getLifetime() const override { return configuration.dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
@ -199,8 +205,7 @@ private:
DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const bool require_nonempty;
const Configuration configuration;
const bool access_to_key_from_attributes;
const std::string key_description{dict_struct.getKeyDescription()};

View File

@ -14,6 +14,7 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
namespace DB
@ -231,7 +232,7 @@ void IPolygonDictionary::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
while (executor.pull(block))
blockToAttributes(block);

View File

@ -56,6 +56,8 @@ public:
/// Store polygon key column. That will allow to read columns from polygon dictionary.
bool store_polygon_key_column = false;
bool use_async_executor = false;
};
IPolygonDictionary(

View File

@ -4,6 +4,8 @@
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Common/logger_useful.h>
@ -161,7 +163,7 @@ DictionaryPtr createLayout(const std::string & ,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* global_context */,
ContextPtr global_context,
bool /*created_from_ddl*/)
{
const String database = config.getString(config_prefix + ".database", "");
@ -219,11 +221,16 @@ DictionaryPtr createLayout(const std::string & ,
config.keys(layout_prefix, keys);
const auto & dict_prefix = layout_prefix + "." + keys.front();
ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
bool use_async_executor = clickhouse_source && clickhouse_source->isLocal() && context->getSettingsRef().dictionary_use_async_executor;
IPolygonDictionary::Configuration configuration
{
.input_type = input_type,
.point_type = point_type,
.store_polygon_key_column = config.getBool(dict_prefix + ".store_polygon_key_column", false)
.store_polygon_key_column = config.getBool(dict_prefix + ".store_polygon_key_column", false),
.use_async_executor = use_async_executor,
};
if (dict_struct.range_min || dict_struct.range_max)

View File

@ -29,7 +29,9 @@
#include <Functions/FunctionHelpers.h>
#include <Interpreters/castColumn.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
namespace DB
@ -56,6 +58,7 @@ struct RangeHashedDictionaryConfiguration
bool convert_null_range_bound_to_open;
RangeHashedDictionaryLookupStrategy lookup_strategy;
bool require_nonempty;
bool use_async_executor = false;
};
template <DictionaryKeyType dictionary_key_type>
@ -655,7 +658,7 @@ void RangeHashedDictionary<dictionary_key_type>::loadData()
if (!source_ptr->hasUpdateField())
{
QueryPipeline pipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
while (executor.pull(block))
@ -919,7 +922,7 @@ void RangeHashedDictionary<dictionary_key_type>::updateData()
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
update_field_loaded_block.reset();
Block block;

View File

@ -310,7 +310,7 @@ void RegExpTreeDictionary::loadData()
if (!source_ptr->hasUpdateField())
{
QueryPipeline pipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
while (executor.pull(block))
@ -867,12 +867,17 @@ void registerDictionaryRegExpTree(DictionaryFactory & factory)
String dictionary_layout_prefix = config_prefix + ".layout" + ".regexp_tree";
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
RegExpTreeDictionary::Configuration configuration{
.require_nonempty = config.getBool(config_prefix + ".require_nonempty", false), .lifetime = dict_lifetime};
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto * clickhouse_source = typeid_cast<const ClickHouseDictionarySource *>(source_ptr.get());
bool use_async_executor = clickhouse_source && clickhouse_source->isLocal() && context->getSettingsRef().dictionary_use_async_executor;
RegExpTreeDictionary::Configuration configuration{
.require_nonempty = config.getBool(config_prefix + ".require_nonempty", false),
.lifetime = dict_lifetime,
.use_async_executor = use_async_executor,
};
return std::make_unique<RegExpTreeDictionary>(
dict_id,

View File

@ -40,6 +40,7 @@ public:
{
bool require_nonempty;
DictionaryLifetime lifetime;
bool use_async_executor = false;
};
const std::string name = "RegExpTree";

View File

@ -2,7 +2,10 @@
#include "CacheDictionaryStorage.h"
#include "SSDCacheDictionaryStorage.h"
#include <Common/filesystemHelpers.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Interpreters/Context.h>
namespace DB
@ -222,6 +225,16 @@ DictionaryPtr createCacheDictionaryLayout(
storage = std::make_shared<SSDCacheDictionaryStorage<dictionary_key_type>>(storage_configuration);
}
#endif
ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto & settings = context->getSettingsRef();
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
bool use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor;
CacheDictionaryConfiguration configuration{
allow_read_expired_keys,
dict_lifetime,
use_async_executor,
};
auto dictionary = std::make_unique<CacheDictionary<dictionary_key_type>>(
dictionary_identifier,
@ -229,8 +242,7 @@ DictionaryPtr createCacheDictionaryLayout(
std::move(source_ptr),
std::move(storage),
update_queue_configuration,
dict_lifetime,
allow_read_expired_keys);
configuration);
return dictionary;
}

View File

@ -16,6 +16,7 @@ static DictionaryPtr createRangeHashedDictionary(const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr global_context,
DictionarySourcePtr source_ptr)
{
static constexpr auto layout_name = dictionary_key_type == DictionaryKeyType::Simple ? "range_hashed" : "complex_key_range_hashed";
@ -52,11 +53,16 @@ static DictionaryPtr createRangeHashedDictionary(const std::string & full_name,
else if (range_lookup_strategy == "max")
lookup_strategy = RangeHashedDictionaryLookupStrategy::max;
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
bool use_async_executor = clickhouse_source && clickhouse_source->isLocal() && context->getSettingsRef().dictionary_use_async_executor;
RangeHashedDictionaryConfiguration configuration
{
.convert_null_range_bound_to_open = convert_null_range_bound_to_open,
.lookup_strategy = lookup_strategy,
.require_nonempty = require_nonempty
.require_nonempty = require_nonempty,
.use_async_executor = use_async_executor,
};
DictionaryPtr result = std::make_unique<RangeHashedDictionary<dictionary_key_type>>(
@ -76,10 +82,10 @@ void registerDictionaryRangeHashed(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* global_context */,
ContextPtr global_context,
bool /*created_from_ddl*/) -> DictionaryPtr
{
return createRangeHashedDictionary<DictionaryKeyType::Simple>(full_name, dict_struct, config, config_prefix, std::move(source_ptr));
return createRangeHashedDictionary<DictionaryKeyType::Simple>(full_name, dict_struct, config, config_prefix, global_context, std::move(source_ptr));
};
factory.registerLayout("range_hashed", create_layout_simple, false);
@ -89,10 +95,10 @@ void registerDictionaryRangeHashed(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
ContextPtr global_context,
bool /*created_from_ddl*/) -> DictionaryPtr
{
return createRangeHashedDictionary<DictionaryKeyType::Complex>(full_name, dict_struct, config, config_prefix, std::move(source_ptr));
return createRangeHashedDictionary<DictionaryKeyType::Complex>(full_name, dict_struct, config, config_prefix, global_context, std::move(source_ptr));
};
factory.registerLayout("complex_key_range_hashed", create_layout_complex, true);

View File

@ -37,7 +37,9 @@ CREATE DICTIONARY {CLICKHOUSE_DATABASE:Identifier}.dict_ipv4_trie
PRIMARY KEY prefix
SOURCE(CLICKHOUSE(host 'localhost' port 9000 user 'default' db currentDatabase() table 'table_ipv4_trie'))
LAYOUT(IP_TRIE())
LIFETIME(MIN 10 MAX 100);
LIFETIME(MIN 10 MAX 100)
SETTINGS(dictionary_use_async_executor=1, max_threads=8)
;
-- fuzzer
SELECT '127.0.0.0/24' = dictGetString({CLICKHOUSE_DATABASE:String} || '.dict_ipv4_trie', 'prefixprefixprefixprefix', tuple(IPv4StringToNumOrDefault('127.0.0.0127.0.0.0'))); -- { serverError 36 }

View File

@ -29,7 +29,9 @@ PRIMARY KEY CountryID
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'date_table' DB 'database_for_range_dict'))
LIFETIME(MIN 1 MAX 1000)
LAYOUT(RANGE_HASHED())
RANGE(MIN StartDate MAX EndDate);
RANGE(MIN StartDate MAX EndDate)
SETTINGS(dictionary_use_async_executor=1, max_threads=8)
;
SELECT 'Dictionary not nullable';
SELECT 'dictGet';

View File

@ -24,7 +24,9 @@ CREATE DICTIONARY 01681_database_for_cache_dictionary.cache_dictionary_simple_ke
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_key_simple_attributes_source_table'))
LIFETIME(MIN 1 MAX 1000)
LAYOUT(CACHE(SIZE_IN_CELLS 10));
LAYOUT(CACHE(SIZE_IN_CELLS 10))
SETTINGS(dictionary_use_async_executor=1, max_threads=8)
;
SELECT 'Dictionary cache_dictionary_simple_key_simple_attributes';
SELECT 'dictGet existing value';

View File

@ -29,7 +29,9 @@ CREATE DICTIONARY 01760_db.dict_array
PRIMARY KEY key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'polygons' DB '01760_db'))
LIFETIME(0)
LAYOUT(POLYGON());
LAYOUT(POLYGON())
SETTINGS(dictionary_use_async_executor=1, max_threads=8)
;
SELECT 'dictGet';

View File

@ -24,7 +24,8 @@ CREATE DICTIONARY 01765_db.hashed_dictionary_simple_key_simple_attributes
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_key_simple_attributes_source_table'))
LIFETIME(MIN 1 MAX 1000)
LAYOUT(HASHED());
LAYOUT(HASHED())
SETTINGS(dictionary_use_async_executor=1, max_threads=8);
SELECT 'Dictionary hashed_dictionary_simple_key_simple_attributes';
SELECT 'dictGet existing value';

View File

@ -21,7 +21,8 @@ CREATE DICTIONARY hashed_array_dictionary_simple_key_simple_attributes
PRIMARY KEY id
SOURCE(CLICKHOUSE(TABLE 'simple_key_simple_attributes_source_table'))
LAYOUT(HASHED_ARRAY())
LIFETIME(MIN 1 MAX 1000);
LIFETIME(MIN 1 MAX 1000)
SETTINGS(dictionary_use_async_executor=1, max_threads=8);
SELECT 'Dictionary hashed_array_dictionary_simple_key_simple_attributes';
SELECT 'dictGet existing value';