From 36ac20681e57a34ddf65e4329051d93154ff7cc8 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 31 Aug 2021 05:16:02 +0300 Subject: [PATCH] refactor AsynchronousInsertQueue --- .../AsynchronousInsertionQueue.cpp | 376 +++++++++++------- src/Interpreters/AsynchronousInsertionQueue.h | 142 ++++--- src/Interpreters/Context.cpp | 38 -- src/Interpreters/Context.h | 17 - src/Parsers/ASTInsertQuery.cpp | 7 + src/Parsers/ASTInsertQuery.h | 1 + .../Sources/WaitForAsyncInsertSource.h | 4 +- 7 files changed, 327 insertions(+), 258 deletions(-) diff --git a/src/Interpreters/AsynchronousInsertionQueue.cpp b/src/Interpreters/AsynchronousInsertionQueue.cpp index 56fdea8f6b4..3962193e6a7 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.cpp +++ b/src/Interpreters/AsynchronousInsertionQueue.cpp @@ -15,88 +15,79 @@ #include #include #include +#include +#include namespace DB { -struct AsynchronousInsertQueue::InsertData +namespace ErrorCodes { - InsertData(ASTPtr query_, const Settings & settings_) - : query(std::move(query_)), settings(settings_) - { - } - - ASTPtr query; - Settings settings; - - struct Data - { - String bytes; - String query_id; - Context::AsyncInsertInfoPtr info; - }; - - std::mutex mutex; - std::list data; - size_t size = 0; - - /// Timestamp of the first insert into queue, or after the last queue dump. - /// Used to detect for how long the queue is active, so we can dump it by timer. - std::chrono::time_point first_update = std::chrono::steady_clock::now(); - - /// Timestamp of the last insert into queue. - /// Used to detect for how long the queue is stale, so we can dump it by another timer. - std::chrono::time_point last_update; - - /// Indicates that the BlockIO should be updated, because we can't read/write prefix and suffix more than once. - bool is_reset = false; - - void reset() - { - data.clear(); - is_reset = true; - } -}; - -std::size_t AsynchronousInsertQueue::InsertQueryHash::operator() (const InsertQuery & query) const -{ - const auto * insert_query = query.query->as(); - std::size_t hash = 0; - - hash ^= std::hash()(insert_query->table_id.getFullTableName()); - hash ^= std::hash()(insert_query->format); - // TODO: insert_query->columns - // TODO: insert_query->table_function - // TODO: insert_query->settings_ast - - // TODO: some of query.settings - - return hash; + extern const int TIMEOUT_EXCEEDED; } -bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery & query1, const InsertQuery & query2) const +AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_) + : query(query_->clone()), settings(settings_) { - const auto * insert_query1 = query1.query->as(); - const auto * insert_query2 = query2.query->as(); - - if (insert_query1->table_id != insert_query2->table_id) - return false; - if (insert_query1->format != insert_query2->format) - return false; - // TODO: same fields as in InsertQueryHash. - - return true; } -AsynchronousInsertQueue::AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) - : WithMutableContext(context_) +AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other) + : query(other.query->clone()), settings(other.settings) +{ +} + +AsynchronousInsertQueue::InsertQuery & +AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) +{ + query = other.query->clone(); + settings = other.settings; + return *this; +} + +UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const +{ + SipHash hash; + insert_query.query->updateTreeHash(hash); + + for (const auto & setting : insert_query.settings.allChanged()) + { + hash.update(setting.getName()); + applyVisitor(FieldVisitorHash(hash), setting.getValue()); + } + + return hash.get64(); +} + +bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const +{ + return queryToString(query) == queryToString(other.query) && settings == other.settings; +} + + +void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_) +{ + std::lock_guard lock(mutex); + finished = true; + exception = exception_; + cv.notify_all(); +} + +bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout) +{ + std::unique_lock lock(mutex); + return cv.wait_for(lock, timeout, [&] { return finished; }); +} + + +AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) + : WithContext(context_) , max_data_size(max_data_size_) , busy_timeout(timeouts.busy) , stale_timeout(timeouts.stale) - , queue(new Queue) , pool(pool_size) , dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this) + , cleanup_thread(&AsynchronousInsertQueue::cleanup, this) { using namespace std::chrono; @@ -115,45 +106,101 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() assert(dump_by_first_update_thread.joinable()); dump_by_first_update_thread.join(); + assert(cleanup_thread.joinable()); + cleanup_thread.join(); + if (dump_by_last_update_thread.joinable()) dump_by_last_update_thread.join(); pool.wait(); + + std::lock_guard lock(currently_processing_mutex); + for (const auto & [_, entry] : currently_processing_queries) + { + if (!entry->finished) + entry->finish(std::make_exception_ptr(Exception( + ErrorCodes::TIMEOUT_EXCEEDED, + "Wait for async insert timeout exceeded)"))); + } } void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id) { - std::unique_lock lock(rwlock); - InsertQuery key{query, settings}; - - auto it = queue->find(key); - if (it == queue->end()) - it = queue->emplace(key, std::make_shared(query, settings)).first; - else if (it->second->is_reset) - it->second = std::make_shared(query, settings); - auto read_buf = getReadBufferFromASTInsertQuery(query); /// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure. - auto & new_data = it->second->data.emplace_back(); - - new_data.query_id = query_id; - new_data.bytes.reserve(read_buf->totalSize()); - new_data.info = getContext()->addAsyncInsertQueryId(query_id); - - WriteBufferFromString write_buf(new_data.bytes); + auto entry = std::make_shared(); + entry->query_id = query_id; + entry->bytes.reserve(read_buf->totalSize()); + WriteBufferFromString write_buf(entry->bytes); copyData(*read_buf, write_buf); - it->second->size += read_buf->count(); - it->second->last_update = std::chrono::steady_clock::now(); - LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), - "Queue size {} for query '{}'", it->second->size, queryToString(*query)); + InsertQuery key{query, settings}; + Queue::iterator it; + bool found = false; - if (it->second->size > max_data_size) - /// Since we're under lock here, it's safe to pass-by-copy the shared_ptr - /// without a race with the cleanup thread, which may reset last shared_ptr instance. - pool.scheduleOrThrowOnError([data = it->second, global_context = getContext()] { processData(data, global_context); }); + { + std::shared_lock read_lock(rwlock); + it = queue.find(key); + if (it != queue.end()) + found = true; + } + + if (!found) + { + std::unique_lock write_lock(rwlock); + it = queue.emplace(key, std::make_shared()).first; + } + + auto & [data_mutex, data] = *it->second; + std::lock_guard data_lock(data_mutex); + + if (!data) + data = std::make_unique(); + + data->size += read_buf->count(); + data->last_update = std::chrono::steady_clock::now(); + data->entries.emplace_back(entry); + + { + std::lock_guard currently_processing_lock(currently_processing_mutex); + currently_processing_queries.emplace(query_id, entry); + } + + LOG_INFO(log, "Queue size {} for query '{}'", data->size, queryToString(*query)); + + if (data->size > max_data_size) + { + pool.scheduleOrThrowOnError([key, + data = std::make_shared(std::move(data)), + global_context = getContext()] + { + processData(std::move(key), std::move(*data), global_context); + }); + } +} + +void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout) +{ + InsertData::EntryPtr entry; + + { + std::lock_guard lock(currently_processing_mutex); + auto it = currently_processing_queries.find(query_id); + if (it == currently_processing_queries.end()) + return; + + entry = it->second; + } + + bool finished = entry->wait(timeout); + + if (!finished) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count()); + + if (entry->exception) + std::rethrow_exception(entry->exception); } void AsynchronousInsertQueue::busyCheck() @@ -166,36 +213,26 @@ void AsynchronousInsertQueue::busyCheck() /// TODO: use priority queue instead of raw unsorted queue. timeout = busy_timeout; - std::vector keys_to_remove; + std::shared_lock read_lock(rwlock); + for (auto & [key, elem] : queue) { - std::shared_lock read_lock(rwlock); + std::lock_guard data_lock(elem->mutex); + if (!elem->data) + continue; - for (auto & [key, data] : *queue) + auto lag = std::chrono::steady_clock::now() - elem->data->first_update; + if (lag >= busy_timeout) { - std::unique_lock data_lock(data->mutex); - - auto lag = std::chrono::steady_clock::now() - data->first_update; - - if (data->is_reset) - keys_to_remove.push_back(key); - else if (lag >= busy_timeout) - pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); }); - else - timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); - } - } - - if (!keys_to_remove.empty()) - { - std::unique_lock write_lock(rwlock); - - for (const auto & key : keys_to_remove) - { - auto it = queue->find(key); - if (it != queue->end() && it->second->is_reset) - queue->erase(it); + pool.scheduleOrThrowOnError([key = key, + data = std::make_shared(std::move(elem->data)), + global_context = getContext()] + { + processData(std::move(key), std::move(*data), global_context); + }); } + else + timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); } } } @@ -205,78 +242,112 @@ void AsynchronousInsertQueue::staleCheck() while (!shutdown) { std::this_thread::sleep_for(stale_timeout); - std::shared_lock read_lock(rwlock); - for (auto & [_, data] : *queue) + for (auto & [key, elem] : queue) { - std::unique_lock data_lock(data->mutex); - - auto lag = std::chrono::steady_clock::now() - data->last_update; + std::lock_guard data_lock(elem->mutex); + if (!elem->data) + continue; + auto lag = std::chrono::steady_clock::now() - elem->data->last_update; if (lag >= stale_timeout) - pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); }); + { + pool.scheduleOrThrowOnError([key = key, + data = std::make_shared(std::move(elem->data)), + global_context = getContext()] + { + processData(std::move(key), std::move(*data), global_context); + }); + } + } + } +} + +void AsynchronousInsertQueue::cleanup() +{ + auto timeout = busy_timeout * 3; + + while (!shutdown) + { + std::this_thread::sleep_for(timeout); + std::vector keys_to_remove; + + { + std::shared_lock read_lock(rwlock); + + for (auto & [key, elem] : queue) + { + std::lock_guard data_lock(elem->mutex); + if (!elem->data) + keys_to_remove.push_back(key); + } + } + + if (!keys_to_remove.empty()) + { + std::unique_lock write_lock(rwlock); + + for (const auto & key : keys_to_remove) + { + auto it = queue.find(key); + if (it != queue.end() && !it->second) + queue.erase(it); + } + + LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", keys_to_remove.size()); } } } // static -void AsynchronousInsertQueue::processData(std::shared_ptr data, ContextPtr global_context) +void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context) try { - std::unique_lock data_lock(data->mutex); - - if (data->is_reset) + if (!data) return; const auto * log = &Poco::Logger::get("AsynchronousInsertQueue"); auto insert_context = Context::createCopy(global_context); /// 'resetParser' doesn't work for parallel parsing. - data->settings.set("input_format_parallel_parsing", false); + key.settings.set("input_format_parallel_parsing", false); insert_context->makeQueryContext(); - insert_context->setSettings(data->settings); + insert_context->setSettings(key.settings); - InterpreterInsertQuery interpreter(data->query, insert_context, data->settings.insert_allow_materialized_columns); + InterpreterInsertQuery interpreter(key.query, insert_context, key.settings.insert_allow_materialized_columns); auto sinks = interpreter.getSinks(); assert(sinks.size() == 1); auto header = sinks.at(0)->getInputs().front().getHeader(); - auto format = getInputFormatFromASTInsertQuery(data->query, false, header, insert_context, nullptr); + auto format = getInputFormatFromASTInsertQuery(key.query, false, header, insert_context, nullptr); size_t total_rows = 0; - std::string_view current_query_id; - Context::AsyncInsertInfoPtr current_info; + InsertData::EntryPtr current_entry; auto on_error = [&](const MutableColumns & result_columns, Exception & e) { - LOG_ERROR(&Poco::Logger::get("AsynchronousInsertQueue"), - "Failed parsing for query '{}' with query id {}. {}", - queryToString(data->query), current_query_id, e.displayText()); + LOG_ERROR(log, "Failed parsing for query '{}' with query id {}. {}", + queryToString(key.query), current_entry->query_id, e.displayText()); for (const auto & column : result_columns) if (column->size() > total_rows) column->popBack(column->size() - total_rows); - current_info->complete(std::current_exception()); + current_entry->finish(std::current_exception()); return 0; }; StreamingFormatExecutor executor(header, format, std::move(on_error)); - std::vector, - std::string_view, Context::AsyncInsertInfoPtr>> prepared_data; - - prepared_data.reserve(data->data.size()); - for (const auto & datum : data->data) - prepared_data.emplace_back(std::make_unique(datum.bytes), datum.query_id, datum.info); - - for (const auto & [buffer, query_id, info] : prepared_data) + std::unique_ptr buffer; + for (const auto & entry : data->entries) { + buffer = std::make_unique(entry->bytes); + format->resetParser(); format->setReadBuffer(*buffer); - current_query_id = query_id; - current_info = info; + current_entry = entry; total_rows += executor.execute(); } @@ -295,21 +366,18 @@ try out_executor->execute(out_pipeline.getNumThreads()); LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", - total_rows, total_bytes, queryToString(data->query)); + total_rows, total_bytes, queryToString(key.query)); - for (const auto & datum : data->data) - datum.info->complete(); - - data->reset(); + for (const auto & entry : data->entries) + if (!entry->finished) + entry->finish(); } catch (...) { tryLogCurrentException("AsynchronousInsertQueue", __PRETTY_FUNCTION__); - for (const auto & datum : data->data) - datum.info->complete(std::current_exception()); - - data->reset(); + for (const auto & entry : data->entries) + entry->finish(std::current_exception()); } } diff --git a/src/Interpreters/AsynchronousInsertionQueue.h b/src/Interpreters/AsynchronousInsertionQueue.h index 5d1c449eae5..0ca44f63a7a 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.h +++ b/src/Interpreters/AsynchronousInsertionQueue.h @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -14,71 +15,116 @@ namespace DB class ASTInsertQuery; struct BlockIO; -class AsynchronousInsertQueue : public WithMutableContext +class AsynchronousInsertQueue : public WithContext { - public: - /// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor. - struct Timeout +public: + using Milliseconds = std::chrono::milliseconds; + using Seconds = std::chrono::seconds; + + /// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor. + struct Timeout + { + Seconds busy; + Seconds stale; + }; + + AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts); + ~AsynchronousInsertQueue(); + + void push(const ASTPtr & query, const Settings & settings, const String & query_id); + void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout); + +private: + + struct InsertQuery + { + ASTPtr query; + Settings settings; + + InsertQuery(const ASTPtr & query_, const Settings & settings_); + InsertQuery(const InsertQuery & other); + InsertQuery & operator==(const InsertQuery & other); + bool operator==(const InsertQuery & other) const; + struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; }; + }; + + struct InsertData + { + struct Entry { - std::chrono::seconds busy; - std::chrono::seconds stale; + public: + String bytes; + String query_id; + + bool finished = false; + std::exception_ptr exception; + + void finish(std::exception_ptr exception_ = nullptr); + bool wait(const Milliseconds & timeout); + + private: + std::mutex mutex; + std::condition_variable cv; }; - AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts); - ~AsynchronousInsertQueue(); + using EntryPtr = std::shared_ptr; - void push(const ASTPtr & query, const Settings & settings, const String & query_id); + std::list entries; + size_t size = 0; - private: + /// Timestamp of the first insert into queue, or after the last queue dump. + /// Used to detect for how long the queue is active, so we can dump it by timer. + std::chrono::time_point first_update = std::chrono::steady_clock::now(); - struct InsertQuery - { - ASTPtr query; - Settings settings; - }; + /// Timestamp of the last insert into queue. + /// Used to detect for how long the queue is stale, so we can dump it by another timer. + std::chrono::time_point last_update; + }; - struct InsertData; + using InsertDataPtr = std::unique_ptr; - struct InsertQueryHash - { - std::size_t operator () (const InsertQuery &) const; - }; + struct Container + { + std::mutex mutex; + InsertDataPtr data; + }; - struct InsertQueryEquality - { - bool operator () (const InsertQuery &, const InsertQuery &) const; - }; + using Queue = std::unordered_map, InsertQuery::Hash>; + using QueueIterator = Queue::iterator; - /// Logic and events behind queue are as follows: - /// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables. - /// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't - /// grow for a long period of time and users will be able to select new data in deterministic manner. - /// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last - /// piece of inserted data. - /// - access_timeout: also we have to check if user still has access to the tables periodically, and if the access is lost, then - /// we dump pending data and delete queue immediately. - /// - max_data_size: if the maximum size of data is reached, then again we dump the data. + std::shared_mutex rwlock; + Queue queue; - using Queue = std::unordered_map, InsertQueryHash, InsertQueryEquality>; - using QueueIterator = Queue::iterator; + std::mutex currently_processing_mutex; + std::unordered_map currently_processing_queries; - const size_t max_data_size; /// in bytes - const std::chrono::seconds busy_timeout, stale_timeout; + /// Logic and events behind queue are as follows: + /// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables. + /// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't + /// grow for a long period of time and users will be able to select new data in deterministic manner. + /// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last + /// piece of inserted data. + /// - access_timeout: also we have to check if user still has access to the tables periodically, and if the access is lost, then + /// we dump pending data and delete queue immediately. + /// - max_data_size: if the maximum size of data is reached, then again we dump the data. - std::shared_mutex rwlock; - std::unique_ptr queue; + const size_t max_data_size; /// in bytes + const Seconds busy_timeout; + const Seconds stale_timeout; - std::atomic shutdown{false}; - ThreadPool pool; /// dump the data only inside this pool. - ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() - ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck() - /// TODO: ThreadFromGlobalPool check_access_thread; + std::atomic shutdown{false}; + ThreadPool pool; /// dump the data only inside this pool. + ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() + ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck() + ThreadFromGlobalPool cleanup_thread; - void busyCheck(); - void staleCheck(); + Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue"); - void pushImpl(const ASTPtr & query, const String & query_id, QueueIterator it); - static void processData(std::shared_ptr data, ContextPtr global_context); + void busyCheck(); + void staleCheck(); + void cleanup(); + + static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context); }; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 3c1049fcf68..fa32bf74afa 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -927,44 +927,6 @@ void Context::addQueryAccessInfo( query_access_info.views.emplace(view_name); } -void Context::AsyncInsertInfo::complete(std::exception_ptr exception_) -{ - std::lock_guard lock(mutex); - finished = true; - exception = exception_; - cv.notify_all(); -} - -Context::AsyncInsertInfoPtr Context::addAsyncInsertQueryId(const String & query_id) -{ - auto lock = getLock(); - auto it = processing_async_inserts.emplace(query_id, std::make_shared()).first; - return it->second; -} - -void Context::waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const -{ - AsyncInsertInfoPtr async_info; - - { - auto lock = getLock(); - auto it = processing_async_inserts.find(query_id); - if (it == processing_async_inserts.end()) - return; - - async_info = it->second; - } - - std::unique_lock lock(async_info->mutex); - auto finished = async_info->cv.wait_for(lock, timeout, [&] { return async_info->finished; }); - - if (!finished) - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count()); - - if (async_info->exception) - std::rethrow_exception(async_info->exception); -} - void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const { assert(!isGlobalContext() || getApplicationType() == ApplicationType::LOCAL); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 8b8e359b460..c954d590237 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -286,21 +286,6 @@ public: // Top-level OpenTelemetry trace context for the query. Makes sense only for a query context. OpenTelemetryTraceContext query_trace_context; - struct AsyncInsertInfo - { - std::mutex mutex; - std::condition_variable cv; - bool finished = false; - std::exception_ptr exception; - - void complete(std::exception_ptr exception_ = nullptr); - }; - - using AsyncInsertInfoPtr = std::shared_ptr; - - AsyncInsertInfoPtr addAsyncInsertQueryId(const String & query_id); - void waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const; - private: using SampleBlockCache = std::unordered_map; mutable SampleBlockCache sample_block_cache; @@ -323,8 +308,6 @@ private: /// thousands of signatures. /// And I hope it will be replaced with more common Transaction sometime. - std::unordered_map processing_async_inserts; - Context(); Context(const Context &); Context & operator=(const Context &); diff --git a/src/Parsers/ASTInsertQuery.cpp b/src/Parsers/ASTInsertQuery.cpp index 745585ae175..6d33df7a94e 100644 --- a/src/Parsers/ASTInsertQuery.cpp +++ b/src/Parsers/ASTInsertQuery.cpp @@ -75,6 +75,13 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s } } +void ASTInsertQuery::updateTreeHashImpl(SipHash & hash_state) const +{ + hash_state.update(table_id.getFullTableName()); + hash_state.update(format); + IAST::updateTreeHashImpl(hash_state); +} + static void tryFindInputFunctionImpl(const ASTPtr & ast, ASTPtr & input_function) { diff --git a/src/Parsers/ASTInsertQuery.h b/src/Parsers/ASTInsertQuery.h index bd7c6753cc9..0f9827475a6 100644 --- a/src/Parsers/ASTInsertQuery.h +++ b/src/Parsers/ASTInsertQuery.h @@ -58,6 +58,7 @@ public: protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; + void updateTreeHashImpl(SipHash & hash_state) const override; }; } diff --git a/src/Processors/Sources/WaitForAsyncInsertSource.h b/src/Processors/Sources/WaitForAsyncInsertSource.h index 8caeaaa4586..5b33ec45f96 100644 --- a/src/Processors/Sources/WaitForAsyncInsertSource.h +++ b/src/Processors/Sources/WaitForAsyncInsertSource.h @@ -25,7 +25,9 @@ protected: Chunk generate() override { auto context = getContext(); - context->waitForProcessingAsyncInsert(query_id, std::chrono::milliseconds(timeout_ms)); + auto * queue = context->getAsynchronousInsertQueue(); + assert(queue); + queue->waitForProcessingQuery(query_id, std::chrono::milliseconds(timeout_ms)); return Chunk(); }