ProtobufSingle output format

This commit is contained in:
Mikhail Filimonov 2020-09-23 19:16:04 +02:00
parent b602d18dd8
commit 6df42c580b
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
9 changed files with 60 additions and 23 deletions

View File

@ -58,7 +58,7 @@ ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_, const bool single_m
{
}
[[noreturn]] void ProtobufReader::SimpleReader::throwUnknownFormat()
[[noreturn]] void ProtobufReader::SimpleReader::throwUnknownFormat() const
{
throw Exception(std::string("Protobuf messages are corrupted or don't match the provided schema.") + (single_message_mode ? "" : " Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint."), ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}

View File

@ -126,7 +126,7 @@ private:
UInt64 continueReadingVarint(UInt64 first_byte);
void ignoreVarint();
void ignoreGroup();
[[noreturn]] void throwUnknownFormat();
[[noreturn]] void throwUnknownFormat() const;
ReadBuffer & in;
Int64 cursor;

View File

@ -21,6 +21,7 @@ namespace ErrorCodes
extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD;
extern const int PROTOBUF_BAD_CAST;
extern const int PROTOBUF_FIELD_NOT_REPEATED;
extern const int TOO_MANY_ROWS;
}
@ -123,7 +124,16 @@ namespace
// SimpleWriter is an utility class to serialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to serialize data.
ProtobufWriter::SimpleWriter::SimpleWriter(WriteBuffer & out_) : out(out_), current_piece_start(0), num_bytes_skipped(0)
ProtobufWriter::SimpleWriter::SimpleWriter(WriteBuffer & out_, const bool single_message_mode_)
: out(out_)
, current_piece_start(0)
, num_bytes_skipped(0)
, produce_length_delimiters(!single_message_mode_)
// normally for single_message_mode we forbid outputting more than one row
// (it would lead to malformed protobuf message), but Kafka/Rabbit push every row
// in a separate message, so it's valid case there and should be allowed.
, allow_several_messages(!single_message_mode_ || out.producesIsolatedRows())
, row_was_send(false)
{
}
@ -131,13 +141,21 @@ ProtobufWriter::SimpleWriter::~SimpleWriter() = default;
void ProtobufWriter::SimpleWriter::startMessage()
{
if (!allow_several_messages && row_was_send)
{
throw Exception("ProtobufSingle can output only single row at a time.", ErrorCodes::TOO_MANY_ROWS);
}
row_was_send = true;
}
void ProtobufWriter::SimpleWriter::endMessage()
{
pieces.emplace_back(current_piece_start, buffer.size());
size_t size_of_message = buffer.size() - num_bytes_skipped;
writeVarint(size_of_message, out);
if (produce_length_delimiters)
{
size_t size_of_message = buffer.size() - num_bytes_skipped;
writeVarint(size_of_message, out);
}
for (const auto & piece : pieces)
if (piece.end > piece.start)
out.write(reinterpret_cast<char *>(&buffer[piece.start]), piece.end - piece.start);
@ -827,8 +845,8 @@ std::unique_ptr<ProtobufWriter::IConverter> ProtobufWriter::createConverter<goog
ProtobufWriter::ProtobufWriter(
WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names)
: simple_writer(out)
WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_)
: simple_writer(out, single_message_mode_)
{
std::vector<const google::protobuf::FieldDescriptor *> field_descriptors_without_match;
root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type, field_descriptors_without_match);

View File

@ -37,7 +37,7 @@ using ConstAggregateDataPtr = const char *;
class ProtobufWriter : private boost::noncopyable
{
public:
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names);
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_);
~ProtobufWriter();
/// Should be called at the beginning of writing a message.
@ -89,7 +89,7 @@ private:
class SimpleWriter
{
public:
SimpleWriter(WriteBuffer & out_);
SimpleWriter(WriteBuffer & out_, const bool single_message_mode_);
~SimpleWriter();
void startMessage();
@ -138,6 +138,10 @@ private:
size_t current_piece_start;
size_t num_bytes_skipped;
std::vector<NestedInfo> nested_infos;
const bool produce_length_delimiters;
const bool allow_several_messages;
bool row_was_send;
};
class IConverter

View File

@ -60,6 +60,11 @@ public:
*/
virtual ~WriteBuffer() {}
/**
* some buffers (kafka / rabbit) split the rows internally, so we can push there formats without
* framing / delimiters (like ProtobufSingle)
*/
virtual bool producesIsolatedRows() { return false; }
inline void nextIfAtEnd()
{

View File

@ -21,10 +21,11 @@ ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema)
const FormatSchemaInfo & format_schema,
const bool single_message_mode_)
: IRowOutputFormat(header, out_, callback)
, data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames(), single_message_mode_)
{
value_indices.resize(header.columns());
}
@ -43,17 +44,22 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"Protobuf",
[](WriteBuffer & buf,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path));
});
for (bool single_message_mode : {false, true})
{
factory.registerOutputFormatProcessor(
single_message_mode ? "ProtobufSingle" : "Protobuf",
[single_message_mode](WriteBuffer & buf,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
single_message_mode);
});
}
}
}

View File

@ -38,7 +38,8 @@ public:
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema);
const FormatSchemaInfo & format_schema,
const bool single_message_mode_);
String getName() const override { return "ProtobufRowOutputFormat"; }

View File

@ -25,6 +25,8 @@ public:
const Block & header);
~WriteBufferToKafkaProducer() override;
bool producesIsolatedRows() override { return true; }
void countRow(const Columns & columns, size_t row);
void flush();

View File

@ -34,6 +34,7 @@ public:
);
~WriteBufferToRabbitMQProducer() override;
bool producesIsolatedRows() override { return true; }
void countRow();
void activateWriting() { writing_task->activateAndSchedule(); }