From 7e908711d0ff0895a5fbe13c28077f0a3042d49c Mon Sep 17 00:00:00 2001 From: vdimir Date: Wed, 6 Mar 2024 12:54:39 +0000 Subject: [PATCH] Respect max_execution_time for dictionary reloading time --- src/Dictionaries/HashedArrayDictionary.cpp | 3 +++ src/Dictionaries/HashedArrayDictionary.h | 1 + src/Dictionaries/HashedDictionary.h | 1 + src/Dictionaries/HashedDictionaryParallelLoader.h | 14 +++++++++++++- src/Dictionaries/registerHashedDictionary.cpp | 1 + 5 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/Dictionaries/HashedArrayDictionary.cpp b/src/Dictionaries/HashedArrayDictionary.cpp index eae00c297b1..d09f402143e 100644 --- a/src/Dictionaries/HashedArrayDictionary.cpp +++ b/src/Dictionaries/HashedArrayDictionary.cpp @@ -1189,6 +1189,9 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory) const auto * clickhouse_source = dynamic_cast(source_ptr.get()); configuration.use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor; + if (settings.max_execution_time.totalSeconds() > 0) + configuration.load_timeout = std::chrono::seconds(settings.max_execution_time.totalSeconds()); + if (dictionary_key_type == DictionaryKeyType::Simple) { if (shards > 1) diff --git a/src/Dictionaries/HashedArrayDictionary.h b/src/Dictionaries/HashedArrayDictionary.h index 4b2570ad928..9877d92d457 100644 --- a/src/Dictionaries/HashedArrayDictionary.h +++ b/src/Dictionaries/HashedArrayDictionary.h @@ -29,6 +29,7 @@ struct HashedArrayDictionaryStorageConfiguration size_t shards = 1; size_t shard_load_queue_backlog = 10000; bool use_async_executor = false; + std::chrono::seconds load_timeout{0}; }; template diff --git a/src/Dictionaries/HashedDictionary.h b/src/Dictionaries/HashedDictionary.h index 3a5e4ff6306..b3b8cc56868 100644 --- a/src/Dictionaries/HashedDictionary.h +++ b/src/Dictionaries/HashedDictionary.h @@ -67,6 +67,7 @@ struct HashedDictionaryConfiguration const bool require_nonempty; const DictionaryLifetime lifetime; bool use_async_executor = false; + const std::chrono::seconds load_timeout{0}; }; template diff --git a/src/Dictionaries/HashedDictionaryParallelLoader.h b/src/Dictionaries/HashedDictionaryParallelLoader.h index c0b4aa73adb..d88ee88f9a9 100644 --- a/src/Dictionaries/HashedDictionaryParallelLoader.h +++ b/src/Dictionaries/HashedDictionaryParallelLoader.h @@ -31,6 +31,7 @@ template clas namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int TIMEOUT_EXCEEDED; } } @@ -50,9 +51,10 @@ public: , shards(dictionary.configuration.shards) , pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, CurrentMetrics::HashedDictionaryThreadsScheduled, shards) , shards_queues(shards) + , loading_timeout(dictionary.configuration.load_timeout) { UInt64 backlog = dictionary.configuration.shard_load_queue_backlog; - LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog)", dictionary_name, shards, backlog); + LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog and timeout {} sec)", dictionary_name, shards, backlog, loading_timeout.count()); shards_slots.resize(shards); iota(shards_slots.data(), shards_slots.size(), UInt64(0)); @@ -107,6 +109,13 @@ public: pool.wait(); /// We expect exception to be thrown from the failed worker thread throw Exception(ErrorCodes::LOGICAL_ERROR, "Worker threads for dictionary {} are not active", dictionary_name); } + + if (loading_timeout.count() && std::chrono::milliseconds(total_loading_time.elapsedMilliseconds()) > loading_timeout) + { + stop_all_workers = true; + pool.wait(); + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout {} sec for dictionary {} loading is expired", loading_timeout.count(), dictionary_name); + } } } } @@ -145,6 +154,9 @@ private: ThreadPool pool; std::atomic_bool stop_all_workers{false}; std::vector>> shards_queues; + std::chrono::seconds loading_timeout; + Stopwatch total_loading_time; + std::vector shards_slots; DictionaryKeysArenaHolder arena_holder; diff --git a/src/Dictionaries/registerHashedDictionary.cpp b/src/Dictionaries/registerHashedDictionary.cpp index 6b980e2d534..5fc4f5d5cb6 100644 --- a/src/Dictionaries/registerHashedDictionary.cpp +++ b/src/Dictionaries/registerHashedDictionary.cpp @@ -77,6 +77,7 @@ void registerDictionaryHashed(DictionaryFactory & factory) require_nonempty, dict_lifetime, use_async_executor, + std::chrono::seconds(settings.max_execution_time.totalSeconds()), }; if (source_ptr->hasUpdateField() && shards > 1)