ProtobufSingle input format

Allows to read Single protobuf message at once (w/o length-delemeters).
This commit is contained in:
Mikhail Filimonov 2020-09-23 17:10:04 +02:00
parent 190fd88af9
commit b602d18dd8
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
6 changed files with 100 additions and 31 deletions

View File

@ -398,7 +398,12 @@ class IColumn;
M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \ M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \
M(Bool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \ M(Bool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \
M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \ M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \
M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0) M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0) \
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0)
// End of COMMON_SETTINGS
// !!! please add settings related to formats into the FORMAT_FACTORY_SETTINGS below !!!
#define FORMAT_FACTORY_SETTINGS(M) \ #define FORMAT_FACTORY_SETTINGS(M) \
M(Char, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.", 0) \ M(Char, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.", 0) \
@ -462,9 +467,10 @@ class IColumn;
\ \
M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \ M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \
M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \ M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \ M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0)
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \ // End of FORMAT_FACTORY_SETTINGS
// !!! please add settings non-related to formats into the COMMON_SETTINGS above !!!
#define LIST_OF_SETTINGS(M) \ #define LIST_OF_SETTINGS(M) \
COMMON_SETTINGS(M) \ COMMON_SETTINGS(M) \

View File

@ -38,36 +38,47 @@ namespace
// Those inequations helps checking conditions in ProtobufReader::SimpleReader. // Those inequations helps checking conditions in ProtobufReader::SimpleReader.
constexpr Int64 END_OF_VARINT = -1; constexpr Int64 END_OF_VARINT = -1;
constexpr Int64 END_OF_GROUP = -2; constexpr Int64 END_OF_GROUP = -2;
constexpr Int64 END_OF_FILE = -3;
Int64 decodeZigZag(UInt64 n) { return static_cast<Int64>((n >> 1) ^ (~(n & 1) + 1)); } Int64 decodeZigZag(UInt64 n) { return static_cast<Int64>((n >> 1) ^ (~(n & 1) + 1)); }
[[noreturn]] void throwUnknownFormat()
{
throw Exception("Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.", ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}
} }
// SimpleReader is an utility class to deserialize protobufs. // SimpleReader is an utility class to deserialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to deserialize data. // Knows nothing about protobuf schemas, just provides useful functions to deserialize data.
ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_) ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_, const bool single_message_mode_)
: in(in_) : in(in_)
, cursor(0) , cursor(0)
, current_message_level(0) , current_message_level(0)
, current_message_end(0) , current_message_end(0)
, field_end(0) , field_end(0)
, last_string_pos(-1) , last_string_pos(-1)
, single_message_mode(single_message_mode_)
{ {
} }
[[noreturn]] void ProtobufReader::SimpleReader::throwUnknownFormat()
{
throw Exception(std::string("Protobuf messages are corrupted or don't match the provided schema.") + (single_message_mode ? "" : " Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint."), ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}
bool ProtobufReader::SimpleReader::startMessage() bool ProtobufReader::SimpleReader::startMessage()
{ {
// Start reading a root message. // Start reading a root message.
assert(!current_message_level); assert(!current_message_level);
if (unlikely(in.eof())) if (unlikely(in.eof()))
return false; return false;
if (single_message_mode)
{
current_message_end = END_OF_FILE;
}
else
{
size_t size_of_message = readVarint(); size_t size_of_message = readVarint();
current_message_end = cursor + size_of_message; current_message_end = cursor + size_of_message;
}
++current_message_level; ++current_message_level;
field_end = cursor; field_end = cursor;
return true; return true;
@ -150,7 +161,15 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
throwUnknownFormat(); throwUnknownFormat();
} }
if ((cursor >= current_message_end) && (current_message_end != END_OF_GROUP)) if (current_message_end == END_OF_FILE)
{
if (unlikely(in.eof()))
{
current_message_end = cursor;
return false;
}
}
else if ((cursor >= current_message_end) && (current_message_end != END_OF_GROUP))
return false; return false;
UInt64 varint = readVarint(); UInt64 varint = readVarint();
@ -1077,8 +1096,8 @@ std::unique_ptr<ProtobufReader::IConverter> ProtobufReader::createConverter<goog
ProtobufReader::ProtobufReader( ProtobufReader::ProtobufReader(
ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names) ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_)
: simple_reader(in_) : simple_reader(in_, single_message_mode_)
{ {
root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type); root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type);
setTraitsDataAfterMatchingColumns(root_message.get()); setTraitsDataAfterMatchingColumns(root_message.get());

View File

@ -37,7 +37,7 @@ using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
class ProtobufReader : private boost::noncopyable class ProtobufReader : private boost::noncopyable
{ {
public: public:
ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names); ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool single_message_mode_);
~ProtobufReader(); ~ProtobufReader();
/// Should be called when we start reading a new message. /// Should be called when we start reading a new message.
@ -93,7 +93,7 @@ private:
class SimpleReader class SimpleReader
{ {
public: public:
SimpleReader(ReadBuffer & in_); SimpleReader(ReadBuffer & in_, const bool single_message_mode_);
bool startMessage(); bool startMessage();
void endMessage(bool ignore_errors); void endMessage(bool ignore_errors);
void startNestedMessage(); void startNestedMessage();
@ -126,6 +126,7 @@ private:
UInt64 continueReadingVarint(UInt64 first_byte); UInt64 continueReadingVarint(UInt64 first_byte);
void ignoreVarint(); void ignoreVarint();
void ignoreGroup(); void ignoreGroup();
[[noreturn]] void throwUnknownFormat();
ReadBuffer & in; ReadBuffer & in;
Int64 cursor; Int64 cursor;
@ -134,6 +135,7 @@ private:
std::vector<Int64> parent_message_ends; std::vector<Int64> parent_message_ends;
Int64 field_end; Int64 field_end;
Int64 last_string_pos; Int64 last_string_pos;
const bool single_message_mode;
}; };
class IConverter class IConverter

View File

@ -11,10 +11,10 @@
namespace DB namespace DB
{ {
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_) ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool single_message_mode_)
: IRowInputFormat(header_, in_, params_) : IRowInputFormat(header_, in_, params_)
, data_types(header_.getDataTypes()) , data_types(header_.getDataTypes())
, reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames()) , reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames(), single_message_mode_)
{ {
} }
@ -67,7 +67,9 @@ void ProtobufRowInputFormat::syncAfterError()
void registerInputFormatProcessorProtobuf(FormatFactory & factory) void registerInputFormatProcessorProtobuf(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("Protobuf", []( for (bool single_message_mode : {false, true})
{
factory.registerInputFormatProcessor(single_message_mode ? "ProtobufSingle" : "Protobuf", [single_message_mode](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -75,9 +77,11 @@ void registerInputFormatProcessorProtobuf(FormatFactory & factory)
{ {
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params), return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true, FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path)); settings.schema.is_server, settings.schema.format_schema_path),
single_message_mode);
}); });
} }
}
} }

View File

@ -16,17 +16,20 @@ class FormatSchemaInfo;
/** Stream designed to deserialize data from the google protobuf format. /** Stream designed to deserialize data from the google protobuf format.
* Each row is read as a separated message. * One Protobuf message is parsed as one row of data.
* These messages are delimited according to documentation *
* Input buffer may contain single protobuf message (single_message_mode_ = true),
* or any number of messages (single_message_mode_ = false). In the second case
* parser assumes messages are length-delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h * https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g. * Parsing of the protobuf format requires the 'format_schema' setting to be set, e.g.
* INSERT INTO table FORMAT Protobuf SETTINGS format_schema = 'schema:Message' * INSERT INTO table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema. * where schema is the name of "schema.proto" file specifying protobuf schema.
*/ */
class ProtobufRowInputFormat : public IRowInputFormat class ProtobufRowInputFormat : public IRowInputFormat
{ {
public: public:
ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_); ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool single_message_mode_);
~ProtobufRowInputFormat() override; ~ProtobufRowInputFormat() override;
String getName() const override { return "ProtobufRowInputFormat"; } String getName() const override { return "ProtobufRowInputFormat"; }

View File

@ -103,6 +103,17 @@ def kafka_produce_protobuf_messages(topic, start_index, num_messages):
producer.flush() producer.flush()
print(("Produced {} messages for topic {}".format(num_messages, topic))) print(("Produced {} messages for topic {}".format(num_messages, topic)))
def kafka_produce_protobuf_messages_no_delimeters(topic, start_index, num_messages):
data = ''
producer = KafkaProducer(bootstrap_servers="localhost:9092")
for i in range(start_index, start_index + num_messages):
msg = kafka_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
producer.send(topic=topic, value=serialized_msg)
producer.flush()
print("Produced {} messages for topic {}".format(num_messages, topic))
def avro_confluent_message(schema_registry_client, value): def avro_confluent_message(schema_registry_client, value):
# type: (CachedSchemaRegistryClient, dict) -> str # type: (CachedSchemaRegistryClient, dict) -> str
@ -971,6 +982,30 @@ def test_kafka_protobuf(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_protobuf_no_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb_no_delimiter',
kafka_group_name = 'pb_no_delimiter',
kafka_format = 'ProtobufSingle',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages_no_delimeters('pb_no_delimiter', 0, 20)
kafka_produce_protobuf_messages_no_delimeters('pb_no_delimiter', 20, 1)
kafka_produce_protobuf_messages_no_delimeters('pb_no_delimiter', 21, 29)
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
if kafka_check_result(result):
break
kafka_check_result(result, True)
@pytest.mark.timeout(180) @pytest.mark.timeout(180)
def test_kafka_materialized_view(kafka_cluster): def test_kafka_materialized_view(kafka_cluster):
instance.query(''' instance.query('''