Introduce polling timeout for asynchronous queue

The current timeout for checking updates in the asynchronous queue is
equal to the timeout used for queue entry
(async_insert_busy_timeout_ms).
That means that, in the worst case, an entry spends twice the time of the
asynchronous timeout in the queue.
This commit is contained in:
Julia Kartseva 2024-01-21 23:32:40 +00:00
parent bd5529dda1
commit bbaa08199a
3 changed files with 5 additions and 1 deletions

View File

@ -751,6 +751,7 @@ class IColumn;
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(UInt64, async_insert_max_query_number, 450, "Maximum number of insert queries before being inserted", 0) \ M(UInt64, async_insert_max_query_number, 450, "Maximum number of insert queries before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_poll_timeout_ms, 10, "Timeout for polling data from asynchronous insert queue", 0) \
\ \
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \ M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \ M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \

View File

@ -430,7 +430,7 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
std::unique_lock lock(shard.mutex); std::unique_lock lock(shard.mutex);
shard.are_tasks_available.wait_for(lock, shard.are_tasks_available.wait_for(lock,
Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [&shard, this] Milliseconds(getContext()->getSettingsRef().async_insert_poll_timeout_ms), [&shard, this]
{ {
if (shutdown) if (shutdown)
return true; return true;

View File

@ -4867,6 +4867,9 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
if (std::chrono::milliseconds(settings.async_insert_busy_timeout_ms) == 0ms) if (std::chrono::milliseconds(settings.async_insert_busy_timeout_ms) == 0ms)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting async_insert_busy_timeout_ms can't be zero"); throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting async_insert_busy_timeout_ms can't be zero");
if (std::chrono::milliseconds(settings.async_insert_poll_timeout_ms) == 0ms)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting async_insert_poll_timeout_ms can't be zero");
shared->async_insert_queue = ptr; shared->async_insert_queue = ptr;
} }