Respect max_execution_time for dictionary reloading time

This commit is contained in:
vdimir 2024-03-06 12:54:39 +00:00
parent 70750cb108
commit 7e908711d0
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
5 changed files with 19 additions and 1 deletions

View File

@ -1189,6 +1189,9 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)
const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
configuration.use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor;
if (settings.max_execution_time.totalSeconds() > 0)
configuration.load_timeout = std::chrono::seconds(settings.max_execution_time.totalSeconds());
if (dictionary_key_type == DictionaryKeyType::Simple)
{
if (shards > 1)

View File

@ -29,6 +29,7 @@ struct HashedArrayDictionaryStorageConfiguration
size_t shards = 1;
size_t shard_load_queue_backlog = 10000;
bool use_async_executor = false;
std::chrono::seconds load_timeout{0};
};
template <DictionaryKeyType dictionary_key_type, bool sharded>

View File

@ -67,6 +67,7 @@ struct HashedDictionaryConfiguration
const bool require_nonempty;
const DictionaryLifetime lifetime;
bool use_async_executor = false;
const std::chrono::seconds load_timeout{0};
};
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>

View File

@ -31,6 +31,7 @@ template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded> clas
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
}
@ -50,9 +51,10 @@ public:
, shards(dictionary.configuration.shards)
, pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, CurrentMetrics::HashedDictionaryThreadsScheduled, shards)
, shards_queues(shards)
, loading_timeout(dictionary.configuration.load_timeout)
{
UInt64 backlog = dictionary.configuration.shard_load_queue_backlog;
LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog)", dictionary_name, shards, backlog);
LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog and timeout {} sec)", dictionary_name, shards, backlog, loading_timeout.count());
shards_slots.resize(shards);
iota(shards_slots.data(), shards_slots.size(), UInt64(0));
@ -107,6 +109,13 @@ public:
pool.wait(); /// We expect exception to be thrown from the failed worker thread
throw Exception(ErrorCodes::LOGICAL_ERROR, "Worker threads for dictionary {} are not active", dictionary_name);
}
if (loading_timeout.count() && std::chrono::milliseconds(total_loading_time.elapsedMilliseconds()) > loading_timeout)
{
stop_all_workers = true;
pool.wait();
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout {} sec for dictionary {} loading is expired", loading_timeout.count(), dictionary_name);
}
}
}
}
@ -145,6 +154,9 @@ private:
ThreadPool pool;
std::atomic_bool stop_all_workers{false};
std::vector<std::optional<ConcurrentBoundedQueue<Block>>> shards_queues;
std::chrono::seconds loading_timeout;
Stopwatch total_loading_time;
std::vector<UInt64> shards_slots;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;

View File

@ -77,6 +77,7 @@ void registerDictionaryHashed(DictionaryFactory & factory)
require_nonempty,
dict_lifetime,
use_async_executor,
std::chrono::seconds(settings.max_execution_time.totalSeconds()),
};
if (source_ptr->hasUpdateField() && shards > 1)