After CR fixes

This commit is contained in:
Mikhail Filimonov 2020-10-06 16:32:01 +02:00
parent c37a456c49
commit f6b00f2cb6
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
19 changed files with 86 additions and 83 deletions

View File

@ -203,7 +203,7 @@ BlockInputStreamPtr FormatFactory::getInput(
BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
{
if (!getCreators(name).output_processor_creator)
{
@ -221,7 +221,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(
output_getter(buf, sample, std::move(callback), format_settings), sample);
}
auto format = getOutputFormat(name, buf, sample, context, std::move(callback));
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), ignore_no_row_delimiter);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
@ -260,7 +260,7 @@ InputFormatPtr FormatFactory::getInputFormat(
OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
@ -270,6 +270,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
FormatSettings format_settings = getOutputFormatSetting(settings, context);
RowOutputFormatParams params;
params.ignore_no_row_delimiter = ignore_no_row_delimiter;
params.callback = std::move(callback);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,

View File

@ -108,7 +108,7 @@ public:
ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context, WriteCallback callback = {}) const;
const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
InputFormatPtr getInputFormat(
const String & name,
@ -119,7 +119,7 @@ public:
ReadCallback callback = {}) const;
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const;
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);

View File

@ -47,20 +47,20 @@ namespace
// SimpleReader is an utility class to deserialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to deserialize data.
ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_, const bool single_message_mode_)
ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_, const bool use_length_delimiters_)
: in(in_)
, cursor(0)
, current_message_level(0)
, current_message_end(0)
, field_end(0)
, last_string_pos(-1)
, single_message_mode(single_message_mode_)
, use_length_delimiters(use_length_delimiters_)
{
}
[[noreturn]] void ProtobufReader::SimpleReader::throwUnknownFormat() const
{
throw Exception(std::string("Protobuf messages are corrupted or don't match the provided schema.") + (single_message_mode ? "" : " Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint."), ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
throw Exception(std::string("Protobuf messages are corrupted or don't match the provided schema.") + (use_length_delimiters ? " Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint." : ""), ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}
bool ProtobufReader::SimpleReader::startMessage()
@ -70,15 +70,15 @@ bool ProtobufReader::SimpleReader::startMessage()
if (unlikely(in.eof()))
return false;
if (single_message_mode)
{
current_message_end = END_OF_FILE;
}
else
if (use_length_delimiters)
{
size_t size_of_message = readVarint();
current_message_end = cursor + size_of_message;
}
else
{
current_message_end = END_OF_FILE;
}
++current_message_level;
field_end = cursor;
return true;
@ -161,16 +161,23 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
throwUnknownFormat();
}
if (current_message_end == END_OF_FILE)
if (cursor >= current_message_end)
{
if (unlikely(in.eof()))
if (current_message_end == END_OF_FILE)
{
current_message_end = cursor;
return false;
if (unlikely(in.eof()))
{
current_message_end = cursor;
return false;
}
}
else if (current_message_end == END_OF_GROUP)
{
/// We'll check for the `GROUP_END` marker later.
}
else
return false;
}
else if ((cursor >= current_message_end) && (current_message_end != END_OF_GROUP))
return false;
UInt64 varint = readVarint();
if (unlikely(varint & (static_cast<UInt64>(0xFFFFFFFF) << 32)))
@ -1096,8 +1103,8 @@ std::unique_ptr<ProtobufReader::IConverter> ProtobufReader::createConverter<goog
ProtobufReader::ProtobufReader(
ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_)
: simple_reader(in_, single_message_mode_)
ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_)
: simple_reader(in_, use_length_delimiters_)
{
root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type);
setTraitsDataAfterMatchingColumns(root_message.get());

View File

@ -37,7 +37,7 @@ using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
class ProtobufReader : private boost::noncopyable
{
public:
ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_);
ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_);
~ProtobufReader();
/// Should be called when we start reading a new message.
@ -93,7 +93,7 @@ private:
class SimpleReader
{
public:
SimpleReader(ReadBuffer & in_, const bool single_message_mode_);
SimpleReader(ReadBuffer & in_, const bool use_length_delimiters_);
bool startMessage();
void endMessage(bool ignore_errors);
void startNestedMessage();
@ -135,7 +135,7 @@ private:
std::vector<Int64> parent_message_ends;
Int64 field_end;
Int64 last_string_pos;
const bool single_message_mode;
const bool use_length_delimiters;
};
class IConverter

View File

@ -21,7 +21,6 @@ namespace ErrorCodes
extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD;
extern const int PROTOBUF_BAD_CAST;
extern const int PROTOBUF_FIELD_NOT_REPEATED;
extern const int TOO_MANY_ROWS;
}
@ -124,16 +123,11 @@ namespace
// SimpleWriter is an utility class to serialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to serialize data.
ProtobufWriter::SimpleWriter::SimpleWriter(WriteBuffer & out_, const bool single_message_mode_)
ProtobufWriter::SimpleWriter::SimpleWriter(WriteBuffer & out_, const bool use_length_delimiters_)
: out(out_)
, current_piece_start(0)
, num_bytes_skipped(0)
, produce_length_delimiters(!single_message_mode_)
// normally for single_message_mode we forbid outputting more than one row
// (it would lead to malformed protobuf message), but Kafka/Rabbit push every row
// in a separate message, so it's valid case there and should be allowed.
, allow_several_messages(!single_message_mode_ || out.producesIsolatedRows())
, row_was_send(false)
, use_length_delimiters(use_length_delimiters_)
{
}
@ -141,17 +135,12 @@ ProtobufWriter::SimpleWriter::~SimpleWriter() = default;
void ProtobufWriter::SimpleWriter::startMessage()
{
if (!allow_several_messages && row_was_send)
{
throw Exception("ProtobufSingle can output only single row at a time.", ErrorCodes::TOO_MANY_ROWS);
}
row_was_send = true;
}
void ProtobufWriter::SimpleWriter::endMessage()
{
pieces.emplace_back(current_piece_start, buffer.size());
if (produce_length_delimiters)
if (use_length_delimiters)
{
size_t size_of_message = buffer.size() - num_bytes_skipped;
writeVarint(size_of_message, out);
@ -845,8 +834,8 @@ std::unique_ptr<ProtobufWriter::IConverter> ProtobufWriter::createConverter<goog
ProtobufWriter::ProtobufWriter(
WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_)
: simple_writer(out, single_message_mode_)
WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_)
: simple_writer(out, use_length_delimiters_)
{
std::vector<const google::protobuf::FieldDescriptor *> field_descriptors_without_match;
root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type, field_descriptors_without_match);

View File

@ -37,7 +37,7 @@ using ConstAggregateDataPtr = const char *;
class ProtobufWriter : private boost::noncopyable
{
public:
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_);
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_);
~ProtobufWriter();
/// Should be called at the beginning of writing a message.
@ -89,7 +89,7 @@ private:
class SimpleWriter
{
public:
SimpleWriter(WriteBuffer & out_, const bool single_message_mode_);
SimpleWriter(WriteBuffer & out_, const bool use_length_delimiters_);
~SimpleWriter();
void startMessage();
@ -138,10 +138,7 @@ private:
size_t current_piece_start;
size_t num_bytes_skipped;
std::vector<NestedInfo> nested_infos;
const bool produce_length_delimiters;
const bool allow_several_messages;
bool row_was_send;
const bool use_length_delimiters;
};
class IConverter

View File

@ -38,13 +38,14 @@ try
FormatSettings format_settings;
RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};
RowInputFormatParams in_params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};
RowOutputFormatParams out_params{[](const Columns & /* columns */, size_t /* row */){},false};
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, params, false, false, format_settings);
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, in_params, false, false, format_settings);
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [](const Columns & /* columns */, size_t /* row */){}, format_settings));
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, out_params, format_settings));
copyData(*block_input, *block_output);
return 0;

View File

@ -60,12 +60,6 @@ public:
*/
virtual ~WriteBuffer() {}
/**
* some buffers (kafka / rabbit) split the rows internally, so we can push there formats without
* framing / delimiters (like ProtobufSingle)
*/
virtual bool producesIsolatedRows() { return false; }
inline void nextIfAtEnd()
{
if (!hasPendingData())

View File

@ -9,13 +9,20 @@
namespace DB
{
/// Common parameters for generating blocks.
struct RowOutputFormatParams
{
using WriteCallback = std::function<void(const Columns & columns,size_t row)>;
// Callback used to indicate that another row is written.
WriteCallback callback;
/**
* some buffers (kafka / rabbit) split the rows internally using callback
* so we can push there formats without framing / delimiters
* (like ProtobufSingle). In other cases you can't write more than single row
* in unframed format.
*/
bool ignore_no_row_delimiter = false;
};
class WriteBuffer;
@ -26,6 +33,7 @@ class IRowOutputFormat : public IOutputFormat
{
protected:
DataTypes types;
bool first_row = true;
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
@ -66,7 +74,6 @@ public:
virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes.
private:
bool first_row = true;
bool prefix_written = false;
bool suffix_written = false;

View File

@ -11,10 +11,10 @@
namespace DB
{
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool single_message_mode_)
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool use_length_delimiters_)
: IRowInputFormat(header_, in_, params_)
, data_types(header_.getDataTypes())
, reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames(), single_message_mode_)
, reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames(), use_length_delimiters_)
{
}
@ -67,9 +67,9 @@ void ProtobufRowInputFormat::syncAfterError()
void registerInputFormatProcessorProtobuf(FormatFactory & factory)
{
for (bool single_message_mode : {false, true})
for (bool use_length_delimiters : {false, true})
{
factory.registerInputFormatProcessor(single_message_mode ? "ProtobufSingle" : "Protobuf", [single_message_mode](
factory.registerInputFormatProcessor(use_length_delimiters ? "Protobuf" : "ProtobufSingle", [use_length_delimiters](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
@ -78,7 +78,7 @@ void registerInputFormatProcessorProtobuf(FormatFactory & factory)
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
single_message_mode);
use_length_delimiters);
});
}
}

View File

@ -18,8 +18,8 @@ class FormatSchemaInfo;
/** Stream designed to deserialize data from the google protobuf format.
* One Protobuf message is parsed as one row of data.
*
* Input buffer may contain single protobuf message (single_message_mode_ = true),
* or any number of messages (single_message_mode_ = false). In the second case
* Input buffer may contain single protobuf message (use_length_delimiters_ = false),
* or any number of messages (use_length_delimiters = true). In the second case
* parser assumes messages are length-delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Parsing of the protobuf format requires the 'format_schema' setting to be set, e.g.
@ -29,7 +29,7 @@ class FormatSchemaInfo;
class ProtobufRowInputFormat : public IRowInputFormat
{
public:
ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool single_message_mode_);
ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool use_length_delimiters_);
~ProtobufRowInputFormat() override;
String getName() const override { return "ProtobufRowInputFormat"; }

View File

@ -14,6 +14,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
}
@ -22,16 +23,22 @@ ProtobufRowOutputFormat::ProtobufRowOutputFormat(
const Block & header,
const RowOutputFormatParams & params,
const FormatSchemaInfo & format_schema,
const bool single_message_mode_)
const bool use_length_delimiters_)
: IRowOutputFormat(header, out_, params)
, data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames(), single_message_mode_)
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames(), use_length_delimiters_)
, throw_on_multiple_rows_undelimited(!use_length_delimiters_ && !params.ignore_no_row_delimiter)
{
value_indices.resize(header.columns());
}
void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
{
if (throw_on_multiple_rows_undelimited && !first_row)
{
throw Exception("The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.", ErrorCodes::TOO_MANY_ROWS);
}
writer.startMessage();
std::fill(value_indices.begin(), value_indices.end(), 0);
size_t column_index;
@ -44,11 +51,11 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{
for (bool single_message_mode : {false, true})
for (bool use_length_delimiters : {false, true})
{
factory.registerOutputFormatProcessor(
single_message_mode ? "ProtobufSingle" : "Protobuf",
[single_message_mode](WriteBuffer & buf,
use_length_delimiters ? "Protobuf" : "ProtobufSingle",
[use_length_delimiters](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
@ -56,7 +63,7 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
return std::make_shared<ProtobufRowOutputFormat>(buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
single_message_mode);
use_length_delimiters);
});
}
}

View File

@ -26,7 +26,7 @@ namespace DB
/** Stream designed to serialize data in the google protobuf format.
* Each row is written as a separated message.
*
* With single_message_mode=1 it can write only single row as plain protobuf message,
* With use_length_delimiters=0 it can write only single row as plain protobuf message,
* otherwise Protobuf messages are delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g.
@ -41,7 +41,7 @@ public:
const Block & header,
const RowOutputFormatParams & params,
const FormatSchemaInfo & format_schema,
const bool single_message_mode_);
const bool use_length_delimiters_);
String getName() const override { return "ProtobufRowOutputFormat"; }
@ -53,6 +53,7 @@ private:
DataTypes data_types;
ProtobufWriter writer;
std::vector<size_t> value_indices;
const bool throw_on_multiple_rows_undelimited;
};
}

View File

@ -9,8 +9,8 @@ namespace DB
RawBLOBRowOutputFormat::RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback)
const RowOutputFormatParams & params)
: IRowOutputFormat(header_, out_, params)
{
}
@ -27,10 +27,10 @@ void registerOutputFormatProcessorRawBLOB(FormatFactory & factory)
factory.registerOutputFormatProcessor("RawBLOB", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, callback);
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, params);
});
}

View File

@ -30,7 +30,7 @@ public:
RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback);
const RowOutputFormatParams & params);
String getName() const override { return "RawBLOBRowOutputFormat"; }

View File

@ -32,7 +32,7 @@ void KafkaBlockOutputStream::writePrefix()
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row){ buffer->countRow(columns, row); });
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row){ buffer->countRow(columns, row); }, true);
}
void KafkaBlockOutputStream::write(const Block & block)

View File

@ -25,8 +25,6 @@ public:
const Block & header);
~WriteBufferToKafkaProducer() override;
bool producesIsolatedRows() override { return true; }
void countRow(const Columns & columns, size_t row);
void flush();

View File

@ -46,7 +46,9 @@ void RabbitMQBlockOutputStream::writePrefix()
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
{
buffer->countRow();
});
},
true
);
}

View File

@ -34,7 +34,6 @@ public:
);
~WriteBufferToRabbitMQProducer() override;
bool producesIsolatedRows() override { return true; }
void countRow();
void activateWriting() { writing_task->activateAndSchedule(); }