diff --git a/src/Interpreters/AsynchronousInsertionQueue.cpp b/src/Interpreters/AsynchronousInsertionQueue.cpp index 3962193e6a7..5b983bc2bca 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.cpp +++ b/src/Interpreters/AsynchronousInsertionQueue.cpp @@ -124,6 +124,16 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() } } +void AsynchronousInsertQueue::scheduleProcessJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context) +{ + /// Wrap 'unique_ptr' with 'shared_ptr' to make this + /// lambda copyable and allow to save it to the thread pool. + pool.scheduleOrThrowOnError([=, data = std::make_shared(std::move(data))] + { + processData(std::move(key), std::move(*data), std::move(global_context)); + }); +} + void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id) { auto read_buf = getReadBufferFromASTInsertQuery(query); @@ -171,14 +181,7 @@ void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settin LOG_INFO(log, "Queue size {} for query '{}'", data->size, queryToString(*query)); if (data->size > max_data_size) - { - pool.scheduleOrThrowOnError([key, - data = std::make_shared(std::move(data)), - global_context = getContext()] - { - processData(std::move(key), std::move(*data), global_context); - }); - } + scheduleProcessJob(key, std::move(data), getContext()); } void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout) @@ -223,14 +226,7 @@ void AsynchronousInsertQueue::busyCheck() auto lag = std::chrono::steady_clock::now() - elem->data->first_update; if (lag >= busy_timeout) - { - pool.scheduleOrThrowOnError([key = key, - data = std::make_shared(std::move(elem->data)), - global_context = getContext()] - { - processData(std::move(key), std::move(*data), global_context); - }); - } + scheduleProcessJob(key, std::move(elem->data), getContext()); else timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); } @@ -252,14 +248,7 @@ void AsynchronousInsertQueue::staleCheck() auto lag = std::chrono::steady_clock::now() - elem->data->last_update; if (lag >= stale_timeout) - { - pool.scheduleOrThrowOnError([key = key, - data = std::make_shared(std::move(elem->data)), - global_context = getContext()] - { - processData(std::move(key), std::move(*data), global_context); - }); - } + scheduleProcessJob(key, std::move(elem->data), getContext()); } } } diff --git a/src/Interpreters/AsynchronousInsertionQueue.h b/src/Interpreters/AsynchronousInsertionQueue.h index 0ca44f63a7a..a64545ed2dc 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.h +++ b/src/Interpreters/AsynchronousInsertionQueue.h @@ -124,6 +124,7 @@ private: void staleCheck(); void cleanup(); + void scheduleProcessJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context); static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context); };