mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #60926 from ClickHouse/vdimir/hashed_dictinary_load_fix
Fix possible stuck on error in HashedDictionaryParallelLoader
This commit is contained in:
commit
7983f883e5
@ -1078,7 +1078,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::calculateBytesAllocate
|
|||||||
bytes_allocated += container.allocated_bytes();
|
bytes_allocated += container.allocated_bytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
bucket_count = container.capacity();
|
bucket_count += container.capacity();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1089,6 +1089,13 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::calculateBytesAllocate
|
|||||||
bytes_allocated += container.size();
|
bytes_allocated += container.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `bucket_count` should be a sum over all shards,
|
||||||
|
/// but it should not be a sum over all attributes, since it is used to
|
||||||
|
/// calculate load_factor like this: `element_count / bucket_count`
|
||||||
|
/// While element_count is a sum over all shards, not over all attributes.
|
||||||
|
if (attributes.size())
|
||||||
|
bucket_count /= attributes.size();
|
||||||
|
|
||||||
if (update_field_loaded_block)
|
if (update_field_loaded_block)
|
||||||
bytes_allocated += update_field_loaded_block->allocatedBytes();
|
bytes_allocated += update_field_loaded_block->allocatedBytes();
|
||||||
|
|
||||||
@ -1167,17 +1174,24 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)
|
|||||||
if (shards <= 0 || 128 < shards)
|
if (shards <= 0 || 128 < shards)
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARDS parameter should be within [1, 128]", full_name);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARDS parameter should be within [1, 128]", full_name);
|
||||||
|
|
||||||
HashedArrayDictionaryStorageConfiguration configuration{require_nonempty, dict_lifetime, static_cast<size_t>(shards)};
|
Int64 shard_load_queue_backlog = config.getInt(config_prefix + dictionary_layout_prefix + ".shard_load_queue_backlog", 10000);
|
||||||
|
if (shard_load_queue_backlog <= 0)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: SHARD_LOAD_QUEUE_BACKLOG parameter should be greater then zero", full_name);
|
||||||
|
|
||||||
if (source_ptr->hasUpdateField() && shards > 1)
|
if (source_ptr->hasUpdateField() && shards > 1)
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: SHARDS parameter does not supports for updatable source (UPDATE_FIELD)", full_name);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: SHARDS parameter does not supports for updatable source (UPDATE_FIELD)", full_name);
|
||||||
|
|
||||||
|
HashedArrayDictionaryStorageConfiguration configuration{require_nonempty, dict_lifetime, static_cast<size_t>(shards), static_cast<UInt64>(shard_load_queue_backlog)};
|
||||||
|
|
||||||
ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
|
ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
|
||||||
const auto & settings = context->getSettingsRef();
|
const auto & settings = context->getSettingsRef();
|
||||||
|
|
||||||
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
|
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
|
||||||
configuration.use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor;
|
configuration.use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor;
|
||||||
|
|
||||||
|
if (settings.max_execution_time.totalSeconds() > 0)
|
||||||
|
configuration.load_timeout = std::chrono::seconds(settings.max_execution_time.totalSeconds());
|
||||||
|
|
||||||
if (dictionary_key_type == DictionaryKeyType::Simple)
|
if (dictionary_key_type == DictionaryKeyType::Simple)
|
||||||
{
|
{
|
||||||
if (shards > 1)
|
if (shards > 1)
|
||||||
|
@ -29,6 +29,7 @@ struct HashedArrayDictionaryStorageConfiguration
|
|||||||
size_t shards = 1;
|
size_t shards = 1;
|
||||||
size_t shard_load_queue_backlog = 10000;
|
size_t shard_load_queue_backlog = 10000;
|
||||||
bool use_async_executor = false;
|
bool use_async_executor = false;
|
||||||
|
std::chrono::seconds load_timeout{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
template <DictionaryKeyType dictionary_key_type, bool sharded>
|
template <DictionaryKeyType dictionary_key_type, bool sharded>
|
||||||
|
@ -67,6 +67,7 @@ struct HashedDictionaryConfiguration
|
|||||||
const bool require_nonempty;
|
const bool require_nonempty;
|
||||||
const DictionaryLifetime lifetime;
|
const DictionaryLifetime lifetime;
|
||||||
bool use_async_executor = false;
|
bool use_async_executor = false;
|
||||||
|
const std::chrono::seconds load_timeout{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
|
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
|
||||||
|
@ -31,6 +31,7 @@ template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded> clas
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -50,9 +51,10 @@ public:
|
|||||||
, shards(dictionary.configuration.shards)
|
, shards(dictionary.configuration.shards)
|
||||||
, pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, CurrentMetrics::HashedDictionaryThreadsScheduled, shards)
|
, pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, CurrentMetrics::HashedDictionaryThreadsScheduled, shards)
|
||||||
, shards_queues(shards)
|
, shards_queues(shards)
|
||||||
|
, loading_timeout(dictionary.configuration.load_timeout)
|
||||||
{
|
{
|
||||||
UInt64 backlog = dictionary.configuration.shard_load_queue_backlog;
|
UInt64 backlog = dictionary.configuration.shard_load_queue_backlog;
|
||||||
LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog)", dictionary_name, shards, backlog);
|
LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog and timeout {} sec)", dictionary_name, shards, backlog, loading_timeout.count());
|
||||||
|
|
||||||
shards_slots.resize(shards);
|
shards_slots.resize(shards);
|
||||||
iota(shards_slots.data(), shards_slots.size(), UInt64(0));
|
iota(shards_slots.data(), shards_slots.size(), UInt64(0));
|
||||||
@ -62,7 +64,11 @@ public:
|
|||||||
shards_queues[shard].emplace(backlog);
|
shards_queues[shard].emplace(backlog);
|
||||||
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
|
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
|
||||||
{
|
{
|
||||||
|
WorkerStatistic statistic;
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
|
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
|
||||||
|
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
|
||||||
|
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachFromGroupIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
@ -74,7 +80,9 @@ public:
|
|||||||
CurrentThread::attachToGroupIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
setThreadName("HashedDictLoad");
|
setThreadName("HashedDictLoad");
|
||||||
|
|
||||||
threadWorker(shard);
|
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
|
||||||
|
|
||||||
|
threadWorker(shard, statistic);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -87,8 +95,28 @@ public:
|
|||||||
|
|
||||||
for (size_t shard = 0; shard < shards; ++shard)
|
for (size_t shard = 0; shard < shards; ++shard)
|
||||||
{
|
{
|
||||||
if (!shards_queues[shard]->push(std::move(shards_blocks[shard])))
|
const auto & current_block = shards_blocks[shard];
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to shards queue #{}", shard);
|
while (!shards_queues[shard]->tryPush(current_block, /* milliseconds= */ 100))
|
||||||
|
{
|
||||||
|
if (shards_queues[shard]->isFinished())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to finished shards queue #{}, dictionary {}", shard, dictionary_name);
|
||||||
|
|
||||||
|
/// We need to check if some workers failed
|
||||||
|
if (pool.active() != shards)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(dictionary.log, "Some workers for dictionary {} failed, stopping all workers", dictionary_name);
|
||||||
|
stop_all_workers = true;
|
||||||
|
pool.wait(); /// We expect exception to be thrown from the failed worker thread
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Worker threads for dictionary {} are not active", dictionary_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (loading_timeout.count() && std::chrono::milliseconds(total_loading_time.elapsedMilliseconds()) > loading_timeout)
|
||||||
|
{
|
||||||
|
stop_all_workers = true;
|
||||||
|
pool.wait();
|
||||||
|
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout {} sec for dictionary {} loading is expired", loading_timeout.count(), dictionary_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,27 +152,49 @@ private:
|
|||||||
String dictionary_name;
|
String dictionary_name;
|
||||||
const size_t shards;
|
const size_t shards;
|
||||||
ThreadPool pool;
|
ThreadPool pool;
|
||||||
|
std::atomic_bool stop_all_workers{false};
|
||||||
std::vector<std::optional<ConcurrentBoundedQueue<Block>>> shards_queues;
|
std::vector<std::optional<ConcurrentBoundedQueue<Block>>> shards_queues;
|
||||||
|
std::chrono::seconds loading_timeout;
|
||||||
|
Stopwatch total_loading_time;
|
||||||
|
|
||||||
std::vector<UInt64> shards_slots;
|
std::vector<UInt64> shards_slots;
|
||||||
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
|
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
|
||||||
|
|
||||||
void threadWorker(size_t shard)
|
struct WorkerStatistic
|
||||||
|
{
|
||||||
|
UInt64 total_elapsed_ms = 0;
|
||||||
|
UInt64 total_blocks = 0;
|
||||||
|
UInt64 total_rows = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
void threadWorker(size_t shard, WorkerStatistic & statistic)
|
||||||
{
|
{
|
||||||
Block block;
|
Block block;
|
||||||
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder_;
|
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder_;
|
||||||
auto & shard_queue = *shards_queues[shard];
|
auto & shard_queue = *shards_queues[shard];
|
||||||
|
|
||||||
while (shard_queue.pop(block))
|
while (true)
|
||||||
{
|
{
|
||||||
|
if (!shard_queue.tryPop(block, /* milliseconds= */ 100))
|
||||||
|
{
|
||||||
|
/// Check if we need to stop
|
||||||
|
if (stop_all_workers || shard_queue.isFinished())
|
||||||
|
break;
|
||||||
|
/// Timeout expired, but the queue is not finished yet, try again
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
dictionary.blockToAttributes(block, arena_holder_, shard);
|
dictionary.blockToAttributes(block, arena_holder_, shard);
|
||||||
UInt64 elapsed_ms = watch.elapsedMilliseconds();
|
UInt64 elapsed_ms = watch.elapsedMilliseconds();
|
||||||
if (elapsed_ms > 1'000)
|
|
||||||
LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {}).", shard, elapsed_ms, block.rows());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!shard_queue.isFinished())
|
statistic.total_elapsed_ms += elapsed_ms;
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pull non finished shards queue #{}", shard);
|
statistic.total_blocks += 1;
|
||||||
|
statistic.total_rows += block.rows();
|
||||||
|
|
||||||
|
if (elapsed_ms > 1'000)
|
||||||
|
LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {})", shard, elapsed_ms, block.rows());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Split block to shards smaller block, using 'selector'.
|
/// Split block to shards smaller block, using 'selector'.
|
||||||
|
@ -77,6 +77,7 @@ void registerDictionaryHashed(DictionaryFactory & factory)
|
|||||||
require_nonempty,
|
require_nonempty,
|
||||||
dict_lifetime,
|
dict_lifetime,
|
||||||
use_async_executor,
|
use_async_executor,
|
||||||
|
std::chrono::seconds(settings.max_execution_time.totalSeconds()),
|
||||||
};
|
};
|
||||||
|
|
||||||
if (source_ptr->hasUpdateField() && shards > 1)
|
if (source_ptr->hasUpdateField() && shards > 1)
|
||||||
|
Loading…
Reference in New Issue
Block a user