Adaptive async timeouts: consider queue flush history

In addition to the time since the most recent insert,
consider the elapsed time between the two recent queue
flushes when decreasing the timeout or processing an
entry synchronously.
This commit is contained in:
Julia Kartseva 2024-01-20 04:11:10 +00:00
parent 17d2455448
commit 689c368b76
5 changed files with 72 additions and 27 deletions

View File

@ -33,7 +33,6 @@
#include <Common/SipHash.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric PendingAsyncInsert;
@ -174,12 +173,31 @@ void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr excep
}
}
AsynchronousInsertQueue::QueueShardFlushTimeHistory::TimePoints
AsynchronousInsertQueue::QueueShardFlushTimeHistory::getRecentTimePoints() const
{
std::shared_lock lock(mutex);
return time_points;
}
void AsynchronousInsertQueue::QueueShardFlushTimeHistory::updateWithCurrentTime()
{
std::unique_lock lock(mutex);
time_points.first = time_points.second;
time_points.second = std::chrono::steady_clock::now();
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_, bool flush_on_shutdown_)
: WithContext(context_)
, pool_size(pool_size_)
, flush_on_shutdown(flush_on_shutdown_)
, queue_shards(pool_size)
, pool(CurrentMetrics::AsynchronousInsertThreads, CurrentMetrics::AsynchronousInsertThreadsActive, CurrentMetrics::AsynchronousInsertThreadsScheduled, pool_size)
, flush_time_history_per_queue_shard(pool_size)
, pool(
CurrentMetrics::AsynchronousInsertThreads,
CurrentMetrics::AsynchronousInsertThreadsActive,
CurrentMetrics::AsynchronousInsertThreadsScheduled,
pool_size)
{
if (!pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "pool_size cannot be zero");
@ -210,7 +228,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
if (flush_on_shutdown)
{
for (auto & [_, elem] : shard.queue)
scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext());
scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext(), i);
}
else
{
@ -226,12 +244,14 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
LOG_TRACE(log, "Asynchronous insertion queue finished");
}
void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
void AsynchronousInsertQueue::scheduleDataProcessingJob(
const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num)
{
/// Wrap 'unique_ptr' with 'shared_ptr' to make this
/// lambda copyable and allow to save it to the thread pool.
pool.scheduleOrThrowOnError([key, global_context, my_data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
{ processData(key, std::move(*my_data), std::move(global_context)); });
pool.scheduleOrThrowOnError(
[this, key, global_context, shard_num, my_data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
{ processData(key, std::move(*my_data), std::move(global_context), flush_time_history_per_queue_shard[shard_num]); });
}
void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const ContextPtr & query_context)
@ -327,12 +347,13 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
auto shard_num = key.hash % pool_size;
auto & shard = queue_shards[shard_num];
const auto flush_time_points = flush_time_history_per_queue_shard[shard_num].getRecentTimePoints();
{
std::lock_guard lock(shard.mutex);
auto [it, inserted] = shard.iterators.try_emplace(key.hash);
auto now = std::chrono::steady_clock::now();
auto timeout_ms = getBusyWaitTimeoutMs(settings, shard, shard_num, now);
auto timeout_ms = getBusyWaitTimeoutMs(settings, shard, shard_num, flush_time_points, now);
if (inserted)
it->second = shard.queue.emplace(now + timeout_ms, Container{key, std::make_unique<InsertData>(timeout_ms)}).first;
@ -352,13 +373,13 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
bool has_enough_bytes = data->size_in_bytes >= key.settings.async_insert_max_data_size;
bool has_enough_queries = data->entries.size() >= key.settings.async_insert_max_query_number && key.settings.async_insert_deduplicate;
auto max_busy_timeout_exceeded = [&shard, &settings, &now]() -> bool
auto max_busy_timeout_exceeded = [&shard, &settings, &now, &flush_time_points]() -> bool
{
if (!settings.async_insert_use_adaptive_busy_timeout || !shard.last_insert_time)
if (!settings.async_insert_use_adaptive_busy_timeout || !shard.last_insert_time || !flush_time_points.first)
return false;
auto max_ms = Milliseconds(settings.async_insert_busy_timeout_max_ms);
return *shard.last_insert_time + max_ms < now;
return *shard.last_insert_time + max_ms < now && *flush_time_points.first + max_ms < *flush_time_points.second;
};
/// Here we check whether we have hit the limit on the maximum data size in the buffer or
@ -395,7 +416,7 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
}
if (data_to_process)
scheduleDataProcessingJob(key, std::move(data_to_process), getContext());
scheduleDataProcessingJob(key, std::move(data_to_process), getContext(), shard_num);
else
shard.are_tasks_available.notify_one();
@ -409,8 +430,9 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
AsynchronousInsertQueue::Milliseconds AsynchronousInsertQueue::getBusyWaitTimeoutMs(
const Settings & settings,
const AsynchronousInsertQueue::QueueShard & shard,
const QueueShard & shard,
size_t shard_num,
const QueueShardFlushTimeHistory::TimePoints & flush_time_points,
std::chrono::steady_clock::time_point now) const
{
if (!settings.async_insert_use_adaptive_busy_timeout)
@ -421,10 +443,11 @@ AsynchronousInsertQueue::Milliseconds AsynchronousInsertQueue::getBusyWaitTimeou
auto normalize = [&min_ms, &max_ms](const auto & t_ms) { return std::min(std::max(t_ms, min_ms), max_ms); };
if (!shard.last_insert_time)
if (!shard.last_insert_time || !flush_time_points.first)
return normalize(shard.busy_timeout_ms);
const auto & last_insert_time = *shard.last_insert_time;
const auto & [t1, t2] = std::tie(*flush_time_points.first, *flush_time_points.second);
const double increase_rate = settings.async_insert_busy_timeout_increase_rate;
const double decrease_rate = settings.async_insert_busy_timeout_decrease_rate;
@ -448,9 +471,10 @@ AsynchronousInsertQueue::Milliseconds AsynchronousInsertQueue::getBusyWaitTimeou
return normalize(timeout_ms);
}
/// Decrease the timeout if inserts are not frequent,
/// that is, if the time since the last insert long enough (exceeding the adjusted timeout).
/// that is, if the time since the last insert and the difference between the last two queue flushes were both
/// long enough (exceeding the adjusted timeout).
/// This ensures the timeout value converges to the minimum over time for non-frequent inserts.
else if (last_insert_time + decreased_timeout_ms < now)
else if (last_insert_time + decreased_timeout_ms < now && t1 + decreased_timeout_ms < t2)
{
auto timeout_ms = decreased_timeout_ms;
if (timeout_ms != shard.busy_timeout_ms)
@ -525,7 +549,7 @@ void AsynchronousInsertQueue::flushAll()
{
total_bytes += entry.data->size_in_bytes;
total_entries += entry.data->entries.size();
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext());
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext(), i);
}
}
@ -599,7 +623,7 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
}
for (auto & entry : entries_to_flush)
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext());
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext(), shard_num);
}
}
@ -643,7 +667,8 @@ String serializeQuery(const IAST & query, size_t max_length)
}
// static
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
void AsynchronousInsertQueue::processData(
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history)
try
{
if (!data)
@ -831,6 +856,8 @@ try
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
}

View File

@ -10,6 +10,7 @@
#include <Processors/Chunk.h>
#include <future>
#include <shared_mutex>
#include <variant>
namespace DB
@ -186,6 +187,8 @@ private:
using QueueIterator = Queue::iterator;
using QueueIteratorByKey = std::unordered_map<UInt128, QueueIterator>;
using OptionalTimePoint = std::optional<std::chrono::steady_clock::time_point>;
struct QueueShard
{
mutable std::mutex mutex;
@ -194,16 +197,29 @@ private:
Queue queue;
QueueIteratorByKey iterators;
using OptionalTimePoint = std::optional<std::chrono::steady_clock::time_point>;
OptionalTimePoint last_insert_time;
std::chrono::milliseconds busy_timeout_ms;
};
/// Times of the two most recent queue flushes.
/// Used to calculate adaptive timeout.
struct QueueShardFlushTimeHistory
{
public:
using TimePoints = std::pair<OptionalTimePoint, OptionalTimePoint>;
TimePoints getRecentTimePoints() const;
void updateWithCurrentTime();
private:
mutable std::shared_mutex mutex;
TimePoints time_points;
};
const size_t pool_size;
const bool flush_on_shutdown;
std::vector<QueueShard> queue_shards;
std::vector<QueueShardFlushTimeHistory> flush_time_history_per_queue_shard;
/// Logic and events behind queue are as follows:
/// - async_insert_busy_timeout_ms:
@ -231,16 +247,18 @@ private:
Milliseconds getBusyWaitTimeoutMs(
const Settings & settings,
const AsynchronousInsertQueue::QueueShard & shard,
const QueueShard & shard,
size_t shard_num,
const QueueShardFlushTimeHistory::TimePoints & flush_time_points,
std::chrono::steady_clock::time_point now) const;
void preprocessInsertQuery(const ASTPtr & query, const ContextPtr & query_context);
void processBatchDeadlines(size_t shard_num);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
static void processData(
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history);
template <typename LogFunc>
static Chunk processEntriesWithParsing(

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_async_insert_adaptive_busy_timeout>1</allow_experimental_async_insert_adaptive_busy_timeout>
<async_insert_use_adaptive_busy_timeout>1</async_insert_use_adaptive_busy_timeout>
</default>
</profiles>

View File

@ -199,7 +199,7 @@ def test_compare_sequential_inserts_durations_for_adaptive_and_fixed_async_timeo
)
fixed_tm_settings = copy.copy(_query_settings)
fixed_tm_settings["allow_experimental_async_insert_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
fixed_tm_run_duration = timeit.timeit(
@ -267,7 +267,7 @@ def test_compare_parallel_inserts_durations_for_adaptive_and_fixed_async_timeout
)
fixed_tm_settings = copy.copy(_query_settings)
fixed_tm_settings["allow_experimental_async_insert_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
fixed_tm_run_duration = timeit.timeit(

View File

@ -1,7 +1,7 @@
DROP TABLE IF EXISTS async_insert_mt_test;
CREATE TABLE async_insert_mt_test (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a;
SET allow_experimental_async_insert_adaptive_busy_timeout = 1;
SET async_insert_use_adaptive_busy_timeout = 1;
INSERT INTO async_insert_mt_test
SETTINGS