diff --git a/src/Interpreters/AsynchronousInsertionQueue.cpp b/src/Interpreters/AsynchronousInsertionQueue.cpp index 5b983bc2bca..9ee109d713b 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.cpp +++ b/src/Interpreters/AsynchronousInsertionQueue.cpp @@ -27,24 +27,6 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; } -AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_) - : query(query_->clone()), settings(settings_) -{ -} - -AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other) - : query(other.query->clone()), settings(other.settings) -{ -} - -AsynchronousInsertQueue::InsertQuery & -AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) -{ - query = other.query->clone(); - settings = other.settings; - return *this; -} - UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const { SipHash hash; @@ -124,7 +106,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() } } -void AsynchronousInsertQueue::scheduleProcessJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context) +void AsynchronousInsertQueue::scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context) { /// Wrap 'unique_ptr' with 'shared_ptr' to make this /// lambda copyable and allow to save it to the thread pool. @@ -178,10 +160,11 @@ void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settin currently_processing_queries.emplace(query_id, entry); } - LOG_INFO(log, "Queue size {} for query '{}'", data->size, queryToString(*query)); + LOG_INFO(log, "Have {} pending inserts with total {} bytes of data for query '{}'", + data->entries.size(), data->size, queryToString(*query)); if (data->size > max_data_size) - scheduleProcessJob(key, std::move(data), getContext()); + scheduleProcessDataJob(key, std::move(data), getContext()); } void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout) @@ -226,7 +209,7 @@ void AsynchronousInsertQueue::busyCheck() auto lag = std::chrono::steady_clock::now() - elem->data->first_update; if (lag >= busy_timeout) - scheduleProcessJob(key, std::move(elem->data), getContext()); + scheduleProcessDataJob(key, std::move(elem->data), getContext()); else timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); } @@ -248,7 +231,7 @@ void AsynchronousInsertQueue::staleCheck() auto lag = std::chrono::steady_clock::now() - elem->data->last_update; if (lag >= stale_timeout) - scheduleProcessJob(key, std::move(elem->data), getContext()); + scheduleProcessDataJob(key, std::move(elem->data), getContext()); } } } @@ -354,7 +337,7 @@ try auto out_executor = out_pipeline.execute(); out_executor->execute(out_pipeline.getNumThreads()); - LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", + LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", total_rows, total_bytes, queryToString(key.query)); for (const auto & entry : data->entries) diff --git a/src/Interpreters/AsynchronousInsertionQueue.h b/src/Interpreters/AsynchronousInsertionQueue.h index a64545ed2dc..db382bbbbf2 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.h +++ b/src/Interpreters/AsynchronousInsertionQueue.h @@ -41,9 +41,6 @@ private: ASTPtr query; Settings settings; - InsertQuery(const ASTPtr & query_, const Settings & settings_); - InsertQuery(const InsertQuery & other); - InsertQuery & operator==(const InsertQuery & other); bool operator==(const InsertQuery & other) const; struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; }; }; @@ -124,7 +121,7 @@ private: void staleCheck(); void cleanup(); - void scheduleProcessJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context); + void scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context); static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context); }; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index b353dddf58b..064472b79aa 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -563,7 +563,7 @@ static std::tuple executeQueryImpl( if (settings.wait_for_async_insert) { auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds(); - auto source = std::make_shared(Block(), query_id, timeout, context->getGlobalContext()); + auto source = std::make_shared(query_id, timeout, context->getGlobalContext()); io.pipeline.init(Pipe(source)); } diff --git a/src/Processors/Sources/WaitForAsyncInsertSource.h b/src/Processors/Sources/WaitForAsyncInsertSource.h index 5b33ec45f96..3636a08f4a5 100644 --- a/src/Processors/Sources/WaitForAsyncInsertSource.h +++ b/src/Processors/Sources/WaitForAsyncInsertSource.h @@ -10,9 +10,8 @@ class WaitForAsyncInsertSource : public ISource, WithContext { public: WaitForAsyncInsertSource( - const Block & header, const String & query_id_, - size_t timeout_ms_, ContextPtr context_) - : ISource(std::move(header)) + const String & query_id_, size_t timeout_ms_, ContextPtr context_) + : ISource(Block()) , WithContext(context_) , query_id(query_id_) , timeout_ms(timeout_ms_)