From fa50fe80a01fcfc96ecb0fee83ab4cde00d66a0c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 19 Feb 2018 03:45:32 +0300 Subject: [PATCH] Added method "getHeader" in IBlockOutputStream: development [#CLICKHOUSE-2] --- dbms/src/Client/Connection.cpp | 2 +- .../AddingDefaultBlockOutputStream.cpp | 2 +- .../AddingDefaultBlockOutputStream.h | 5 +- .../AggregatingBlockInputStream.cpp | 4 +- dbms/src/DataStreams/BlockIO.h | 3 - .../BlockOutputStreamFromRowOutputStream.cpp | 4 +- .../BlockOutputStreamFromRowOutputStream.h | 7 ++- .../DataStreams/CountingBlockOutputStream.h | 3 +- dbms/src/DataStreams/FormatFactory.cpp | 50 +++++++-------- dbms/src/DataStreams/IBlockOutputStream.h | 8 ++- .../InputStreamFromASTInsertQuery.cpp | 2 +- .../MaterializingBlockOutputStream.h | 6 +- .../MergeSortingBlockInputStream.cpp | 30 ++++----- .../MergeSortingBlockInputStream.h | 2 +- .../DataStreams/NativeBlockOutputStream.cpp | 4 +- .../src/DataStreams/NativeBlockOutputStream.h | 5 +- dbms/src/DataStreams/NullBlockOutputStream.h | 5 ++ .../NullableAdapterBlockInputStream.cpp | 18 +++--- .../NullableAdapterBlockInputStream.h | 4 +- .../ODBCDriverBlockOutputStream.cpp | 9 ++- .../DataStreams/ODBCDriverBlockOutputStream.h | 5 +- .../ParallelAggregatingBlockInputStream.cpp | 3 +- .../DataStreams/PrettyBlockOutputStream.cpp | 5 +- .../src/DataStreams/PrettyBlockOutputStream.h | 4 +- .../PrettyCompactBlockOutputStream.h | 4 +- .../PrettySpaceBlockOutputStream.h | 4 +- .../ProhibitColumnsBlockOutputStream.cpp | 23 ------- .../ProhibitColumnsBlockOutputStream.h | 31 ---------- .../PushingToViewsBlockOutputStream.cpp | 4 +- .../PushingToViewsBlockOutputStream.h | 4 +- .../DataStreams/RemoteBlockOutputStream.cpp | 29 +++------ .../src/DataStreams/RemoteBlockOutputStream.h | 10 +-- .../DataStreams/SquashingBlockOutputStream.h | 1 + .../tests/block_row_transforms.cpp | 2 +- .../DataStreams/tests/expression_stream.cpp | 2 +- dbms/src/DataStreams/tests/filter_stream.cpp | 2 +- .../tests/filter_stream_hitlog.cpp | 2 +- dbms/src/DataStreams/tests/native_streams.cpp | 2 +- dbms/src/DataStreams/tests/sorting_stream.cpp | 2 +- .../tests/tab_separated_streams.cpp | 2 +- dbms/src/Interpreters/Aggregator.cpp | 2 +- .../Interpreters/InterpreterCreateQuery.cpp | 41 +++---------- .../Interpreters/InterpreterInsertQuery.cpp | 61 +++++++++---------- .../src/Interpreters/InterpreterInsertQuery.h | 10 +-- dbms/src/Interpreters/tests/expression.cpp | 2 +- dbms/src/Server/ClusterCopier.cpp | 2 +- dbms/src/Server/TCPHandler.cpp | 13 ++-- dbms/src/Server/TCPHandler.h | 2 +- .../DistributedBlockOutputStream.cpp | 17 ++++-- .../DistributedBlockOutputStream.h | 2 +- .../MergeTree/MergeTreeBlockOutputStream.cpp | 6 ++ .../MergeTree/MergeTreeBlockOutputStream.h | 1 + dbms/src/Storages/MergeTree/MergeTreeData.cpp | 2 +- .../MergeTree/MergeTreeDataMerger.cpp | 2 +- .../MergeTree/MergedBlockOutputStream.cpp | 4 +- .../MergeTree/MergedBlockOutputStream.h | 6 +- .../ReplicatedMergeTreeBlockOutputStream.cpp | 6 ++ .../ReplicatedMergeTreeBlockOutputStream.h | 1 + dbms/src/Storages/StorageBuffer.cpp | 2 + dbms/src/Storages/StorageFile.cpp | 3 +- dbms/src/Storages/StorageLog.cpp | 1 + dbms/src/Storages/StorageMemory.cpp | 2 + dbms/src/Storages/StorageNull.h | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 2 +- dbms/src/Storages/StorageSet.cpp | 3 +- dbms/src/Storages/StorageStripeLog.cpp | 4 +- dbms/src/Storages/StorageTinyLog.cpp | 2 + dbms/src/Storages/tests/hit_log.cpp | 2 +- dbms/src/Storages/tests/storage_log.cpp | 2 +- dbms/src/Storages/tests/system_numbers.cpp | 2 +- 70 files changed, 239 insertions(+), 282 deletions(-) delete mode 100644 dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.cpp delete mode 100644 dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.h diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 4d0b20f0168..3c6c8d75d0e 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -393,7 +393,7 @@ void Connection::sendData(const Block & block, const String & name) else maybe_compressed_out = out; - block_out = std::make_shared(*maybe_compressed_out, server_revision); + block_out = std::make_shared(*maybe_compressed_out, server_revision, block.cloneEmpty()); } writeVarUInt(Protocol::Client::Data, *out); diff --git a/dbms/src/DataStreams/AddingDefaultBlockOutputStream.cpp b/dbms/src/DataStreams/AddingDefaultBlockOutputStream.cpp index a52c35da8ad..a2f46fccf8a 100644 --- a/dbms/src/DataStreams/AddingDefaultBlockOutputStream.cpp +++ b/dbms/src/DataStreams/AddingDefaultBlockOutputStream.cpp @@ -10,7 +10,7 @@ namespace DB { -void AddingDefaultBlockOutputStream::write(const DB::Block & block) +void AddingDefaultBlockOutputStream::write(const Block & block) { Block res = block; diff --git a/dbms/src/DataStreams/AddingDefaultBlockOutputStream.h b/dbms/src/DataStreams/AddingDefaultBlockOutputStream.h index 9ac92e74ac9..b54fb475a4b 100644 --- a/dbms/src/DataStreams/AddingDefaultBlockOutputStream.h +++ b/dbms/src/DataStreams/AddingDefaultBlockOutputStream.h @@ -19,16 +19,18 @@ class AddingDefaultBlockOutputStream : public IBlockOutputStream public: AddingDefaultBlockOutputStream( const BlockOutputStreamPtr & output_, + const Block & header_, NamesAndTypesList required_columns_, const ColumnDefaults & column_defaults_, const Context & context_, bool only_explicit_column_defaults_) - : output(output_), required_columns(required_columns_), + : output(output_), header(header_), required_columns(required_columns_), column_defaults(column_defaults_), context(context_), only_explicit_column_defaults(only_explicit_column_defaults_) { } + Block getHeader() const override { return header; } void write(const Block & block) override; void flush() override; @@ -38,6 +40,7 @@ public: private: BlockOutputStreamPtr output; + Block header; NamesAndTypesList required_columns; const ColumnDefaults column_defaults; const Context & context; diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index 8896c40e511..ce91333bfe0 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -76,7 +76,7 @@ Block AggregatingBlockInputStream::readImpl() AggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path) - : file_in(path), compressed_in(file_in), block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) {} - + : file_in(path), compressed_in(file_in), + block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) {} } diff --git a/dbms/src/DataStreams/BlockIO.h b/dbms/src/DataStreams/BlockIO.h index 8cd19db9154..6d97e30e510 100644 --- a/dbms/src/DataStreams/BlockIO.h +++ b/dbms/src/DataStreams/BlockIO.h @@ -21,8 +21,6 @@ struct BlockIO BlockInputStreamPtr in; BlockOutputStreamPtr out; - Block out_sample; /// Example of a block to be written to `out`. - /// Callbacks for query logging could be set here. std::function finish_callback; std::function exception_callback; @@ -50,7 +48,6 @@ struct BlockIO process_list_entry = rhs.process_list_entry; in = rhs.in; out = rhs.out; - out_sample = rhs.out_sample; finish_callback = rhs.finish_callback; exception_callback = rhs.exception_callback; diff --git a/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp b/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp index 8c466bc3c00..3206e918232 100644 --- a/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp +++ b/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp @@ -5,8 +5,8 @@ namespace DB { -BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_) - : row_output(row_output_), first_row(true) {} +BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_) + : row_output(row_output_), header(header_) {} void BlockOutputStreamFromRowOutputStream::write(const Block & block) diff --git a/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.h b/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.h index 63743f7827a..dfb6f49ecec 100644 --- a/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.h +++ b/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.h @@ -13,7 +13,9 @@ namespace DB class BlockOutputStreamFromRowOutputStream : public IBlockOutputStream { public: - BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_); + BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_); + + Block getHeader() const override { return header; } void write(const Block & block) override; void writePrefix() override { row_output->writePrefix(); } void writeSuffix() override { row_output->writeSuffix(); } @@ -29,7 +31,8 @@ public: private: RowOutputStreamPtr row_output; - bool first_row; + Block header; + bool first_row = true; }; } diff --git a/dbms/src/DataStreams/CountingBlockOutputStream.h b/dbms/src/DataStreams/CountingBlockOutputStream.h index 63ece36c2b0..0918d33f113 100644 --- a/dbms/src/DataStreams/CountingBlockOutputStream.h +++ b/dbms/src/DataStreams/CountingBlockOutputStream.h @@ -12,7 +12,6 @@ namespace DB class CountingBlockOutputStream : public IBlockOutputStream { public: - CountingBlockOutputStream(const BlockOutputStreamPtr & stream_) : stream(stream_) {} @@ -31,6 +30,7 @@ public: return progress; } + Block getHeader() const override { return stream->getHeader(); } void write(const Block & block) override; void writePrefix() override { stream->writePrefix(); } @@ -40,7 +40,6 @@ public: String getContentType() const override { return stream->getContentType(); } protected: - BlockOutputStreamPtr stream; Progress progress; ProgressCallback progress_callback; diff --git a/dbms/src/DataStreams/FormatFactory.cpp b/dbms/src/DataStreams/FormatFactory.cpp index d871a4b23a5..a985c9f3dc0 100644 --- a/dbms/src/DataStreams/FormatFactory.cpp +++ b/dbms/src/DataStreams/FormatFactory.cpp @@ -141,66 +141,66 @@ static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf FormatSettingsJSON json_settings(settings.output_format_json_quote_64bit_integers, settings.output_format_json_quote_denormals); if (name == "Native") - return std::make_shared(buf); + return std::make_shared(buf, 0, sample); else if (name == "RowBinary") - return std::make_shared(std::make_shared(buf)); + return std::make_shared(std::make_shared(buf), sample); else if (name == "TabSeparated" || name == "TSV") - return std::make_shared(std::make_shared(buf, sample)); + return std::make_shared(std::make_shared(buf, sample), sample); else if (name == "TabSeparatedWithNames" || name == "TSVWithNames") - return std::make_shared(std::make_shared(buf, sample, true)); + return std::make_shared(std::make_shared(buf, sample, true), sample); else if (name == "TabSeparatedWithNamesAndTypes" || name == "TSVWithNamesAndTypes") - return std::make_shared(std::make_shared(buf, sample, true, true)); + return std::make_shared(std::make_shared(buf, sample, true, true), sample); else if (name == "TabSeparatedRaw" || name == "TSVRaw") - return std::make_shared(std::make_shared(buf, sample)); + return std::make_shared(std::make_shared(buf, sample), sample); else if (name == "CSV") - return std::make_shared(std::make_shared(buf, sample)); + return std::make_shared(std::make_shared(buf, sample), sample); else if (name == "CSVWithNames") - return std::make_shared(std::make_shared(buf, sample, true)); + return std::make_shared(std::make_shared(buf, sample, true), sample); else if (name == "Pretty") - return std::make_shared(buf, false, settings.output_format_pretty_max_rows, context); + return std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); else if (name == "PrettyCompact") - return std::make_shared(buf, false, settings.output_format_pretty_max_rows, context); + return std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); else if (name == "PrettyCompactMonoBlock") { - BlockOutputStreamPtr dst = std::make_shared(buf, false, settings.output_format_pretty_max_rows, context); + BlockOutputStreamPtr dst = std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); auto res = std::make_shared(dst, settings.output_format_pretty_max_rows, 0); res->disableFlush(); return res; } else if (name == "PrettySpace") - return std::make_shared(buf, false, settings.output_format_pretty_max_rows, context); + return std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); else if (name == "PrettyNoEscapes") - return std::make_shared(buf, true, settings.output_format_pretty_max_rows, context); + return std::make_shared(buf, sample, true, settings.output_format_pretty_max_rows, context); else if (name == "PrettyCompactNoEscapes") - return std::make_shared(buf, true, settings.output_format_pretty_max_rows, context); + return std::make_shared(buf, sample, true, settings.output_format_pretty_max_rows, context); else if (name == "PrettySpaceNoEscapes") - return std::make_shared(buf, true, settings.output_format_pretty_max_rows, context); + return std::make_shared(buf, sample, true, settings.output_format_pretty_max_rows, context); else if (name == "Vertical") return std::make_shared(std::make_shared( - buf, sample, settings.output_format_pretty_max_rows)); + buf, sample, settings.output_format_pretty_max_rows), sample); else if (name == "VerticalRaw") return std::make_shared(std::make_shared( - buf, sample, settings.output_format_pretty_max_rows)); + buf, sample, settings.output_format_pretty_max_rows), sample); else if (name == "Values") - return std::make_shared(std::make_shared(buf)); + return std::make_shared(std::make_shared(buf), sample); else if (name == "JSON") return std::make_shared(std::make_shared( - buf, sample, settings.output_format_write_statistics, json_settings)); + buf, sample, settings.output_format_write_statistics, json_settings), sample); else if (name == "JSONCompact") return std::make_shared(std::make_shared( - buf, sample, settings.output_format_write_statistics, json_settings)); + buf, sample, settings.output_format_write_statistics, json_settings), sample); else if (name == "JSONEachRow") return std::make_shared(std::make_shared( - buf, sample, json_settings)); + buf, sample, json_settings), sample); else if (name == "XML") return std::make_shared(std::make_shared(buf, sample, - settings.output_format_write_statistics)); + settings.output_format_write_statistics), sample); else if (name == "TSKV") - return std::make_shared(std::make_shared(buf, sample)); + return std::make_shared(std::make_shared(buf, sample), sample); else if (name == "ODBCDriver") return std::make_shared(buf, sample); else if (name == "Null") - return std::make_shared(); + return std::make_shared(sample); else throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT); } @@ -211,7 +211,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & /** Materialization is needed, because formats can use the functions `IDataType`, * which only work with full columns. */ - return std::make_shared(getOutputImpl(name, buf, sample, context)); + return std::make_shared(getOutputImpl(name, buf, materializeBlock(sample), context), sample); } } diff --git a/dbms/src/DataStreams/IBlockOutputStream.h b/dbms/src/DataStreams/IBlockOutputStream.h index 58e6607f383..e33fced86a3 100644 --- a/dbms/src/DataStreams/IBlockOutputStream.h +++ b/dbms/src/DataStreams/IBlockOutputStream.h @@ -4,12 +4,12 @@ #include #include #include +#include namespace DB { -class Block; struct Progress; class TableStructureReadLock; @@ -26,6 +26,12 @@ class IBlockOutputStream : private boost::noncopyable public: IBlockOutputStream() {} + /** Get data structure of the stream in a form of "header" block (it is also called "sample block"). + * Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values. + * You must pass blocks of exactly this structure to the 'write' method. + */ + virtual Block getHeader() const = 0; + /** Write block. */ virtual void write(const Block & block) = 0; diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp index 9f5f10d19bf..0e4f876925d 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp @@ -43,7 +43,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( input_buffer_contacenated = std::make_unique(buffers); - res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out_sample, context.getSettings().max_insert_block_size); + res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size); } } diff --git a/dbms/src/DataStreams/MaterializingBlockOutputStream.h b/dbms/src/DataStreams/MaterializingBlockOutputStream.h index 2d8489156f4..9e1efeb29d3 100644 --- a/dbms/src/DataStreams/MaterializingBlockOutputStream.h +++ b/dbms/src/DataStreams/MaterializingBlockOutputStream.h @@ -12,9 +12,10 @@ namespace DB class MaterializingBlockOutputStream : public IBlockOutputStream { public: - MaterializingBlockOutputStream(const BlockOutputStreamPtr & output) - : output{output} {} + MaterializingBlockOutputStream(const BlockOutputStreamPtr & output, const Block & header) + : output{output}, header(header) {} + Block getHeader() const override { return header; } void write(const Block & block) override { output->write(materializeBlock(block)); } void flush() override { output->flush(); } void writePrefix() override { output->writePrefix(); } @@ -27,6 +28,7 @@ public: private: BlockOutputStreamPtr output; + Block header; }; } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 6a5c5aba0f6..288d5d9e6ef 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -34,29 +34,29 @@ static void removeConstantsFromBlock(Block & block) } } -static void removeConstantsFromSortDescription(const Block & sample_block, SortDescription & description) +static void removeConstantsFromSortDescription(const Block & header, SortDescription & description) { description.erase(std::remove_if(description.begin(), description.end(), [&](const SortColumnDescription & elem) { if (!elem.column_name.empty()) - return sample_block.getByName(elem.column_name).column->isColumnConst(); + return header.getByName(elem.column_name).column->isColumnConst(); else - return sample_block.safeGetByPosition(elem.column_number).column->isColumnConst(); + return header.safeGetByPosition(elem.column_number).column->isColumnConst(); }), description.end()); } /** Add into block, whose constant columns was removed by previous function, - * constant columns from sample_block (which must have structure as before removal of constants from block). + * constant columns from header (which must have structure as before removal of constants from block). */ -static void enrichBlockWithConstants(Block & block, const Block & sample_block) +static void enrichBlockWithConstants(Block & block, const Block & header) { size_t rows = block.rows(); - size_t columns = sample_block.columns(); + size_t columns = header.columns(); for (size_t i = 0; i < columns; ++i) { - const auto & col_type_name = sample_block.getByPosition(i); + const auto & col_type_name = header.getByPosition(i); if (col_type_name.column->isColumnConst()) block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name}); } @@ -65,6 +65,12 @@ static void enrichBlockWithConstants(Block & block, const Block & sample_block) Block MergeSortingBlockInputStream::readImpl() { + if (!header) + { + header = getHeader(); + removeConstantsFromSortDescription(header, description); + } + /** Algorithm: * - read to memory blocks from source stream; * - if too much of them and if external sorting is enabled, @@ -77,12 +83,6 @@ Block MergeSortingBlockInputStream::readImpl() { while (Block block = children.back()->read()) { - if (!sample_block) - { - sample_block = block.cloneEmpty(); - removeConstantsFromSortDescription(sample_block, description); - } - /// If there were only const columns in sort description, then there is no need to sort. /// Return the blocks as is. if (description.empty()) @@ -103,7 +103,7 @@ Block MergeSortingBlockInputStream::readImpl() const std::string & path = temporary_files.back()->path(); WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf); - NativeBlockOutputStream block_out(compressed_buf); + NativeBlockOutputStream block_out(compressed_buf, 0, block.cloneEmpty()); MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit); LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); @@ -148,7 +148,7 @@ Block MergeSortingBlockInputStream::readImpl() Block res = impl->read(); if (res) - enrichBlockWithConstants(res, sample_block); + enrichBlockWithConstants(res, header); return res; } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 613169899d8..4b203182d19 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -107,7 +107,7 @@ private: /// Before operation, will remove constant columns from blocks. And after, place constant columns back. /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files) /// Save original block structure here. - Block sample_block; + Block header; /// Everything below is for external sorting. std::vector> temporary_files; diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.cpp b/dbms/src/DataStreams/NativeBlockOutputStream.cpp index 0e38a3e5bd7..b02d435b39f 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockOutputStream.cpp @@ -20,9 +20,9 @@ namespace ErrorCodes NativeBlockOutputStream::NativeBlockOutputStream( - WriteBuffer & ostr_, UInt64 client_revision_, + WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, WriteBuffer * index_ostr_, size_t initial_size_of_file_) - : ostr(ostr_), client_revision(client_revision_), + : ostr(ostr_), client_revision(client_revision_), header(header_), index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_) { if (index_ostr) diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.h b/dbms/src/DataStreams/NativeBlockOutputStream.h index d76cb827863..7e3f14e06ea 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.h +++ b/dbms/src/DataStreams/NativeBlockOutputStream.h @@ -23,9 +23,10 @@ public: /** If non-zero client_revision is specified, additional block information can be written. */ NativeBlockOutputStream( - WriteBuffer & ostr_, UInt64 client_revision_ = 0, + WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, WriteBuffer * index_ostr_ = nullptr, size_t initial_size_of_file_ = 0); + Block getHeader() const override { return header; } void write(const Block & block) override; void flush() override; @@ -36,7 +37,7 @@ public: private: WriteBuffer & ostr; UInt64 client_revision; - + Block header; WriteBuffer * index_ostr; size_t initial_size_of_file; /// The initial size of the data file, if `append` done. Used for the index. /// If you need to write index, then `ostr` must be a CompressedWriteBuffer. diff --git a/dbms/src/DataStreams/NullBlockOutputStream.h b/dbms/src/DataStreams/NullBlockOutputStream.h index ad0c398629a..3d437527960 100644 --- a/dbms/src/DataStreams/NullBlockOutputStream.h +++ b/dbms/src/DataStreams/NullBlockOutputStream.h @@ -11,7 +11,12 @@ namespace DB class NullBlockOutputStream : public IBlockOutputStream { public: + NullBlockOutputStream(const Block & header) : header(header) {} + Block getHeader() const override { return header; } void write(const Block &) override {} + +private: + Block header; }; } diff --git a/dbms/src/DataStreams/NullableAdapterBlockInputStream.cpp b/dbms/src/DataStreams/NullableAdapterBlockInputStream.cpp index 60fe4013595..f10ff9b876b 100644 --- a/dbms/src/DataStreams/NullableAdapterBlockInputStream.cpp +++ b/dbms/src/DataStreams/NullableAdapterBlockInputStream.cpp @@ -16,10 +16,10 @@ namespace ErrorCodes NullableAdapterBlockInputStream::NullableAdapterBlockInputStream( const BlockInputStreamPtr & input, - const Block & in_sample_, const Block & out_sample_) - : header(out_sample_) + const Block & src_header_, const Block & res_header_) + : header(res_header_) { - buildActions(in_sample_, out_sample_); + buildActions(src_header_, res_header_); children.push_back(input); } @@ -83,12 +83,12 @@ Block NullableAdapterBlockInputStream::readImpl() } void NullableAdapterBlockInputStream::buildActions( - const Block & in_sample, - const Block & out_sample) + const Block & src_header, + const Block & res_header) { - size_t in_size = in_sample.columns(); + size_t in_size = src_header.columns(); - if (out_sample.columns() != in_size) + if (res_header.columns() != in_size) throw Exception("Number of columns in INSERT SELECT doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH); actions.reserve(in_size); @@ -96,8 +96,8 @@ void NullableAdapterBlockInputStream::buildActions( for (size_t i = 0; i < in_size; ++i) { - const auto & in_elem = in_sample.getByPosition(i); - const auto & out_elem = out_sample.getByPosition(i); + const auto & in_elem = src_header.getByPosition(i); + const auto & out_elem = res_header.getByPosition(i); bool is_in_nullable = in_elem.type->isNullable(); bool is_out_nullable = out_elem.type->isNullable(); diff --git a/dbms/src/DataStreams/NullableAdapterBlockInputStream.h b/dbms/src/DataStreams/NullableAdapterBlockInputStream.h index 47e064ecdf2..ac21f852e3e 100644 --- a/dbms/src/DataStreams/NullableAdapterBlockInputStream.h +++ b/dbms/src/DataStreams/NullableAdapterBlockInputStream.h @@ -18,7 +18,7 @@ namespace DB class NullableAdapterBlockInputStream : public IProfilingBlockInputStream { public: - NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_, const Block & out_sample_); + NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & src_header_, const Block & res_header_); String getName() const override { return "NullableAdapterBlockInputStream"; } @@ -48,7 +48,7 @@ private: /// which describes the columns from which we fetch data inside an INSERT /// query, and the target sample block which contains the columns /// we insert data into. - void buildActions(const Block & in_sample, const Block & out_sample); + void buildActions(const Block & src_header, const Block & res_header); private: Block header; diff --git a/dbms/src/DataStreams/ODBCDriverBlockOutputStream.cpp b/dbms/src/DataStreams/ODBCDriverBlockOutputStream.cpp index 71161eeb117..da961948907 100644 --- a/dbms/src/DataStreams/ODBCDriverBlockOutputStream.cpp +++ b/dbms/src/DataStreams/ODBCDriverBlockOutputStream.cpp @@ -7,9 +7,8 @@ namespace DB { -ODBCDriverBlockOutputStream::ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & sample_) - : out(out_) - , sample(sample_) +ODBCDriverBlockOutputStream::ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & header_) + : out(out_), header(header_) { } @@ -43,7 +42,7 @@ void ODBCDriverBlockOutputStream::write(const Block & block) void ODBCDriverBlockOutputStream::writePrefix() { - const size_t columns = sample.columns(); + const size_t columns = header.columns(); /// Number of columns. writeVarUInt(columns, out); @@ -51,7 +50,7 @@ void ODBCDriverBlockOutputStream::writePrefix() /// Names and types of columns. for (size_t i = 0; i < columns; ++i) { - const ColumnWithTypeAndName & col = sample.getByPosition(i); + const ColumnWithTypeAndName & col = header.getByPosition(i); writeStringBinary(col.name, out); writeStringBinary(col.type->getName(), out); diff --git a/dbms/src/DataStreams/ODBCDriverBlockOutputStream.h b/dbms/src/DataStreams/ODBCDriverBlockOutputStream.h index 09795b72a3a..a40603c356e 100644 --- a/dbms/src/DataStreams/ODBCDriverBlockOutputStream.h +++ b/dbms/src/DataStreams/ODBCDriverBlockOutputStream.h @@ -19,8 +19,9 @@ class WriteBuffer; class ODBCDriverBlockOutputStream : public IBlockOutputStream { public: - ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & sample_); + ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & header_); + Block getHeader() const override { return header; } void write(const Block & block) override; void writePrefix() override; @@ -29,7 +30,7 @@ public: private: WriteBuffer & out; - const Block sample; + const Block header; }; } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 8476f3020af..9405cbfd389 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -100,7 +100,8 @@ Block ParallelAggregatingBlockInputStream::readImpl() ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path) - : file_in(path), compressed_in(file_in), block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) {} + : file_in(path), compressed_in(file_in), + block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) {} diff --git a/dbms/src/DataStreams/PrettyBlockOutputStream.cpp b/dbms/src/DataStreams/PrettyBlockOutputStream.cpp index 9c556ed22c5..58b61664b7c 100644 --- a/dbms/src/DataStreams/PrettyBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PrettyBlockOutputStream.cpp @@ -17,8 +17,9 @@ namespace ErrorCodes } -PrettyBlockOutputStream::PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_) - : ostr(ostr_), max_rows(max_rows_), no_escapes(no_escapes_), context(context_) +PrettyBlockOutputStream::PrettyBlockOutputStream( + WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_) + : ostr(ostr_), header(header_), max_rows(max_rows_), no_escapes(no_escapes_), context(context_) { struct winsize w; if (0 == ioctl(STDOUT_FILENO, TIOCGWINSZ, &w)) diff --git a/dbms/src/DataStreams/PrettyBlockOutputStream.h b/dbms/src/DataStreams/PrettyBlockOutputStream.h index 9c6eef51705..7702cd46435 100644 --- a/dbms/src/DataStreams/PrettyBlockOutputStream.h +++ b/dbms/src/DataStreams/PrettyBlockOutputStream.h @@ -17,8 +17,9 @@ class PrettyBlockOutputStream : public IBlockOutputStream { public: /// no_escapes - do not use ANSI escape sequences - to display in the browser, not in the console. - PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_); + PrettyBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_); + Block getHeader() const override { return header; } void write(const Block & block) override; void writeSuffix() override; @@ -32,6 +33,7 @@ protected: void writeExtremes(); WriteBuffer & ostr; + const Block header; size_t max_rows; size_t total_rows = 0; size_t terminal_width = 0; diff --git a/dbms/src/DataStreams/PrettyCompactBlockOutputStream.h b/dbms/src/DataStreams/PrettyCompactBlockOutputStream.h index 38e6fae71cf..82a3a44f720 100644 --- a/dbms/src/DataStreams/PrettyCompactBlockOutputStream.h +++ b/dbms/src/DataStreams/PrettyCompactBlockOutputStream.h @@ -11,8 +11,8 @@ namespace DB class PrettyCompactBlockOutputStream : public PrettyBlockOutputStream { public: - PrettyCompactBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_) - : PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_, context_) {} + PrettyCompactBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_) + : PrettyBlockOutputStream(ostr_, header_, no_escapes_, max_rows_, context_) {} void write(const Block & block) override; diff --git a/dbms/src/DataStreams/PrettySpaceBlockOutputStream.h b/dbms/src/DataStreams/PrettySpaceBlockOutputStream.h index 2fd78fa883f..6dbd2c748c4 100644 --- a/dbms/src/DataStreams/PrettySpaceBlockOutputStream.h +++ b/dbms/src/DataStreams/PrettySpaceBlockOutputStream.h @@ -11,8 +11,8 @@ namespace DB class PrettySpaceBlockOutputStream : public PrettyBlockOutputStream { public: - PrettySpaceBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_) - : PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_, context_) {} + PrettySpaceBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_) + : PrettyBlockOutputStream(ostr_, header_, no_escapes_, max_rows_, context_) {} void write(const Block & block) override; void writeSuffix() override; diff --git a/dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.cpp b/dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.cpp deleted file mode 100644 index 0ef5d981da1..00000000000 --- a/dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.cpp +++ /dev/null @@ -1,23 +0,0 @@ -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int ILLEGAL_COLUMN; -} - - -void ProhibitColumnsBlockOutputStream::write(const Block & block) -{ - for (const auto & column : columns) - if (block.has(column.name)) - throw Exception{"Cannot insert column " + column.name, ErrorCodes::ILLEGAL_COLUMN}; - - output->write(block); -} - -} diff --git a/dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.h b/dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.h deleted file mode 100644 index 732eece7904..00000000000 --- a/dbms/src/DataStreams/ProhibitColumnsBlockOutputStream.h +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include -#include - - -namespace DB -{ - -/// Throws exception on encountering prohibited column in block -class ProhibitColumnsBlockOutputStream : public IBlockOutputStream -{ -public: - ProhibitColumnsBlockOutputStream(const BlockOutputStreamPtr & output, const NamesAndTypesList & columns) - : output{output}, columns{columns} - { - } - -private: - void write(const Block & block) override; - - void flush() override { output->flush(); } - - void writePrefix() override { output->writePrefix(); } - void writeSuffix() override { output->writeSuffix(); } - - BlockOutputStreamPtr output; - NamesAndTypesList columns; -}; - -} diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 99711fc31e3..621423044ae 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -6,8 +6,8 @@ namespace DB { PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( - String database, String table, StoragePtr storage, - const Context & context_, const ASTPtr & query_ptr_, bool no_destination) + const String & database, const String & table, const StoragePtr & storage, + const Context & context_, const ASTPtr & query_ptr_, bool no_destination) : context(context_), query_ptr(query_ptr_) { /** TODO This is a very important line. At any insertion into the table one of streams should own lock. diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h index 0fe47f677fd..237c4ef73a1 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -19,9 +19,11 @@ class ReplicatedMergeTreeBlockOutputStream; class PushingToViewsBlockOutputStream : public IBlockOutputStream { public: - PushingToViewsBlockOutputStream(String database, String table, StoragePtr storage, + PushingToViewsBlockOutputStream( + const String & database, const String & table, const StoragePtr & storage, const Context & context_, const ASTPtr & query_ptr_, bool no_destination = false); + Block getHeader() const override { return storage->getSampleBlock(); } void write(const Block & block) override; void flush() override diff --git a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp index 659f30c465b..e3a861d9dab 100644 --- a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp @@ -19,24 +19,18 @@ namespace ErrorCodes RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_) : connection(connection_), query(query_), settings(settings_) { -} - - -void RemoteBlockOutputStream::writePrefix() -{ - /** Send query and receive "sample block", that describe table structure. - * Sample block is needed to know, what structure is required for blocks to be passed to 'write' method. + /** Send query and receive "header", that describe table structure. + * Header is needed to know, what structure is required for blocks to be passed to 'write' method. */ - connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr); Connection::Packet packet = connection.receivePacket(); if (Protocol::Server::Data == packet.type) { - sample_block = packet.block; + header = packet.block; - if (!sample_block) + if (!header) throw Exception("Logical error: empty block received as table structure", ErrorCodes::LOGICAL_ERROR); } else if (Protocol::Server::Exception == packet.type) @@ -46,23 +40,18 @@ void RemoteBlockOutputStream::writePrefix() } else throw NetException("Unexpected packet from server (expected Data or Exception, got " - + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); + + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); } void RemoteBlockOutputStream::write(const Block & block) { - if (!sample_block) - throw Exception("You must call IBlockOutputStream::writePrefix before IBlockOutputStream::write", ErrorCodes::LOGICAL_ERROR); - - if (!blocksHaveEqualStructure(block, sample_block)) + if (!blocksHaveEqualStructure(block, header)) { std::stringstream message; message << "Block structure is different from table structure.\n" - << "\nTable structure:\n(" << sample_block.dumpStructure() << ")\nBlock structure:\n(" << block.dumpStructure() << ")\n"; - - LOG_ERROR(&Logger::get("RemoteBlockOutputStream"), message.str()); - throw DB::Exception(message.str()); + << "\nTable structure:\n(" << header.dumpStructure() << ")\nBlock structure:\n(" << block.dumpStructure() << ")\n"; + throw Exception(message.str()); } connection.sendData(block); @@ -71,7 +60,7 @@ void RemoteBlockOutputStream::write(const Block & block) void RemoteBlockOutputStream::writePrepared(ReadBuffer & input, size_t size) { - /// We cannot use 'sample_block'. Input must contain block with proper structure. + /// We cannot use 'header'. Input must contain block with proper structure. connection.sendPreparedData(input, size); } diff --git a/dbms/src/DataStreams/RemoteBlockOutputStream.h b/dbms/src/DataStreams/RemoteBlockOutputStream.h index 33b3af86754..6f21fcf138f 100644 --- a/dbms/src/DataStreams/RemoteBlockOutputStream.h +++ b/dbms/src/DataStreams/RemoteBlockOutputStream.h @@ -19,14 +19,8 @@ class RemoteBlockOutputStream : public IBlockOutputStream public: RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_ = nullptr); + Block getHeader() const override { return header; } - /// You can call this method after 'writePrefix', to get table required structure. (You must send data with that structure). - Block getSampleBlock() const - { - return sample_block; - } - - void writePrefix() override; void write(const Block & block) override; void writeSuffix() override; @@ -37,7 +31,7 @@ private: Connection & connection; String query; const Settings * settings; - Block sample_block; + Block header; }; } diff --git a/dbms/src/DataStreams/SquashingBlockOutputStream.h b/dbms/src/DataStreams/SquashingBlockOutputStream.h index df3cf262fa6..9e660de59f1 100644 --- a/dbms/src/DataStreams/SquashingBlockOutputStream.h +++ b/dbms/src/DataStreams/SquashingBlockOutputStream.h @@ -14,6 +14,7 @@ class SquashingBlockOutputStream : public IBlockOutputStream public: SquashingBlockOutputStream(BlockOutputStreamPtr & dst, size_t min_block_size_rows, size_t min_block_size_bytes); + Block getHeader() const override { return output->getHeader(); } void write(const Block & block) override; void flush() override; diff --git a/dbms/src/DataStreams/tests/block_row_transforms.cpp b/dbms/src/DataStreams/tests/block_row_transforms.cpp index 6e216c59158..5f826542271 100644 --- a/dbms/src/DataStreams/tests/block_row_transforms.cpp +++ b/dbms/src/DataStreams/tests/block_row_transforms.cpp @@ -44,7 +44,7 @@ try RowInputStreamPtr row_input = std::make_shared(in_buf, sample); BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, 0); RowOutputStreamPtr row_output = std::make_shared(out_buf, sample); - BlockOutputStreamFromRowOutputStream block_output(row_output); + BlockOutputStreamFromRowOutputStream block_output(row_output, sample); copyData(block_input, block_output); } diff --git a/dbms/src/DataStreams/tests/expression_stream.cpp b/dbms/src/DataStreams/tests/expression_stream.cpp index f52a8f9c105..53d79634c80 100644 --- a/dbms/src/DataStreams/tests/expression_stream.cpp +++ b/dbms/src/DataStreams/tests/expression_stream.cpp @@ -56,7 +56,7 @@ try WriteBufferFromOStream out1(std::cout); RowOutputStreamPtr out2 = std::make_shared(out1, expression->getSampleBlock()); - BlockOutputStreamFromRowOutputStream out(out2); + BlockOutputStreamFromRowOutputStream out(out2, expression->getSampleBlock()); { Stopwatch stopwatch; diff --git a/dbms/src/DataStreams/tests/filter_stream.cpp b/dbms/src/DataStreams/tests/filter_stream.cpp index 1f5549204b0..da3b2f7d5ad 100644 --- a/dbms/src/DataStreams/tests/filter_stream.cpp +++ b/dbms/src/DataStreams/tests/filter_stream.cpp @@ -61,7 +61,7 @@ try WriteBufferFromOStream ob(std::cout); RowOutputStreamPtr out_ = std::make_shared(ob, expression->getSampleBlock()); - BlockOutputStreamFromRowOutputStream out(out_); + BlockOutputStreamFromRowOutputStream out(out_, expression->getSampleBlock()); { diff --git a/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp b/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp index 4561ffde4be..bbfd65b1f93 100644 --- a/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp +++ b/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp @@ -134,7 +134,7 @@ int main(int, char **) WriteBufferFromOStream ob(std::cout); RowOutputStreamPtr out_ = std::make_shared(ob, expression->getSampleBlock()); - BlockOutputStreamFromRowOutputStream out(out_); + BlockOutputStreamFromRowOutputStream out(out_, in->getHeader()); copyData(*in, out); } diff --git a/dbms/src/DataStreams/tests/native_streams.cpp b/dbms/src/DataStreams/tests/native_streams.cpp index d06d8aa3714..bd0a998f88e 100644 --- a/dbms/src/DataStreams/tests/native_streams.cpp +++ b/dbms/src/DataStreams/tests/native_streams.cpp @@ -106,7 +106,7 @@ try BlockInputStreamPtr in = table->read(column_names, {}, Context::createGlobal(), stage, 8192, 1)[0]; WriteBufferFromFileDescriptor out1(STDOUT_FILENO); CompressedWriteBuffer out2(out1); - NativeBlockOutputStream out3(out2, ClickHouseRevision::get()); + NativeBlockOutputStream out3(out2, ClickHouseRevision::get(), in->getHeader()); copyData(*in, out3); } diff --git a/dbms/src/DataStreams/tests/sorting_stream.cpp b/dbms/src/DataStreams/tests/sorting_stream.cpp index d3a32f4adf1..cd6fe515a53 100644 --- a/dbms/src/DataStreams/tests/sorting_stream.cpp +++ b/dbms/src/DataStreams/tests/sorting_stream.cpp @@ -152,7 +152,7 @@ try WriteBufferFromOStream ob(std::cout); RowOutputStreamPtr out_ = std::make_shared(ob, sample); - BlockOutputStreamFromRowOutputStream out(out_); + BlockOutputStreamFromRowOutputStream out(out_, sample); copyData(*in, out); diff --git a/dbms/src/DataStreams/tests/tab_separated_streams.cpp b/dbms/src/DataStreams/tests/tab_separated_streams.cpp index 2cc6abf9835..c765135484b 100644 --- a/dbms/src/DataStreams/tests/tab_separated_streams.cpp +++ b/dbms/src/DataStreams/tests/tab_separated_streams.cpp @@ -38,7 +38,7 @@ try RowOutputStreamPtr row_output = std::make_shared(out_buf, sample); BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, 0); - BlockOutputStreamFromRowOutputStream block_output(row_output); + BlockOutputStreamFromRowOutputStream block_output(row_output, sample); copyData(block_input, block_output); return 0; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index df1fb72584c..7bc3f7df644 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -840,7 +840,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const std::string & path = file->path(); WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf); - NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get()); + NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get(), getHeader(false)); LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << "."); ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart); diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index aa4bf23fc20..88c12f17364 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -10,17 +10,12 @@ #include #include -#include -#include -#include -#include -#include - #include #include #include #include #include +#include #include #include #include @@ -33,6 +28,7 @@ #include #include #include +#include #include #include @@ -43,8 +39,10 @@ #include + namespace DB { + namespace ErrorCodes { extern const int DIRECTORY_DOESNT_EXIST; @@ -474,13 +472,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) if (create.select && (create.is_view || create.is_materialized_view)) create.select->setDatabaseIfNeeded(current_database); - std::unique_ptr interpreter_select; Block as_select_sample; if (create.select && (!create.attach || !create.columns)) - { - interpreter_select = std::make_unique(create.select->clone(), context); - as_select_sample = interpreter_select->getSampleBlock(); - } + as_select_sample = InterpreterSelectQuery::getSampleBlock(create.select->clone(), context); String as_database_name = create.as_database.empty() ? current_database : create.as_database; String as_table_name = create.as_table; @@ -554,28 +548,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) if (create.select && !create.attach && !create.is_view && (!create.is_materialized_view || create.is_populate)) { - auto table_lock = res->lockStructure(true, __PRETTY_FUNCTION__); + auto insert = std::make_shared(); - /// Also see InterpreterInsertQuery. - BlockOutputStreamPtr out; + insert->database = database_name; + insert->table = table_name; + insert->select = create.select->clone(); - out = std::make_shared( - create.database, create.table, res, create.is_temporary ? context.getSessionContext() : context, query_ptr); - - out = std::make_shared(out); - - /// @note shouldn't these two contexts be session contexts in case of temporary table? - bool strict_insert_defaults = static_cast(context.getSettingsRef().strict_insert_defaults); - out = std::make_shared( - out, columns.columns, columns.column_defaults, context, strict_insert_defaults); - - if (!context.getSettingsRef().insert_allow_materialized_columns) - out = std::make_shared(out, columns.materialized_columns); - - BlockIO io; - io.in = std::make_shared(interpreter_select->execute().in, out); - - return io; + return InterpreterInsertQuery(insert, context, context.getSettingsRef().insert_allow_materialized_columns).execute(); } return {}; diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 63eea9542b8..3091457d384 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -25,7 +24,7 @@ namespace ProfileEvents { -extern const Event InsertQuery; + extern const Event InsertQuery; } namespace DB @@ -34,6 +33,7 @@ namespace ErrorCodes { extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int READONLY; + extern const int ILLEGAL_COLUMN; } @@ -45,10 +45,8 @@ InterpreterInsertQuery::InterpreterInsertQuery( } -StoragePtr InterpreterInsertQuery::loadTable() +StoragePtr InterpreterInsertQuery::getTable(const ASTInsertQuery & query) { - ASTInsertQuery & query = typeid_cast(*query_ptr); - if (query.table_function) { auto table_function = typeid_cast(query.table_function.get()); @@ -60,23 +58,15 @@ StoragePtr InterpreterInsertQuery::loadTable() return context.getTable(query.database, query.table); } -StoragePtr InterpreterInsertQuery::getTable() +Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table) { - if (!cached_table) - cached_table = loadTable(); - - return cached_table; -} - -Block InterpreterInsertQuery::getSampleBlock() -{ - ASTInsertQuery & query = typeid_cast(*query_ptr); + Block table_sample_non_materialized = table->getSampleBlockNonMaterialized(); /// If the query does not include information about columns if (!query.columns) - return getTable()->getSampleBlockNonMaterialized(); + return table_sample_non_materialized; - Block table_sample = getTable()->getSampleBlock(); + Block table_sample = table->getSampleBlock(); /// Form the block based on the column names from the query Block res; @@ -88,13 +78,11 @@ Block InterpreterInsertQuery::getSampleBlock() if (!table_sample.has(current_name)) throw Exception("No such column " + current_name + " in table " + query.table, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); - ColumnWithTypeAndName col; - col.name = current_name; - col.type = table_sample.getByName(current_name).type; - col.column = col.type->createColumn(); - res.insert(std::move(col)); - } + if (!allow_materialized && !table_sample_non_materialized.has(current_name)) + throw Exception("Cannot insert column " + current_name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN); + res.insert(ColumnWithTypeAndName(table_sample.getByName(current_name).type, current_name)); + } return res; } @@ -103,7 +91,7 @@ BlockIO InterpreterInsertQuery::execute() { ASTInsertQuery & query = typeid_cast(*query_ptr); checkAccess(query); - StoragePtr table = getTable(); + StoragePtr table = getTable(query); auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__); @@ -114,13 +102,11 @@ BlockIO InterpreterInsertQuery::execute() out = std::make_shared(query.database, query.table, table, context, query_ptr, query.no_destination); - out = std::make_shared(out); + out = std::make_shared(out, table->getSampleBlock()); out = std::make_shared( - out, required_columns, table->column_defaults, context, static_cast(context.getSettingsRef().strict_insert_defaults)); - - if (!allow_materialized) - out = std::make_shared(out, table->materialized_columns); + out, getSampleBlock(query, table), required_columns, table->column_defaults, context, + static_cast(context.getSettingsRef().strict_insert_defaults)); out = std::make_shared( out, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes); @@ -130,12 +116,11 @@ BlockIO InterpreterInsertQuery::execute() out = std::move(out_wrapper); BlockIO res; - res.out_sample = getSampleBlock(); /// What type of query: INSERT or INSERT SELECT? if (!query.select) { - res.out = out; + res.out = std::move(out); } else { @@ -143,14 +128,23 @@ BlockIO InterpreterInsertQuery::execute() res.in = interpreter_select.execute().in; - res.in = std::make_shared(res.in, res.in->getHeader(), res.out_sample); - res.in = std::make_shared(context, res.in, res.out_sample); + res.in = std::make_shared(res.in, res.in->getHeader(), res.out->getHeader()); + res.in = std::make_shared(context, res.in, res.out->getHeader()); res.in = std::make_shared(res.in, out); + + if (!allow_materialized) + { + Block in_header = res.in->getHeader(); + for (const auto & name_type : table->materialized_columns) + if (in_header.has(name_type.name)) + throw Exception("Cannot insert column " + name_type.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN); + } } return res; } + void InterpreterInsertQuery::checkAccess(const ASTInsertQuery & query) { const Settings & settings = context.getSettingsRef(); @@ -163,4 +157,5 @@ void InterpreterInsertQuery::checkAccess(const ASTInsertQuery & query) throw Exception("Cannot insert into table in readonly mode", ErrorCodes::READONLY); } + } diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.h b/dbms/src/Interpreters/InterpreterInsertQuery.h index 9bdc5cfcaba..2180ebe0550 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.h +++ b/dbms/src/Interpreters/InterpreterInsertQuery.h @@ -25,14 +25,8 @@ public: BlockIO execute() override; private: - /// Cache storage to avoid double table function call. - StoragePtr cached_table; - StoragePtr loadTable(); - - StoragePtr getTable(); - - Block getSampleBlock(); - + StoragePtr getTable(const ASTInsertQuery & query); + Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table); void checkAccess(const ASTInsertQuery & query); ASTPtr query_ptr; diff --git a/dbms/src/Interpreters/tests/expression.cpp b/dbms/src/Interpreters/tests/expression.cpp index 0b2ead8bd52..734f89623ea 100644 --- a/dbms/src/Interpreters/tests/expression.cpp +++ b/dbms/src/Interpreters/tests/expression.cpp @@ -125,7 +125,7 @@ int main(int argc, char ** argv) LimitBlockInputStream lis(is, 20, std::max(0, static_cast(n) - 20)); WriteBufferFromOStream out_buf(std::cout); RowOutputStreamPtr os_ = std::make_shared(out_buf, block); - BlockOutputStreamFromRowOutputStream os(os_); + BlockOutputStreamFromRowOutputStream os(os_, is->getHeader()); copyData(lis, os); } diff --git a/dbms/src/Server/ClusterCopier.cpp b/dbms/src/Server/ClusterCopier.cpp index 02dbf7367c2..96b13012c18 100644 --- a/dbms/src/Server/ClusterCopier.cpp +++ b/dbms/src/Server/ClusterCopier.cpp @@ -1520,7 +1520,7 @@ protected: try { RemoteBlockInputStream stream(*connection, query, {}, context, ¤t_settings); - NullBlockOutputStream output; + NullBlockOutputStream output({}); copyData(stream, output); if (increment_and_check_exit()) diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index e27d4f088e0..345cf118a5b 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -288,7 +288,7 @@ void TCPHandler::processInsertQuery(const Settings & global_settings) state.io.out->writePrefix(); /// Send block to the client - table structure. - Block block = state.io.out_sample; + Block block = state.io.out->getHeader(); sendData(block); readData(global_settings); @@ -417,7 +417,7 @@ void TCPHandler::sendTotals() if (totals) { - initBlockOutput(); + initBlockOutput(totals); writeVarUInt(Protocol::Server::Totals, *out); writeStringBinary("", *out); @@ -438,7 +438,7 @@ void TCPHandler::sendExtremes() if (extremes) { - initBlockOutput(); + initBlockOutput(extremes); writeVarUInt(Protocol::Server::Extremes, *out); writeStringBinary("", *out); @@ -662,7 +662,7 @@ void TCPHandler::initBlockInput() } -void TCPHandler::initBlockOutput() +void TCPHandler::initBlockOutput(const Block & block) { if (!state.block_out) { @@ -674,7 +674,8 @@ void TCPHandler::initBlockOutput() state.block_out = std::make_shared( *state.maybe_compressed_out, - client_revision); + client_revision, + block.cloneEmpty()); } } @@ -715,7 +716,7 @@ bool TCPHandler::isQueryCancelled() void TCPHandler::sendData(const Block & block) { - initBlockOutput(); + initBlockOutput(block); writeVarUInt(Protocol::Server::Data, *out); writeStringBinary("", *out); diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index 93b82acd7ea..444d8eb4990 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -140,7 +140,7 @@ private: /// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled. void initBlockInput(); - void initBlockOutput(); + void initBlockOutput(const Block & block); bool isQueryCancelled(); diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 2d2a5c45434..5534ee39eed 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -33,6 +33,7 @@ #include #include + namespace CurrentMetrics { extern const Metric DistributedSend; @@ -53,14 +54,20 @@ namespace ErrorCodes } -DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, - const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_) - : storage(storage), query_ast(query_ast), cluster(cluster_), settings(settings_), insert_sync(insert_sync_), - insert_timeout(insert_timeout_) +DistributedBlockOutputStream::DistributedBlockOutputStream( + StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, + const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_) + : storage(storage), query_ast(query_ast), cluster(cluster_), settings(settings_), insert_sync(insert_sync_), insert_timeout(insert_timeout_) { } +Block DistributedBlockOutputStream::getHeader() const +{ + return storage.getSampleBlock(); +} + + void DistributedBlockOutputStream::writePrefix() { deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout); @@ -469,7 +476,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: WriteBufferFromFile out{block_file_tmp_path}; CompressedWriteBuffer compress{out}; - NativeBlockOutputStream stream{compress, ClickHouseRevision::get()}; + NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()}; writeStringBinary(query_string, out); diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h index de8dc12649b..5cddd1ec92e 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -35,8 +35,8 @@ public: DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_); + Block getHeader() const override; void write(const Block & block) override; - void writePrefix() override; private: diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 46b5f470439..af0a207bafc 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -6,6 +6,12 @@ namespace DB { +Block MergeTreeBlockOutputStream::getHeader() const +{ + return storage.getSampleBlock(); +} + + void MergeTreeBlockOutputStream::write(const Block & block) { storage.data.delayInsertIfNeeded(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h index 1ecf621f0f9..64243b6e7bf 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -16,6 +16,7 @@ public: MergeTreeBlockOutputStream(StorageMergeTree & storage_) : storage(storage_) {} + Block getHeader() const override; void write(const Block & block) override; private: diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 3d95b9408a0..2e259d2467a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1221,7 +1221,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( * temporary column name ('converting_column_name') created in 'createConvertExpression' method * will have old name of shared offsets for arrays. */ - MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */); + MergedColumnOnlyOutputStream out(*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */); in.readPrefix(); out.writePrefix(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 0bc73afc690..e2cbdc10980 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -736,7 +736,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart rows_sources_read_buf.seek(0, 0); ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf); - MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_settings, offset_written); + MergedColumnOnlyOutputStream column_to(data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, offset_written); size_t column_elems_written = 0; column_to.writePrefix(); diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 9c680b814d5..f2acaa787b4 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -465,12 +465,12 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm /// Implementation of MergedColumnOnlyOutputStream. MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( - MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_) + MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_) : IMergedBlockOutputStream( storage_, storage_.context.getSettings().min_compress_block_size, storage_.context.getSettings().max_compress_block_size, compression_settings, storage_.context.getSettings().min_bytes_to_use_direct_io), - part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_) + header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_) { } diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h index 60196c3ecdd..4b83f959991 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -105,6 +105,8 @@ public: std::string getPartPath() const; + Block getHeader() const override { return storage.getSampleBlock(); } + /// If the data is pre-sorted. void write(const Block & block) override; @@ -149,13 +151,15 @@ class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream { public: MergedColumnOnlyOutputStream( - MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_); + MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_); + Block getHeader() const override { return header; } void write(const Block & block) override; void writeSuffix() override; MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums(); private: + Block header; String part_path; bool initialized = false; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index a36a400aea5..e34f0f68b6d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -39,6 +39,12 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( } +Block ReplicatedMergeTreeBlockOutputStream::getHeader() const +{ + return storage.getSampleBlock(); +} + + /// Allow to verify that the session in ZooKeeper is still alive. static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 12d8f97a1d7..29ca8657038 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -25,6 +25,7 @@ public: ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, bool deduplicate_); + Block getHeader() const override; void write(const Block & block) override; /// For ATTACHing existing data on filesystem. diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index ec4f7b498da..be6922997d1 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -208,6 +208,8 @@ class BufferBlockOutputStream : public IBlockOutputStream public: explicit BufferBlockOutputStream(StorageBuffer & storage_) : storage(storage_) {} + Block getHeader() const override { return storage.getSampleBlock(); } + void write(const Block & block) override { if (!block) diff --git a/dbms/src/Storages/StorageFile.cpp b/dbms/src/Storages/StorageFile.cpp index 1bbe9d6d00c..78fb3990978 100644 --- a/dbms/src/Storages/StorageFile.cpp +++ b/dbms/src/Storages/StorageFile.cpp @@ -184,7 +184,6 @@ BlockInputStreams StorageFile::read( class StorageFileBlockOutputStream : public IBlockOutputStream { public: - explicit StorageFileBlockOutputStream(StorageFile & storage_) : storage(storage_), lock(storage.rwlock) { @@ -205,6 +204,8 @@ public: writer = FormatFactory().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), storage.context_global); } + Block getHeader() const override { return storage.getSampleBlock(); } + void write(const Block & block) override { writer->write(block); diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 443510fea42..711d48621f0 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -127,6 +127,7 @@ public: } } + Block getHeader() const override { return storage.getSampleBlock(); } void write(const Block & block) override; void writeSuffix() override; diff --git a/dbms/src/Storages/StorageMemory.cpp b/dbms/src/Storages/StorageMemory.cpp index 89ce474b065..96af6a9a138 100644 --- a/dbms/src/Storages/StorageMemory.cpp +++ b/dbms/src/Storages/StorageMemory.cpp @@ -61,6 +61,8 @@ class MemoryBlockOutputStream : public IBlockOutputStream public: explicit MemoryBlockOutputStream(StorageMemory & storage_) : storage(storage_) {} + Block getHeader() const override { return storage.getSampleBlock(); } + void write(const Block & block) override { storage.check(block, true); diff --git a/dbms/src/Storages/StorageNull.h b/dbms/src/Storages/StorageNull.h index e413ff2d930..ef3f0f6fcd1 100644 --- a/dbms/src/Storages/StorageNull.h +++ b/dbms/src/Storages/StorageNull.h @@ -33,7 +33,7 @@ public: BlockOutputStreamPtr write(const ASTPtr &, const Settings &) override { - return std::make_shared(); + return std::make_shared(getSampleBlock()); } void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & new_table_name) override diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 14cc102f609..de3e1f5886c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3214,7 +3214,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query "", "", timeouts, "ClickHouse replica"); RemoteBlockInputStream stream(connection, formattedAST(new_query), {}, context, &settings); - NullBlockOutputStream output; + NullBlockOutputStream output({}); copyData(stream, output); return; diff --git a/dbms/src/Storages/StorageSet.cpp b/dbms/src/Storages/StorageSet.cpp index 498e475a465..bdbfca46d57 100644 --- a/dbms/src/Storages/StorageSet.cpp +++ b/dbms/src/Storages/StorageSet.cpp @@ -33,6 +33,7 @@ public: SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_, const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_); + Block getHeader() const override { return table.getSampleBlock(); } void write(const Block & block) override; void writeSuffix() override; @@ -54,7 +55,7 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & ta backup_file_name(backup_file_name_), backup_buf(backup_tmp_path + backup_file_name), compressed_backup_buf(backup_buf), - backup_stream(compressed_backup_buf) + backup_stream(compressed_backup_buf, 0, table.getSampleBlock()) { } diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index 1d2d31e27bb..570dd3f9ea4 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -136,7 +136,7 @@ public: data_out(data_out_compressed, CompressionSettings(CompressionMethod::LZ4), storage.max_compress_block_size), index_out_compressed(storage.full_path() + "index.mrk", INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), index_out(index_out_compressed), - block_out(data_out, 0, &index_out, Poco::File(storage.full_path() + "data.bin").getSize()) + block_out(data_out, 0, storage.getSampleBlock(), &index_out, Poco::File(storage.full_path() + "data.bin").getSize()) { } @@ -152,6 +152,8 @@ public: } } + Block getHeader() const override { return storage.getSampleBlock(); } + void write(const Block & block) override { block_out.write(block); diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 957d1ca9aff..4ed7d6dfff2 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -118,6 +118,8 @@ public: } } + Block getHeader() const override { return storage.getSampleBlock(); } + void write(const Block & block) override; void writeSuffix() override; diff --git a/dbms/src/Storages/tests/hit_log.cpp b/dbms/src/Storages/tests/hit_log.cpp index bd1777db18f..3dd75206a2b 100644 --- a/dbms/src/Storages/tests/hit_log.cpp +++ b/dbms/src/Storages/tests/hit_log.cpp @@ -134,7 +134,7 @@ try BlockInputStreamPtr in = table->read(column_names, {}, Context::createGlobal(), stage, 8192, 1)[0]; RowOutputStreamPtr out_ = std::make_shared(out_buf, sample); - BlockOutputStreamFromRowOutputStream out(out_); + BlockOutputStreamFromRowOutputStream out(out_, sample); copyData(*in, out); } diff --git a/dbms/src/Storages/tests/storage_log.cpp b/dbms/src/Storages/tests/storage_log.cpp index 70c73d8c0b5..6d9cb5d0def 100644 --- a/dbms/src/Storages/tests/storage_log.cpp +++ b/dbms/src/Storages/tests/storage_log.cpp @@ -93,7 +93,7 @@ try LimitBlockInputStream in_limit(in, 10, 0); RowOutputStreamPtr output_ = std::make_shared(out_buf, sample); - BlockOutputStreamFromRowOutputStream output(output_); + BlockOutputStreamFromRowOutputStream output(output_, sample); copyData(in_limit, output); } diff --git a/dbms/src/Storages/tests/system_numbers.cpp b/dbms/src/Storages/tests/system_numbers.cpp index d2d0f9785b2..93e31939555 100644 --- a/dbms/src/Storages/tests/system_numbers.cpp +++ b/dbms/src/Storages/tests/system_numbers.cpp @@ -31,7 +31,7 @@ try LimitBlockInputStream input(table->read(column_names, {}, Context::createGlobal(), stage, 10, 1)[0], 10, 96); RowOutputStreamPtr output_ = std::make_shared(out_buf, sample); - BlockOutputStreamFromRowOutputStream output(output_); + BlockOutputStreamFromRowOutputStream output(output_, sample); copyData(input, output);