Allow to configure queue backlog of the parallel hashed dictionary loader

v2: Decrease default parallel_queue_backlog to 10000 (same speed)
v3: Rename parallel_queue_backlog to per_shard_load_backlog
v3: Rename per_shard_load_backlog to shard_load_queue_backlog
v4: Fix documentation
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-11-25 11:14:51 +01:00
parent 79ad81dfdf
commit 99063b152f
3 changed files with 25 additions and 12 deletions

View File

@ -164,6 +164,15 @@ Configuration example:
<layout>
<hashed>
<shards>10</shards>
<!-- Size of the backlog for blocks in parallel queue.
Since the bottleneck in parallel loading is rehash, and so to avoid
stalling because of thread is doing rehash, you need to have some
backlog.
10000 is good balance between memory and speed.
Even for 10e10 elements and can handle all the load without starvation. -->
<shard_load_queue_backlog>10000</shard_load_queue_backlog>
</hashed>
</layout>
```
@ -171,7 +180,7 @@ Configuration example:
or
``` sql
LAYOUT(HASHED(SHARDS 10))
LAYOUT(HASHED(SHARDS 10 [SHARD_LOAD_QUEUE_BACKLOG 10000]))
```
### sparse_hashed
@ -209,6 +218,7 @@ Configuration example:
<complex_key_hashed>
<preallocate>0</preallocate>
<shards>1</shards>
<!-- <shard_load_queue_backlog>10000</shard_load_queue_backlog> -->
</complex_key_hashed>
</layout>
```
@ -216,7 +226,7 @@ Configuration example:
or
``` sql
LAYOUT(COMPLEX_KEY_HASHED([PREALLOCATE 0] [SHARDS 1]))
LAYOUT(COMPLEX_KEY_HASHED([PREALLOCATE 0] [SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000]))
```
### complex_key_sparse_hashed
@ -237,7 +247,7 @@ Configuration example:
or
``` sql
LAYOUT(COMPLEX_KEY_SPARSE_HASHED([PREALLOCATE 0] [SHARDS 1]))
LAYOUT(COMPLEX_KEY_SPARSE_HASHED([PREALLOCATE 0] [SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000]))
```
### hashed_array

View File

@ -55,22 +55,22 @@ class ParallelDictionaryLoader : public boost::noncopyable
using Queue = ConcurrentBoundedQueue<Block>;
public:
explicit ParallelDictionaryLoader(HashedDictionary & dictionary_, size_t max_fill_ = 100'000)
explicit ParallelDictionaryLoader(HashedDictionary & dictionary_)
: dictionary(dictionary_)
, shards(dictionary.configuration.shards)
, max_fill(max_fill_)
, simple_key(dictionary.dict_struct.getKeysSize() == 1)
, pool(shards)
, shards_queues(shards)
{
LOG_TRACE(dictionary.log, "Will load the dictionary using {} threads", shards);
UInt64 backlog = dictionary.configuration.shard_load_queue_backlog;
LOG_TRACE(dictionary.log, "Will load the dictionary using {} threads (with {} backlog)", shards, backlog);
shards_slots.resize(shards);
std::generate(shards_slots.begin(), shards_slots.end(), [n = 0]() mutable { return n++; });
for (size_t shard = 0; shard < shards; ++shard)
{
shards_queues[shard].emplace(max_fill);
shards_queues[shard].emplace(backlog);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
{
if (thread_group)
@ -124,7 +124,6 @@ public:
private:
HashedDictionary & dictionary;
const size_t shards;
const size_t max_fill;
bool simple_key;
ThreadPool pool;
std::vector<std::optional<Queue>> shards_queues;
@ -1116,14 +1115,17 @@ void registerDictionaryHashed(DictionaryFactory & factory)
const bool preallocate = config.getBool(config_prefix + dictionary_layout_prefix + ".preallocate", false);
Int64 shards = config.getInt(config_prefix + dictionary_layout_prefix + ".shards", 1);
if (!shards)
shards = 1;
if (shards < 0 || shards > 128)
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARDS parameter should be within [0, 128]", full_name);
if (shards <= 0 || shards > 128)
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARDS parameter should be within [1, 128]", full_name);
Int64 shard_load_queue_backlog = config.getInt(config_prefix + dictionary_layout_prefix + ".shard_load_queue_backlog", 10000);
if (shard_load_queue_backlog <= 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARD_LOAD_QUEUE_BACKLOG parameter should be greater then zero", full_name);
HashedDictionaryStorageConfiguration configuration{
preallocate,
static_cast<UInt64>(shards),
static_cast<UInt64>(shard_load_queue_backlog),
require_nonempty,
dict_lifetime,
};

View File

@ -28,6 +28,7 @@ struct HashedDictionaryStorageConfiguration
{
const bool preallocate;
const UInt64 shards;
const UInt64 shard_load_queue_backlog;
const bool require_nonempty;
const DictionaryLifetime lifetime;
};