support adding defaults in async inserts

This commit is contained in:
Anton Popov 2021-09-08 17:08:57 +03:00
parent 66173d2cc0
commit f864c4252a
6 changed files with 71 additions and 13 deletions

View File

@ -342,8 +342,9 @@ try
return;
const auto * log = &Poco::Logger::get("AsynchronousInsertQueue");
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*key.query);
auto insert_context = Context::createCopy(global_context);
/// 'resetParser' doesn't work for parallel parsing.
key.settings.set("input_format_parallel_parsing", false);
insert_context->makeQueryContext();
@ -372,8 +373,17 @@ try
return 0;
};
StreamingFormatExecutor executor(header, format, std::move(on_error));
std::shared_ptr<ISimpleTransform> adding_defaults_transform;
if (insert_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
StoragePtr storage = DatabaseCatalog::instance().getTable(insert_query.table_id, insert_context);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
const auto & columns = metadata_snapshot->getColumns();
if (columns.hasDefaults())
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
}
StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> buffer;
for (const auto & entry : data->entries)
{

View File

@ -1,4 +1,5 @@
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <iostream>
namespace DB
@ -10,10 +11,14 @@ namespace ErrorCodes
}
StreamingFormatExecutor::StreamingFormatExecutor(
const Block & header_, InputFormatPtr format_, ErrorCallback on_error_)
const Block & header_,
InputFormatPtr format_,
ErrorCallback on_error_,
SimpleTransformPtr adding_defaults_transform_)
: header(header_)
, format(std::move(format_))
, on_error(std::move(on_error_))
, adding_defaults_transform(std::move(adding_defaults_transform_))
, port(format->getPort().getHeader(), format.get())
{
connect(format->getPort(), port);
@ -51,6 +56,8 @@ size_t StreamingFormatExecutor::execute()
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
if (adding_defaults_transform)
adding_defaults_transform->transform(chunk);
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;

View File

@ -1,10 +1,13 @@
#pragma once
#include <Processors/Formats/IInputFormat.h>
#include <Processors/ISimpleTransform.h>
namespace DB
{
using SimpleTransformPtr = std::shared_ptr<ISimpleTransform>;
class StreamingFormatExecutor
{
public:
@ -13,17 +16,19 @@ public:
StreamingFormatExecutor(
const Block & header_,
InputFormatPtr format_,
ErrorCallback on_error_ = [](const MutableColumns &, Exception &) -> size_t { throw; });
ErrorCallback on_error_ = [](const MutableColumns &, Exception &) -> size_t { throw; },
SimpleTransformPtr adding_defaults_transform_ = nullptr);
size_t execute();
MutableColumns getResultColumns();
private:
Block header;
InputFormatPtr format;
ErrorCallback on_error;
InputPort port;
const Block header;
const InputFormatPtr format;
const ErrorCallback on_error;
const SimpleTransformPtr adding_defaults_transform;
InputPort port;
MutableColumns result_columns;
};

View File

@ -32,11 +32,6 @@ protected:
/// This allows to escape caching chunks in input port, which can lead to uneven data distribution.
bool set_input_not_needed_after_read = true;
virtual void transform(Chunk &)
{
throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual void transform(Chunk & input_chunk, Chunk & output_chunk)
{
transform(input_chunk);
@ -49,6 +44,11 @@ protected:
public:
ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_);
virtual void transform(Chunk &)
{
throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
Status prepare() override;
void work() override;

View File

@ -0,0 +1,8 @@
1 1
2 4
3 9
4 16
5 25
6 36
7 49
8 64

View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, v UInt32 DEFAULT id * id) ENGINE = Memory"
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV
1,
2,' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV
3,
4,
' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 5} {"id": 6}' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 7} {"id": 8}' &
wait
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts ORDER BY id"
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts"