diff --git a/src/Dictionaries/HashedArrayDictionary.cpp b/src/Dictionaries/HashedArrayDictionary.cpp index 56d42a38c8e..d09f402143e 100644 --- a/src/Dictionaries/HashedArrayDictionary.cpp +++ b/src/Dictionaries/HashedArrayDictionary.cpp @@ -1078,7 +1078,7 @@ void HashedArrayDictionary::calculateBytesAllocate bytes_allocated += container.allocated_bytes(); } - bucket_count = container.capacity(); + bucket_count += container.capacity(); } }; @@ -1089,6 +1089,13 @@ void HashedArrayDictionary::calculateBytesAllocate bytes_allocated += container.size(); } + /// `bucket_count` should be a sum over all shards, + /// but it should not be a sum over all attributes, since it is used to + /// calculate load_factor like this: `element_count / bucket_count` + /// While element_count is a sum over all shards, not over all attributes. + if (attributes.size()) + bucket_count /= attributes.size(); + if (update_field_loaded_block) bytes_allocated += update_field_loaded_block->allocatedBytes(); @@ -1167,17 +1174,24 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory) if (shards <= 0 || 128 < shards) throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARDS parameter should be within [1, 128]", full_name); - HashedArrayDictionaryStorageConfiguration configuration{require_nonempty, dict_lifetime, static_cast(shards)}; + Int64 shard_load_queue_backlog = config.getInt(config_prefix + dictionary_layout_prefix + ".shard_load_queue_backlog", 10000); + if (shard_load_queue_backlog <= 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: SHARD_LOAD_QUEUE_BACKLOG parameter should be greater then zero", full_name); if (source_ptr->hasUpdateField() && shards > 1) throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: SHARDS parameter does not supports for updatable source (UPDATE_FIELD)", full_name); + HashedArrayDictionaryStorageConfiguration configuration{require_nonempty, dict_lifetime, static_cast(shards), static_cast(shard_load_queue_backlog)}; + ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix); const auto & settings = context->getSettingsRef(); const auto * clickhouse_source = dynamic_cast(source_ptr.get()); configuration.use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor; + if (settings.max_execution_time.totalSeconds() > 0) + configuration.load_timeout = std::chrono::seconds(settings.max_execution_time.totalSeconds()); + if (dictionary_key_type == DictionaryKeyType::Simple) { if (shards > 1) diff --git a/src/Dictionaries/HashedArrayDictionary.h b/src/Dictionaries/HashedArrayDictionary.h index 4b2570ad928..9877d92d457 100644 --- a/src/Dictionaries/HashedArrayDictionary.h +++ b/src/Dictionaries/HashedArrayDictionary.h @@ -29,6 +29,7 @@ struct HashedArrayDictionaryStorageConfiguration size_t shards = 1; size_t shard_load_queue_backlog = 10000; bool use_async_executor = false; + std::chrono::seconds load_timeout{0}; }; template diff --git a/src/Dictionaries/HashedDictionary.h b/src/Dictionaries/HashedDictionary.h index 3a5e4ff6306..b3b8cc56868 100644 --- a/src/Dictionaries/HashedDictionary.h +++ b/src/Dictionaries/HashedDictionary.h @@ -67,6 +67,7 @@ struct HashedDictionaryConfiguration const bool require_nonempty; const DictionaryLifetime lifetime; bool use_async_executor = false; + const std::chrono::seconds load_timeout{0}; }; template diff --git a/src/Dictionaries/HashedDictionaryParallelLoader.h b/src/Dictionaries/HashedDictionaryParallelLoader.h index a256f6de0e0..d88ee88f9a9 100644 --- a/src/Dictionaries/HashedDictionaryParallelLoader.h +++ b/src/Dictionaries/HashedDictionaryParallelLoader.h @@ -31,6 +31,7 @@ template clas namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int TIMEOUT_EXCEEDED; } } @@ -50,9 +51,10 @@ public: , shards(dictionary.configuration.shards) , pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, CurrentMetrics::HashedDictionaryThreadsScheduled, shards) , shards_queues(shards) + , loading_timeout(dictionary.configuration.load_timeout) { UInt64 backlog = dictionary.configuration.shard_load_queue_backlog; - LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog)", dictionary_name, shards, backlog); + LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog and timeout {} sec)", dictionary_name, shards, backlog, loading_timeout.count()); shards_slots.resize(shards); iota(shards_slots.data(), shards_slots.size(), UInt64(0)); @@ -62,7 +64,11 @@ public: shards_queues[shard].emplace(backlog); pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()] { + WorkerStatistic statistic; SCOPE_EXIT_SAFE( + LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms", + dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms); + if (thread_group) CurrentThread::detachFromGroupIfNotDetached(); ); @@ -74,7 +80,9 @@ public: CurrentThread::attachToGroupIfDetached(thread_group); setThreadName("HashedDictLoad"); - threadWorker(shard); + LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard); + + threadWorker(shard, statistic); }); } } @@ -87,8 +95,28 @@ public: for (size_t shard = 0; shard < shards; ++shard) { - if (!shards_queues[shard]->push(std::move(shards_blocks[shard]))) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to shards queue #{}", shard); + const auto & current_block = shards_blocks[shard]; + while (!shards_queues[shard]->tryPush(current_block, /* milliseconds= */ 100)) + { + if (shards_queues[shard]->isFinished()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to finished shards queue #{}, dictionary {}", shard, dictionary_name); + + /// We need to check if some workers failed + if (pool.active() != shards) + { + LOG_DEBUG(dictionary.log, "Some workers for dictionary {} failed, stopping all workers", dictionary_name); + stop_all_workers = true; + pool.wait(); /// We expect exception to be thrown from the failed worker thread + throw Exception(ErrorCodes::LOGICAL_ERROR, "Worker threads for dictionary {} are not active", dictionary_name); + } + + if (loading_timeout.count() && std::chrono::milliseconds(total_loading_time.elapsedMilliseconds()) > loading_timeout) + { + stop_all_workers = true; + pool.wait(); + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout {} sec for dictionary {} loading is expired", loading_timeout.count(), dictionary_name); + } + } } } @@ -124,27 +152,49 @@ private: String dictionary_name; const size_t shards; ThreadPool pool; + std::atomic_bool stop_all_workers{false}; std::vector>> shards_queues; + std::chrono::seconds loading_timeout; + Stopwatch total_loading_time; + std::vector shards_slots; DictionaryKeysArenaHolder arena_holder; - void threadWorker(size_t shard) + struct WorkerStatistic + { + UInt64 total_elapsed_ms = 0; + UInt64 total_blocks = 0; + UInt64 total_rows = 0; + }; + + void threadWorker(size_t shard, WorkerStatistic & statistic) { Block block; DictionaryKeysArenaHolder arena_holder_; auto & shard_queue = *shards_queues[shard]; - while (shard_queue.pop(block)) + while (true) { + if (!shard_queue.tryPop(block, /* milliseconds= */ 100)) + { + /// Check if we need to stop + if (stop_all_workers || shard_queue.isFinished()) + break; + /// Timeout expired, but the queue is not finished yet, try again + continue; + } + Stopwatch watch; dictionary.blockToAttributes(block, arena_holder_, shard); UInt64 elapsed_ms = watch.elapsedMilliseconds(); - if (elapsed_ms > 1'000) - LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {}).", shard, elapsed_ms, block.rows()); - } - if (!shard_queue.isFinished()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pull non finished shards queue #{}", shard); + statistic.total_elapsed_ms += elapsed_ms; + statistic.total_blocks += 1; + statistic.total_rows += block.rows(); + + if (elapsed_ms > 1'000) + LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {})", shard, elapsed_ms, block.rows()); + } } /// Split block to shards smaller block, using 'selector'. diff --git a/src/Dictionaries/registerHashedDictionary.cpp b/src/Dictionaries/registerHashedDictionary.cpp index 6b980e2d534..5fc4f5d5cb6 100644 --- a/src/Dictionaries/registerHashedDictionary.cpp +++ b/src/Dictionaries/registerHashedDictionary.cpp @@ -77,6 +77,7 @@ void registerDictionaryHashed(DictionaryFactory & factory) require_nonempty, dict_lifetime, use_async_executor, + std::chrono::seconds(settings.max_execution_time.totalSeconds()), }; if (source_ptr->hasUpdateField() && shards > 1)