Implement separate queue for parallel loader of hashed dictionaries

Previous patches in this series has a bottleneck in rehash(). This is
the most slowest operation when insert lots of rows into the hashtable
and eventually all that thread pool sometimes work as the most slowest
thread since we did not have any queue of blocks.

This patch adds such queue and now it scales linearly, so initialy with
1 thread I had ~4 hours for 10e9 elements (UInt64 key, UInt16 value),
after this patch it works in 16 minutes with 16 threads (well actually I
have to use 32 threads because of distribution of data in the source
table).

And now with 16 threads it works 16 times faster.

Also this patch adds more optimal block splitting for the non-complex
dictionaries, and usual block splitting for complex dictionaries.
But anyway this moves the overhead from the loading into the hashtable
threads out to the reader thread, and this is better, since reader does
not uses that much CPU.

v2: fix use-after-free on failed load (add missing wait in dtor)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-11-23 13:35:35 +01:00
parent 5d0fd3cdc4
commit 79ad81dfdf
2 changed files with 186 additions and 42 deletions

View File

@ -1,17 +1,23 @@
#include "HashedDictionary.h"
#include <Common/ArenaUtils.h>
#include <Common/ThreadPool.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/createBlockSelector.h>
#include <Dictionaries//DictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Common/ArenaUtils.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Core/Defines.h>
#include <boost/noncopyable.hpp>
namespace
{
@ -36,8 +42,148 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int DICTIONARY_IS_EMPTY;
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
}
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded> class HashedDictionary;
/// Implementation parallel dictionary load for SHARDS
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
class ParallelDictionaryLoader : public boost::noncopyable
{
using HashedDictionary = HashedDictionary<dictionary_key_type, sparse, sharded>;
using Queue = ConcurrentBoundedQueue<Block>;
public:
explicit ParallelDictionaryLoader(HashedDictionary & dictionary_, size_t max_fill_ = 100'000)
: dictionary(dictionary_)
, shards(dictionary.configuration.shards)
, max_fill(max_fill_)
, simple_key(dictionary.dict_struct.getKeysSize() == 1)
, pool(shards)
, shards_queues(shards)
{
LOG_TRACE(dictionary.log, "Will load the dictionary using {} threads", shards);
shards_slots.resize(shards);
std::generate(shards_slots.begin(), shards_slots.end(), [n = 0]() mutable { return n++; });
for (size_t shard = 0; shard < shards; ++shard)
{
shards_queues[shard].emplace(max_fill);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("HashedDictLoad");
threadWorker(shard);
});
}
}
void addBlock(Block block)
{
Blocks shards_blocks(shards);
for (size_t shard = 0; shard < shards; ++shard)
shards_blocks[shard] = block.cloneEmpty();
IColumn::Selector selector;
if (simple_key)
selector = createBlockSelector<UInt64>(*block.getByPosition(0).column, shards_slots);
else
selector = createComplexBlockSelector(block, shards_slots);
splitBlock(selector, block, shards_blocks);
for (size_t shard = 0; shard < shards; ++shard)
{
if (!shards_queues[shard]->push(std::move(shards_blocks[shard])))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to shards queue #{}", shard);
}
}
void finish()
{
for (auto & queue : shards_queues)
queue->finish();
Stopwatch watch;
pool.wait();
UInt64 elapsed_us = watch.elapsedMicroseconds();
LOG_TRACE(dictionary.log, "Processing the tail took {}ms", elapsed_us);
}
~ParallelDictionaryLoader()
{
for (auto & queue : shards_queues)
queue->clearAndFinish();
pool.wait();
}
private:
HashedDictionary & dictionary;
const size_t shards;
const size_t max_fill;
bool simple_key;
ThreadPool pool;
std::vector<std::optional<Queue>> shards_queues;
std::vector<UInt64> shards_slots;
void threadWorker(size_t shard)
{
Block block;
auto & shard_queue = *shards_queues[shard];
while (shard_queue.pop(block))
{
Stopwatch watch;
dictionary.blockToAttributes(block);
UInt64 elapsed_us = watch.elapsedMicroseconds();
if (elapsed_us > 1'000'000)
LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {}).", shard, elapsed_us, block.rows());
}
if (!shard_queue.isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pull non finished shards queue #{}", shard);
}
/// Split block to shards smaller block, using 'selector'.
void splitBlock(const IColumn::Selector & selector, const Block & block, Blocks & splitted_blocks)
{
size_t columns = block.columns();
for (size_t col = 0; col < columns; ++col)
{
MutableColumns splitted_columns = block.getByPosition(col).column->scatter(shards, selector);
for (size_t shard = 0; shard < shards; ++shard)
splitted_blocks[shard].getByPosition(col).column = std::move(splitted_columns[shard]);
}
}
IColumn::Selector createComplexBlockSelector(const Block & block, const std::vector<UInt64> & slots)
{
size_t num_rows = block.rows();
IColumn::Selector selector(num_rows);
size_t skip_keys_size_offset = dictionary.dict_struct.getKeysSize();
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
for (size_t i = 0; i < skip_keys_size_offset; ++i)
key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns, arena_holder.getComplexKeyArena());
for (size_t i = 0; i < num_rows; ++i)
{
auto key = keys_extractor.extractCurrentKey();
size_t shard = dictionary.getShard(key);
selector[i] = slots[shard];
}
return selector;
}
};
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
HashedDictionary<dictionary_key_type, sparse, sharded>::HashedDictionary(
const StorageID & dict_id_,
@ -46,6 +192,7 @@ HashedDictionary<dictionary_key_type, sparse, sharded>::HashedDictionary(
const HashedDictionaryStorageConfiguration & configuration_,
BlockPtr update_field_loaded_block_)
: IDictionary(dict_id_)
, log(&Poco::Logger::get("HashedDictionary"))
, dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_))
, configuration(configuration_)
@ -488,12 +635,12 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
if (update_field_loaded_block)
{
resize(update_field_loaded_block->rows());
blockToAttributes(*update_field_loaded_block.get(), /* current_shard= */{});
blockToAttributes(*update_field_loaded_block.get());
}
}
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(const Block & block, std::optional<UInt64> current_shard)
size_t HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(const Block & block)
{
size_t skip_keys_size_offset = dict_struct.getKeysSize();
size_t new_element_count = 0;
@ -518,10 +665,6 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(c
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
if (current_shard && getShard(key) != *current_shard)
continue;
size_t shard = getShard(key);
if constexpr (std::is_same_v<KeyType, StringRef>)
@ -529,9 +672,11 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(c
no_attributes_containers[shard].insert(key);
keys_extractor.rollbackCurrentKey();
++new_element_count;
}
return;
element_count += new_element_count;
return new_element_count;
}
for (size_t attribute_index = 0; attribute_index < attributes_size; ++attribute_index)
@ -548,9 +693,6 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(c
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
if (current_shard && getShard(key) != *current_shard)
continue;
size_t shard = getShard(key);
auto & container = containers[shard];
@ -597,6 +739,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(c
}
element_count += new_element_count;
return new_element_count;
}
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
@ -686,12 +829,9 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
{
if (!source_ptr->hasUpdateField())
{
std::optional<ThreadPool> pool;
std::optional<ParallelDictionaryLoader<dictionary_key_type, sparse, sharded>> parallel_loader;
if (configuration.shards > 1)
{
pool.emplace(configuration.shards);
LOG_TRACE(&Poco::Logger::get("HashedDictionary"), "Will load the dictionary with {} threads", configuration.shards);
}
parallel_loader.emplace(*this);
std::atomic<size_t> new_size = 0;
@ -703,6 +843,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
if (configuration.preallocate && new_size)
@ -710,7 +851,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
size_t current_new_size = new_size.exchange(0);
if (current_new_size)
{
LOG_TRACE(&Poco::Logger::get("HashedDictionary"), "Preallocated {} elements", current_new_size);
LOG_TRACE(log, "Preallocated {} elements", current_new_size);
resize(current_new_size);
}
}
@ -719,25 +860,14 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
resize(block.rows());
}
if (pool)
{
for (size_t shard = 0; shard < configuration.shards; ++shard)
{
pool->scheduleOrThrowOnError([this, &block, shard, thread_group = CurrentThread::getGroup()]
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
blockToAttributes(block, shard);
});
}
pool->wait();
}
if (parallel_loader)
parallel_loader->addBlock(block);
else
{
blockToAttributes(block, /* current_shard= */ {});
}
blockToAttributes(block);
}
if (parallel_loader)
parallel_loader->finish();
}
else
{
@ -968,12 +1098,19 @@ void registerDictionaryHashed(DictionaryFactory & factory)
std::string dictionary_layout_name;
if (dictionary_key_type == DictionaryKeyType::Simple)
dictionary_layout_name = "hashed";
{
if (sparse)
dictionary_layout_name = "sparse_hashed";
else
dictionary_layout_name = "hashed";
}
else
dictionary_layout_name = "complex_key_hashed";
if (sparse)
dictionary_layout_name = "sparse_" + dictionary_layout_name;
{
if (sparse)
dictionary_layout_name = "complex_key_sparse_hashed";
else
dictionary_layout_name = "complex_key_hashed";
}
const std::string dictionary_layout_prefix = ".layout." + dictionary_layout_name;
const bool preallocate = config.getBool(config_prefix + dictionary_layout_prefix + ".preallocate", false);

View File

@ -32,9 +32,14 @@ struct HashedDictionaryStorageConfiguration
const DictionaryLifetime lifetime;
};
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
class ParallelDictionaryLoader;
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
class HashedDictionary final : public IDictionary
{
friend class ParallelDictionaryLoader<dictionary_key_type, sparse, sharded>;
public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::Simple, UInt64, StringRef>;
@ -204,7 +209,7 @@ private:
void createAttributes();
void blockToAttributes(const Block & block, std::optional<UInt64> current_shard);
size_t blockToAttributes(const Block & block);
void updateData();
@ -242,6 +247,8 @@ private:
void resize(size_t added_rows);
Poco::Logger * log;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const HashedDictionaryStorageConfiguration configuration;