diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 261bafbde10..e062d87f9f6 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -342,8 +342,9 @@ try return; const auto * log = &Poco::Logger::get("AsynchronousInsertQueue"); - + const auto & insert_query = assert_cast(*key.query); auto insert_context = Context::createCopy(global_context); + /// 'resetParser' doesn't work for parallel parsing. key.settings.set("input_format_parallel_parsing", false); insert_context->makeQueryContext(); @@ -372,8 +373,17 @@ try return 0; }; - StreamingFormatExecutor executor(header, format, std::move(on_error)); + std::shared_ptr adding_defaults_transform; + if (insert_context->getSettingsRef().input_format_defaults_for_omitted_fields) + { + StoragePtr storage = DatabaseCatalog::instance().getTable(insert_query.table_id, insert_context); + auto metadata_snapshot = storage->getInMemoryMetadataPtr(); + const auto & columns = metadata_snapshot->getColumns(); + if (columns.hasDefaults()) + adding_defaults_transform = std::make_shared(header, columns, *format, insert_context); + } + StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform)); std::unique_ptr buffer; for (const auto & entry : data->entries) { diff --git a/src/Processors/Executors/StreamingFormatExecutor.cpp b/src/Processors/Executors/StreamingFormatExecutor.cpp index 011b8b79fc5..5c460ed49c7 100644 --- a/src/Processors/Executors/StreamingFormatExecutor.cpp +++ b/src/Processors/Executors/StreamingFormatExecutor.cpp @@ -1,4 +1,5 @@ #include +#include #include namespace DB @@ -10,10 +11,14 @@ namespace ErrorCodes } StreamingFormatExecutor::StreamingFormatExecutor( - const Block & header_, InputFormatPtr format_, ErrorCallback on_error_) + const Block & header_, + InputFormatPtr format_, + ErrorCallback on_error_, + SimpleTransformPtr adding_defaults_transform_) : header(header_) , format(std::move(format_)) , on_error(std::move(on_error_)) + , adding_defaults_transform(std::move(adding_defaults_transform_)) , port(format->getPort().getHeader(), format.get()) { connect(format->getPort(), port); @@ -51,6 +56,8 @@ size_t StreamingFormatExecutor::execute() case IProcessor::Status::PortFull: { auto chunk = port.pull(); + if (adding_defaults_transform) + adding_defaults_transform->transform(chunk); auto chunk_rows = chunk.getNumRows(); new_rows += chunk_rows; diff --git a/src/Processors/Executors/StreamingFormatExecutor.h b/src/Processors/Executors/StreamingFormatExecutor.h index aa937ed1b92..97ad1693f6f 100644 --- a/src/Processors/Executors/StreamingFormatExecutor.h +++ b/src/Processors/Executors/StreamingFormatExecutor.h @@ -1,10 +1,13 @@ #pragma once #include +#include namespace DB { +using SimpleTransformPtr = std::shared_ptr; + class StreamingFormatExecutor { public: @@ -13,17 +16,19 @@ public: StreamingFormatExecutor( const Block & header_, InputFormatPtr format_, - ErrorCallback on_error_ = [](const MutableColumns &, Exception &) -> size_t { throw; }); + ErrorCallback on_error_ = [](const MutableColumns &, Exception &) -> size_t { throw; }, + SimpleTransformPtr adding_defaults_transform_ = nullptr); size_t execute(); MutableColumns getResultColumns(); private: - Block header; - InputFormatPtr format; - ErrorCallback on_error; - InputPort port; + const Block header; + const InputFormatPtr format; + const ErrorCallback on_error; + const SimpleTransformPtr adding_defaults_transform; + InputPort port; MutableColumns result_columns; }; diff --git a/src/Processors/ISimpleTransform.h b/src/Processors/ISimpleTransform.h index ee92b574d7c..7c08a4c707e 100644 --- a/src/Processors/ISimpleTransform.h +++ b/src/Processors/ISimpleTransform.h @@ -32,11 +32,6 @@ protected: /// This allows to escape caching chunks in input port, which can lead to uneven data distribution. bool set_input_not_needed_after_read = true; - virtual void transform(Chunk &) - { - throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); - } - virtual void transform(Chunk & input_chunk, Chunk & output_chunk) { transform(input_chunk); @@ -49,6 +44,11 @@ protected: public: ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_); + virtual void transform(Chunk &) + { + throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + Status prepare() override; void work() override; diff --git a/tests/queries/0_stateless/02015_async_inserts_3.reference b/tests/queries/0_stateless/02015_async_inserts_3.reference new file mode 100644 index 00000000000..4c5b75aff3e --- /dev/null +++ b/tests/queries/0_stateless/02015_async_inserts_3.reference @@ -0,0 +1,8 @@ +1 1 +2 4 +3 9 +4 16 +5 25 +6 36 +7 49 +8 64 diff --git a/tests/queries/0_stateless/02015_async_inserts_3.sh b/tests/queries/0_stateless/02015_async_inserts_3.sh new file mode 100755 index 00000000000..fe97354d3ac --- /dev/null +++ b/tests/queries/0_stateless/02015_async_inserts_3.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1" + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts" +${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, v UInt32 DEFAULT id * id) ENGINE = Memory" + +${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV +1, +2,' & + +${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV +3, +4, +' & + +${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 5} {"id": 6}' & +${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 7} {"id": 8}' & + +wait + +${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts ORDER BY id" + +${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts"