allow to fallback from async insert in case of large amount of data

This commit is contained in:
Anton Popov 2023-02-22 21:52:49 +00:00
parent 49330b373c
commit d5864fa88e
9 changed files with 212 additions and 40 deletions

View File

@ -13,6 +13,7 @@
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/LimitReadBuffer.h>
#include <IO/copyData.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/queryToString.h>
@ -181,7 +182,8 @@ void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key,
});
}
std::future<void> AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
AsynchronousInsertQueue::PushResult
AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
{
query = query->clone();
const auto & settings = query_context->getSettingsRef();
@ -201,9 +203,32 @@ std::future<void> AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_c
String bytes;
{
/// Read at most 'async_insert_max_data_size' bytes of data.
/// If limit is exceeded we will fallback to synchronous insert
/// to avoid buffering of huge amount of data in memory.
auto read_buf = getReadBufferFromASTInsertQuery(query);
LimitReadBuffer limit_buf(*read_buf, settings.async_insert_max_data_size, false);
WriteBufferFromString write_buf(bytes);
copyData(*read_buf, write_buf);
copyData(limit_buf, write_buf);
if (!read_buf->eof())
{
write_buf.finalize();
/// Concat read buffer with already extracted from insert
/// query data and with the rest data from insert query.
std::vector<std::unique_ptr<ReadBuffer>> buffers;
buffers.emplace_back(std::make_unique<ReadBufferFromOwnString>(bytes));
buffers.emplace_back(std::move(read_buf));
return PushResult
{
.status = PushResult::TOO_MUCH_DATA,
.insert_data_buffer = std::make_unique<ConcatReadBuffer>(std::move(buffers)),
};
}
}
if (auto quota = query_context->getQuota())
@ -263,7 +288,11 @@ std::future<void> AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_c
else
shard.are_tasks_available.notify_one();
return insert_future;
return PushResult
{
.status = PushResult::OK,
.future = std::move(insert_future),
};
}
void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)

View File

@ -19,7 +19,25 @@ public:
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_);
~AsynchronousInsertQueue();
std::future<void> push(ASTPtr query, ContextPtr query_context);
struct PushResult
{
enum Status
{
OK,
TOO_MUCH_DATA,
};
Status status;
/// Future that allows to wait until the query is flushed.
std::future<void> future;
/// Read buffer that contains extracted
/// from query data in case of too much data.
std::unique_ptr<ReadBuffer> insert_data_buffer;
};
PushResult push(ASTPtr query, ContextPtr query_context);
size_t getPoolSize() const { return pool_size; }
private:

View File

@ -540,7 +540,11 @@ BlockIO InterpreterInsertQuery::execute()
if (query.hasInlinedData() && !async_insert)
{
/// can execute without additional data
auto pipe = getSourceFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr);
auto format = getInputFormatFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr);
for (auto && buffer : owned_buffers)
format->addBuffer(std::move(buffer));
auto pipe = getSourceFromInputFormat(query_ptr, std::move(format), getContext(), nullptr);
res.pipeline.complete(std::move(pipe));
}
}

View File

@ -52,6 +52,8 @@ public:
bool supportsTransactions() const override { return true; }
void addBuffer(std::unique_ptr<ReadBuffer> buffer) { owned_buffers.push_back(std::move(buffer)); }
private:
Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
@ -61,6 +63,8 @@ private:
const bool no_destination;
const bool async_insert;
std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
Chain buildChainImpl(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -590,6 +590,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool async_insert = false;
auto * queue = context->getAsynchronousInsertQueue();
auto * logger = &Poco::Logger::get("executeQuery");
if (insert_query && settings.async_insert)
{
@ -605,41 +606,62 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
async_insert = true;
if (!async_insert)
{
LOG_DEBUG(&Poco::Logger::get("executeQuery"),
"Setting async_insert=1, but INSERT query will be executed synchronously (reason: {})", reason);
}
LOG_DEBUG(logger, "Setting async_insert=1, but INSERT query will be executed synchronously (reason: {})", reason);
}
bool quota_checked = false;
std::unique_ptr<ReadBuffer> insert_data_buffer_holder;
if (async_insert)
{
if (context->getCurrentTransaction() && settings.throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts inside transactions are not supported");
if (settings.implicit_transaction && settings.throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts with 'implicit_transaction' are not supported");
quota = context->getQuota();
if (quota)
{
quota_checked = true;
quota->used(QuotaType::QUERY_INSERTS, 1);
quota->used(QuotaType::QUERIES, 1);
quota->checkExceeded(QuotaType::ERRORS);
}
auto insert_future = queue->push(ast, context);
auto result = queue->push(ast, context);
if (settings.wait_for_async_insert)
if (result.status == AsynchronousInsertQueue::PushResult::OK)
{
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto source = std::make_shared<WaitForAsyncInsertSource>(std::move(insert_future), timeout);
res.pipeline = QueryPipeline(Pipe(std::move(source)));
if (settings.wait_for_async_insert)
{
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto source = std::make_shared<WaitForAsyncInsertSource>(std::move(result.future), timeout);
res.pipeline = QueryPipeline(Pipe(std::move(source)));
}
const auto & table_id = insert_query->table_id;
if (!table_id.empty())
context->setInsertionTable(table_id);
}
else if (result.status == AsynchronousInsertQueue::PushResult::TOO_MUCH_DATA)
{
async_insert = false;
insert_data_buffer_holder = std::move(result.insert_data_buffer);
const auto & table_id = insert_query->table_id;
if (!table_id.empty())
context->setInsertionTable(table_id);
if (insert_query->data)
{
/// Reset inlined data because it will be
/// available from tail read buffer.
insert_query->end = insert_query->data;
insert_query->data = nullptr;
}
if (context->getCurrentTransaction() && settings.throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts inside transactions are not supported");
if (settings.implicit_transaction && settings.throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts with 'implicit_transaction' are not supported");
insert_query->tail = insert_data_buffer_holder.get();
LOG_DEBUG(logger, "Setting async_insert=1, but INSERT query will be executed synchronously because it has too much data");
}
}
else
if (!async_insert)
{
/// We need to start the (implicit) transaction before getting the interpreter as this will get links to the latest snapshots
if (!context->getCurrentTransaction() && settings.implicit_transaction && !ast->as<ASTTransactionControl>())
@ -671,7 +693,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported for this type of query ({})", ast->getID());
if (!interpreter->ignoreQuota())
if (!interpreter->ignoreQuota() && !quota_checked)
{
quota = context->getQuota();
if (quota)
@ -695,12 +717,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
if (auto * insert_interpreter = typeid_cast<InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
if (insert_data_buffer_holder)
insert_interpreter->addBuffer(std::move(insert_data_buffer_holder));
}
{

View File

@ -64,15 +64,13 @@ InputFormatPtr getInputFormatFromASTInsertQuery(
return source;
}
Pipe getSourceFromASTInsertQuery(
Pipe getSourceFromInputFormat(
const ASTPtr & ast,
bool with_buffers,
const Block & header,
InputFormatPtr format,
ContextPtr context,
const ASTPtr & input_function)
{
auto source = getInputFormatFromASTInsertQuery(ast, with_buffers, header, context, input_function);
Pipe pipe(source);
Pipe pipe(format);
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
@ -84,7 +82,7 @@ Pipe getSourceFromASTInsertQuery(
{
pipe.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *source, context);
return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *format, context);
});
}
}
@ -92,6 +90,17 @@ Pipe getSourceFromASTInsertQuery(
return pipe;
}
Pipe getSourceFromASTInsertQuery(
const ASTPtr & ast,
bool with_buffers,
const Block & header,
ContextPtr context,
const ASTPtr & input_function)
{
auto format = getInputFormatFromASTInsertQuery(ast, with_buffers, header, context, input_function);
return getSourceFromInputFormat(ast, std::move(format), std::move(context), input_function);
}
std::unique_ptr<ReadBuffer> getReadBufferFromASTInsertQuery(const ASTPtr & ast)
{
const auto * insert_query = ast->as<ASTInsertQuery>();

View File

@ -14,19 +14,27 @@ class Pipe;
/// Prepares a input format, which produce data containing in INSERT query.
InputFormatPtr getInputFormatFromASTInsertQuery(
const ASTPtr & ast,
bool with_buffers,
const Block & header,
ContextPtr context,
const ASTPtr & input_function);
const ASTPtr & ast,
bool with_buffers,
const Block & header,
ContextPtr context,
const ASTPtr & input_function);
/// Prepares a pipe from input format got from ASTInsertQuery,
/// which produce data containing in INSERT query.
Pipe getSourceFromInputFormat(
const ASTPtr & ast,
InputFormatPtr format,
ContextPtr context,
const ASTPtr & input_function);
/// Prepares a pipe which produce data containing in INSERT query.
Pipe getSourceFromASTInsertQuery(
const ASTPtr & ast,
bool with_buffers,
const Block & header,
ContextPtr context,
const ASTPtr & input_function);
const ASTPtr & ast,
bool with_buffers,
const Block & header,
ContextPtr context,
const ASTPtr & input_function);
class ReadBuffer;

View File

@ -0,0 +1,23 @@
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
id_0
id_1
id_2
id_3
id_4

View File

@ -0,0 +1,52 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
message="INSERT query will be executed synchronously because it has too much data"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS t_async_insert_fallback"
$CLICKHOUSE_CLIENT --query "CREATE TABLE t_async_insert_fallback (a UInt64) ENGINE = Memory"
query_id_suffix="${CLICKHOUSE_DATABASE}_${RANDOM}"
# inlined data via native protocol
$CLICKHOUSE_CLIENT \
--query_id "0_$query_id_suffix" \
--async_insert 1 \
--async_insert_max_data_size 5 \
--query "INSERT INTO t_async_insert_fallback VALUES (1) (2) (3)"
# inlined data via http
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query_id=1_$query_id_suffix&async_insert=1&async_insert_max_data_size=3" \
-d "INSERT INTO t_async_insert_fallback VALUES (4) (5) (6)"
# partially inlined partially sent via post data
${CLICKHOUSE_CURL} -sS -X POST \
"${CLICKHOUSE_URL}&query_id=2_$query_id_suffix&async_insert=1&async_insert_max_data_size=5&query=INSERT+INTO+t_async_insert_fallback+VALUES+(7)" \
--data-binary @- <<< "(8) (9)"
# partially inlined partially sent via post data
${CLICKHOUSE_CURL} -sS -X POST \
"${CLICKHOUSE_URL}&query_id=3_$query_id_suffix&async_insert=1&async_insert_max_data_size=5&query=INSERT+INTO+t_async_insert_fallback+VALUES+(10)+(11)" \
--data-binary @- <<< "(12)"
# sent via post data
${CLICKHOUSE_CURL} -sS -X POST \
"${CLICKHOUSE_URL}&query_id=4_$query_id_suffix&async_insert=1&async_insert_max_data_size=5&query=INSERT+INTO+t_async_insert_fallback+FORMAT+Values" \
--data-binary @- <<< "(13) (14) (15)"
# no limit for async insert size
${CLICKHOUSE_CURL} -sS -X POST \
"${CLICKHOUSE_URL}&query_id=5_$query_id_suffix&async_insert=1&query=INSERT+INTO+t_async_insert_fallback+FORMAT+Values" \
--data-binary @- <<< "(16) (17) (18)"
$CLICKHOUSE_CLIENT --query "SELECT * FROM t_async_insert_fallback ORDER BY a"
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --query "
SELECT 'id_' || splitByChar('_', query_id)[1] AS id FROM system.text_log
WHERE query_id LIKE '%$query_id_suffix' AND message LIKE '%$message%'
"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS t_async_insert_fallback"