mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-09 17:14:47 +00:00
support query paramters in async inserts
This commit is contained in:
parent
00aa60ca03
commit
ad60876777
@ -389,6 +389,10 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
|
||||
if (data_kind == DataKind::Preprocessed)
|
||||
insert_query.format = "Native";
|
||||
|
||||
/// Query parameters make sense only for format Values.
|
||||
if (insert_query.format == "Values")
|
||||
entry->query_parameters = query_context->getQueryParameters();
|
||||
|
||||
InsertQuery key{query, query_context->getUserID(), query_context->getCurrentRoles(), settings, data_kind};
|
||||
InsertDataPtr data_to_process;
|
||||
std::future<void> insert_future;
|
||||
@ -999,6 +1003,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
|
||||
"Expected entry with data kind Parsed. Got: {}", entry->chunk.getDataKind());
|
||||
|
||||
auto buffer = std::make_unique<ReadBufferFromString>(*bytes);
|
||||
executor.setQueryParameters(entry->query_parameters);
|
||||
|
||||
size_t num_bytes = bytes->size();
|
||||
size_t num_rows = executor.execute(*buffer);
|
||||
|
@ -147,6 +147,7 @@ private:
|
||||
const String format;
|
||||
MemoryTracker * const user_memory_tracker;
|
||||
const std::chrono::time_point<std::chrono::system_clock> create_time;
|
||||
NameToNameMap query_parameters;
|
||||
|
||||
Entry(
|
||||
DataChunk && chunk_,
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <Processors/Executors/StreamingFormatExecutor.h>
|
||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -32,6 +33,19 @@ MutableColumns StreamingFormatExecutor::getResultColumns()
|
||||
return ret_columns;
|
||||
}
|
||||
|
||||
void StreamingFormatExecutor::setQueryParameters(const NameToNameMap & parameters)
|
||||
{
|
||||
if (parameters.empty())
|
||||
return;
|
||||
|
||||
/// Query parameters make sense only for format Values.
|
||||
auto * values_format = typeid_cast<ValuesBlockInputFormat *>(format.get());
|
||||
if (!values_format)
|
||||
return;
|
||||
|
||||
values_format->setQueryParameters(parameters);
|
||||
}
|
||||
|
||||
size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
|
||||
{
|
||||
format->setReadBuffer(buffer);
|
||||
|
@ -39,6 +39,9 @@ public:
|
||||
/// Releases currently accumulated columns.
|
||||
MutableColumns getResultColumns();
|
||||
|
||||
/// Sets query parameters for input format if applicable.
|
||||
void setQueryParameters(const NameToNameMap & parameters);
|
||||
|
||||
private:
|
||||
const Block header;
|
||||
const InputFormatPtr format;
|
||||
|
@ -663,6 +663,16 @@ void ValuesBlockInputFormat::resetReadBuffer()
|
||||
IInputFormat::resetReadBuffer();
|
||||
}
|
||||
|
||||
void ValuesBlockInputFormat::setQueryParameters(const NameToNameMap & parameters)
|
||||
{
|
||||
if (parameters.empty())
|
||||
return;
|
||||
|
||||
auto context_copy = Context::createCopy(context);
|
||||
context_copy->setQueryParameters(parameters);
|
||||
context = std::move(context_copy);
|
||||
}
|
||||
|
||||
ValuesSchemaReader::ValuesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
|
||||
: IRowSchemaReader(buf, format_settings_), buf(in_)
|
||||
{
|
||||
|
@ -38,6 +38,7 @@ public:
|
||||
|
||||
/// TODO: remove context somehow.
|
||||
void setContext(const ContextPtr & context_) { context = Context::createCopy(context_); }
|
||||
void setQueryParameters(const NameToNameMap & parameters);
|
||||
|
||||
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
|
||||
|
||||
|
@ -0,0 +1,13 @@
|
||||
11
|
||||
12
|
||||
13
|
||||
14
|
||||
15
|
||||
16
|
||||
17
|
||||
18
|
||||
19
|
||||
20
|
||||
21
|
||||
22
|
||||
23
|
36
tests/queries/0_stateless/03228_async_insert_query_params.sh
Executable file
36
tests/queries/0_stateless/03228_async_insert_query_params.sh
Executable file
@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-parallel
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
DROP TABLE IF EXISTS t_async_insert_params;
|
||||
CREATE TABLE t_async_insert_params (id UInt64) ENGINE = Memory;
|
||||
"
|
||||
cmd_params="--async_insert 1 --async_insert_busy_timeout_max_ms 300000 --async_insert_busy_timeout_min_ms 300000 --wait_for_async_insert 0 --async_insert_use_adaptive_busy_timeout 0"
|
||||
|
||||
$CLICKHOUSE_CLIENT $cmd_params -q "SET param_p1 = 11; INSERT INTO t_async_insert_params VALUES ({p1:UInt64});"
|
||||
$CLICKHOUSE_CLIENT $cmd_params -q "SET param_p2 = 12; INSERT INTO t_async_insert_params VALUES ({p2:UInt64});"
|
||||
$CLICKHOUSE_CLIENT $cmd_params -q "SET param_p2 = 1000; INSERT INTO t_async_insert_params VALUES (13);"
|
||||
$CLICKHOUSE_CLIENT $cmd_params -q 'SET param_p2 = 1000; INSERT INTO t_async_insert_params FORMAT JSONEachRow {"id": 14};'
|
||||
|
||||
$CLICKHOUSE_CLIENT $cmd_params --param_p1 15 -q "INSERT INTO t_async_insert_params VALUES ({p1:UInt64});"
|
||||
$CLICKHOUSE_CLIENT $cmd_params --param_p2 16 -q "INSERT INTO t_async_insert_params VALUES ({p2:UInt64});"
|
||||
$CLICKHOUSE_CLIENT $cmd_params --param_p2 1000 -q "INSERT INTO t_async_insert_params VALUES (17);"
|
||||
$CLICKHOUSE_CLIENT $cmd_params --param_p2 1000 -q 'INSERT INTO t_async_insert_params FORMAT JSONEachRow {"id": 18};'
|
||||
|
||||
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=300000&async_insert_busy_timeout_min_ms=300000&wait_for_async_insert=0&async_insert_use_adaptive_busy_timeout=0"
|
||||
|
||||
${CLICKHOUSE_CURL} -sS "$url¶m_p1=19" -d "INSERT INTO t_async_insert_params VALUES ({p1:UInt64})"
|
||||
${CLICKHOUSE_CURL} -sS "$url¶m_p2=20" -d "INSERT INTO t_async_insert_params VALUES ({p2:UInt64})"
|
||||
${CLICKHOUSE_CURL} -sS "$url¶m_p3=21" -d "INSERT INTO t_async_insert_params VALUES ({p3:UInt64})"
|
||||
${CLICKHOUSE_CURL} -sS "$url¶m_p2=1000" -d "INSERT INTO t_async_insert_params VALUES (22)"
|
||||
${CLICKHOUSE_CURL} -sS "$url¶m_p2=1000" -d 'INSERT INTO t_async_insert_params FORMAT JSONEachRow {"id": 23}'
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
SYSTEM FLUSH ASYNC INSERT QUEUE;
|
||||
SELECT id FROM t_async_insert_params ORDER BY id;
|
||||
DROP TABLE IF EXISTS t_async_insert_params;
|
||||
"
|
Loading…
Reference in New Issue
Block a user