Remove ProtobufRowOutputStream.

This commit is contained in:
Nikolai Kochetov 2019-08-02 19:31:17 +03:00
parent 8c66d106e4
commit 5fb548c994
5 changed files with 17 additions and 126 deletions

View File

@ -207,8 +207,6 @@ void registerInputFormatCSV(FormatFactory & factory);
void registerOutputFormatCSV(FormatFactory & factory);
void registerInputFormatTSKV(FormatFactory & factory);
void registerOutputFormatTSKV(FormatFactory & factory);
void registerOutputFormatTSKV(FormatFactory & factory);
void registerOutputFormatProtobuf(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory);
@ -262,7 +260,6 @@ FormatFactory::FormatFactory()
registerOutputFormatCSV(*this);
registerInputFormatTSKV(*this);
registerOutputFormatTSKV(*this);
registerOutputFormatProtobuf(*this);
registerInputFormatProcessorNative(*this);
registerOutputFormatProcessorNative(*this);

View File

@ -1,55 +0,0 @@
#include <Formats/FormatFactory.h>
#include "config_formats.h"
#if USE_PROTOBUF
#include "ProtobufRowOutputStream.h"
#include <Core/Block.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <google/protobuf/descriptor.h>
namespace DB
{
ProtobufRowOutputStream::ProtobufRowOutputStream(WriteBuffer & out, const Block & header, const FormatSchemaInfo & format_schema)
: data_types(header.getDataTypes()), writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
{
value_indices.resize(header.columns());
}
void ProtobufRowOutputStream::write(const Block & block, size_t row_num)
{
writer.startMessage();
std::fill(value_indices.begin(), value_indices.end(), 0);
size_t column_index;
while (writer.writeField(column_index))
data_types[column_index]->serializeProtobuf(
*block.getByPosition(column_index).column, row_num, writer, value_indices[column_index]);
writer.endMessage();
}
void registerOutputFormatProtobuf(FormatFactory & factory)
{
factory.registerOutputFormat(
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<ProtobufRowOutputStream>(buf, header, FormatSchemaInfo(context, "Protobuf")), header);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProtobuf(FormatFactory &) {}
}
#endif

View File

@ -1,44 +0,0 @@
#pragma once
#include <DataTypes/IDataType.h>
#include <Formats/IRowOutputStream.h>
#include <Formats/ProtobufWriter.h>
namespace google
{
namespace protobuf
{
class Message;
}
}
namespace DB
{
class Block;
class FormatSchemaInfo;
/** Stream designed to serialize data in the google protobuf format.
* Each row is written as a separated message.
* These messages are delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g.
* SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufRowOutputStream : public IRowOutputStream
{
public:
ProtobufRowOutputStream(WriteBuffer & out, const Block & header, const FormatSchemaInfo & format_schema);
void write(const Block & block, size_t row_num) override;
std::string getContentType() const override { return "application/octet-stream"; }
private:
DataTypes data_types;
ProtobufWriter writer;
std::vector<size_t> value_indices;
};
}

View File

@ -3,7 +3,7 @@
#include "config_formats.h"
#if USE_PROTOBUF
#include <Processors/Formats/Impl/ProtobufBlockOutputFormat.h>
#include <Processors/Formats/Impl/ProtobufRowOutputFormat.h>
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
@ -22,32 +22,26 @@ namespace ErrorCodes
}
ProtobufBlockOutputFormat::ProtobufBlockOutputFormat(
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const FormatSchemaInfo & format_schema)
: IOutputFormat(header, out_)
: IRowOutputFormat(header, out_)
, data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
{
value_indices.resize(header.columns());
}
void ProtobufBlockOutputFormat::consume(Chunk chunk)
void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
{
auto & columns = chunk.getColumns();
auto num_rows = chunk.getNumRows();
for (UInt64 row_num = 0; row_num < num_rows; ++row_num)
{
writer.startMessage();
std::fill(value_indices.begin(), value_indices.end(), 0);
size_t column_index;
while (writer.writeField(column_index))
data_types[column_index]->serializeProtobuf(
*columns[column_index], row_num, writer, value_indices[column_index]);
writer.endMessage();
}
writer.startMessage();
std::fill(value_indices.begin(), value_indices.end(), 0);
size_t column_index;
while (writer.writeField(column_index))
data_types[column_index]->serializeProtobuf(
*columns[column_index], row_num, writer, value_indices[column_index]);
writer.endMessage();
}
@ -56,7 +50,7 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
factory.registerOutputFormatProcessor(
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &)
{
return std::make_shared<ProtobufBlockOutputFormat>(buf, header, FormatSchemaInfo(context, "Protobuf"));
return std::make_shared<ProtobufRowOutputFormat>(buf, header, FormatSchemaInfo(context, "Protobuf"));
});
}

View File

@ -7,7 +7,7 @@
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufWriter.h>
#include <Formats/FormatSchemaInfo.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
namespace google
@ -29,18 +29,17 @@ namespace DB
* SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufBlockOutputFormat : public IOutputFormat
class ProtobufRowOutputFormat : public IRowOutputFormat
{
public:
ProtobufBlockOutputFormat(
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const FormatSchemaInfo & format_schema);
String getName() const override { return "ProtobufBlockOutputFormat"; }
void consume(Chunk) override;
String getName() const override { return "ProtobufRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
std::string getContentType() const override { return "application/octet-stream"; }
private: