From 9e679438783c1c6221b402b18e5c2d36bba7d63d Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sat, 28 Aug 2021 00:29:10 +0300 Subject: [PATCH] add waiting for async inserts --- src/Core/Settings.h | 2 + .../AsynchronousInsertionQueue.cpp | 30 +++++++++++--- src/Interpreters/AsynchronousInsertionQueue.h | 8 ++-- src/Interpreters/Context.cpp | 30 ++++++++++++++ src/Interpreters/Context.h | 17 ++++++++ src/Interpreters/InterpreterInsertQuery.cpp | 39 ++++++++++++------- src/Interpreters/InterpreterInsertQuery.h | 2 +- src/Interpreters/executeQuery.cpp | 16 ++++++-- .../Sources/WaitForAsyncInsertSource.h | 37 ++++++++++++++++++ 9 files changed, 154 insertions(+), 27 deletions(-) create mode 100644 src/Processors/Sources/WaitForAsyncInsertSource.h diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9ccfbdd7ef1..18f1459d331 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -594,6 +594,8 @@ class IColumn; \ M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \ + M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \ + M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \ M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(Seconds, async_insert_busy_timeout, 1, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ M(Seconds, async_insert_stale_timeout, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \ diff --git a/src/Interpreters/AsynchronousInsertionQueue.cpp b/src/Interpreters/AsynchronousInsertionQueue.cpp index b14d3ac0e62..5e6927d5db8 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.cpp +++ b/src/Interpreters/AsynchronousInsertionQueue.cpp @@ -34,6 +34,7 @@ struct AsynchronousInsertQueue::InsertData { String bytes; String query_id; + Context::AsyncInsertInfoPtr info; }; std::mutex mutex; @@ -88,8 +89,8 @@ bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery return true; } -AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) - : WithContext(context_) +AsynchronousInsertQueue::AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) + : WithMutableContext(context_) , max_data_size(max_data_size_) , busy_timeout(timeouts.busy) , stale_timeout(timeouts.stale) @@ -139,6 +140,8 @@ void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settin new_data.query_id = query_id; new_data.bytes.reserve(read_buf->totalSize()); + new_data.info = getContext()->addAsyncInsertQueryId(query_id); + WriteBufferFromString write_buf(new_data.bytes); copyData(*read_buf, write_buf); @@ -226,6 +229,7 @@ try size_t total_rows = 0; std::string_view current_query_id; + Context::AsyncInsertInfoPtr current_info; auto on_error = [&](const MutableColumns & result_columns, Exception & e) { @@ -237,21 +241,30 @@ try if (column->size() > total_rows) column->popBack(column->size() - total_rows); + std::lock_guard info_lock(current_info->mutex); + + current_info->finished = true; + current_info->exception = std::current_exception(); + current_info->cv.notify_all(); + return 0; }; StreamingFormatExecutor executor(header, format, std::move(on_error)); - std::vector>> prepared_data; + std::vector, + std::string_view, Context::AsyncInsertInfoPtr>> prepared_data; + prepared_data.reserve(data->data.size()); for (const auto & datum : data->data) - prepared_data.emplace_back(datum.query_id, std::make_unique(datum.bytes)); + prepared_data.emplace_back(std::make_unique(datum.bytes), datum.query_id, datum.info); - for (const auto & [query_id, buffer] : prepared_data) + for (const auto & [buffer, query_id, info] : prepared_data) { format->resetParser(); format->setReadBuffer(*buffer); current_query_id = query_id; + current_info = info; total_rows += executor.execute(); } @@ -272,6 +285,13 @@ try LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", total_rows, total_bytes, queryToString(data->query)); + for (const auto & datum : data->data) + { + std::lock_guard info_lock(datum.info->mutex); + datum.info->finished = true; + datum.info->cv.notify_all(); + } + data->reset(); } catch (...) diff --git a/src/Interpreters/AsynchronousInsertionQueue.h b/src/Interpreters/AsynchronousInsertionQueue.h index 1018c96458c..d89006e0684 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.h +++ b/src/Interpreters/AsynchronousInsertionQueue.h @@ -14,16 +14,17 @@ namespace DB class ASTInsertQuery; struct BlockIO; -class AsynchronousInsertQueue : public WithContext +class AsynchronousInsertQueue : public WithMutableContext { public: /// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor. struct Timeout { - std::chrono::seconds busy, stale; + std::chrono::seconds busy; + std::chrono::seconds stale; }; - AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts); + AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts); ~AsynchronousInsertQueue(); void push(const ASTPtr & query, const Settings & settings, const String & query_id); @@ -35,6 +36,7 @@ class AsynchronousInsertQueue : public WithContext ASTPtr query; Settings settings; }; + struct InsertData; struct InsertQueryHash diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 25d1315799c..bdcb754c642 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -120,6 +120,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; extern const int INVALID_SETTING_VALUE; + extern const int TIMEOUT_EXCEEDED; } @@ -926,6 +927,35 @@ void Context::addQueryAccessInfo( query_access_info.views.emplace(view_name); } +Context::AsyncInsertInfoPtr Context::addAsyncInsertQueryId(const String & query_id) +{ + auto lock = getLock(); + auto it = processing_async_inserts.emplace(query_id, std::make_shared()).first; + return it->second; +} + +void Context::waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const +{ + AsyncInsertInfoPtr wait_data; + + { + auto lock = getLock(); + auto it = processing_async_inserts.find(query_id); + if (it == processing_async_inserts.end()) + return; + + wait_data = it->second; + } + + std::unique_lock lock(wait_data->mutex); + auto finished = wait_data->cv.wait_for(lock, timeout, [&] { return wait_data->finished; }); + + if (!finished) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count()); + + if (wait_data->exception) + std::rethrow_exception(wait_data->exception); +} void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 8cfb871083f..6fd46962a69 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -24,6 +24,7 @@ #include #include #include +#include namespace Poco::Net { class IPAddress; } @@ -285,6 +286,19 @@ public: // Top-level OpenTelemetry trace context for the query. Makes sense only for a query context. OpenTelemetryTraceContext query_trace_context; + struct AsyncInsertInfo + { + std::mutex mutex; + std::condition_variable cv; + bool finished = false; + std::exception_ptr exception; + }; + + using AsyncInsertInfoPtr = std::shared_ptr; + + AsyncInsertInfoPtr addAsyncInsertQueryId(const String & query_id); + void waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const; + private: using SampleBlockCache = std::unordered_map; mutable SampleBlockCache sample_block_cache; @@ -307,6 +321,8 @@ private: /// thousands of signatures. /// And I hope it will be replaced with more common Transaction sometime. + std::unordered_map processing_async_inserts; + Context(); Context(const Context &); Context & operator=(const Context &); @@ -468,6 +484,7 @@ public: const String & projection_name = {}, const String & view_name = {}); + /// Supported factories for records in query_log enum class QueryLogFactories { diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 8b93bde5abe..ecc869449b9 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -147,7 +147,7 @@ static bool isTrivialSelect(const ASTPtr & select) }; -std::pair InterpreterInsertQuery::executeImpl( +std::pair InterpreterInsertQuery::executeImpl( const StoragePtr & table, Block & sample_block) { const auto & settings = getContext()->getSettingsRef(); @@ -158,7 +158,7 @@ std::pair InterpreterInsertQuery::executeImpl( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage"); BlockIO res; - Processors sinks; + BlockOutputStreams out_streams; bool is_distributed_insert_select = false; if (query.select && table->isRemote() && settings.parallel_distributed_insert_select) @@ -303,11 +303,11 @@ std::pair InterpreterInsertQuery::executeImpl( auto out_wrapper = std::make_shared(out); out_wrapper->setProcessListElement(getContext()->getProcessListElement()); - sinks.emplace_back(std::make_shared(std::move(out_wrapper))); + out_streams.emplace_back(std::move(out_wrapper)); } } - return {std::move(res), std::move(sinks)}; + return {std::move(res), std::move(out_streams)}; } BlockIO InterpreterInsertQuery::execute() @@ -324,11 +324,11 @@ BlockIO InterpreterInsertQuery::execute() getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames()); BlockIO res; - Processors sinks; - std::tie(res, sinks) = executeImpl(table, sample_block); + BlockOutputStreams out_streams; + std::tie(res, out_streams) = executeImpl(table, sample_block); /// What type of query: INSERT or INSERT SELECT or INSERT WATCH? - if (sinks.empty()) + if (out_streams.empty()) { /// Pipeline was already built. } @@ -336,7 +336,7 @@ BlockIO InterpreterInsertQuery::execute() { /// XXX: is this branch also triggered for select+input() case? - const auto & header = sinks.at(0)->getInputs().front().getHeader(); + const auto & header = out_streams.at(0)->getHeader(); auto actions_dag = ActionsDAG::makeConvertingActions( res.pipeline.getHeader().getColumnsWithTypeAndName(), header.getColumnsWithTypeAndName(), @@ -348,13 +348,15 @@ BlockIO InterpreterInsertQuery::execute() return std::make_shared(in_header, actions); }); - auto it = sinks.rbegin(); - res.pipeline.setSinks([&it](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr + res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr { if (type != QueryPipeline::StreamType::Main) return nullptr; - return *it++; + auto stream = std::move(out_streams.back()); + out_streams.pop_back(); + + return std::make_shared(std::move(stream)); }); if (!allow_materialized) @@ -364,16 +366,18 @@ BlockIO InterpreterInsertQuery::execute() throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN); } } - else + else if (!query.expectNativeData()) { auto pipe = getSourceFromASTInsertQuery(query_ptr, true, sample_block, getContext(), nullptr); res.pipeline.init(std::move(pipe)); res.pipeline.resize(1); res.pipeline.setSinks([&](const Block &, Pipe::StreamType) { - return sinks.at(0); + return std::make_shared(out_streams.at(0)); }); } + else + res.out = std::move(out_streams.at(0)); res.pipeline.addStorageHolder(table); if (const auto * mv = dynamic_cast(table.get())) @@ -398,7 +402,14 @@ Processors InterpreterInsertQuery::getSinks() if (!query.table_function) getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames()); - return executeImpl(table, sample_block).second; + auto out_streams = executeImpl(table, sample_block).second; + + Processors sinks; + sinks.reserve(out_streams.size()); + for (const auto & out : out_streams) + sinks.emplace_back(std::make_shared(out)); + + return sinks; } StorageID InterpreterInsertQuery::getDatabaseTable() const diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 959d5a0a11b..72eec2b4792 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -37,7 +37,7 @@ public: private: StoragePtr getTable(ASTInsertQuery & query); Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; - std::pair executeImpl(const StoragePtr & table, Block & sample_block); + std::pair executeImpl(const StoragePtr & table, Block & sample_block); ASTPtr query_ptr; const bool allow_materialized; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index eeaede25f9f..b353dddf58b 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -56,6 +56,7 @@ #include #include #include +#include namespace ProfileEvents @@ -555,11 +556,18 @@ static std::tuple executeQueryImpl( if (async_insert) { - queue->push(ast, settings, context->getCurrentQueryId()); + auto query_id = context->getCurrentQueryId(); + queue->push(ast, settings, query_id); - /// Shortcut for already processed similar insert-queries. - /// Similarity is defined by hashing query text and some settings. - return std::make_tuple(ast, BlockIO()); + BlockIO io; + if (settings.wait_for_async_insert) + { + auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds(); + auto source = std::make_shared(Block(), query_id, timeout, context->getGlobalContext()); + io.pipeline.init(Pipe(source)); + } + + return std::make_tuple(ast, std::move(io)); } auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal)); diff --git a/src/Processors/Sources/WaitForAsyncInsertSource.h b/src/Processors/Sources/WaitForAsyncInsertSource.h new file mode 100644 index 00000000000..8caeaaa4586 --- /dev/null +++ b/src/Processors/Sources/WaitForAsyncInsertSource.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class WaitForAsyncInsertSource : public ISource, WithContext +{ +public: + WaitForAsyncInsertSource( + const Block & header, const String & query_id_, + size_t timeout_ms_, ContextPtr context_) + : ISource(std::move(header)) + , WithContext(context_) + , query_id(query_id_) + , timeout_ms(timeout_ms_) + { + } + + String getName() const override { return "WaitForAsyncInsert"; } + +protected: + Chunk generate() override + { + auto context = getContext(); + context->waitForProcessingAsyncInsert(query_id, std::chrono::milliseconds(timeout_ms)); + return Chunk(); + } + +private: + String query_id; + size_t timeout_ms; +}; + +}