From 78dbcaea54be1ed158bb3edb940071250132d50e Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sun, 8 Aug 2021 06:30:14 +0300 Subject: [PATCH] implement async inserts with processors --- .../AsynchronousInsertionQueue.cpp | 162 +++++++++--------- src/Interpreters/AsynchronousInsertionQueue.h | 11 +- src/Interpreters/InterpreterInsertQuery.cpp | 14 +- src/Interpreters/InterpreterInsertQuery.h | 12 ++ src/Interpreters/executeQuery.cpp | 40 ++--- src/Parsers/ASTInsertQuery.h | 5 +- ...ry.cpp => getSourceFromASTInsertQuery.cpp} | 59 ++++--- ...tQuery.h => getSourceFromASTInsertQuery.h} | 16 +- src/Processors/ya.make | 2 +- 9 files changed, 170 insertions(+), 151 deletions(-) rename src/Processors/Transforms/{getSourceFromFromASTInsertQuery.cpp => getSourceFromASTInsertQuery.cpp} (53%) rename src/Processors/Transforms/{getSourceFromFromASTInsertQuery.h => getSourceFromASTInsertQuery.h} (59%) diff --git a/src/Interpreters/AsynchronousInsertionQueue.cpp b/src/Interpreters/AsynchronousInsertionQueue.cpp index e83e12253f8..9c4f0507847 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.cpp +++ b/src/Interpreters/AsynchronousInsertionQueue.cpp @@ -2,8 +2,9 @@ #include #include -#include -#include +#include +#include +#include #include #include #include @@ -18,10 +19,17 @@ namespace DB struct AsynchronousInsertQueue::InsertData { + InsertData(ASTPtr query_, const Settings & settings_) + : query(std::move(query_)), settings(settings_) + { + } + + ASTPtr query; + Settings settings; + std::mutex mutex; std::list data; size_t size = 0; - BlockIO io; /// Timestamp of the first insert into queue, or after the last queue dump. /// Used to detect for how long the queue is active, so we can dump it by timer. @@ -32,7 +40,13 @@ struct AsynchronousInsertQueue::InsertData std::chrono::time_point last_update; /// Indicates that the BlockIO should be updated, because we can't read/write prefix and suffix more than once. - bool reset = false; + bool is_reset = false; + + void reset() + { + data.clear(); + is_reset = true; + } }; std::size_t AsynchronousInsertQueue::InsertQueryHash::operator() (const InsertQuery & query) const @@ -65,8 +79,9 @@ bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery return true; } -AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_data_size_, const Timeout & timeouts) - : max_data_size(max_data_size_) +AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) + : WithContext(context_) + , max_data_size(max_data_size_) , busy_timeout(timeouts.busy) , stale_timeout(timeouts.stale) , lock(RWLockImpl::create()) @@ -97,45 +112,41 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() pool.wait(); } -bool AsynchronousInsertQueue::push(ASTInsertQuery * query, const Settings & settings) -{ - auto read_lock = lock->getLock(RWLockImpl::Read, String()); - - auto it = queue->find(InsertQuery{query->shared_from_this(), settings}); - - if (it != queue->end()) - { - std::unique_lock data_lock(it->second->mutex); - - if (it->second->reset) - return false; - - pushImpl(query, it); - return true; - } - - return false; -} - -void AsynchronousInsertQueue::push(ASTInsertQuery * query, BlockIO && io, const Settings & settings) +void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings) { auto write_lock = lock->getLock(RWLockImpl::Write, String()); - InsertQuery key{query->shared_from_this(), settings}; + InsertQuery key{query, settings}; + auto it = queue->find(key); if (it == queue->end()) - { - it = queue->insert({key, std::make_shared()}).first; - it->second->io = std::move(io); - } - else if (it->second->reset) - { - it->second = std::make_shared(); - it->second->io = std::move(io); - } + it = queue->insert({key, std::make_shared(query, settings)}).first; + else if (it->second->is_reset) + it->second = std::make_shared(query, settings); std::unique_lock data_lock(it->second->mutex); - pushImpl(query, it); + + auto read_buffers = getReadBuffersFromASTInsertQuery(query); + ConcatReadBuffer concat_buf(std::move(read_buffers)); + + /// NOTE: must not read from |query->tail| before read all between |query->data| and |query->end|. + + /// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure. + auto & new_data = it->second->data.emplace_back(); + new_data.reserve(concat_buf.totalSize()); + WriteBufferFromString write_buf(new_data); + + copyData(concat_buf, write_buf); + it->second->size += concat_buf.count(); + it->second->last_update = std::chrono::steady_clock::now(); + + LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), + "Queue size {} for query '{}'", it->second->size, queryToString(*query)); + + if (it->second->size > max_data_size) + /// Since we're under lock here, it's safe to pass-by-copy the shared_ptr + /// without a race with the cleanup thread, which may reset last shared_ptr instance. + pool.scheduleOrThrowOnError([data = it->second, global_context = getContext()] { processData(data, global_context); }); } void AsynchronousInsertQueue::busyCheck() @@ -157,7 +168,7 @@ void AsynchronousInsertQueue::busyCheck() auto lag = std::chrono::steady_clock::now() - data->first_update; if (lag >= busy_timeout) - pool.scheduleOrThrowOnError([data = data] { processData(data); }); + pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); }); else timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); } @@ -179,64 +190,47 @@ void AsynchronousInsertQueue::staleCheck() auto lag = std::chrono::steady_clock::now() - data->last_update; if (lag >= stale_timeout) - pool.scheduleOrThrowOnError([data = data] { processData(data); }); + pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); }); } } } -void AsynchronousInsertQueue::pushImpl(ASTInsertQuery * query, QueueIterator & it) -{ - ConcatReadBuffer concat_buf; - - auto ast_buf = std::make_unique(query->data, query->data ? query->end - query->data : 0); - - if (query->data) - concat_buf.appendBuffer(std::move(ast_buf)); - - if (query->tail) - concat_buf.appendBuffer(wrapReadBufferReference(*query->tail)); - - /// NOTE: must not read from |query->tail| before read all between |query->data| and |query->end|. - - /// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure. - auto & new_data = it->second->data.emplace_back(); - new_data.reserve(concat_buf.totalSize()); - WriteBufferFromString write_buf(new_data); - - copyData(concat_buf, write_buf); - it->second->size += concat_buf.count(); - it->second->last_update = std::chrono::steady_clock::now(); - - LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), "Queue size {} for query '{}'", it->second->size, queryToString(*query)); - - if (it->second->size > max_data_size) - /// Since we're under lock here, it's safe to pass-by-copy the shared_ptr - /// without a race with the cleanup thread, which may reset last shared_ptr instance. - pool.scheduleOrThrowOnError([data = it->second] { processData(data); }); -} - // static -void AsynchronousInsertQueue::processData(std::shared_ptr data) +void AsynchronousInsertQueue::processData(std::shared_ptr data, ContextPtr global_context) +try { std::unique_lock data_lock(data->mutex); - if (data->reset) + if (data->is_reset) return; - // auto in = std::dynamic_pointer_cast(data->io.in); - // assert(in); + ReadBuffers read_buffers; + for (const auto & datum : data->data) + read_buffers.emplace_back(std::make_unique(datum)); - // auto log_progress = [](const Block & block) - // { - // LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), "Flushed {} rows", block.rows()); - // }; + auto insert_context = Context::createCopy(global_context); + insert_context->makeQueryContext(); + insert_context->setSettings(data->settings); - // for (const auto & datum : data->data) - // in->appendBuffer(std::make_unique(datum)); - // copyData(*in, *data->io.out, [] {return false;}, log_progress); + InterpreterInsertQuery interpreter(data->query, std::move(read_buffers), insert_context); + auto io = interpreter.execute(); + assert(io.pipeline.initialized()); - data->io = BlockIO(); /// Release all potential table locks - data->reset = true; + auto log_progress = [&](const Progress & progress) + { + LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), + "Flushed {} rows, {} bytes", progress.written_rows, progress.written_bytes); + }; + + io.pipeline.setProgressCallback(log_progress); + auto executor = io.pipeline.execute(); + executor->execute(io.pipeline.getNumThreads()); + + data->reset(); +} +catch (...) +{ + tryLogCurrentException("AsynchronousInsertQueue", __PRETTY_FUNCTION__); } } diff --git a/src/Interpreters/AsynchronousInsertionQueue.h b/src/Interpreters/AsynchronousInsertionQueue.h index 1fe2e8a720f..21061fc1eec 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.h +++ b/src/Interpreters/AsynchronousInsertionQueue.h @@ -14,7 +14,7 @@ namespace DB class ASTInsertQuery; struct BlockIO; -class AsynchronousInsertQueue +class AsynchronousInsertQueue : public WithContext { public: /// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor. @@ -23,11 +23,10 @@ class AsynchronousInsertQueue std::chrono::seconds busy, stale; }; - AsynchronousInsertQueue(size_t pool_size, size_t max_data_size, const Timeout & timeouts); + AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts); ~AsynchronousInsertQueue(); - bool push(ASTInsertQuery * query, const Settings & settings); - void push(ASTInsertQuery * query, BlockIO && io, const Settings & settings); + void push(const ASTPtr & query, const Settings & settings); private: struct InsertQuery @@ -75,9 +74,7 @@ class AsynchronousInsertQueue void busyCheck(); void staleCheck(); - void pushImpl(ASTInsertQuery * query, QueueIterator & it); /// use only under lock - - static void processData(std::shared_ptr data); + static void processData(std::shared_ptr data, ContextPtr global_context); }; } diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index f1c45bd678b..c24247945b3 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -44,8 +44,18 @@ namespace ErrorCodes InterpreterInsertQuery::InterpreterInsertQuery( const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_) + : InterpreterInsertQuery(query_ptr_, getReadBuffersFromASTInsertQuery(query_ptr_), + context_, allow_materialized_, no_squash_, no_destination_) +{ +} + +InterpreterInsertQuery::InterpreterInsertQuery( + const ASTPtr & query_ptr_, ReadBuffers read_buffers_, + ContextPtr context_, bool allow_materialized_, + bool no_squash_, bool no_destination_) : WithContext(context_) , query_ptr(query_ptr_) + , read_buffers(std::move(read_buffers_)) , allow_materialized(allow_materialized_) , no_squash(no_squash_) , no_destination(no_destination_) @@ -353,7 +363,7 @@ BlockIO InterpreterInsertQuery::execute() } else if (!query.expectNativeData()) { - auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr); + auto pipe = getSourceFromASTInsertQuery(query_ptr, query_sample_block, std::move(read_buffers), getContext()); res.pipeline.init(std::move(pipe)); res.pipeline.resize(1); res.pipeline.setSinks([&](const Block &, Pipe::StreamType) diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 71b8c827702..79e1d51a9cb 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { @@ -14,6 +15,8 @@ namespace DB class InterpreterInsertQuery : public IInterpreter, WithContext { public: + using ReadBuffers = std::vector>; + InterpreterInsertQuery( const ASTPtr & query_ptr_, ContextPtr context_, @@ -21,6 +24,14 @@ public: bool no_squash_ = false, bool no_destination_ = false); + InterpreterInsertQuery( + const ASTPtr & query_ptr_, + ReadBuffers read_buffers_, + ContextPtr context_, + bool allow_materialized_ = false, + bool no_squash_ = false, + bool no_destination_ = false); + /** Prepare a request for execution. Return block streams * - the stream into which you can write data to execute the query, if INSERT; * - the stream from which you can read the result of the query, if SELECT and similar; @@ -37,6 +48,7 @@ private: Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; ASTPtr query_ptr; + ReadBuffers read_buffers; const bool allow_materialized; const bool no_squash; const bool no_destination; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index fbc27ecba75..f92fdb4eb05 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include @@ -431,6 +431,7 @@ static std::tuple executeQueryImpl( query_end = insert_query->data; else query_end = end; + insert_query->tail = istr; } else @@ -521,8 +522,9 @@ static std::tuple executeQueryImpl( StoragePtr storage = context->executeTableFunction(input_function); auto & input_storage = dynamic_cast(*storage); auto input_metadata_snapshot = input_storage.getInMemoryMetadataPtr(); - auto pipe = getSourceFromFromASTInsertQuery( - ast, istr, input_metadata_snapshot->getSampleBlock(), context, input_function); + auto read_buffers = getReadBuffersFromASTInsertQuery(ast); + auto pipe = getSourceFromASTInsertQuery( + ast, input_metadata_snapshot->getSampleBlock(), std::move(read_buffers), context, input_function); input_storage.setPipe(std::move(pipe)); } } @@ -531,6 +533,18 @@ static std::tuple executeQueryImpl( /// reset Input callbacks if query is not INSERT SELECT context->resetInputCallbacks(); + auto * queue = context->getAsynchronousInsertQueue(); + const bool async_insert + = queue && insert_query && !insert_query->select && !insert_query->expectNativeData() && settings.async_insert_mode; + + if (async_insert) + { + /// Shortcut for already processed similar insert-queries. + /// Similarity is defined by hashing query text and some settings. + queue->push(ast, settings); + return std::make_tuple(ast, BlockIO()); + } + auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal)); std::shared_ptr quota; @@ -559,17 +573,6 @@ static std::tuple executeQueryImpl( limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); } - auto * queue = context->getAsynchronousInsertQueue(); - const bool async_insert - = queue && insert_query && !insert_query->select && !insert_query->expectNativeData() && settings.async_insert_mode; - - if (async_insert && queue->push(insert_query, settings)) - { - /// Shortcut for already processed similar insert-queries. - /// Similarity is defined by hashing query text and some settings. - return std::make_tuple(ast, BlockIO()); - } - { OpenTelemetrySpanHolder span("IInterpreter::execute()"); res = interpreter->execute(); @@ -897,12 +900,6 @@ static std::tuple executeQueryImpl( res.finish_callback = std::move(finish_callback); res.exception_callback = std::move(exception_callback); } - - if (async_insert) - { - queue->push(insert_query, std::move(res), settings); - return std::make_tuple(ast, BlockIO()); - } } catch (...) { @@ -1008,7 +1005,8 @@ void executeQuery( { if (streams.out) { - auto pipe = getSourceFromFromASTInsertQuery(ast, &istr, streams.out->getHeader(), context, nullptr); + auto read_buffers = getReadBuffersFromASTInsertQuery(ast); + auto pipe = getSourceFromASTInsertQuery(ast, streams.out->getHeader(), std::move(read_buffers), context); pipeline.init(std::move(pipe)); pipeline.resize(1); diff --git a/src/Parsers/ASTInsertQuery.h b/src/Parsers/ASTInsertQuery.h index f52e60618e6..804dbe16d1a 100644 --- a/src/Parsers/ASTInsertQuery.h +++ b/src/Parsers/ASTInsertQuery.h @@ -29,10 +29,7 @@ public: /// Data from buffer to insert after inlined one - may be nullptr. ReadBuffer * tail = nullptr; - bool expectNativeData() const - { - return !data && !tail; - } + bool expectNativeData() const { return !data && !tail; } /// Try to find table function input() in SELECT part void tryFindInputFunction(ASTPtr & input_function) const; diff --git a/src/Processors/Transforms/getSourceFromFromASTInsertQuery.cpp b/src/Processors/Transforms/getSourceFromASTInsertQuery.cpp similarity index 53% rename from src/Processors/Transforms/getSourceFromFromASTInsertQuery.cpp rename to src/Processors/Transforms/getSourceFromASTInsertQuery.cpp index 262ba3df2a6..8b16da4994d 100644 --- a/src/Processors/Transforms/getSourceFromFromASTInsertQuery.cpp +++ b/src/Processors/Transforms/getSourceFromASTInsertQuery.cpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include #include @@ -22,18 +22,19 @@ namespace ErrorCodes extern const int INVALID_USAGE_OF_INPUT; } - -Pipe getSourceFromFromASTInsertQuery( +Pipe getSourceFromASTInsertQuery( const ASTPtr & ast, - ReadBuffer * input_buffer_tail_part, const Block & header, + ReadBuffers read_buffers, ContextPtr context, const ASTPtr & input_function) { const auto * ast_insert_query = ast->as(); - if (!ast_insert_query) - throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: query requires data to insert, but it is not INSERT query"); + + if (read_buffers.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Required at least one read buffer to create source from ASTInsertQuery"); String format = ast_insert_query->format; if (format.empty()) @@ -43,25 +44,14 @@ Pipe getSourceFromFromASTInsertQuery( format = "Values"; } - /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. + auto input_buffer = std::make_unique(std::move(read_buffers)); + auto source = FormatFactory::instance().getInput( + format, *input_buffer, header, + context, context->getSettings().max_insert_block_size); - auto input_buffer_ast_part = std::make_unique( - ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); + source->addBuffer(std::move(input_buffer)); - auto input_buffer_contacenated = std::make_unique(); - if (ast_insert_query->data) - input_buffer_contacenated->appendBuffer(std::move(input_buffer_ast_part)); - - if (input_buffer_tail_part) - input_buffer_contacenated->appendBuffer(wrapReadBufferReference(*input_buffer_tail_part)); - - /** NOTE: Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. - * - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'. - */ - - auto source = FormatFactory::instance().getInput(format, *input_buffer_contacenated, header, context, context->getSettings().max_insert_block_size); Pipe pipe(source); - if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function) { StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context); @@ -76,10 +66,29 @@ Pipe getSourceFromFromASTInsertQuery( } } - source->addBuffer(std::move(input_buffer_ast_part)); - source->addBuffer(std::move(input_buffer_contacenated)); - return pipe; } +ReadBuffers getReadBuffersFromASTInsertQuery(const ASTPtr & ast) +{ + const auto * insert_query = ast->as(); + if (!insert_query) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: query requires data to insert, but it is not INSERT query"); + + ReadBuffers buffers; + if (insert_query->data) + { + /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. + auto ast_buffer = std::make_unique( + insert_query->data, insert_query->end - insert_query->data); + + buffers.emplace_back(std::move(ast_buffer)); + } + + if (insert_query->tail) + buffers.emplace_back(wrapReadBufferReference(*insert_query->tail)); + + return buffers; +} + } diff --git a/src/Processors/Transforms/getSourceFromFromASTInsertQuery.h b/src/Processors/Transforms/getSourceFromASTInsertQuery.h similarity index 59% rename from src/Processors/Transforms/getSourceFromFromASTInsertQuery.h rename to src/Processors/Transforms/getSourceFromASTInsertQuery.h index 3c00bd47ea0..d48bd57cf50 100644 --- a/src/Processors/Transforms/getSourceFromFromASTInsertQuery.h +++ b/src/Processors/Transforms/getSourceFromASTInsertQuery.h @@ -5,22 +5,24 @@ #include #include - namespace DB { +class ReadBuffer; +class ASTInsertQuery; +using ReadBuffers = std::vector>; + /** Prepares a pipe which produce data containing in INSERT query * Head of inserting data could be stored in INSERT ast directly * Remaining (tail) data could be stored in input_buffer_tail_part */ - -class Pipe; - -Pipe getSourceFromFromASTInsertQuery( +Pipe getSourceFromASTInsertQuery( const ASTPtr & ast, - ReadBuffer * input_buffer_tail_part, const Block & header, + ReadBuffers read_buffers, ContextPtr context, - const ASTPtr & input_function); + const ASTPtr & input_function = nullptr); + +ReadBuffers getReadBuffersFromASTInsertQuery(const ASTPtr & ast); } diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 4b95484a828..38ec0adaaba 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -178,7 +178,7 @@ SRCS( Transforms/SortingTransform.cpp Transforms/TotalsHavingTransform.cpp Transforms/WindowTransform.cpp - Transforms/getSourceFromFromASTInsertQuery.cpp + Transforms/getSourceFromASTInsertQuery.cpp printPipeline.cpp )