Merge pull request #40003 from azat/dict-shards

Add ability to load hashed dictionaries using multiple threads
This commit is contained in:
Maksim Kita 2023-01-18 13:37:10 +03:00 committed by GitHub
commit 8225d2814c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 770 additions and 197 deletions

View File

@ -156,6 +156,33 @@ or
LAYOUT(HASHED(PREALLOCATE 0))
```
If `shards` greater then 1 (default is `1`) the dictionary will load data in parallel, useful if you have huge amount of elements in one dictionary.
Configuration example:
``` xml
<layout>
<hashed>
<shards>10</shards>
<!-- Size of the backlog for blocks in parallel queue.
Since the bottleneck in parallel loading is rehash, and so to avoid
stalling because of thread is doing rehash, you need to have some
backlog.
10000 is good balance between memory and speed.
Even for 10e10 elements and can handle all the load without starvation. -->
<shard_load_queue_backlog>10000</shard_load_queue_backlog>
</hashed>
</layout>
```
or
``` sql
LAYOUT(HASHED(SHARDS 10 [SHARD_LOAD_QUEUE_BACKLOG 10000]))
```
### sparse_hashed
Similar to `hashed`, but uses less memory in favor more CPU usage.
@ -178,6 +205,8 @@ or
LAYOUT(SPARSE_HASHED([PREALLOCATE 0]))
```
It is also possible to use `shards` for this type of dictionary, and again it is more important for `sparse_hashed` then for `hashed`, since `sparse_hashed` is slower.
### complex_key_hashed
This type of storage is for use with composite [keys](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md). Similar to `hashed`.
@ -186,14 +215,18 @@ Configuration example:
``` xml
<layout>
<complex_key_hashed />
<complex_key_hashed>
<preallocate>0</preallocate>
<shards>1</shards>
<!-- <shard_load_queue_backlog>10000</shard_load_queue_backlog> -->
</complex_key_hashed>
</layout>
```
or
``` sql
LAYOUT(COMPLEX_KEY_HASHED())
LAYOUT(COMPLEX_KEY_HASHED([PREALLOCATE 0] [SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000]))
```
### complex_key_sparse_hashed
@ -204,14 +237,17 @@ Configuration example:
``` xml
<layout>
<complex_key_sparse_hashed />
<complex_key_sparse_hashed>
<preallocate>0</preallocate>
<shards>1</shards>
</complex_key_sparse_hashed>
</layout>
```
or
``` sql
LAYOUT(COMPLEX_KEY_SPARSE_HASHED())
LAYOUT(COMPLEX_KEY_SPARSE_HASHED([PREALLOCATE 0] [SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000]))
```
### hashed_array

View File

@ -1,16 +1,25 @@
#include "HashedDictionary.h"
#include <numeric>
#include <boost/noncopyable.hpp>
#include <Common/ArenaUtils.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Functions/FunctionHelpers.h>
#include <Dictionaries//DictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Common/logger_useful.h>
#include "HashedDictionary.h"
namespace
{
@ -35,16 +44,154 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int DICTIONARY_IS_EMPTY;
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
HashedDictionary<dictionary_key_type, sparse>::HashedDictionary(
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded> class HashedDictionary;
/// Implementation parallel dictionary load for SHARDS
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
class ParallelDictionaryLoader : public boost::noncopyable
{
using HashedDictionary = HashedDictionary<dictionary_key_type, sparse, sharded>;
public:
explicit ParallelDictionaryLoader(HashedDictionary & dictionary_)
: dictionary(dictionary_)
, shards(dictionary.configuration.shards)
, simple_key(dictionary.dict_struct.getKeysSize() == 1)
, pool(shards)
, shards_queues(shards)
{
UInt64 backlog = dictionary.configuration.shard_load_queue_backlog;
LOG_TRACE(dictionary.log, "Will load the dictionary using {} threads (with {} backlog)", shards, backlog);
shards_slots.resize(shards);
std::iota(shards_slots.begin(), shards_slots.end(), 0);
for (size_t shard = 0; shard < shards; ++shard)
{
shards_queues[shard].emplace(backlog);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("HashedDictLoad");
threadWorker(shard);
});
}
}
void addBlock(Block block)
{
IColumn::Selector selector = createShardSelector(block, shards_slots);
Blocks shards_blocks = splitBlock(selector, block);
for (size_t shard = 0; shard < shards; ++shard)
{
if (!shards_queues[shard]->push(std::move(shards_blocks[shard])))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to shards queue #{}", shard);
}
}
void finish()
{
for (auto & queue : shards_queues)
queue->finish();
Stopwatch watch;
pool.wait();
UInt64 elapsed_ms = watch.elapsedMilliseconds();
LOG_TRACE(dictionary.log, "Processing the tail took {}ms", elapsed_ms);
}
~ParallelDictionaryLoader()
{
for (auto & queue : shards_queues)
queue->clearAndFinish();
pool.wait();
}
private:
HashedDictionary & dictionary;
const size_t shards;
bool simple_key;
ThreadPool pool;
std::vector<std::optional<ConcurrentBoundedQueue<Block>>> shards_queues;
std::vector<UInt64> shards_slots;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
void threadWorker(size_t shard)
{
Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
auto & shard_queue = *shards_queues[shard];
while (shard_queue.pop(block))
{
Stopwatch watch;
dictionary.blockToAttributes(block, arena_holder, shard);
UInt64 elapsed_ms = watch.elapsedMilliseconds();
if (elapsed_ms > 1'000)
LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {}).", shard, elapsed_ms, block.rows());
}
if (!shard_queue.isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pull non finished shards queue #{}", shard);
}
/// Split block to shards smaller block, using 'selector'.
Blocks splitBlock(const IColumn::Selector & selector, const Block & block)
{
Blocks out_blocks(shards);
for (size_t shard = 0; shard < shards; ++shard)
out_blocks[shard] = block.cloneEmpty();
size_t columns = block.columns();
for (size_t col = 0; col < columns; ++col)
{
MutableColumns splitted_columns = block.getByPosition(col).column->scatter(shards, selector);
for (size_t shard = 0; shard < shards; ++shard)
out_blocks[shard].getByPosition(col).column = std::move(splitted_columns[shard]);
}
return out_blocks;
}
IColumn::Selector createShardSelector(const Block & block, const std::vector<UInt64> & slots)
{
size_t num_rows = block.rows();
IColumn::Selector selector(num_rows);
size_t skip_keys_size_offset = dictionary.dict_struct.getKeysSize();
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
for (size_t i = 0; i < skip_keys_size_offset; ++i)
key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns, arena_holder.getComplexKeyArena());
for (size_t i = 0; i < num_rows; ++i)
{
auto key = keys_extractor.extractCurrentKey();
size_t shard = dictionary.getShard(key);
selector[i] = slots[shard];
keys_extractor.rollbackCurrentKey();
}
return selector;
}
};
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
HashedDictionary<dictionary_key_type, sparse, sharded>::HashedDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const HashedDictionaryStorageConfiguration & configuration_,
BlockPtr update_field_loaded_block_)
: IDictionary(dict_id_)
, log(&Poco::Logger::get("HashedDictionary"))
, dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_))
, configuration(configuration_)
@ -56,8 +203,74 @@ HashedDictionary<dictionary_key_type, sparse>::HashedDictionary(
calculateBytesAllocated();
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getColumn(
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
HashedDictionary<dictionary_key_type, sparse, sharded>::~HashedDictionary()
try
{
/// Do a regular sequential destroy in case of non sharded dictionary
///
/// Note, that even in non-sharded dictionaries you can have multiple hash
/// tables, since each attribute is stored in a separate hash table.
if constexpr (!sharded)
return;
size_t shards = std::max<size_t>(configuration.shards, 1);
size_t attributes_tables = std::max<size_t>(attributes.size(), 1 /* no_attributes_containers */);
ThreadPool pool(shards * attributes_tables);
size_t hash_tables_count = 0;
auto schedule_destroy = [&hash_tables_count, &pool](auto & container)
{
if (container.empty())
return;
pool.scheduleOrThrowOnError([&container, thread_group = CurrentThread::getGroup()]
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("HashedDictDtor");
if constexpr (sparse)
container.clear();
else
container.clearAndShrink();
});
++hash_tables_count;
};
if (attributes.empty())
{
for (size_t shard = 0; shard < shards; ++shard)
{
schedule_destroy(no_attributes_containers[shard]);
}
}
else
{
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
getAttributeContainer(attribute_index, [&](auto & containers)
{
for (size_t shard = 0; shard < shards; ++shard)
{
schedule_destroy(containers[shard]);
}
});
}
}
LOG_TRACE(log, "Destroying {} non empty hash tables (using {} threads)", hash_tables_count, pool.getMaxThreads());
pool.wait();
LOG_TRACE(log, "Hash tables destroyed");
}
catch (...)
{
tryLogCurrentException("HashedDictionary", "Error while destroying dictionary in parallel, will do a sequential destroy.");
}
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
ColumnPtr HashedDictionary<dictionary_key_type, sparse, sharded>::getColumn(
const std::string & attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
@ -163,8 +376,8 @@ ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getColumn(
return result;
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::hasKeys(const Columns & key_columns, const DataTypes & key_types) const
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse, sharded>::hasKeys(const Columns & key_columns, const DataTypes & key_types) const
{
if (dictionary_key_type == DictionaryKeyType::Complex)
dict_struct.validateKeyTypes(key_types);
@ -183,8 +396,9 @@ ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::hasKeys(const Co
{
for (size_t requested_key_index = 0; requested_key_index < keys_size; ++requested_key_index)
{
auto requested_key = extractor.extractCurrentKey();
out[requested_key_index] = no_attributes_container.find(requested_key) != no_attributes_container.end();
auto key = extractor.extractCurrentKey();
const auto & container = no_attributes_containers[getShard(key)];
out[requested_key_index] = container.find(key) != container.end();
keys_found += out[requested_key_index];
extractor.rollbackCurrentKey();
}
@ -197,18 +411,19 @@ ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::hasKeys(const Co
const auto & attribute = attributes.front();
bool is_attribute_nullable = attribute.is_nullable_set.has_value();
getAttributeContainer(0, [&](const auto & container)
getAttributeContainer(0, [&](const auto & containers)
{
for (size_t requested_key_index = 0; requested_key_index < keys_size; ++requested_key_index)
{
auto requested_key = extractor.extractCurrentKey();
auto key = extractor.extractCurrentKey();
const auto & container = containers[getShard(key)];
out[requested_key_index] = container.find(requested_key) != container.end();
out[requested_key_index] = container.find(key) != container.end();
keys_found += out[requested_key_index];
if (is_attribute_nullable && !out[requested_key_index])
out[requested_key_index] = attribute.is_nullable_set->find(requested_key) != nullptr;
out[requested_key_index] = attribute.is_nullable_set->find(key) != nullptr;
extractor.rollbackCurrentKey();
}
@ -220,8 +435,8 @@ ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::hasKeys(const Co
return result;
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getHierarchy(ColumnPtr key_column [[maybe_unused]], const DataTypePtr &) const
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
ColumnPtr HashedDictionary<dictionary_key_type, sparse, sharded>::getHierarchy(ColumnPtr key_column [[maybe_unused]], const DataTypePtr &) const
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
{
@ -238,14 +453,15 @@ ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getHierarchy(ColumnPtr
if (!dictionary_attribute.null_value.isNull())
null_value = dictionary_attribute.null_value.get<UInt64>();
const CollectionType<UInt64> & child_key_to_parent_key_map = std::get<CollectionType<UInt64>>(hierarchical_attribute.container);
const CollectionsHolder<UInt64> & child_key_to_parent_key_maps = std::get<CollectionsHolder<UInt64>>(hierarchical_attribute.containers);
auto is_key_valid_func = [&](auto & hierarchy_key)
{
if (unlikely(hierarchical_attribute.is_nullable_set) && hierarchical_attribute.is_nullable_set->find(hierarchy_key))
return true;
return child_key_to_parent_key_map.find(hierarchy_key) != child_key_to_parent_key_map.end();
const auto & map = child_key_to_parent_key_maps[getShard(hierarchy_key)];
return map.find(hierarchy_key) != map.end();
};
size_t keys_found = 0;
@ -254,9 +470,9 @@ ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getHierarchy(ColumnPtr
{
std::optional<UInt64> result;
auto it = child_key_to_parent_key_map.find(hierarchy_key);
if (it == child_key_to_parent_key_map.end())
const auto & map = child_key_to_parent_key_maps[getShard(hierarchy_key)];
auto it = map.find(hierarchy_key);
if (it == map.end())
return result;
UInt64 parent_key = getValueFromCell(it);
@ -282,8 +498,8 @@ ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getHierarchy(ColumnPtr
}
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::isInHierarchy(
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse, sharded>::isInHierarchy(
ColumnPtr key_column [[maybe_unused]],
ColumnPtr in_key_column [[maybe_unused]],
const DataTypePtr &) const
@ -309,14 +525,15 @@ ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::isInHierarchy(
if (!dictionary_attribute.null_value.isNull())
null_value = dictionary_attribute.null_value.get<UInt64>();
const CollectionType<UInt64> & child_key_to_parent_key_map = std::get<CollectionType<UInt64>>(hierarchical_attribute.container);
const CollectionsHolder<UInt64> & child_key_to_parent_key_maps = std::get<CollectionsHolder<UInt64>>(hierarchical_attribute.containers);
auto is_key_valid_func = [&](auto & hierarchy_key)
{
if (unlikely(hierarchical_attribute.is_nullable_set) && hierarchical_attribute.is_nullable_set->find(hierarchy_key))
return true;
return child_key_to_parent_key_map.find(hierarchy_key) != child_key_to_parent_key_map.end();
const auto & map = child_key_to_parent_key_maps[getShard(hierarchy_key)];
return map.find(hierarchy_key) != map.end();
};
size_t keys_found = 0;
@ -325,9 +542,9 @@ ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::isInHierarchy(
{
std::optional<UInt64> result;
auto it = child_key_to_parent_key_map.find(hierarchy_key);
if (it == child_key_to_parent_key_map.end())
const auto & map = child_key_to_parent_key_maps[getShard(hierarchy_key)];
auto it = map.find(hierarchy_key);
if (it == map.end())
return result;
UInt64 parent_key = getValueFromCell(it);
@ -351,8 +568,8 @@ ColumnUInt8::Ptr HashedDictionary<dictionary_key_type, sparse>::isInHierarchy(
return nullptr;
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
DictionaryHierarchyParentToChildIndexPtr HashedDictionary<dictionary_key_type, sparse>::getHierarchicalIndex() const
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
DictionaryHierarchyParentToChildIndexPtr HashedDictionary<dictionary_key_type, sparse, sharded>::getHierarchicalIndex() const
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
{
@ -361,13 +578,22 @@ DictionaryHierarchyParentToChildIndexPtr HashedDictionary<dictionary_key_type, s
size_t hierarchical_attribute_index = *dict_struct.hierarchical_attribute_index;
const auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
const CollectionType<UInt64> & child_key_to_parent_key_map = std::get<CollectionType<UInt64>>(hierarchical_attribute.container);
const CollectionsHolder<UInt64> & child_key_to_parent_key_maps = std::get<CollectionsHolder<UInt64>>(hierarchical_attribute.containers);
size_t size = 0;
for (const auto & map : child_key_to_parent_key_maps)
size += map.size();
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
parent_to_child.reserve(child_key_to_parent_key_map.size());
parent_to_child.reserve(size);
for (const auto & [child_key, parent_key] : child_key_to_parent_key_map)
parent_to_child[parent_key].emplace_back(child_key);
for (const auto & map : child_key_to_parent_key_maps)
{
for (const auto & [child_key, parent_key] : map)
{
parent_to_child[parent_key].emplace_back(child_key);
}
}
return std::make_shared<DictionaryHierarchicalParentToChildIndex>(parent_to_child);
}
@ -377,8 +603,8 @@ DictionaryHierarchyParentToChildIndexPtr HashedDictionary<dictionary_key_type, s
}
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getDescendants(
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
ColumnPtr HashedDictionary<dictionary_key_type, sparse, sharded>::getDescendants(
ColumnPtr key_column [[maybe_unused]],
const DataTypePtr &,
size_t level [[maybe_unused]],
@ -403,8 +629,8 @@ ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getDescendants(
}
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::createAttributes()
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
@ -418,16 +644,25 @@ void HashedDictionary<dictionary_key_type, sparse>::createAttributes()
using ValueType = DictionaryValueType<AttributeType>;
auto is_nullable_set = dictionary_attribute.is_nullable ? std::make_optional<NullableSet>() : std::optional<NullableSet>{};
Attribute attribute{dictionary_attribute.underlying_type, std::move(is_nullable_set), CollectionType<ValueType>()};
Attribute attribute{dictionary_attribute.underlying_type, std::move(is_nullable_set), CollectionsHolder<ValueType>(configuration.shards)};
attributes.emplace_back(std::move(attribute));
};
callOnDictionaryAttributeType(dictionary_attribute.underlying_type, type_call);
}
if (unlikely(attributes.size()) == 0)
{
no_attributes_containers.resize(configuration.shards);
}
string_arenas.resize(configuration.shards);
for (auto & arena : string_arenas)
arena = std::make_unique<Arena>();
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::updateData()
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
{
/// NOTE: updateData() does not preallocation since it may increase memory usage.
@ -465,14 +700,16 @@ void HashedDictionary<dictionary_key_type, sparse>::updateData()
if (update_field_loaded_block)
{
resize(update_field_loaded_block->rows());
blockToAttributes(*update_field_loaded_block.get());
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
blockToAttributes(*update_field_loaded_block.get(), arena_holder, /* shard= */ 0);
}
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Block & block [[maybe_unused]])
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(const Block & block, DictionaryKeysArenaHolder<dictionary_key_type> & arena_holder, UInt64 shard)
{
size_t skip_keys_size_offset = dict_struct.getKeysSize();
size_t new_element_count = 0;
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
@ -481,7 +718,6 @@ void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Bloc
for (size_t i = 0; i < skip_keys_size_offset; ++i)
key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
@ -496,12 +732,14 @@ void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Bloc
auto key = keys_extractor.extractCurrentKey();
if constexpr (std::is_same_v<KeyType, StringRef>)
key = copyStringInArena(string_arena, key);
key = copyStringInArena(*string_arenas[shard], key);
no_attributes_container.insert(key);
no_attributes_containers[shard].insert(key);
keys_extractor.rollbackCurrentKey();
++new_element_count;
}
element_count += new_element_count;
return;
}
@ -511,14 +749,15 @@ void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Bloc
auto & attribute = attributes[attribute_index];
bool attribute_is_nullable = attribute.is_nullable_set.has_value();
getAttributeContainer(attribute_index, [&](auto & container)
getAttributeContainer(attribute_index, [&](auto & containers)
{
using ContainerType = std::decay_t<decltype(container)>;
using ContainerType = std::decay_t<decltype(containers.front())>;
using AttributeValueType = typename ContainerType::mapped_type;
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
auto & container = containers[shard];
auto it = container.find(key);
bool key_is_nullable_and_already_exists = attribute_is_nullable && attribute.is_nullable_set->find(key) != nullptr;
@ -530,7 +769,7 @@ void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Bloc
}
if constexpr (std::is_same_v<KeyType, StringRef>)
key = copyStringInArena(string_arena, key);
key = copyStringInArena(*string_arenas[shard], key);
attribute_column.get(key_index, column_value_to_insert);
@ -544,7 +783,7 @@ void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Bloc
if constexpr (std::is_same_v<AttributeValueType, StringRef>)
{
String & value_to_insert = column_value_to_insert.get<String>();
StringRef arena_value = copyStringInArena(string_arena, value_to_insert);
StringRef arena_value = copyStringInArena(*string_arenas[shard], value_to_insert);
container.insert({key, arena_value});
}
else
@ -553,7 +792,7 @@ void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Bloc
container.insert({key, value_to_insert});
}
++element_count;
++new_element_count;
keys_extractor.rollbackCurrentKey();
}
@ -561,51 +800,58 @@ void HashedDictionary<dictionary_key_type, sparse>::blockToAttributes(const Bloc
keys_extractor.reset();
});
}
element_count += new_element_count;
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::resize(size_t added_rows)
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::resize(size_t added_rows)
{
if (unlikely(!added_rows))
return;
/// In multi shards configuration it is pointless.
if constexpr (sharded)
return;
size_t attributes_size = attributes.size();
if (unlikely(attributes_size == 0))
{
size_t reserve_size = added_rows + no_attributes_container.size();
size_t reserve_size = added_rows + no_attributes_containers.front().size();
if constexpr (sparse)
no_attributes_container.resize(reserve_size);
no_attributes_containers.front().resize(reserve_size);
else
no_attributes_container.reserve(reserve_size);
no_attributes_containers.front().reserve(reserve_size);
return;
}
for (size_t attribute_index = 0; attribute_index < attributes_size; ++attribute_index)
{
getAttributeContainer(attribute_index, [added_rows](auto & attribute_map)
getAttributeContainer(attribute_index, [added_rows](auto & containers)
{
size_t reserve_size = added_rows + attribute_map.size();
auto & container = containers.front();
size_t reserve_size = added_rows + container.size();
if constexpr (sparse)
attribute_map.resize(reserve_size);
container.resize(reserve_size);
else
attribute_map.reserve(reserve_size);
container.reserve(reserve_size);
});
}
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
template <typename AttributeType, bool is_nullable, typename ValueSetter, typename DefaultValueExtractor>
void HashedDictionary<dictionary_key_type, sparse>::getItemsImpl(
void HashedDictionary<dictionary_key_type, sparse, sharded>::getItemsImpl(
const Attribute & attribute,
DictionaryKeysExtractor<dictionary_key_type> & keys_extractor,
ValueSetter && set_value [[maybe_unused]],
DefaultValueExtractor & default_value_extractor) const
{
const auto & attribute_container = std::get<CollectionType<AttributeType>>(attribute.container);
const auto & attribute_containers = std::get<CollectionsHolder<AttributeType>>(attribute.containers);
const size_t keys_size = keys_extractor.getKeysSize();
size_t keys_found = 0;
@ -614,9 +860,10 @@ void HashedDictionary<dictionary_key_type, sparse>::getItemsImpl(
{
auto key = keys_extractor.extractCurrentKey();
const auto it = attribute_container.find(key);
const auto & container = attribute_containers[getShard(key)];
const auto it = container.find(key);
if (it != attribute_container.end())
if (it != container.end())
{
set_value(key_index, getValueFromCell(it), false);
++keys_found;
@ -639,11 +886,15 @@ void HashedDictionary<dictionary_key_type, sparse>::getItemsImpl(
found_count.fetch_add(keys_found, std::memory_order_relaxed);
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::loadData()
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
{
if (!source_ptr->hasUpdateField())
{
std::optional<ParallelDictionaryLoader<dictionary_key_type, sparse, sharded>> parallel_loader;
if constexpr (sharded)
parallel_loader.emplace(*this);
std::atomic<size_t> new_size = 0;
QueryPipeline pipeline;
@ -654,6 +905,8 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
PullingPipelineExecutor executor(pipeline);
Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
while (executor.pull(block))
{
if (configuration.preallocate && new_size)
@ -661,7 +914,7 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
size_t current_new_size = new_size.exchange(0);
if (current_new_size)
{
LOG_TRACE(&Poco::Logger::get("HashedDictionary"), "Preallocated {} elements", current_new_size);
LOG_TRACE(log, "Preallocated {} elements", current_new_size);
resize(current_new_size);
}
}
@ -670,8 +923,14 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
resize(block.rows());
}
blockToAttributes(block);
if (parallel_loader)
parallel_loader->addBlock(block);
else
blockToAttributes(block, arena_holder, /* shard= */ 0);
}
if (parallel_loader)
parallel_loader->finish();
}
else
{
@ -684,8 +943,8 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
getFullName());
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::buildHierarchyParentToChildIndexIfNeeded()
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::buildHierarchyParentToChildIndexIfNeeded()
{
if (!dict_struct.hierarchical_attribute_index)
return;
@ -694,34 +953,37 @@ void HashedDictionary<dictionary_key_type, sparse>::buildHierarchyParentToChildI
hierarchical_index = getHierarchicalIndex();
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::calculateBytesAllocated()
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAllocated()
{
size_t attributes_size = attributes.size();
bytes_allocated += attributes_size * sizeof(attributes.front());
for (size_t i = 0; i < attributes_size; ++i)
{
getAttributeContainer(i, [&](const auto & container)
getAttributeContainer(i, [&](const auto & containers)
{
using ContainerType = std::decay_t<decltype(container)>;
using AttributeValueType = typename ContainerType::mapped_type;
bytes_allocated += sizeof(container);
if constexpr (sparse || std::is_same_v<AttributeValueType, Field>)
for (const auto & container : containers)
{
/// bucket_count() - Returns table size, that includes empty and deleted
/// size() - Returns table size, without empty and deleted
/// and since this is sparsehash, empty cells should not be significant,
/// and since items cannot be removed from the dictionary, deleted is also not important.
bytes_allocated += container.size() * (sizeof(KeyType) + sizeof(AttributeValueType));
bucket_count = container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count = container.getBufferSizeInCells();
using ContainerType = std::decay_t<decltype(container)>;
using AttributeValueType = typename ContainerType::mapped_type;
bytes_allocated += sizeof(container);
if constexpr (sparse || std::is_same_v<AttributeValueType, Field>)
{
/// bucket_count() - Returns table size, that includes empty and deleted
/// size() - Returns table size, without empty and deleted
/// and since this is sparsehash, empty cells should not be significant,
/// and since items cannot be removed from the dictionary, deleted is also not important.
bytes_allocated += container.size() * (sizeof(KeyType) + sizeof(AttributeValueType));
bucket_count = container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count = container.getBufferSizeInCells();
}
}
});
@ -733,17 +995,20 @@ void HashedDictionary<dictionary_key_type, sparse>::calculateBytesAllocated()
if (unlikely(attributes_size == 0))
{
bytes_allocated += sizeof(no_attributes_container);
for (const auto & container : no_attributes_containers)
{
bytes_allocated += sizeof(container);
if constexpr (sparse)
{
bytes_allocated += no_attributes_container.size() * (sizeof(KeyType));
bucket_count = no_attributes_container.bucket_count();
}
else
{
bytes_allocated += no_attributes_container.getBufferSizeInBytes();
bucket_count = no_attributes_container.getBufferSizeInCells();
if constexpr (sparse)
{
bytes_allocated += container.size() * (sizeof(KeyType));
bucket_count = container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count = container.getBufferSizeInCells();
}
}
}
@ -756,48 +1021,55 @@ void HashedDictionary<dictionary_key_type, sparse>::calculateBytesAllocated()
bytes_allocated += hierarchical_index_bytes_allocated;
}
bytes_allocated += string_arena.size();
for (const auto & arena : string_arenas)
bytes_allocated += arena->size();
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
Pipe HashedDictionary<dictionary_key_type, sparse>::read(const Names & column_names, size_t max_block_size, size_t num_streams) const
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
Pipe HashedDictionary<dictionary_key_type, sparse, sharded>::read(const Names & column_names, size_t max_block_size, size_t num_streams) const
{
PaddedPODArray<HashedDictionary::KeyType> keys;
/// NOTE: could read multiple shards in parallel
if (!attributes.empty())
{
const auto & attribute = attributes.front();
getAttributeContainer(0, [&](auto & container)
getAttributeContainer(0, [&](auto & containers)
{
keys.reserve(container.size());
for (const auto & [key, value] : container)
for (const auto & container : containers)
{
(void)(value);
keys.emplace_back(key);
}
keys.reserve(container.size());
if (attribute.is_nullable_set)
{
const auto & is_nullable_set = *attribute.is_nullable_set;
keys.reserve(is_nullable_set.size());
for (const auto & [key, _] : container)
{
keys.emplace_back(key);
}
for (auto & node : is_nullable_set)
keys.emplace_back(node.getKey());
if (attribute.is_nullable_set)
{
const auto & is_nullable_set = *attribute.is_nullable_set;
keys.reserve(is_nullable_set.size());
for (auto & node : is_nullable_set)
keys.emplace_back(node.getKey());
}
}
});
}
else
{
keys.reserve(no_attributes_container.size());
for (const auto & key : no_attributes_container)
for (const auto & container : no_attributes_containers)
{
if constexpr (sparse)
keys.emplace_back(key);
else
keys.emplace_back(key.getKey());
keys.reserve(keys.size() + container.size());
for (const auto & key : container)
{
if constexpr (sparse)
keys.emplace_back(key);
else
keys.emplace_back(key.getKey());
}
}
}
@ -820,9 +1092,9 @@ Pipe HashedDictionary<dictionary_key_type, sparse>::read(const Names & column_na
return result;
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
template <typename GetContainerFunc>
void HashedDictionary<dictionary_key_type, sparse>::getAttributeContainer(size_t attribute_index, GetContainerFunc && get_container_func)
void HashedDictionary<dictionary_key_type, sparse, sharded>::getAttributeContainer(size_t attribute_index, GetContainerFunc && get_container_func)
{
assert(attribute_index < attributes.size());
@ -834,16 +1106,16 @@ void HashedDictionary<dictionary_key_type, sparse>::getAttributeContainer(size_t
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
auto & attribute_container = std::get<CollectionType<ValueType>>(attribute.container);
std::forward<GetContainerFunc>(get_container_func)(attribute_container);
auto & attribute_containers = std::get<CollectionsHolder<ValueType>>(attribute.containers);
std::forward<GetContainerFunc>(get_container_func)(attribute_containers);
};
callOnDictionaryAttributeType(attribute.type, type_call);
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
template <typename GetContainerFunc>
void HashedDictionary<dictionary_key_type, sparse>::getAttributeContainer(size_t attribute_index, GetContainerFunc && get_container_func) const
void HashedDictionary<dictionary_key_type, sparse, sharded>::getAttributeContainer(size_t attribute_index, GetContainerFunc && get_container_func) const
{
const_cast<std::decay_t<decltype(*this)> *>(this)->getAttributeContainer(attribute_index, [&](auto & attribute_container)
{
@ -851,10 +1123,14 @@ void HashedDictionary<dictionary_key_type, sparse>::getAttributeContainer(size_t
});
}
template class HashedDictionary<DictionaryKeyType::Simple, true>;
template class HashedDictionary<DictionaryKeyType::Simple, false>;
template class HashedDictionary<DictionaryKeyType::Complex, true>;
template class HashedDictionary<DictionaryKeyType::Complex, false>;
template class HashedDictionary<DictionaryKeyType::Simple, false, false>;
template class HashedDictionary<DictionaryKeyType::Simple, false, true>;
template class HashedDictionary<DictionaryKeyType::Simple, true, false>;
template class HashedDictionary<DictionaryKeyType::Simple, true, true>;
template class HashedDictionary<DictionaryKeyType::Complex, false, false>;
template class HashedDictionary<DictionaryKeyType::Complex, false, true>;
template class HashedDictionary<DictionaryKeyType::Complex, true, false>;
template class HashedDictionary<DictionaryKeyType::Complex, true, true>;
void registerDictionaryHashed(DictionaryFactory & factory)
{
@ -883,32 +1159,76 @@ void registerDictionaryHashed(DictionaryFactory & factory)
std::string dictionary_layout_name;
if (dictionary_key_type == DictionaryKeyType::Simple)
dictionary_layout_name = "hashed";
else
dictionary_layout_name = "complex_key_hashed";
if (sparse)
dictionary_layout_name = "sparse_" + dictionary_layout_name;
const std::string dictionary_layout_prefix = ".layout." + dictionary_layout_name;
const bool preallocate = config.getBool(config_prefix + dictionary_layout_prefix + ".preallocate", false);
HashedDictionaryStorageConfiguration configuration{preallocate, require_nonempty, dict_lifetime};
if (dictionary_key_type == DictionaryKeyType::Simple)
{
if (sparse)
return std::make_unique<HashedDictionary<DictionaryKeyType::Simple, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
dictionary_layout_name = "sparse_hashed";
else
return std::make_unique<HashedDictionary<DictionaryKeyType::Simple, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
dictionary_layout_name = "hashed";
}
else
{
if (sparse)
return std::make_unique<HashedDictionary<DictionaryKeyType::Complex, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
dictionary_layout_name = "complex_key_sparse_hashed";
else
return std::make_unique<HashedDictionary<DictionaryKeyType::Complex, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
dictionary_layout_name = "complex_key_hashed";
}
const std::string dictionary_layout_prefix = ".layout." + dictionary_layout_name;
const bool preallocate = config.getBool(config_prefix + dictionary_layout_prefix + ".preallocate", false);
Int64 shards = config.getInt(config_prefix + dictionary_layout_prefix + ".shards", 1);
if (shards <= 0 || shards > 128)
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARDS parameter should be within [1, 128]", full_name);
Int64 shard_load_queue_backlog = config.getInt(config_prefix + dictionary_layout_prefix + ".shard_load_queue_backlog", 10000);
if (shard_load_queue_backlog <= 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARD_LOAD_QUEUE_BACKLOG parameter should be greater then zero", full_name);
HashedDictionaryStorageConfiguration configuration{
preallocate,
static_cast<UInt64>(shards),
static_cast<UInt64>(shard_load_queue_backlog),
require_nonempty,
dict_lifetime,
};
if (source_ptr->hasUpdateField() && shards > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARDS parameter does not supports for updatable source (UPDATE_FIELD)", full_name);
if (dictionary_key_type == DictionaryKeyType::Simple)
{
if (sparse)
{
if (shards > 1)
return std::make_unique<HashedDictionary<DictionaryKeyType::Simple, true, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else
return std::make_unique<HashedDictionary<DictionaryKeyType::Simple, true, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
}
else
{
if (shards > 1)
return std::make_unique<HashedDictionary<DictionaryKeyType::Simple, false, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else
return std::make_unique<HashedDictionary<DictionaryKeyType::Simple, false, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
}
}
else
{
if (sparse)
{
if (shards > 1)
return std::make_unique<HashedDictionary<DictionaryKeyType::Complex, true, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else
return std::make_unique<HashedDictionary<DictionaryKeyType::Complex, true, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
}
else
{
if (shards > 1)
return std::make_unique<HashedDictionary<DictionaryKeyType::Complex, false, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else
return std::make_unique<HashedDictionary<DictionaryKeyType::Complex, false, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
}
}
};

View File

@ -27,13 +27,20 @@ namespace DB
struct HashedDictionaryStorageConfiguration
{
const bool preallocate;
const UInt64 shards;
const UInt64 shard_load_queue_backlog;
const bool require_nonempty;
const DictionaryLifetime lifetime;
};
template <DictionaryKeyType dictionary_key_type, bool sparse>
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
class ParallelDictionaryLoader;
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
class HashedDictionary final : public IDictionary
{
friend class ParallelDictionaryLoader<dictionary_key_type, sparse, sharded>;
public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::Simple, UInt64, StringRef>;
@ -43,6 +50,7 @@ public:
DictionarySourcePtr source_ptr_,
const HashedDictionaryStorageConfiguration & configuration_,
BlockPtr update_field_loaded_block_ = nullptr);
~HashedDictionary() override;
std::string getTypeName() const override
{
@ -76,7 +84,12 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<HashedDictionary<dictionary_key_type, sparse>>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block);
return std::make_shared<HashedDictionary<dictionary_key_type, sparse, sharded>>(
getDictionaryID(),
dict_struct,
source_ptr->clone(),
configuration,
update_field_loaded_block);
}
DictionarySourcePtr getSource() const override { return source_ptr; }
@ -156,6 +169,9 @@ private:
template <typename Value>
using CollectionType = std::conditional_t<sparse, CollectionTypeSparse<Value>, CollectionTypeNonSparse<Value>>;
template <typename Value>
using CollectionsHolder = std::vector<CollectionType<Value>>;
using NoAttributesCollectionType = std::conditional_t<sparse, NoAttributesCollectionTypeSparse, NoAttributesCollectionTypeNonSparse>;
using NullableSet = HashSet<KeyType, DefaultHash<KeyType>>;
@ -166,36 +182,36 @@ private:
std::optional<NullableSet> is_nullable_set;
std::variant<
CollectionType<UInt8>,
CollectionType<UInt16>,
CollectionType<UInt32>,
CollectionType<UInt64>,
CollectionType<UInt128>,
CollectionType<UInt256>,
CollectionType<Int8>,
CollectionType<Int16>,
CollectionType<Int32>,
CollectionType<Int64>,
CollectionType<Int128>,
CollectionType<Int256>,
CollectionType<Decimal32>,
CollectionType<Decimal64>,
CollectionType<Decimal128>,
CollectionType<Decimal256>,
CollectionType<DateTime64>,
CollectionType<Float32>,
CollectionType<Float64>,
CollectionType<UUID>,
CollectionType<IPv4>,
CollectionType<IPv6>,
CollectionType<StringRef>,
CollectionType<Array>>
container;
CollectionsHolder<UInt8>,
CollectionsHolder<UInt16>,
CollectionsHolder<UInt32>,
CollectionsHolder<UInt64>,
CollectionsHolder<UInt128>,
CollectionsHolder<UInt256>,
CollectionsHolder<Int8>,
CollectionsHolder<Int16>,
CollectionsHolder<Int32>,
CollectionsHolder<Int64>,
CollectionsHolder<Int128>,
CollectionsHolder<Int256>,
CollectionsHolder<Decimal32>,
CollectionsHolder<Decimal64>,
CollectionsHolder<Decimal128>,
CollectionsHolder<Decimal256>,
CollectionsHolder<DateTime64>,
CollectionsHolder<Float32>,
CollectionsHolder<Float64>,
CollectionsHolder<UUID>,
CollectionsHolder<IPv4>,
CollectionsHolder<IPv6>,
CollectionsHolder<StringRef>,
CollectionsHolder<Array>>
containers;
};
void createAttributes();
void blockToAttributes(const Block & block);
void blockToAttributes(const Block & block, DictionaryKeysArenaHolder<dictionary_key_type> & arena_holder, UInt64 shard);
void updateData();
@ -205,6 +221,22 @@ private:
void calculateBytesAllocated();
UInt64 getShard(UInt64 key) const
{
if constexpr (!sharded)
return 0;
/// NOTE: function here should not match with the DefaultHash<> since
/// it used for the HashMap/sparse_hash_map.
return intHashCRC32(key) % configuration.shards;
}
UInt64 getShard(StringRef key) const
{
if constexpr (!sharded)
return 0;
return StringRefHash()(key) % configuration.shards;
}
template <typename AttributeType, bool is_nullable, typename ValueSetter, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
@ -220,6 +252,8 @@ private:
void resize(size_t added_rows);
Poco::Logger * log;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const HashedDictionaryStorageConfiguration configuration;
@ -228,21 +262,25 @@ private:
size_t bytes_allocated = 0;
size_t hierarchical_index_bytes_allocated = 0;
size_t element_count = 0;
std::atomic<size_t> element_count = 0;
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
mutable std::atomic<size_t> found_count{0};
BlockPtr update_field_loaded_block;
Arena string_arena;
NoAttributesCollectionType no_attributes_container;
std::vector<std::unique_ptr<Arena>> string_arenas;
std::vector<NoAttributesCollectionType> no_attributes_containers;
DictionaryHierarchicalParentToChildIndexPtr hierarchical_index;
};
extern template class HashedDictionary<DictionaryKeyType::Simple, false>;
extern template class HashedDictionary<DictionaryKeyType::Simple, true>;
extern template class HashedDictionary<DictionaryKeyType::Simple, /* sparse= */ false, /* sharded= */ false>;
extern template class HashedDictionary<DictionaryKeyType::Simple, /* sparse= */ false, /* sharded= */ true>;
extern template class HashedDictionary<DictionaryKeyType::Simple, /* sparse= */ true, /* sharded= */ false>;
extern template class HashedDictionary<DictionaryKeyType::Simple, /* sparse= */ true, /* sharded= */ true>;
extern template class HashedDictionary<DictionaryKeyType::Complex, false>;
extern template class HashedDictionary<DictionaryKeyType::Complex, true>;
extern template class HashedDictionary<DictionaryKeyType::Complex, /* sparse= */ false, /* sharded= */ false>;
extern template class HashedDictionary<DictionaryKeyType::Complex, /* sparse= */ false, /* sharded= */ true>;
extern template class HashedDictionary<DictionaryKeyType::Complex, /* sparse= */ true, /* sharded= */ false>;
extern template class HashedDictionary<DictionaryKeyType::Complex, /* sparse= */ true, /* sharded= */ true>;
}

View File

@ -0,0 +1,91 @@
<test>
<substitutions>
<substitution>
<name>layout_suffix</name>
<values>
<value>HASHED</value>
<value>SPARSE_HASHED</value>
</values>
</substitution>
<substitution>
<name>shards</name>
<values>
<value>1</value>
<value>8</value>
<value>16</value>
</values>
</substitution>
</substitutions>
<create_query>
CREATE TABLE simple_key_dictionary_source_table
(
id UInt64,
value_int UInt64
) ENGINE = Memory
</create_query>
<create_query>
CREATE TABLE complex_key_dictionary_source_table
(
id UInt64,
id_key String,
value_int UInt64
) ENGINE = Memory
</create_query>
<create_query>
CREATE DICTIONARY IF NOT EXISTS simple_key_{layout_suffix}_dictionary_s{shards}
(
id UInt64,
value_int UInt64
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(TABLE 'simple_key_dictionary_source_table'))
LAYOUT({layout_suffix}(SHARDS {shards}))
LIFETIME(0)
</create_query>
<create_query>
CREATE DICTIONARY IF NOT EXISTS complex_key_{layout_suffix}_dictionary_s{shards}
(
id UInt64,
id_key String,
value_int UInt64
)
PRIMARY KEY id, id_key
SOURCE(CLICKHOUSE(TABLE 'complex_key_dictionary_source_table'))
LAYOUT(COMPLEX_KEY_{layout_suffix}(SHARDS {shards}))
LIFETIME(0)
</create_query>
<fill_query>INSERT INTO simple_key_dictionary_source_table SELECT number, number FROM numbers(3_000_000)</fill_query>
<fill_query>INSERT INTO complex_key_dictionary_source_table SELECT number, toString(number), number FROM numbers(2_000_000)</fill_query>
<fill_query>SYSTEM RELOAD DICTIONARY simple_key_{layout_suffix}_dictionary_s{shards}</fill_query>
<fill_query>SYSTEM RELOAD DICTIONARY complex_key_{layout_suffix}_dictionary_s{shards}</fill_query>
<query>SYSTEM RELOAD DICTIONARY simple_key_{layout_suffix}_dictionary_s{shards}</query>
<query>SYSTEM RELOAD DICTIONARY complex_key_{layout_suffix}_dictionary_s{shards}</query>
<query>
WITH rand64() % 3_000_000 as key
SELECT dictHas('default.simple_key_{layout_suffix}_dictionary_s{shards}', key)
FROM numbers(3_000_000)
FORMAT Null
</query>
<query>
WITH (rand64() % 2_000_000, toString(rand64() % 2_000_000)) as key
SELECT dictHas('default.complex_key_{layout_suffix}_dictionary_s{shards}', key)
FROM numbers(2_000_000)
FORMAT Null
</query>
<drop_query>DROP DICTIONARY IF EXISTS simple_key_{layout_suffix}_dictionary_s{shards}</drop_query>
<drop_query>DROP DICTIONARY IF EXISTS complex_key_{layout_suffix}_dictionary_s{shards}</drop_query>
<drop_query>DROP TABLE IF EXISTS simple_key_dictionary_source_table</drop_query>
<drop_query>DROP TABLE IF EXISTS complex_key_dictionary_source_table</drop_query>
</test>

View File

@ -8,6 +8,14 @@
</values>
</substitution>
<substitution>
<name>dictionary_shards</name>
<values>
<value>1</value>
<value>16</value>
</values>
</substitution>
<substitution>
<name>func</name>
<values>
@ -26,14 +34,14 @@
</create_query>
<create_query>
CREATE DICTIONARY hierarchical_{dictionary_layout}_dictionary
CREATE DICTIONARY hierarchical_{dictionary_layout}_shards{dictionary_shards}_dictionary
(
id UInt64,
parent_id UInt64 HIERARCHICAL
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'default' TABLE 'hierarchical_dictionary_source_table'))
LAYOUT({dictionary_layout})
LAYOUT({dictionary_layout}(SHARDS {dictionary_shards}))
LIFETIME(0);
</create_query>
@ -65,10 +73,10 @@
SELECT {func}('hierarchical_flat_dictionary', number + 1) FROM numbers(1000000) FORMAT Null;
</query>
<query>
SELECT {func}('hierarchical_{dictionary_layout}_dictionary', number + 1) FROM numbers(1000000) FORMAT Null;
SELECT {func}('hierarchical_{dictionary_layout}_shards{dictionary_shards}_dictionary', number + 1) FROM numbers(1000000) FORMAT Null;
</query>
<drop_query>DROP DICTIONARY IF EXISTS hierarchical_{dictionary_layout}_dictionary;</drop_query>
<drop_query>DROP DICTIONARY IF EXISTS hierarchical_{dictionary_layout}_shards{dictionary_shards}_dictionary;</drop_query>
<drop_query>DROP DICTIONARY IF EXISTS hierarchical_flat_dictionary;</drop_query>
<drop_query>DROP TABLE IF EXISTS hierarchical_dictionary_source_table;</drop_query>
</test>

View File

@ -54,7 +54,6 @@ $CLICKHOUSE_CLIENT -nm -q "
LAYOUT(SPARSE_HASHED(PREALLOCATE 1))
LIFETIME(0);
SHOW CREATE DICTIONARY dict_01509_preallocate;
SYSTEM RELOAD DICTIONARY dict_01509_preallocate;
"
(
# start new shell to avoid overriding variables for other client invocation

View File

@ -0,0 +1,37 @@
-- { echoOn }
create dictionary dict (key UInt64, value UInt16) primary key key source(clickhouse(table data)) layout(sparse_hashed()) lifetime(0);
show create dict;
CREATE DICTIONARY default.dict\n(\n `key` UInt64,\n `value` UInt16\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(TABLE data))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED())
system reload dictionary dict;
select element_count from system.dictionaries where database = currentDatabase() and name = 'dict';
100000
select count() from data where dictGetUInt16('dict', 'value', key) != value;
0
create dictionary dict_10 (key UInt64, value UInt16) primary key key source(clickhouse(table data)) layout(sparse_hashed(shards 10)) lifetime(0);
show create dict_10;
CREATE DICTIONARY default.dict_10\n(\n `key` UInt64,\n `value` UInt16\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(TABLE data))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(SHARDS 10))
system reload dictionary dict_10;
select element_count from system.dictionaries where database = currentDatabase() and name = 'dict_10';
100000
select count() from data where dictGetUInt16('dict_10', 'value', key) != value;
0
create dictionary dict_10_uint8 (key UInt8, value UInt16) primary key key source(clickhouse(table data)) layout(sparse_hashed(shards 10)) lifetime(0);
show create dict_10_uint8;
CREATE DICTIONARY default.dict_10_uint8\n(\n `key` UInt8,\n `value` UInt16\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(TABLE data))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(SHARDS 10))
system reload dictionary dict_10_uint8;
select element_count from system.dictionaries where database = currentDatabase() and name = 'dict_10';
100000
select count() from data where dictGetUInt16('dict_10_uint8', 'value', key) != value;
0
create dictionary dict_10_string (key String, value UInt16) primary key key source(clickhouse(table data_string)) layout(sparse_hashed(shards 10)) lifetime(0);
show create dict_10_string;
CREATE DICTIONARY default.dict_10_string\n(\n `key` String,\n `value` UInt16\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(TABLE data_string))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(SHARDS 10))
system reload dictionary dict_10_string; -- { serverError CANNOT_PARSE_TEXT }
create dictionary dict_10_incremental (key UInt64, value UInt16) primary key key source(clickhouse(table data_last_access update_field last_access)) layout(sparse_hashed(shards 10)) lifetime(0);
system reload dictionary dict_10_incremental; -- { serverError BAD_ARGUMENTS }
create dictionary complex_dict_10 (k1 UInt64, k2 UInt64, value UInt16) primary key k1, k2 source(clickhouse(table complex_data)) layout(complex_key_sparse_hashed(shards 10)) lifetime(0);
system reload dictionary complex_dict_10;
select element_count from system.dictionaries where database = currentDatabase() and name = 'complex_dict_10';
100000
select count() from complex_data where dictGetUInt16('complex_dict_10', 'value', (k1, k2)) != value;
0

View File

@ -0,0 +1,44 @@
drop dictionary if exists dict;
drop dictionary if exists dict_10;
drop dictionary if exists dict_10_uint8;
drop dictionary if exists dict_10_string;
drop dictionary if exists dict_10_incremental;
drop dictionary if exists complex_dict_10;
drop table if exists data;
drop table if exists data_string;
drop table if exists complex_data;
create table data (key UInt64, value UInt16) engine=Memory() as select number, number from numbers(1e5);
create table data_string (key String, value UInt16) engine=Memory() as select 'foo' || number::String, number from numbers(1e5);
create table complex_data (k1 UInt64, k2 UInt64, value UInt16) engine=Memory() as select number, number, number from numbers(1e5);
-- { echoOn }
create dictionary dict (key UInt64, value UInt16) primary key key source(clickhouse(table data)) layout(sparse_hashed()) lifetime(0);
show create dict;
system reload dictionary dict;
select element_count from system.dictionaries where database = currentDatabase() and name = 'dict';
select count() from data where dictGetUInt16('dict', 'value', key) != value;
create dictionary dict_10 (key UInt64, value UInt16) primary key key source(clickhouse(table data)) layout(sparse_hashed(shards 10)) lifetime(0);
show create dict_10;
system reload dictionary dict_10;
select element_count from system.dictionaries where database = currentDatabase() and name = 'dict_10';
select count() from data where dictGetUInt16('dict_10', 'value', key) != value;
create dictionary dict_10_uint8 (key UInt8, value UInt16) primary key key source(clickhouse(table data)) layout(sparse_hashed(shards 10)) lifetime(0);
show create dict_10_uint8;
system reload dictionary dict_10_uint8;
select element_count from system.dictionaries where database = currentDatabase() and name = 'dict_10';
select count() from data where dictGetUInt16('dict_10_uint8', 'value', key) != value;
create dictionary dict_10_string (key String, value UInt16) primary key key source(clickhouse(table data_string)) layout(sparse_hashed(shards 10)) lifetime(0);
show create dict_10_string;
system reload dictionary dict_10_string; -- { serverError CANNOT_PARSE_TEXT }
create dictionary dict_10_incremental (key UInt64, value UInt16) primary key key source(clickhouse(table data_last_access update_field last_access)) layout(sparse_hashed(shards 10)) lifetime(0);
system reload dictionary dict_10_incremental; -- { serverError BAD_ARGUMENTS }
create dictionary complex_dict_10 (k1 UInt64, k2 UInt64, value UInt16) primary key k1, k2 source(clickhouse(table complex_data)) layout(complex_key_sparse_hashed(shards 10)) lifetime(0);
system reload dictionary complex_dict_10;
select element_count from system.dictionaries where database = currentDatabase() and name = 'complex_dict_10';
select count() from complex_data where dictGetUInt16('complex_dict_10', 'value', (k1, k2)) != value;