diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 293849b88b7..992705f1c3c 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -181,7 +182,8 @@ void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, }); } -std::future AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context) +AsynchronousInsertQueue::PushResult +AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context) { query = query->clone(); const auto & settings = query_context->getSettingsRef(); @@ -201,9 +203,32 @@ std::future AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_c String bytes; { + /// Read at most 'async_insert_max_data_size' bytes of data. + /// If limit is exceeded we will fallback to synchronous insert + /// to avoid buffering of huge amount of data in memory. + auto read_buf = getReadBufferFromASTInsertQuery(query); + LimitReadBuffer limit_buf(*read_buf, settings.async_insert_max_data_size, false); + WriteBufferFromString write_buf(bytes); - copyData(*read_buf, write_buf); + copyData(limit_buf, write_buf); + + if (!read_buf->eof()) + { + write_buf.finalize(); + + /// Concat read buffer with already extracted from insert + /// query data and with the rest data from insert query. + std::vector> buffers; + buffers.emplace_back(std::make_unique(bytes)); + buffers.emplace_back(std::move(read_buf)); + + return PushResult + { + .status = PushResult::TOO_MUCH_DATA, + .insert_data_buffer = std::make_unique(std::move(buffers)), + }; + } } if (auto quota = query_context->getQuota()) @@ -263,7 +288,11 @@ std::future AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_c else shard.are_tasks_available.notify_one(); - return insert_future; + return PushResult + { + .status = PushResult::OK, + .future = std::move(insert_future), + }; } void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num) diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index ee1265673a6..23a2860364d 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -19,7 +19,25 @@ public: AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_); ~AsynchronousInsertQueue(); - std::future push(ASTPtr query, ContextPtr query_context); + struct PushResult + { + enum Status + { + OK, + TOO_MUCH_DATA, + }; + + Status status; + + /// Future that allows to wait until the query is flushed. + std::future future; + + /// Read buffer that contains extracted + /// from query data in case of too much data. + std::unique_ptr insert_data_buffer; + }; + + PushResult push(ASTPtr query, ContextPtr query_context); size_t getPoolSize() const { return pool_size; } private: diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 2569e6ddc33..b4a19ea7403 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -540,7 +540,11 @@ BlockIO InterpreterInsertQuery::execute() if (query.hasInlinedData() && !async_insert) { /// can execute without additional data - auto pipe = getSourceFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr); + auto format = getInputFormatFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr); + for (auto && buffer : owned_buffers) + format->addBuffer(std::move(buffer)); + + auto pipe = getSourceFromInputFormat(query_ptr, std::move(format), getContext(), nullptr); res.pipeline.complete(std::move(pipe)); } } diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 9b3f617e4b3..bb6509a9102 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -52,6 +52,8 @@ public: bool supportsTransactions() const override { return true; } + void addBuffer(std::unique_ptr buffer) { owned_buffers.push_back(std::move(buffer)); } + private: Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; @@ -61,6 +63,8 @@ private: const bool no_destination; const bool async_insert; + std::vector> owned_buffers; + Chain buildChainImpl( const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index ca7544df4b9..c233060e646 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -590,6 +590,7 @@ static std::tuple executeQueryImpl( bool async_insert = false; auto * queue = context->getAsynchronousInsertQueue(); + auto * logger = &Poco::Logger::get("executeQuery"); if (insert_query && settings.async_insert) { @@ -605,41 +606,62 @@ static std::tuple executeQueryImpl( async_insert = true; if (!async_insert) - { - LOG_DEBUG(&Poco::Logger::get("executeQuery"), - "Setting async_insert=1, but INSERT query will be executed synchronously (reason: {})", reason); - } + LOG_DEBUG(logger, "Setting async_insert=1, but INSERT query will be executed synchronously (reason: {})", reason); } + bool quota_checked = false; + std::unique_ptr insert_data_buffer_holder; + if (async_insert) { + if (context->getCurrentTransaction() && settings.throw_on_unsupported_query_inside_transaction) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts inside transactions are not supported"); + if (settings.implicit_transaction && settings.throw_on_unsupported_query_inside_transaction) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts with 'implicit_transaction' are not supported"); + quota = context->getQuota(); if (quota) { + quota_checked = true; quota->used(QuotaType::QUERY_INSERTS, 1); quota->used(QuotaType::QUERIES, 1); quota->checkExceeded(QuotaType::ERRORS); } - auto insert_future = queue->push(ast, context); + auto result = queue->push(ast, context); - if (settings.wait_for_async_insert) + if (result.status == AsynchronousInsertQueue::PushResult::OK) { - auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds(); - auto source = std::make_shared(std::move(insert_future), timeout); - res.pipeline = QueryPipeline(Pipe(std::move(source))); + if (settings.wait_for_async_insert) + { + auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds(); + auto source = std::make_shared(std::move(result.future), timeout); + res.pipeline = QueryPipeline(Pipe(std::move(source))); + } + + const auto & table_id = insert_query->table_id; + if (!table_id.empty()) + context->setInsertionTable(table_id); } + else if (result.status == AsynchronousInsertQueue::PushResult::TOO_MUCH_DATA) + { + async_insert = false; + insert_data_buffer_holder = std::move(result.insert_data_buffer); - const auto & table_id = insert_query->table_id; - if (!table_id.empty()) - context->setInsertionTable(table_id); + if (insert_query->data) + { + /// Reset inlined data because it will be + /// available from tail read buffer. + insert_query->end = insert_query->data; + insert_query->data = nullptr; + } - if (context->getCurrentTransaction() && settings.throw_on_unsupported_query_inside_transaction) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts inside transactions are not supported"); - if (settings.implicit_transaction && settings.throw_on_unsupported_query_inside_transaction) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts with 'implicit_transaction' are not supported"); + insert_query->tail = insert_data_buffer_holder.get(); + LOG_DEBUG(logger, "Setting async_insert=1, but INSERT query will be executed synchronously because it has too much data"); + } } - else + + if (!async_insert) { /// We need to start the (implicit) transaction before getting the interpreter as this will get links to the latest snapshots if (!context->getCurrentTransaction() && settings.implicit_transaction && !ast->as()) @@ -671,7 +693,7 @@ static std::tuple executeQueryImpl( context->getSettingsRef().throw_on_unsupported_query_inside_transaction) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported for this type of query ({})", ast->getID()); - if (!interpreter->ignoreQuota()) + if (!interpreter->ignoreQuota() && !quota_checked) { quota = context->getQuota(); if (quota) @@ -695,12 +717,15 @@ static std::tuple executeQueryImpl( limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); } - if (const auto * insert_interpreter = typeid_cast(&*interpreter)) + if (auto * insert_interpreter = typeid_cast(&*interpreter)) { /// Save insertion table (not table function). TODO: support remote() table function. auto table_id = insert_interpreter->getDatabaseTable(); if (!table_id.empty()) context->setInsertionTable(std::move(table_id)); + + if (insert_data_buffer_holder) + insert_interpreter->addBuffer(std::move(insert_data_buffer_holder)); } { diff --git a/src/Processors/Transforms/getSourceFromASTInsertQuery.cpp b/src/Processors/Transforms/getSourceFromASTInsertQuery.cpp index ab7cfca3de2..6c7c7447070 100644 --- a/src/Processors/Transforms/getSourceFromASTInsertQuery.cpp +++ b/src/Processors/Transforms/getSourceFromASTInsertQuery.cpp @@ -64,15 +64,13 @@ InputFormatPtr getInputFormatFromASTInsertQuery( return source; } -Pipe getSourceFromASTInsertQuery( +Pipe getSourceFromInputFormat( const ASTPtr & ast, - bool with_buffers, - const Block & header, + InputFormatPtr format, ContextPtr context, const ASTPtr & input_function) { - auto source = getInputFormatFromASTInsertQuery(ast, with_buffers, header, context, input_function); - Pipe pipe(source); + Pipe pipe(format); const auto * ast_insert_query = ast->as(); if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function) @@ -84,7 +82,7 @@ Pipe getSourceFromASTInsertQuery( { pipe.addSimpleTransform([&](const Block & cur_header) { - return std::make_shared(cur_header, columns, *source, context); + return std::make_shared(cur_header, columns, *format, context); }); } } @@ -92,6 +90,17 @@ Pipe getSourceFromASTInsertQuery( return pipe; } +Pipe getSourceFromASTInsertQuery( + const ASTPtr & ast, + bool with_buffers, + const Block & header, + ContextPtr context, + const ASTPtr & input_function) +{ + auto format = getInputFormatFromASTInsertQuery(ast, with_buffers, header, context, input_function); + return getSourceFromInputFormat(ast, std::move(format), std::move(context), input_function); +} + std::unique_ptr getReadBufferFromASTInsertQuery(const ASTPtr & ast) { const auto * insert_query = ast->as(); diff --git a/src/Processors/Transforms/getSourceFromASTInsertQuery.h b/src/Processors/Transforms/getSourceFromASTInsertQuery.h index 4a5ed952efc..dc541873972 100644 --- a/src/Processors/Transforms/getSourceFromASTInsertQuery.h +++ b/src/Processors/Transforms/getSourceFromASTInsertQuery.h @@ -14,19 +14,27 @@ class Pipe; /// Prepares a input format, which produce data containing in INSERT query. InputFormatPtr getInputFormatFromASTInsertQuery( - const ASTPtr & ast, - bool with_buffers, - const Block & header, - ContextPtr context, - const ASTPtr & input_function); + const ASTPtr & ast, + bool with_buffers, + const Block & header, + ContextPtr context, + const ASTPtr & input_function); + +/// Prepares a pipe from input format got from ASTInsertQuery, +/// which produce data containing in INSERT query. +Pipe getSourceFromInputFormat( + const ASTPtr & ast, + InputFormatPtr format, + ContextPtr context, + const ASTPtr & input_function); /// Prepares a pipe which produce data containing in INSERT query. Pipe getSourceFromASTInsertQuery( - const ASTPtr & ast, - bool with_buffers, - const Block & header, - ContextPtr context, - const ASTPtr & input_function); + const ASTPtr & ast, + bool with_buffers, + const Block & header, + ContextPtr context, + const ASTPtr & input_function); class ReadBuffer; diff --git a/tests/queries/0_stateless/02668_fallback_from_async_insert.reference b/tests/queries/0_stateless/02668_fallback_from_async_insert.reference new file mode 100644 index 00000000000..7aa58724b9e --- /dev/null +++ b/tests/queries/0_stateless/02668_fallback_from_async_insert.reference @@ -0,0 +1,23 @@ +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +id_0 +id_1 +id_2 +id_3 +id_4 diff --git a/tests/queries/0_stateless/02668_fallback_from_async_insert.sh b/tests/queries/0_stateless/02668_fallback_from_async_insert.sh new file mode 100755 index 00000000000..9c158d6241b --- /dev/null +++ b/tests/queries/0_stateless/02668_fallback_from_async_insert.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +message="INSERT query will be executed synchronously because it has too much data" + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS t_async_insert_fallback" +$CLICKHOUSE_CLIENT --query "CREATE TABLE t_async_insert_fallback (a UInt64) ENGINE = Memory" + +query_id_suffix="${CLICKHOUSE_DATABASE}_${RANDOM}" + +# inlined data via native protocol +$CLICKHOUSE_CLIENT \ + --query_id "0_$query_id_suffix" \ + --async_insert 1 \ + --async_insert_max_data_size 5 \ + --query "INSERT INTO t_async_insert_fallback VALUES (1) (2) (3)" + +# inlined data via http +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query_id=1_$query_id_suffix&async_insert=1&async_insert_max_data_size=3" \ + -d "INSERT INTO t_async_insert_fallback VALUES (4) (5) (6)" + +# partially inlined partially sent via post data +${CLICKHOUSE_CURL} -sS -X POST \ + "${CLICKHOUSE_URL}&query_id=2_$query_id_suffix&async_insert=1&async_insert_max_data_size=5&query=INSERT+INTO+t_async_insert_fallback+VALUES+(7)" \ + --data-binary @- <<< "(8) (9)" + +# partially inlined partially sent via post data +${CLICKHOUSE_CURL} -sS -X POST \ + "${CLICKHOUSE_URL}&query_id=3_$query_id_suffix&async_insert=1&async_insert_max_data_size=5&query=INSERT+INTO+t_async_insert_fallback+VALUES+(10)+(11)" \ + --data-binary @- <<< "(12)" + +# sent via post data +${CLICKHOUSE_CURL} -sS -X POST \ + "${CLICKHOUSE_URL}&query_id=4_$query_id_suffix&async_insert=1&async_insert_max_data_size=5&query=INSERT+INTO+t_async_insert_fallback+FORMAT+Values" \ + --data-binary @- <<< "(13) (14) (15)" + +# no limit for async insert size +${CLICKHOUSE_CURL} -sS -X POST \ + "${CLICKHOUSE_URL}&query_id=5_$query_id_suffix&async_insert=1&query=INSERT+INTO+t_async_insert_fallback+FORMAT+Values" \ + --data-binary @- <<< "(16) (17) (18)" + +$CLICKHOUSE_CLIENT --query "SELECT * FROM t_async_insert_fallback ORDER BY a" +$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS" +$CLICKHOUSE_CLIENT --query " + SELECT 'id_' || splitByChar('_', query_id)[1] AS id FROM system.text_log + WHERE query_id LIKE '%$query_id_suffix' AND message LIKE '%$message%' +" + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS t_async_insert_fallback"