diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 30a08b37153..5ef86844b67 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -207,8 +207,6 @@ void registerInputFormatCSV(FormatFactory & factory); void registerOutputFormatCSV(FormatFactory & factory); void registerInputFormatTSKV(FormatFactory & factory); void registerOutputFormatTSKV(FormatFactory & factory); -void registerOutputFormatTSKV(FormatFactory & factory); -void registerOutputFormatProtobuf(FormatFactory & factory); void registerInputFormatProcessorNative(FormatFactory & factory); void registerOutputFormatProcessorNative(FormatFactory & factory); @@ -262,7 +260,6 @@ FormatFactory::FormatFactory() registerOutputFormatCSV(*this); registerInputFormatTSKV(*this); registerOutputFormatTSKV(*this); - registerOutputFormatProtobuf(*this); registerInputFormatProcessorNative(*this); registerOutputFormatProcessorNative(*this); diff --git a/dbms/src/Formats/ProtobufRowOutputStream.cpp b/dbms/src/Formats/ProtobufRowOutputStream.cpp deleted file mode 100644 index dfa43a1c7e8..00000000000 --- a/dbms/src/Formats/ProtobufRowOutputStream.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include - -#include "config_formats.h" -#if USE_PROTOBUF - -#include "ProtobufRowOutputStream.h" - -#include -#include -#include -#include -#include - - -namespace DB -{ -ProtobufRowOutputStream::ProtobufRowOutputStream(WriteBuffer & out, const Block & header, const FormatSchemaInfo & format_schema) - : data_types(header.getDataTypes()), writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames()) -{ - value_indices.resize(header.columns()); -} - -void ProtobufRowOutputStream::write(const Block & block, size_t row_num) -{ - writer.startMessage(); - std::fill(value_indices.begin(), value_indices.end(), 0); - size_t column_index; - while (writer.writeField(column_index)) - data_types[column_index]->serializeProtobuf( - *block.getByPosition(column_index).column, row_num, writer, value_indices[column_index]); - writer.endMessage(); -} - - -void registerOutputFormatProtobuf(FormatFactory & factory) -{ - factory.registerOutputFormat( - "Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &) - { - return std::make_shared( - std::make_shared(buf, header, FormatSchemaInfo(context, "Protobuf")), header); - }); -} - -} - -#else - -namespace DB -{ - class FormatFactory; - void registerOutputFormatProtobuf(FormatFactory &) {} -} - -#endif diff --git a/dbms/src/Formats/ProtobufRowOutputStream.h b/dbms/src/Formats/ProtobufRowOutputStream.h deleted file mode 100644 index 2bc1ebb32c9..00000000000 --- a/dbms/src/Formats/ProtobufRowOutputStream.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include -#include -#include - - -namespace google -{ -namespace protobuf -{ - class Message; -} -} - - -namespace DB -{ -class Block; -class FormatSchemaInfo; - -/** Stream designed to serialize data in the google protobuf format. - * Each row is written as a separated message. - * These messages are delimited according to documentation - * https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h - * Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g. - * SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message' - * where schema is the name of "schema.proto" file specifying protobuf schema. - */ -class ProtobufRowOutputStream : public IRowOutputStream -{ -public: - ProtobufRowOutputStream(WriteBuffer & out, const Block & header, const FormatSchemaInfo & format_schema); - - void write(const Block & block, size_t row_num) override; - std::string getContentType() const override { return "application/octet-stream"; } - -private: - DataTypes data_types; - ProtobufWriter writer; - std::vector value_indices; -}; - -} diff --git a/dbms/src/Processors/Formats/Impl/ProtobufBlockOutputFormat.cpp b/dbms/src/Processors/Formats/Impl/ProtobufRowOutputFormat.cpp similarity index 56% rename from dbms/src/Processors/Formats/Impl/ProtobufBlockOutputFormat.cpp rename to dbms/src/Processors/Formats/Impl/ProtobufRowOutputFormat.cpp index 03c8ef1b2ac..50f79dda993 100644 --- a/dbms/src/Processors/Formats/Impl/ProtobufBlockOutputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/ProtobufRowOutputFormat.cpp @@ -3,7 +3,7 @@ #include "config_formats.h" #if USE_PROTOBUF -#include +#include #include #include @@ -22,32 +22,26 @@ namespace ErrorCodes } -ProtobufBlockOutputFormat::ProtobufBlockOutputFormat( +ProtobufRowOutputFormat::ProtobufRowOutputFormat( WriteBuffer & out_, const Block & header, const FormatSchemaInfo & format_schema) - : IOutputFormat(header, out_) + : IRowOutputFormat(header, out_) , data_types(header.getDataTypes()) , writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames()) { value_indices.resize(header.columns()); } -void ProtobufBlockOutputFormat::consume(Chunk chunk) +void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num) { - auto & columns = chunk.getColumns(); - auto num_rows = chunk.getNumRows(); - - for (UInt64 row_num = 0; row_num < num_rows; ++row_num) - { - writer.startMessage(); - std::fill(value_indices.begin(), value_indices.end(), 0); - size_t column_index; - while (writer.writeField(column_index)) - data_types[column_index]->serializeProtobuf( - *columns[column_index], row_num, writer, value_indices[column_index]); - writer.endMessage(); - } + writer.startMessage(); + std::fill(value_indices.begin(), value_indices.end(), 0); + size_t column_index; + while (writer.writeField(column_index)) + data_types[column_index]->serializeProtobuf( + *columns[column_index], row_num, writer, value_indices[column_index]); + writer.endMessage(); } @@ -56,7 +50,7 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory) factory.registerOutputFormatProcessor( "Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &) { - return std::make_shared(buf, header, FormatSchemaInfo(context, "Protobuf")); + return std::make_shared(buf, header, FormatSchemaInfo(context, "Protobuf")); }); } diff --git a/dbms/src/Processors/Formats/Impl/ProtobufBlockOutputFormat.h b/dbms/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h similarity index 80% rename from dbms/src/Processors/Formats/Impl/ProtobufBlockOutputFormat.h rename to dbms/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h index 9cc5c7af68e..bdc6f5e2492 100644 --- a/dbms/src/Processors/Formats/Impl/ProtobufBlockOutputFormat.h +++ b/dbms/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h @@ -7,7 +7,7 @@ #include #include #include -#include +#include namespace google @@ -29,18 +29,17 @@ namespace DB * SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message' * where schema is the name of "schema.proto" file specifying protobuf schema. */ -class ProtobufBlockOutputFormat : public IOutputFormat +class ProtobufRowOutputFormat : public IRowOutputFormat { public: - ProtobufBlockOutputFormat( + ProtobufRowOutputFormat( WriteBuffer & out_, const Block & header, const FormatSchemaInfo & format_schema); - String getName() const override { return "ProtobufBlockOutputFormat"; } - - void consume(Chunk) override; + String getName() const override { return "ProtobufRowOutputFormat"; } + void write(const Columns & columns, size_t row_num) override; std::string getContentType() const override { return "application/octet-stream"; } private: