From 800854119e059b7213d2a3fb561458651593e4af Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 23 May 2019 14:15:18 +0300 Subject: [PATCH] Add buffer callback to fill in virtual columns --- dbms/src/DataStreams/OneBlockInputStream.h | 2 +- dbms/src/Formats/BinaryRowInputStream.cpp | 2 ++ dbms/src/Formats/CSVRowInputStream.cpp | 1 + dbms/src/Formats/CapnProtoRowInputStream.cpp | 1 + dbms/src/Formats/FormatFactory.cpp | 12 ++++++++++-- dbms/src/Formats/FormatFactory.h | 14 ++++++++++++-- dbms/src/Formats/JSONEachRowRowInputStream.cpp | 1 + dbms/src/Formats/NativeFormat.cpp | 1 + dbms/src/Formats/ParquetBlockInputStream.cpp | 1 + dbms/src/Formats/ProtobufRowInputStream.cpp | 1 + dbms/src/Formats/TSKVRowInputStream.cpp | 1 + dbms/src/Formats/TabSeparatedRowInputStream.cpp | 3 +++ dbms/src/Formats/ValuesRowInputStream.cpp | 1 + dbms/src/Interpreters/SyntaxAnalyzer.cpp | 4 +++- dbms/src/Storages/ColumnsDescription.cpp | 13 +++++++++++-- dbms/src/Storages/ColumnsDescription.h | 3 ++- dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp | 12 +++++++++++- dbms/src/Storages/Kafka/KafkaBlockInputStream.h | 1 + .../Storages/Kafka/ReadBufferFromKafkaConsumer.h | 6 +++--- 19 files changed, 67 insertions(+), 13 deletions(-) diff --git a/dbms/src/DataStreams/OneBlockInputStream.h b/dbms/src/DataStreams/OneBlockInputStream.h index 3f1da34fcd8..168053b4fb3 100644 --- a/dbms/src/DataStreams/OneBlockInputStream.h +++ b/dbms/src/DataStreams/OneBlockInputStream.h @@ -12,7 +12,7 @@ namespace DB class OneBlockInputStream : public IBlockInputStream { public: - OneBlockInputStream(const Block & block_) : block(block_) {} + explicit OneBlockInputStream(const Block & block_) : block(block_) {} String getName() const override { return "One"; } diff --git a/dbms/src/Formats/BinaryRowInputStream.cpp b/dbms/src/Formats/BinaryRowInputStream.cpp index c710b17ee9e..37b405c18df 100644 --- a/dbms/src/Formats/BinaryRowInputStream.cpp +++ b/dbms/src/Formats/BinaryRowInputStream.cpp @@ -65,6 +65,7 @@ void registerInputFormatRowBinary(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( @@ -78,6 +79,7 @@ void registerInputFormatRowBinary(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Formats/CSVRowInputStream.cpp b/dbms/src/Formats/CSVRowInputStream.cpp index bb348faa96d..6c118f73f01 100644 --- a/dbms/src/Formats/CSVRowInputStream.cpp +++ b/dbms/src/Formats/CSVRowInputStream.cpp @@ -479,6 +479,7 @@ void registerInputFormatCSV(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Formats/CapnProtoRowInputStream.cpp b/dbms/src/Formats/CapnProtoRowInputStream.cpp index 414a25cf39c..e83de3f676e 100644 --- a/dbms/src/Formats/CapnProtoRowInputStream.cpp +++ b/dbms/src/Formats/CapnProtoRowInputStream.cpp @@ -307,6 +307,7 @@ void registerInputFormatCapnProto(FormatFactory & factory) const Context & context, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 08f0355064b..f9454ab7f65 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -27,7 +27,14 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name) } -BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size) const +BlockInputStreamPtr FormatFactory::getInput( + const String & name, + ReadBuffer & buf, + const Block & sample, + const Context & context, + UInt64 max_block_size, + UInt64 rows_portion_size, + BufferCallback callback) const { const auto & input_getter = getCreators(name).first; if (!input_getter) @@ -47,7 +54,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu format_settings.input_allow_errors_num = settings.input_format_allow_errors_num; format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio; - return input_getter(buf, sample, context, max_block_size, rows_portion_size, format_settings); + return input_getter( + buf, sample, context, max_block_size, rows_portion_size, callback ? callback : [] {}, format_settings); } diff --git a/dbms/src/Formats/FormatFactory.h b/dbms/src/Formats/FormatFactory.h index 843d866301d..79e3d98659d 100644 --- a/dbms/src/Formats/FormatFactory.h +++ b/dbms/src/Formats/FormatFactory.h @@ -24,6 +24,9 @@ class WriteBuffer; */ class FormatFactory final : public ext::singleton { +public: + using BufferCallback = std::function; + private: using InputCreator = std::function; using OutputCreator = std::function; public: - BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf, - const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size = 0) const; + BlockInputStreamPtr getInput( + const String & name, + ReadBuffer & buf, + const Block & sample, + const Context & context, + UInt64 max_block_size, + UInt64 rows_portion_size = 0, + BufferCallback callback = {}) const; BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const; diff --git a/dbms/src/Formats/JSONEachRowRowInputStream.cpp b/dbms/src/Formats/JSONEachRowRowInputStream.cpp index 5055d6c0c7d..30a140edace 100644 --- a/dbms/src/Formats/JSONEachRowRowInputStream.cpp +++ b/dbms/src/Formats/JSONEachRowRowInputStream.cpp @@ -260,6 +260,7 @@ void registerInputFormatJSONEachRow(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Formats/NativeFormat.cpp b/dbms/src/Formats/NativeFormat.cpp index 88e727fdd3f..06cce134e57 100644 --- a/dbms/src/Formats/NativeFormat.cpp +++ b/dbms/src/Formats/NativeFormat.cpp @@ -14,6 +14,7 @@ void registerInputFormatNative(FormatFactory & factory) const Context &, UInt64 /* max_block_size */, UInt64 /* min_read_rows */, + FormatFactory::BufferCallback /* callback */, const FormatSettings &) { return std::make_shared(buf, sample, 0); diff --git a/dbms/src/Formats/ParquetBlockInputStream.cpp b/dbms/src/Formats/ParquetBlockInputStream.cpp index a573969b65f..1cd1ca4ae40 100644 --- a/dbms/src/Formats/ParquetBlockInputStream.cpp +++ b/dbms/src/Formats/ParquetBlockInputStream.cpp @@ -477,6 +477,7 @@ void registerInputFormatParquet(FormatFactory & factory) const Context & context, UInt64 /* max_block_size */, UInt64 /* rows_portion_size */, + FormatFactory::BufferCallback /* callback */, const FormatSettings & /* settings */) { return std::make_shared(buf, sample, context); }); } diff --git a/dbms/src/Formats/ProtobufRowInputStream.cpp b/dbms/src/Formats/ProtobufRowInputStream.cpp index 44d830f56ea..dc658401de4 100644 --- a/dbms/src/Formats/ProtobufRowInputStream.cpp +++ b/dbms/src/Formats/ProtobufRowInputStream.cpp @@ -73,6 +73,7 @@ void registerInputFormatProtobuf(FormatFactory & factory) const Context & context, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Formats/TSKVRowInputStream.cpp b/dbms/src/Formats/TSKVRowInputStream.cpp index ac89d5ec1c5..17038dc36ad 100644 --- a/dbms/src/Formats/TSKVRowInputStream.cpp +++ b/dbms/src/Formats/TSKVRowInputStream.cpp @@ -199,6 +199,7 @@ void registerInputFormatTSKV(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Formats/TabSeparatedRowInputStream.cpp b/dbms/src/Formats/TabSeparatedRowInputStream.cpp index 884bc49454f..f7fd7783725 100644 --- a/dbms/src/Formats/TabSeparatedRowInputStream.cpp +++ b/dbms/src/Formats/TabSeparatedRowInputStream.cpp @@ -457,6 +457,7 @@ void registerInputFormatTabSeparated(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( @@ -473,6 +474,7 @@ void registerInputFormatTabSeparated(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( @@ -489,6 +491,7 @@ void registerInputFormatTabSeparated(FormatFactory & factory) const Context &, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Formats/ValuesRowInputStream.cpp b/dbms/src/Formats/ValuesRowInputStream.cpp index b2d972d678b..ba2a34a84ef 100644 --- a/dbms/src/Formats/ValuesRowInputStream.cpp +++ b/dbms/src/Formats/ValuesRowInputStream.cpp @@ -156,6 +156,7 @@ void registerInputFormatValues(FormatFactory & factory) const Context & context, UInt64 max_block_size, UInt64 rows_portion_size, + FormatFactory::BufferCallback /* callback */, const FormatSettings & settings) { return std::make_shared( diff --git a/dbms/src/Interpreters/SyntaxAnalyzer.cpp b/dbms/src/Interpreters/SyntaxAnalyzer.cpp index a6f91356dbe..1fa874f3be5 100644 --- a/dbms/src/Interpreters/SyntaxAnalyzer.cpp +++ b/dbms/src/Interpreters/SyntaxAnalyzer.cpp @@ -75,8 +75,10 @@ void collectSourceColumns(const ASTSelectQuery * select_query, StoragePtr storag if (select_query) { - const auto & storage_aliases = storage->getColumns().getAliasesAndVirtuals(); + const auto & storage_aliases = storage->getColumns().getAliases(); + const auto & storage_virtuals = storage->getColumns().getVirtuals(); source_columns.insert(source_columns.end(), storage_aliases.begin(), storage_aliases.end()); + source_columns.insert(source_columns.end(), storage_virtuals.begin(), storage_virtuals.end()); } } } diff --git a/dbms/src/Storages/ColumnsDescription.cpp b/dbms/src/Storages/ColumnsDescription.cpp index c51807c2679..2dbe308ea57 100644 --- a/dbms/src/Storages/ColumnsDescription.cpp +++ b/dbms/src/Storages/ColumnsDescription.cpp @@ -246,15 +246,24 @@ NamesAndTypesList ColumnsDescription::getMaterialized() const return ret; } -NamesAndTypesList ColumnsDescription::getAliasesAndVirtuals() const +NamesAndTypesList ColumnsDescription::getAliases() const { NamesAndTypesList ret; for (const auto & col : columns) - if (col.default_desc.kind == ColumnDefaultKind::Alias || col.is_virtual) + if (col.default_desc.kind == ColumnDefaultKind::Alias) ret.emplace_back(col.name, col.type); return ret; } +NamesAndTypesList ColumnsDescription::getVirtuals() const +{ + NamesAndTypesList result; + for (const auto & column : columns) + if (column.is_virtual) + result.emplace_back(column.name, column.type); + return result; +} + NamesAndTypesList ColumnsDescription::getAll() const { NamesAndTypesList ret; diff --git a/dbms/src/Storages/ColumnsDescription.h b/dbms/src/Storages/ColumnsDescription.h index e7f2919c3bd..d0d042498fa 100644 --- a/dbms/src/Storages/ColumnsDescription.h +++ b/dbms/src/Storages/ColumnsDescription.h @@ -67,7 +67,8 @@ public: NamesAndTypesList getOrdinary() const; NamesAndTypesList getMaterialized() const; - NamesAndTypesList getAliasesAndVirtuals() const; + NamesAndTypesList getAliases() const; + NamesAndTypesList getVirtuals() const; NamesAndTypesList getAllPhysical() const; /// ordinary + materialized. NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + virtuals. diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp index abc4e702a6e..c5bff5f2b1a 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -16,6 +16,8 @@ KafkaBlockInputStream::KafkaBlockInputStream( if (!storage.getSchemaName().empty()) context.setSetting("format_schema", storage.getSchemaName()); + + virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns(); } KafkaBlockInputStream::~KafkaBlockInputStream() @@ -50,8 +52,16 @@ void KafkaBlockInputStream::readPrefixImpl() rows_portion_size = std::max(rows_portion_size, 1ul); auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support + auto buffer_callback = [this] + { + const auto * sub_buffer = buffer->subBufferAs(); + virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic" + virtual_columns[1]->insert(sub_buffer->currentKey()); // "key" + virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset" + }; + auto child = FormatFactory::instance().getInput( - storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size); + storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size, buffer_callback); child->setLimits(limits); addChild(child); diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.h b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h index dcaec1f5066..d51100ce938 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h @@ -28,6 +28,7 @@ private: UInt64 max_block_size; BufferPtr buffer; + MutableColumns virtual_columns; bool broken = true, claimed = false; }; diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index 9bb3fd473ab..acfb88d3160 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -35,9 +35,9 @@ public: auto pollTimeout() { return poll_timeout; } // Return values for the message that's being read. - String currentTopic() { return current[-1].get_topic(); } - String currentKey() { return current[-1].get_key(); } - auto currentOffset() { return current[-1].get_offset(); } + String currentTopic() const { return current[-1].get_topic(); } + String currentKey() const { return current[-1].get_key(); } + auto currentOffset() const { return current[-1].get_offset(); } private: using Messages = std::vector;