Merge pull request #68741 from CurtizJ/fix-async-insert-query-params

Support query parameters in async inserts
This commit is contained in:
Anton Popov 2024-08-27 16:10:59 +00:00 committed by GitHub
commit f7ed0912db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 98 additions and 0 deletions

View File

@ -389,6 +389,10 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
if (data_kind == DataKind::Preprocessed)
insert_query.format = "Native";
/// Query parameters make sense only for format Values.
if (insert_query.format == "Values")
entry->query_parameters = query_context->getQueryParameters();
InsertQuery key{query, query_context->getUserID(), query_context->getCurrentRoles(), settings, data_kind};
InsertDataPtr data_to_process;
std::future<void> insert_future;
@ -999,6 +1003,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
"Expected entry with data kind Parsed. Got: {}", entry->chunk.getDataKind());
auto buffer = std::make_unique<ReadBufferFromString>(*bytes);
executor.setQueryParameters(entry->query_parameters);
size_t num_bytes = bytes->size();
size_t num_rows = executor.execute(*buffer);

View File

@ -147,6 +147,7 @@ private:
const String format;
MemoryTracker * const user_memory_tracker;
const std::chrono::time_point<std::chrono::system_clock> create_time;
NameToNameMap query_parameters;
Entry(
DataChunk && chunk_,

View File

@ -1,5 +1,6 @@
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
namespace DB
{
@ -32,6 +33,13 @@ MutableColumns StreamingFormatExecutor::getResultColumns()
return ret_columns;
}
void StreamingFormatExecutor::setQueryParameters(const NameToNameMap & parameters)
{
/// Query parameters make sense only for format Values.
if (auto * values_format = typeid_cast<ValuesBlockInputFormat *>(format.get()))
values_format->setQueryParameters(parameters);
}
size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
{
format->setReadBuffer(buffer);

View File

@ -39,6 +39,9 @@ public:
/// Releases currently accumulated columns.
MutableColumns getResultColumns();
/// Sets query parameters for input format if applicable.
void setQueryParameters(const NameToNameMap & parameters);
private:
const Block header;
const InputFormatPtr format;

View File

@ -663,6 +663,16 @@ void ValuesBlockInputFormat::resetReadBuffer()
IInputFormat::resetReadBuffer();
}
void ValuesBlockInputFormat::setQueryParameters(const NameToNameMap & parameters)
{
if (parameters == context->getQueryParameters())
return;
auto context_copy = Context::createCopy(context);
context_copy->setQueryParameters(parameters);
context = std::move(context_copy);
}
ValuesSchemaReader::ValuesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowSchemaReader(buf, format_settings_), buf(in_)
{

View File

@ -38,6 +38,7 @@ public:
/// TODO: remove context somehow.
void setContext(const ContextPtr & context_) { context = Context::createCopy(context_); }
void setQueryParameters(const NameToNameMap & parameters);
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }

View File

@ -0,0 +1,13 @@
11
12
13
14
15
16
17
18
19
20
21
22
23

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
# Tags: no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "
DROP TABLE IF EXISTS t_async_insert_params;
CREATE TABLE t_async_insert_params (id UInt64) ENGINE = Memory;
"
cmd_params="--async_insert 1 --async_insert_busy_timeout_max_ms 300000 --async_insert_busy_timeout_min_ms 300000 --wait_for_async_insert 0 --async_insert_use_adaptive_busy_timeout 0"
$CLICKHOUSE_CLIENT $cmd_params -q "SET param_p1 = 11; INSERT INTO t_async_insert_params VALUES ({p1:UInt64});"
$CLICKHOUSE_CLIENT $cmd_params -q "SET param_p2 = 12; INSERT INTO t_async_insert_params VALUES ({p2:UInt64});"
$CLICKHOUSE_CLIENT $cmd_params -q "SET param_p2 = 1000; INSERT INTO t_async_insert_params VALUES (13);"
$CLICKHOUSE_CLIENT $cmd_params -q 'SET param_p2 = 1000; INSERT INTO t_async_insert_params FORMAT JSONEachRow {"id": 14};'
$CLICKHOUSE_CLIENT $cmd_params --param_p1 15 -q "INSERT INTO t_async_insert_params VALUES ({p1:UInt64});"
$CLICKHOUSE_CLIENT $cmd_params --param_p2 16 -q "INSERT INTO t_async_insert_params VALUES ({p2:UInt64});"
$CLICKHOUSE_CLIENT $cmd_params --param_p2 1000 -q "INSERT INTO t_async_insert_params VALUES (17);"
$CLICKHOUSE_CLIENT $cmd_params --param_p2 1000 -q 'INSERT INTO t_async_insert_params FORMAT JSONEachRow {"id": 18};'
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=300000&async_insert_busy_timeout_min_ms=300000&wait_for_async_insert=0&async_insert_use_adaptive_busy_timeout=0"
${CLICKHOUSE_CURL} -sS "$url&param_p1=19" -d "INSERT INTO t_async_insert_params VALUES ({p1:UInt64})"
${CLICKHOUSE_CURL} -sS "$url&param_p2=20" -d "INSERT INTO t_async_insert_params VALUES ({p2:UInt64})"
${CLICKHOUSE_CURL} -sS "$url&param_p3=21" -d "INSERT INTO t_async_insert_params VALUES ({p3:UInt64})"
${CLICKHOUSE_CURL} -sS "$url&param_p2=1000" -d "INSERT INTO t_async_insert_params VALUES (22)"
${CLICKHOUSE_CURL} -sS "$url&param_p2=1000" -d 'INSERT INTO t_async_insert_params FORMAT JSONEachRow {"id": 23}'
$CLICKHOUSE_CLIENT -q "
SYSTEM FLUSH ASYNC INSERT QUEUE;
SELECT id FROM t_async_insert_params ORDER BY id;
DROP TABLE IF EXISTS t_async_insert_params;
"

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS t_async_insert_params;
CREATE TABLE t_async_insert_params (id UInt64) ENGINE = MergeTree ORDER BY tuple();
SET param_p1 = 'Hello';
SET async_insert = 1;
SET wait_for_async_insert = 1;
INSERT INTO t_async_insert_params VALUES ({p1:UInt64}); -- { serverError BAD_QUERY_PARAMETER }
INSERT INTO t_async_insert_params VALUES ({p1:String}); -- { serverError TYPE_MISMATCH }
ALTER TABLE t_async_insert_params MODIFY COLUMN id String;
INSERT INTO t_async_insert_params VALUES ({p1:UInt64}); -- { serverError BAD_QUERY_PARAMETER }
INSERT INTO t_async_insert_params VALUES ({p1:String});
SELECT * FROM t_async_insert_params ORDER BY id;
DROP TABLE t_async_insert_params;