From 15e3dbe3f210e5478825de8c997d3513c3ad890f Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 30 Nov 2021 14:43:30 +0300 Subject: [PATCH 1/2] Fix skipping columns in Nested while writing protobuf. --- src/Formats/ProtobufSerializer.cpp | 132 +++++++++++------- ..._format_skipped_column_in_nested.reference | 27 ++++ ...rotobuf_format_skipped_column_in_nested.sh | 55 ++++++++ ...obuf_format_skipped_column_in_nested.proto | 29 ++++ 4 files changed, 194 insertions(+), 49 deletions(-) create mode 100644 tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.reference create mode 100755 tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.sh create mode 100644 tests/queries/0_stateless/format_schemas/00825_protobuf_format_skipped_column_in_nested.proto diff --git a/src/Formats/ProtobufSerializer.cpp b/src/Formats/ProtobufSerializer.cpp index ac89203c6e0..94a385aa067 100644 --- a/src/Formats/ProtobufSerializer.cpp +++ b/src/Formats/ProtobufSerializer.cpp @@ -2062,7 +2062,7 @@ namespace }; ProtobufSerializerMessage( - std::vector field_descs_, + std::vector && field_descs_, const FieldDescriptor * parent_field_descriptor_, bool with_length_delimiter_, const ProtobufReaderOrWriter & reader_or_writer_) @@ -2091,8 +2091,10 @@ namespace for (const FieldInfo & info : field_infos) { field_columns.clear(); + field_columns.reserve(info.column_indices.size()); for (size_t column_index : info.column_indices) { + assert(column_index < num_columns_); field_columns.emplace_back(columns_[column_index]); } info.field_serializer->setColumns(field_columns.data(), field_columns.size()); @@ -2103,11 +2105,9 @@ namespace missing_column_indices.resize(num_columns_); for (size_t column_index : collections::range(num_columns_)) missing_column_indices[column_index] = column_index; - for (const FieldInfo & info : field_infos) - { - for (size_t column_index : info.column_indices) + for (const auto & field_info : field_infos) + for (size_t column_index : field_info.column_indices) missing_column_indices[column_index] = static_cast(-1); - } boost::range::remove_erase(missing_column_indices, static_cast(-1)); } } @@ -2195,6 +2195,7 @@ namespace reader->endNestedMessage(); else reader->endMessage(false); + addDefaultsToMissingColumns(row_num); } @@ -2229,9 +2230,9 @@ namespace void addDefaultsToMissingColumns(size_t row_num) { - for (size_t column_idx : missing_column_indices) + for (size_t column_index : missing_column_indices) { - auto & column = columns[column_idx]; + auto & column = columns[column_index]; size_t old_size = column->size(); if (row_num >= old_size) column->assumeMutableRef().insertDefault(); @@ -2241,7 +2242,7 @@ namespace struct FieldInfo { FieldInfo( - std::vector column_indices_, + std::vector && column_indices_, const FieldDescriptor & field_descriptor_, std::unique_ptr field_serializer_) : column_indices(std::move(column_indices_)) @@ -2277,8 +2278,8 @@ namespace class ProtobufSerializerTupleAsNestedMessage : public ProtobufSerializer { public: - explicit ProtobufSerializerTupleAsNestedMessage(std::unique_ptr nested_message_serializer_) - : nested_message_serializer(std::move(nested_message_serializer_)) + explicit ProtobufSerializerTupleAsNestedMessage(std::unique_ptr message_serializer_) + : message_serializer(std::move(message_serializer_)) { } @@ -2292,7 +2293,7 @@ namespace element_columns.reserve(tuple_size); for (size_t i : collections::range(tuple_size)) element_columns.emplace_back(column_tuple.getColumnPtr(i)); - nested_message_serializer->setColumns(element_columns.data(), element_columns.size()); + message_serializer->setColumns(element_columns.data(), element_columns.size()); } void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override @@ -2302,12 +2303,12 @@ namespace setColumns(&column0, 1); } - void writeRow(size_t row_num) override { nested_message_serializer->writeRow(row_num); } - void readRow(size_t row_num) override { nested_message_serializer->readRow(row_num); } - void insertDefaults(size_t row_num) override { nested_message_serializer->insertDefaults(row_num); } + void writeRow(size_t row_num) override { message_serializer->writeRow(row_num); } + void readRow(size_t row_num) override { message_serializer->readRow(row_num); } + void insertDefaults(size_t row_num) override { message_serializer->insertDefaults(row_num); } private: - const std::unique_ptr nested_message_serializer; + const std::unique_ptr message_serializer; }; @@ -2317,8 +2318,8 @@ namespace { public: explicit ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages( - std::unique_ptr nested_message_serializer_) - : nested_message_serializer(std::move(nested_message_serializer_)) + std::unique_ptr message_serializer_) + : message_serializer(std::move(message_serializer_)) { } @@ -2340,7 +2341,7 @@ namespace std::sort(offset_columns.begin(), offset_columns.end()); offset_columns.erase(std::unique(offset_columns.begin(), offset_columns.end()), offset_columns.end()); - nested_message_serializer->setColumns(data_columns.data(), data_columns.size()); + message_serializer->setColumns(data_columns.data(), data_columns.size()); } void setColumns(const MutableColumnPtr * columns, size_t num_columns) override @@ -2364,7 +2365,7 @@ namespace throw Exception("Components of FlattenedNested have different sizes", ErrorCodes::PROTOBUF_BAD_CAST); } for (size_t i : collections::range(start_offset, end_offset)) - nested_message_serializer->writeRow(i); + message_serializer->writeRow(i); } void readRow(size_t row_num) override @@ -2377,7 +2378,7 @@ namespace try { - nested_message_serializer->readRow(old_data_size); + message_serializer->readRow(old_data_size); size_t data_size = data_columns[0]->size(); if (data_size != old_data_size + 1) throw Exception("Unexpected number of elements of ColumnArray has been read", ErrorCodes::LOGICAL_ERROR); @@ -2433,7 +2434,7 @@ namespace } private: - const std::unique_ptr nested_message_serializer; + const std::unique_ptr message_serializer; Columns data_columns; Columns offset_columns; }; @@ -2445,7 +2446,7 @@ namespace public: explicit ProtobufSerializerBuilder(const ProtobufReaderOrWriter & reader_or_writer_) : reader_or_writer(reader_or_writer_) {} - std::unique_ptr buildMessageSerializer( + std::unique_ptr buildMessageSerializer( const Strings & column_names, const DataTypes & data_types, std::vector & missing_column_indices, @@ -2453,16 +2454,17 @@ namespace bool with_length_delimiter) { std::vector used_column_indices; - auto serializer = buildMessageSerializerImpl( + auto message_serializer = buildMessageSerializerImpl( /* num_columns = */ column_names.size(), column_names.data(), data_types.data(), - used_column_indices, message_descriptor, with_length_delimiter, - /* parent_field_descriptor = */ nullptr); + /* parent_field_descriptor = */ nullptr, + used_column_indices, + /* columns_are_reordered_outside = */ false); - if (!serializer) + if (!message_serializer) { throw Exception( "Not found matches between the names of the columns {" + boost::algorithm::join(column_names, ", ") @@ -2473,10 +2475,12 @@ namespace missing_column_indices.clear(); missing_column_indices.reserve(column_names.size() - used_column_indices.size()); - boost::range::set_difference(collections::range(column_names.size()), used_column_indices, + auto used_column_indices_sorted = std::move(used_column_indices); + std::sort(used_column_indices_sorted.begin(), used_column_indices_sorted.end()); + boost::range::set_difference(collections::range(column_names.size()), used_column_indices_sorted, std::back_inserter(missing_column_indices)); - return serializer; + return message_serializer; } private: @@ -2621,24 +2625,38 @@ namespace } /// Builds a serializer for a protobuf message (root or nested). + /// + /// Some of the passed columns might be skipped, the function sets `used_column_indices` to + /// the list of those columns which match any fields in the protobuf message. + /// + /// Normally `columns_are_reordered_outside` should be false - if it's false it means that + /// the used column indices will be passed to ProtobufSerializerMessage, which will write/read + /// only those columns and set the rest of columns by default. + /// Set `columns_are_reordered_outside` to true if you're going to reorder columns + /// according to `used_column_indices` returned and pass to + /// ProtobufSerializerMessage::setColumns() only the columns which are actually used. template std::unique_ptr buildMessageSerializerImpl( size_t num_columns, const StringOrStringViewT * column_names, const DataTypePtr * data_types, - std::vector & used_column_indices, const MessageDescriptor & message_descriptor, bool with_length_delimiter, - const FieldDescriptor * parent_field_descriptor) + const FieldDescriptor * parent_field_descriptor, + std::vector & used_column_indices, + bool columns_are_reordered_outside) { std::vector field_descs; boost::container::flat_map field_descriptors_in_use; used_column_indices.clear(); used_column_indices.reserve(num_columns); + boost::container::flat_set used_column_indices_sorted; + used_column_indices_sorted.reserve(num_columns); + size_t sequential_column_index = 0; auto add_field_serializer = [&](const std::string_view & column_name_, - std::vector column_indices_, + std::vector && column_indices_, const FieldDescriptor & field_descriptor_, std::unique_ptr field_serializer_) { @@ -2652,12 +2670,17 @@ namespace ErrorCodes::MULTIPLE_COLUMNS_SERIALIZED_TO_SAME_PROTOBUF_FIELD); } - for (size_t column_index : column_indices_) + used_column_indices.insert(used_column_indices.end(), column_indices_.begin(), column_indices_.end()); + used_column_indices_sorted.insert(column_indices_.begin(), column_indices_.end()); + + auto column_indices_to_pass_to_message_serializer = std::move(column_indices_); + if (columns_are_reordered_outside) { - /// Keep `used_column_indices` sorted. - used_column_indices.insert(boost::range::upper_bound(used_column_indices, column_index), column_index); + for (auto & index : column_indices_to_pass_to_message_serializer) + index = sequential_column_index++; } - field_descs.push_back({std::move(column_indices_), &field_descriptor_, std::move(field_serializer_)}); + + field_descs.push_back({std::move(column_indices_to_pass_to_message_serializer), &field_descriptor_, std::move(field_serializer_)}); field_descriptors_in_use.emplace(&field_descriptor_, column_name_); }; @@ -2666,7 +2689,7 @@ namespace /// We're going through all the passed columns. for (size_t column_idx : collections::range(num_columns)) { - if (boost::range::binary_search(used_column_indices, column_idx)) + if (used_column_indices_sorted.count(column_idx)) continue; const auto & column_name = column_names[column_idx]; @@ -2702,7 +2725,7 @@ namespace for (size_t j : collections::range(column_idx + 1, num_columns)) { - if (boost::range::binary_search(used_column_indices, j)) + if (used_column_indices_sorted.count(j)) continue; std::string_view other_suffix; if (!columnNameStartsWithFieldName(column_names[j], *field_descriptor, other_suffix)) @@ -2740,10 +2763,15 @@ namespace nested_column_names.size(), nested_column_names.data(), nested_data_types.data(), - used_column_indices_in_nested, *field_descriptor->message_type(), - false, - field_descriptor); + /* with_length_delimiter = */ false, + field_descriptor, + used_column_indices_in_nested, + /* columns_are_reordered_outside = */ true); + + /// `columns_are_reordered_outside` is true because column indices are + /// going to be transformed and then written to the outer message, + /// see add_field_serializer() below. if (nested_message_serializer) { @@ -2774,10 +2802,15 @@ namespace nested_column_names.size(), nested_column_names.data(), nested_data_types.data(), - used_column_indices_in_nested, *field_descriptor->message_type(), - false, - field_descriptor); + /* with_length_delimiter = */ false, + field_descriptor, + used_column_indices_in_nested, + /* columns_are_reordered_outside = */ true); + + /// `columns_are_reordered_outside` is true because column indices are + /// going to be transformed and then written to the outer message, + /// see add_field_serializer() below. if (nested_message_serializer) { @@ -2907,16 +2940,17 @@ namespace { /// Try to serialize as a nested message. std::vector used_column_indices; - auto nested_message_serializer = buildMessageSerializerImpl( + auto message_serializer = buildMessageSerializerImpl( size_of_tuple, tuple_data_type.getElementNames().data(), tuple_data_type.getElements().data(), - used_column_indices, *field_descriptor.message_type(), - false, - &field_descriptor); + /* with_length_delimiter = */ false, + &field_descriptor, + used_column_indices, + /* columns_are_reordered_outside = */ false); - if (!nested_message_serializer) + if (!message_serializer) { throw Exception( "Not found matches between the names of the tuple's elements {" @@ -2926,7 +2960,7 @@ namespace ErrorCodes::NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS); } - return std::make_unique(std::move(nested_message_serializer)); + return std::make_unique(std::move(message_serializer)); } /// Serialize as a repeated field. diff --git a/tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.reference b/tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.reference new file mode 100644 index 00000000000..1a80e6401db --- /dev/null +++ b/tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.reference @@ -0,0 +1,27 @@ +e4048ead-30a2-45e5-90be-2af1c7137523 dummy [1] [50639] [58114] [[5393]] [[1]] [[3411]] [[17811]] [[(10,20)]] + +Binary representation: +00000000 44 0a 24 65 34 30 34 38 65 61 64 2d 33 30 61 32 |D.$e4048ead-30a2| +00000010 2d 34 35 65 35 2d 39 30 62 65 2d 32 61 66 31 63 |-45e5-90be-2af1c| +00000020 37 31 33 37 35 32 33 62 1c 10 01 18 cf 8b 03 20 |7137523b....... | +00000030 82 c6 03 5a 10 28 01 30 91 2a 40 93 8b 01 52 05 |...Z.(.0.*@...R.| +00000040 4d 00 00 a0 41 |M...A| +00000045 + +MESSAGE #1 AT 0x00000001 +identifier: "e4048ead-30a2-45e5-90be-2af1c7137523" +modules { + module_id: 1 + supply: 50639 + temp: 58114 + nodes { + node_id: 1 + opening_time: 5393 + current: 17811 + coords { + y: 20 + } + } +} + +Binary representation is as expected diff --git a/tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.sh b/tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.sh new file mode 100755 index 00000000000..b413385fb77 --- /dev/null +++ b/tests/queries/0_stateless/00825_protobuf_format_skipped_column_in_nested.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +# https://github.com/ClickHouse/ClickHouse/issues/31160 + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +SCHEMADIR=$CURDIR/format_schemas +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -eo pipefail + +# Run the client. +$CLICKHOUSE_CLIENT --multiquery < "$BINARY_FILE_PATH" + +# Check the output in the protobuf format +echo +$CURDIR/helpers/protobuf_length_delimited_encoder.py --decode_and_check --format_schema "$SCHEMADIR/00825_protobuf_format_skipped_column_in_nested:UpdateMessage" --input "$BINARY_FILE_PATH" + +# Check the input in the protobuf format (now the table contains the same data twice). +#echo +#$CLICKHOUSE_CLIENT --query "INSERT INTO table_skipped_column_in_nested_00825 FORMAT Protobuf SETTINGS format_schema='$SCHEMADIR/00825_protobuf_format_skipped_column_in_nested:UpdateMessage'" < "$BINARY_FILE_PATH" +#$CLICKHOUSE_CLIENT --query "SELECT * FROM table_skipped_column_in_nested_00825" + +rm "$BINARY_FILE_PATH" +$CLICKHOUSE_CLIENT --query "DROP TABLE table_skipped_column_in_nested_00825" diff --git a/tests/queries/0_stateless/format_schemas/00825_protobuf_format_skipped_column_in_nested.proto b/tests/queries/0_stateless/format_schemas/00825_protobuf_format_skipped_column_in_nested.proto new file mode 100644 index 00000000000..054de349e24 --- /dev/null +++ b/tests/queries/0_stateless/format_schemas/00825_protobuf_format_skipped_column_in_nested.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +message UpdateMessage { + string identifier = 1; + //string unused1 = 100; + + message Module { + uint32 module_id = 2; + uint32 supply = 3; + uint32 temp = 4; + + message ModuleNode { + uint32 node_id = 5; + uint32 opening_time = 6; + uint32 closing_time = 7; // The column in the table is named `closing_time_time` + uint32 current = 8; + + message Coords { + //float x = 8; + float y = 9; + } + Coords coords = 10; + } + + repeated ModuleNode nodes = 11; + } + + repeated Module modules = 12; +} From 2e0b4800440e76a9877ce6f8871b904d01c421aa Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Wed, 1 Dec 2021 21:19:47 +0300 Subject: [PATCH 2/2] Improve error handling while serializing protobufs. --- src/Formats/ProtobufSerializer.cpp | 494 ++++++++++++++++++++--------- src/Formats/ProtobufSerializer.h | 4 +- 2 files changed, 351 insertions(+), 147 deletions(-) diff --git a/src/Formats/ProtobufSerializer.cpp b/src/Formats/ProtobufSerializer.cpp index 94a385aa067..efe01740cf6 100644 --- a/src/Formats/ProtobufSerializer.cpp +++ b/src/Formats/ProtobufSerializer.cpp @@ -28,6 +28,7 @@ # include # include # include +# include # include # include # include @@ -139,6 +140,15 @@ namespace } + WriteBuffer & writeIndent(WriteBuffer & out, size_t size) { return out << String(size * 4, ' '); } + + + [[noreturn]] void wrongNumberOfColumns(size_t number_of_columns, const String & expected) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong number of columns: expected {}, specified {}", expected, number_of_columns); + } + + struct ProtobufReaderOrWriter { ProtobufReaderOrWriter(ProtobufReader & reader_) : reader(&reader_) {} // NOLINT(google-explicit-constructor) @@ -152,8 +162,12 @@ namespace class ProtobufSerializerSingleValue : public ProtobufSerializer { protected: - ProtobufSerializerSingleValue(const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : field_descriptor(field_descriptor_) + ProtobufSerializerSingleValue( + const std::string_view & column_name_, + const FieldDescriptor & field_descriptor_, + const ProtobufReaderOrWriter & reader_or_writer_) + : column_name(column_name_) + , field_descriptor(field_descriptor_) , field_typeid(field_descriptor_.type()) , field_tag(field_descriptor.number()) , reader(reader_or_writer_.reader) @@ -164,13 +178,15 @@ namespace void setColumns(const ColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); column = columns[0]; } void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); column = columns[0]->getPtr(); } @@ -259,14 +275,28 @@ namespace return result; } + [[noreturn]] void incompatibleColumnType(const std::string_view & column_type) const + { + throw Exception( + ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD, + "The column {} ({}) cannot be serialized to the field {} ({}) due to their types are not compatible", + quoteString(column_name), + column_type, + quoteString(field_descriptor.full_name()), + field_descriptor.type_name()); + } + [[noreturn]] void cannotConvertValue(const std::string_view & src_value, const std::string_view & src_type_name, const std::string_view & dest_type_name) const { throw Exception( - "Could not convert value '" + String{src_value} + "' from type " + String{src_type_name} + " to type " + String{dest_type_name} + - " while " + (reader ? "reading" : "writing") + " field " + field_descriptor.name(), + "Could not convert value '" + String{src_value} + "' from type " + String{src_type_name} + " to type " + + String{dest_type_name} + " while " + (reader ? "reading" : "writing") + " field " + + quoteString(field_descriptor.name()) + " " + (reader ? "for inserting into" : "extracted from") + " column " + + quoteString(column_name), ErrorCodes::PROTOBUF_BAD_CAST); } + const String column_name; const FieldDescriptor & field_descriptor; const FieldTypeId field_typeid; const int field_tag; @@ -289,8 +319,8 @@ namespace public: using ColumnType = ColumnVector; - ProtobufSerializerNumber(const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerSingleValue(field_descriptor_, reader_or_writer_) + ProtobufSerializerNumber(const std::string_view & column_name_, const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) + : ProtobufSerializerSingleValue(column_name_, field_descriptor_, reader_or_writer_) { setFunctions(); } @@ -319,6 +349,13 @@ namespace column_vector.insertValue(getDefaultNumber()); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerNumber<" << TypeName << ">: column " << quoteString(column_name) + << " -> field " << quoteString(field_descriptor.full_name()) << " (" << field_descriptor.type_name() + << ")\n"; + } + private: void setFunctions() { @@ -469,7 +506,7 @@ namespace case FieldTypeId::TYPE_ENUM: { if (std::is_floating_point_v) - failedToSetFunctions(); + incompatibleColumnType(TypeName); write_function = [this](NumberType value) { @@ -484,18 +521,10 @@ namespace } default: - failedToSetFunctions(); + incompatibleColumnType(TypeName); } } - [[noreturn]] void failedToSetFunctions() const - { - throw Exception( - "The field " + quoteString(field_descriptor.full_name()) + " has an incompatible type " + field_descriptor.type_name() - + " for serialization of the data type " + quoteString(TypeName), - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } - NumberType getDefaultNumber() { if (!default_number) @@ -529,10 +558,11 @@ namespace using ColumnType = std::conditional_t; ProtobufSerializerString( + const std::string_view & column_name_, const std::shared_ptr & fixed_string_data_type_, const google::protobuf::FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerSingleValue(field_descriptor_, reader_or_writer_) + : ProtobufSerializerSingleValue(column_name_, field_descriptor_, reader_or_writer_) , fixed_string_data_type(fixed_string_data_type_) , n(fixed_string_data_type->getN()) { @@ -542,8 +572,10 @@ namespace } ProtobufSerializerString( - const google::protobuf::FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerSingleValue(field_descriptor_, reader_or_writer_) + const std::string_view & column_name_, + const google::protobuf::FieldDescriptor & field_descriptor_, + const ProtobufReaderOrWriter & reader_or_writer_) + : ProtobufSerializerSingleValue(column_name_, field_descriptor_, reader_or_writer_) { static_assert(!is_fixed_string, "This constructor for String only"); setFunctions(); @@ -649,6 +681,13 @@ namespace } } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerString<" << (is_fixed_string ? "fixed" : "") << ">: column " + << quoteString(column_name) << " -> field " << quoteString(field_descriptor.full_name()) << " (" + << field_descriptor.type_name() << ")\n"; + } + private: void setFunctions() { @@ -799,18 +838,10 @@ namespace } default: - failedToSetFunctions(); + this->incompatibleColumnType(is_fixed_string ? "FixedString" : "String"); } } - [[noreturn]] void failedToSetFunctions() - { - throw Exception( - "The field " + quoteString(field_descriptor.full_name()) + " has an incompatible type " + field_descriptor.type_name() - + " for serialization of the data type " + quoteString(is_fixed_string ? "FixedString" : "String"), - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } - const PaddedPODArray & getDefaultString() { if (!default_string) @@ -890,16 +921,24 @@ namespace using BaseClass = ProtobufSerializerNumber; ProtobufSerializerEnum( + const std::string_view & column_name_, const std::shared_ptr & enum_data_type_, const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : BaseClass(field_descriptor_, reader_or_writer_), enum_data_type(enum_data_type_) + : BaseClass(column_name_, field_descriptor_, reader_or_writer_), enum_data_type(enum_data_type_) { assert(enum_data_type); setFunctions(); prepareEnumMapping(); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerEnum<" << TypeName << ">: column " << quoteString(this->column_name) + << " -> field " << quoteString(this->field_descriptor.full_name()) << " (" + << this->field_descriptor.type_name() << ")\n"; + } + private: void setFunctions() { @@ -964,18 +1003,10 @@ namespace } default: - failedToSetFunctions(); + this->incompatibleColumnType(enum_data_type->getName()); } } - [[noreturn]] void failedToSetFunctions() - { - throw Exception( - "The field " + quoteString(this->field_descriptor.full_name()) + " has an incompatible type " + this->field_descriptor.type_name() - + " for serialization of the data type " + quoteString(enum_data_type->getName()), - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } - void checkEnumDataTypeValue(NumberType value) { enum_data_type->findByValue(value); /// Throws an exception if the value isn't defined in the DataTypeEnum. @@ -1089,10 +1120,11 @@ namespace using ColumnType = ColumnDecimal; ProtobufSerializerDecimal( + const std::string_view & column_name_, const DataTypeDecimalBase & decimal_data_type_, const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerSingleValue(field_descriptor_, reader_or_writer_) + : ProtobufSerializerSingleValue(column_name_, field_descriptor_, reader_or_writer_) , precision(decimal_data_type_.getPrecision()) , scale(decimal_data_type_.getScale()) { @@ -1123,6 +1155,13 @@ namespace column_decimal.insertValue(getDefaultDecimal()); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerDecimal<" << TypeName << ">: column " << quoteString(column_name) + << " -> field " << quoteString(field_descriptor.full_name()) << " (" << field_descriptor.type_name() + << ")\n"; + } + private: void setFunctions() { @@ -1227,7 +1266,7 @@ namespace case FieldTypeId::TYPE_BOOL: { if (std::is_same_v) - failedToSetFunctions(); + incompatibleColumnType(TypeName); else { write_function = [this](const DecimalType & decimal) @@ -1281,18 +1320,10 @@ namespace } default: - failedToSetFunctions(); + incompatibleColumnType(TypeName); } } - [[noreturn]] void failedToSetFunctions() - { - throw Exception( - "The field " + quoteString(field_descriptor.full_name()) + " has an incompatible type " + field_descriptor.type_name() - + " for serialization of the data type " + quoteString(TypeName), - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } - DecimalType getDefaultDecimal() { if (!default_decimal) @@ -1349,13 +1380,20 @@ namespace { public: ProtobufSerializerDate( + const std::string_view & column_name_, const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerNumber(field_descriptor_, reader_or_writer_) + : ProtobufSerializerNumber(column_name_, field_descriptor_, reader_or_writer_) { setFunctions(); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerDate: column " << quoteString(column_name) << " -> field " + << quoteString(field_descriptor.full_name()) << " (" << field_descriptor.type_name() << ")\n"; + } + private: void setFunctions() { @@ -1395,7 +1433,7 @@ namespace } default: - failedToSetFunctions(); + incompatibleColumnType("Date"); } } @@ -1412,14 +1450,6 @@ namespace readDateText(date, buf); return date; } - - [[noreturn]] void failedToSetFunctions() - { - throw Exception( - "The field " + quoteString(field_descriptor.full_name()) + " has an incompatible type " + field_descriptor.type_name() - + " for serialization of the data type 'Date'", - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } }; @@ -1428,15 +1458,22 @@ namespace { public: ProtobufSerializerDateTime( + const std::string_view & column_name_, const DataTypeDateTime & type, const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerNumber(field_descriptor_, reader_or_writer_), + : ProtobufSerializerNumber(column_name_, field_descriptor_, reader_or_writer_), date_lut(type.getTimeZone()) { setFunctions(); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerDateTime: column " << quoteString(column_name) << " -> field " + << quoteString(field_descriptor.full_name()) << " (" << field_descriptor.type_name() << ")\n"; + } + protected: const DateLUTImpl & date_lut; @@ -1478,7 +1515,7 @@ namespace } default: - failedToSetFunctions(); + incompatibleColumnType("DateTime"); } } @@ -1497,14 +1534,6 @@ namespace tm = 0; return tm; } - - [[noreturn]] void failedToSetFunctions() - { - throw Exception( - "The field " + quoteString(field_descriptor.full_name()) + " has an incompatible type " + field_descriptor.type_name() - + " for serialization of the data type 'DateTime'", - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } }; @@ -1513,9 +1542,10 @@ namespace { public: ProtobufSerializerUUID( + const std::string_view & column_name_, const google::protobuf::FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerSingleValue(field_descriptor_, reader_or_writer_) + : ProtobufSerializerSingleValue(column_name_, field_descriptor_, reader_or_writer_) { setFunctions(); } @@ -1544,16 +1574,17 @@ namespace column_vector.insertDefault(); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerUUID: column " << quoteString(column_name) << " -> field " + << quoteString(field_descriptor.full_name()) << " (" << field_descriptor.type_name() << ")\n"; + } + private: void setFunctions() { if ((field_typeid != FieldTypeId::TYPE_STRING) && (field_typeid != FieldTypeId::TYPE_BYTES)) - { - throw Exception( - "The field " + quoteString(field_descriptor.full_name()) + " has an incompatible type " + field_descriptor.type_name() - + " for serialization of the data type UUID", - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } + incompatibleColumnType("UUID"); write_function = [this](UUID value) { @@ -1591,20 +1622,16 @@ namespace { public: ProtobufSerializerAggregateFunction( + const std::string_view & column_name_, const std::shared_ptr & aggregate_function_data_type_, const google::protobuf::FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_) - : ProtobufSerializerSingleValue(field_descriptor_, reader_or_writer_) + : ProtobufSerializerSingleValue(column_name_, field_descriptor_, reader_or_writer_) , aggregate_function_data_type(aggregate_function_data_type_) , aggregate_function(aggregate_function_data_type->getFunction()) { if ((field_typeid != FieldTypeId::TYPE_STRING) && (field_typeid != FieldTypeId::TYPE_BYTES)) - { - throw Exception( - "The field " + quoteString(field_descriptor.full_name()) + " has an incompatible type " + field_descriptor.type_name() - + " for serialization of the data type " + quoteString(aggregate_function_data_type->getName()), - ErrorCodes::DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD); - } + incompatibleColumnType(aggregate_function_data_type->getName()); } void writeRow(size_t row_num) override @@ -1642,6 +1669,12 @@ namespace column_af.getData().push_back(data); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerAggregateFunction: column " << quoteString(column_name) << " -> field " + << quoteString(field_descriptor.full_name()) << " (" << field_descriptor.type_name() << ")\n"; + } + private: void dataToString(ConstAggregateDataPtr data, String & str) const { @@ -1684,7 +1717,8 @@ namespace void setColumns(const ColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); column = columns[0]; const auto & column_nullable = assert_cast(*column); ColumnPtr nested_column = column_nullable.getNestedColumnPtr(); @@ -1693,7 +1727,8 @@ namespace void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); ColumnPtr column0 = columns[0]->getPtr(); setColumns(&column0, 1); } @@ -1744,6 +1779,12 @@ namespace column_nullable.insertDefault(); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerNullable ->\n"; + nested_serializer->describeTree(out, indent + 1); + } + private: const std::unique_ptr nested_serializer; ColumnPtr column; @@ -1761,7 +1802,8 @@ namespace void setColumns(const ColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); const auto & column_map = assert_cast(*columns[0]); ColumnPtr nested_column = column_map.getNestedColumnPtr(); nested_serializer->setColumns(&nested_column, 1); @@ -1769,7 +1811,8 @@ namespace void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); ColumnPtr column0 = columns[0]->getPtr(); setColumns(&column0, 1); } @@ -1778,6 +1821,12 @@ namespace void readRow(size_t row_num) override { nested_serializer->readRow(row_num); } void insertDefaults(size_t row_num) override { nested_serializer->insertDefaults(row_num); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerMap ->\n"; + nested_serializer->describeTree(out, indent + 1); + } + private: const std::unique_ptr nested_serializer; }; @@ -1794,7 +1843,8 @@ namespace void setColumns(const ColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); column = columns[0]; const auto & column_lc = assert_cast(*column); ColumnPtr nested_column = column_lc.getDictionary().getNestedColumn(); @@ -1804,7 +1854,8 @@ namespace void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); ColumnPtr column0 = columns[0]->getPtr(); setColumns(&column0, 1); } @@ -1862,6 +1913,12 @@ namespace column_lc.insertFromFullColumn(*default_value_column, 0); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerLowCardinality ->\n"; + nested_serializer->describeTree(out, indent + 1); + } + private: const std::unique_ptr nested_serializer; ColumnPtr column; @@ -1882,7 +1939,8 @@ namespace void setColumns(const ColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); column = columns[0]; const auto & column_array = assert_cast(*column); ColumnPtr data_column = column_array.getDataPtr(); @@ -1891,7 +1949,8 @@ namespace void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); ColumnPtr column0 = columns[0]->getPtr(); setColumns(&column0, 1); } @@ -1944,6 +2003,12 @@ namespace column_array.insertDefault(); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerArray ->\n"; + element_serializer->describeTree(out, indent + 1); + } + private: const std::unique_ptr element_serializer; ColumnPtr column; @@ -1955,10 +2020,12 @@ namespace { public: ProtobufSerializerTupleAsArray( + const std::string_view & column_name_, const std::shared_ptr & tuple_data_type_, const FieldDescriptor & field_descriptor_, std::vector> element_serializers_) - : tuple_data_type(tuple_data_type_) + : column_name(column_name_) + , tuple_data_type(tuple_data_type_) , tuple_size(tuple_data_type->getElements().size()) , field_descriptor(field_descriptor_) , element_serializers(std::move(element_serializers_)) @@ -1969,7 +2036,8 @@ namespace void setColumns(const ColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); column = columns[0]; const auto & column_tuple = assert_cast(*column); for (size_t i : collections::range(tuple_size)) @@ -1982,7 +2050,8 @@ namespace void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); ColumnPtr column0 = columns[0]->getPtr(); setColumns(&column0, 1); } @@ -2006,9 +2075,12 @@ namespace if (current_element_index >= tuple_size) { throw Exception( - "Too many (" + std::to_string(current_element_index) + ") elements was read from the field " - + field_descriptor.full_name() + " to fit in the data type " + tuple_data_type->getName(), - ErrorCodes::PROTOBUF_BAD_CAST); + ErrorCodes::PROTOBUF_BAD_CAST, + "Column {}: More than {} elements was read from the field {} to fit in the data type {}", + quoteString(column_name), + tuple_size, + quoteString(field_descriptor.full_name()), + tuple_data_type->getName()); } element_serializers[current_element_index]->readRow(row_num); @@ -2040,7 +2112,17 @@ namespace } } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerTupleAsArray: column " << quoteString(column_name) << " (" + << tuple_data_type->getName() << ") -> field " << quoteString(field_descriptor.full_name()) << " (" + << field_descriptor.type_name() << ") ->\n"; + for (const auto & element_serializer : element_serializers) + element_serializer->describeTree(out, indent + 1); + } + private: + const String column_name; const std::shared_ptr tuple_data_type; const size_t tuple_size; const FieldDescriptor & field_descriptor; @@ -2085,6 +2167,9 @@ namespace void setColumns(const ColumnPtr * columns_, size_t num_columns_) override { + if (!num_columns_) + wrongNumberOfColumns(num_columns_, ">0"); + columns.assign(columns_, columns_ + num_columns_); std::vector field_columns; @@ -2094,7 +2179,8 @@ namespace field_columns.reserve(info.column_indices.size()); for (size_t column_index : info.column_indices) { - assert(column_index < num_columns_); + if (column_index >= num_columns_) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong column index {}, expected column indices <{}", column_index, num_columns_); field_columns.emplace_back(columns_[column_index]); } info.field_serializer->setColumns(field_columns.data(), field_columns.size()); @@ -2206,6 +2292,32 @@ namespace addDefaultsToMissingColumns(row_num); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + size_t num_columns = 0; + for (const auto & field_info : field_infos) + num_columns += field_info.column_indices.size(); + + writeIndent(out, indent) << "ProtobufSerializerMessage: " << num_columns << " columns ->"; + if (parent_field_descriptor) + out << " field " << quoteString(parent_field_descriptor->full_name()) << " (" << parent_field_descriptor->type_name() << ")"; + + for (size_t i = 0; i != field_infos.size(); ++i) + { + out << "\n"; + const auto & field_info = field_infos[i]; + writeIndent(out, indent + 1) << "Columns #"; + for (size_t j = 0; j != field_info.column_indices.size(); ++j) + { + if (j) + out << ", "; + out << field_info.column_indices[j]; + } + out << " ->\n"; + field_info.field_serializer->describeTree(out, indent + 2); + } + } + private: size_t findFieldIndexByFieldTag(int field_tag) { @@ -2285,7 +2397,8 @@ namespace void setColumns(const ColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); const auto & column_tuple = assert_cast(*columns[0]); size_t tuple_size = column_tuple.tupleSize(); assert(tuple_size); @@ -2298,7 +2411,8 @@ namespace void setColumns(const MutableColumnPtr * columns, [[maybe_unused]] size_t num_columns) override { - assert(num_columns == 1); + if (num_columns != 1) + wrongNumberOfColumns(num_columns, "1"); ColumnPtr column0 = columns[0]->getPtr(); setColumns(&column0, 1); } @@ -2307,6 +2421,12 @@ namespace void readRow(size_t row_num) override { message_serializer->readRow(row_num); } void insertDefaults(size_t row_num) override { message_serializer->insertDefaults(row_num); } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerTupleAsNestedMessage ->\n"; + message_serializer->describeTree(out, indent + 1); + } + private: const std::unique_ptr message_serializer; }; @@ -2318,14 +2438,23 @@ namespace { public: explicit ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages( - std::unique_ptr message_serializer_) - : message_serializer(std::move(message_serializer_)) + const std::vector & column_names_, + const FieldDescriptor * parent_field_descriptor_, + std::unique_ptr message_serializer_, + const std::function & get_root_desc_function_) + : parent_field_descriptor(parent_field_descriptor_) + , message_serializer(std::move(message_serializer_)) + , get_root_desc_function(get_root_desc_function_) { + column_names.reserve(column_names_.size()); + for (const auto & column_name : column_names_) + column_names.emplace_back(column_name); } void setColumns(const ColumnPtr * columns, size_t num_columns) override { - assert(num_columns); + if (!num_columns) + wrongNumberOfColumns(num_columns, ">0"); data_columns.clear(); data_columns.reserve(num_columns); offset_columns.clear(); @@ -2335,11 +2464,26 @@ namespace { const auto & column_array = assert_cast(*columns[i]); data_columns.emplace_back(column_array.getDataPtr()); - offset_columns.emplace_back(column_array.getOffsetsPtr()); - } - std::sort(offset_columns.begin(), offset_columns.end()); - offset_columns.erase(std::unique(offset_columns.begin(), offset_columns.end()), offset_columns.end()); + auto offset_column = column_array.getOffsetsPtr(); + if (std::binary_search(offset_columns.begin(), offset_columns.end(), offset_column)) + continue; + + /// Keep `offset_columns` sorted. + offset_columns.insert(std::upper_bound(offset_columns.begin(), offset_columns.end(), offset_column), offset_column); + + /// All the columns listed in `offset_columns` should have equal offsets. + if (i >= 1) + { + const auto & column_array0 = assert_cast(*columns[0]); + if (!column_array0.hasEqualOffsets(column_array)) + { + throw Exception(ErrorCodes::PROTOBUF_BAD_CAST, + "Column #{} {} and column #{} {} are supposed to have equal offsets according to the following serialization tree:\n{}", + 0, quoteString(column_names[0]), i, quoteString(column_names[i]), get_root_desc_function(0)); + } + } + } message_serializer->setColumns(data_columns.data(), data_columns.size()); } @@ -2358,12 +2502,6 @@ namespace const auto & offset_column0 = assert_cast(*offset_columns[0]); size_t start_offset = offset_column0.getElement(row_num - 1); size_t end_offset = offset_column0.getElement(row_num); - for (size_t i : collections::range(1, offset_columns.size())) - { - const auto & offset_column = assert_cast(*offset_columns[i]); - if (offset_column.getElement(row_num) != end_offset) - throw Exception("Components of FlattenedNested have different sizes", ErrorCodes::PROTOBUF_BAD_CAST); - } for (size_t i : collections::range(start_offset, end_offset)) message_serializer->writeRow(i); } @@ -2433,8 +2571,26 @@ namespace } } + void describeTree(WriteBuffer & out, size_t indent) const override + { + writeIndent(out, indent) << "ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages: columns "; + for (size_t i = 0; i != column_names.size(); ++i) + { + if (i) + out << ", "; + out << "#" << i << " " << quoteString(column_names[i]); + } + out << " ->"; + if (parent_field_descriptor) + out << " field " << quoteString(parent_field_descriptor->full_name()) << " (" << parent_field_descriptor->type_name() << ") ->\n"; + message_serializer->describeTree(out, indent + 1); + } + private: + Strings column_names; + const FieldDescriptor * parent_field_descriptor; const std::unique_ptr message_serializer; + const std::function get_root_desc_function; Columns data_columns; Columns offset_columns; }; @@ -2453,6 +2609,14 @@ namespace const MessageDescriptor & message_descriptor, bool with_length_delimiter) { + root_serializer_ptr = std::make_shared(); + get_root_desc_function = [root_serializer_ptr = root_serializer_ptr](size_t indent) -> String + { + WriteBufferFromOwnString buf; + (*root_serializer_ptr)->describeTree(buf, indent); + return buf.str(); + }; + std::vector used_column_indices; auto message_serializer = buildMessageSerializerImpl( /* num_columns = */ column_names.size(), @@ -2480,6 +2644,12 @@ namespace boost::range::set_difference(collections::range(column_names.size()), used_column_indices_sorted, std::back_inserter(missing_column_indices)); + *root_serializer_ptr = message_serializer.get(); + +#if 0 + LOG_INFO(&Poco::Logger::get("ProtobufSerializer"), "Serialization tree:\n{}", get_root_desc_function(0)); +#endif + return message_serializer; } @@ -2635,10 +2805,35 @@ namespace /// Set `columns_are_reordered_outside` to true if you're going to reorder columns /// according to `used_column_indices` returned and pass to /// ProtobufSerializerMessage::setColumns() only the columns which are actually used. - template std::unique_ptr buildMessageSerializerImpl( size_t num_columns, - const StringOrStringViewT * column_names, + const String * column_names, + const DataTypePtr * data_types, + const MessageDescriptor & message_descriptor, + bool with_length_delimiter, + const FieldDescriptor * parent_field_descriptor, + std::vector & used_column_indices, + bool columns_are_reordered_outside) + { + std::vector column_names_sv; + column_names_sv.reserve(num_columns); + for (size_t i = 0; i != num_columns; ++i) + column_names_sv.emplace_back(column_names[i]); + + return buildMessageSerializerImpl( + num_columns, + column_names_sv.data(), + data_types, + message_descriptor, + with_length_delimiter, + parent_field_descriptor, + used_column_indices, + columns_are_reordered_outside); + } + + std::unique_ptr buildMessageSerializerImpl( + size_t num_columns, + const std::string_view * column_names, const DataTypePtr * data_types, const MessageDescriptor & message_descriptor, bool with_length_delimiter, @@ -2814,7 +3009,11 @@ namespace if (nested_message_serializer) { - auto field_serializer = std::make_unique(std::move(nested_message_serializer)); + std::vector column_names_used; + for (size_t i : used_column_indices_in_nested) + column_names_used.emplace_back(nested_column_names[i]); + auto field_serializer = std::make_unique( + std::move(column_names_used), field_descriptor, std::move(nested_message_serializer), get_root_desc_function); transformColumnIndices(used_column_indices_in_nested, nested_column_indices); add_field_serializer(column_name, std::move(used_column_indices_in_nested), *field_descriptor, std::move(field_serializer)); break; @@ -2856,34 +3055,34 @@ namespace auto data_type_id = data_type->getTypeId(); switch (data_type_id) { - case TypeIndex::UInt8: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::UInt16: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::UInt32: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::UInt64: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::UInt128: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::UInt256: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Int8: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Int16: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Int32: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Int64: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Int128: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Int256: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Float32: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Float64: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::Date: return std::make_unique(field_descriptor, reader_or_writer); - case TypeIndex::DateTime: return std::make_unique(assert_cast(*data_type), field_descriptor, reader_or_writer); - case TypeIndex::DateTime64: return std::make_unique(assert_cast(*data_type), field_descriptor, reader_or_writer); - case TypeIndex::String: return std::make_unique>(field_descriptor, reader_or_writer); - case TypeIndex::FixedString: return std::make_unique>(typeid_cast>(data_type), field_descriptor, reader_or_writer); - case TypeIndex::Enum8: return std::make_unique>(typeid_cast>(data_type), field_descriptor, reader_or_writer); - case TypeIndex::Enum16: return std::make_unique>(typeid_cast>(data_type), field_descriptor, reader_or_writer); - case TypeIndex::Decimal32: return std::make_unique>(assert_cast &>(*data_type), field_descriptor, reader_or_writer); - case TypeIndex::Decimal64: return std::make_unique>(assert_cast &>(*data_type), field_descriptor, reader_or_writer); - case TypeIndex::Decimal128: return std::make_unique>(assert_cast &>(*data_type), field_descriptor, reader_or_writer); - case TypeIndex::Decimal256: return std::make_unique>(assert_cast &>(*data_type), field_descriptor, reader_or_writer); - case TypeIndex::UUID: return std::make_unique(field_descriptor, reader_or_writer); - case TypeIndex::Interval: return std::make_unique(field_descriptor, reader_or_writer); - case TypeIndex::AggregateFunction: return std::make_unique(typeid_cast>(data_type), field_descriptor, reader_or_writer); + case TypeIndex::UInt8: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::UInt16: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::UInt32: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::UInt64: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::UInt128: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::UInt256: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Int8: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Int16: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Int32: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Int64: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Int128: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Int256: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Float32: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Float64: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Date: return std::make_unique(column_name, field_descriptor, reader_or_writer); + case TypeIndex::DateTime: return std::make_unique(column_name, assert_cast(*data_type), field_descriptor, reader_or_writer); + case TypeIndex::DateTime64: return std::make_unique(column_name, assert_cast(*data_type), field_descriptor, reader_or_writer); + case TypeIndex::String: return std::make_unique>(column_name, field_descriptor, reader_or_writer); + case TypeIndex::FixedString: return std::make_unique>(column_name, typeid_cast>(data_type), field_descriptor, reader_or_writer); + case TypeIndex::Enum8: return std::make_unique>(column_name, typeid_cast>(data_type), field_descriptor, reader_or_writer); + case TypeIndex::Enum16: return std::make_unique>(column_name, typeid_cast>(data_type), field_descriptor, reader_or_writer); + case TypeIndex::Decimal32: return std::make_unique>(column_name, assert_cast &>(*data_type), field_descriptor, reader_or_writer); + case TypeIndex::Decimal64: return std::make_unique>(column_name, assert_cast &>(*data_type), field_descriptor, reader_or_writer); + case TypeIndex::Decimal128: return std::make_unique>(column_name, assert_cast &>(*data_type), field_descriptor, reader_or_writer); + case TypeIndex::Decimal256: return std::make_unique>(column_name, assert_cast &>(*data_type), field_descriptor, reader_or_writer); + case TypeIndex::UUID: return std::make_unique(column_name, field_descriptor, reader_or_writer); + case TypeIndex::Interval: return std::make_unique(column_name, field_descriptor, reader_or_writer); + case TypeIndex::AggregateFunction: return std::make_unique(column_name, typeid_cast>(data_type), field_descriptor, reader_or_writer); case TypeIndex::Nullable: { @@ -2981,6 +3180,7 @@ namespace return nullptr; return std::make_unique( + column_name, typeid_cast>(data_type), field_descriptor, std::move(nested_serializers)); @@ -3007,6 +3207,8 @@ namespace } const ProtobufReaderOrWriter reader_or_writer; + std::function get_root_desc_function; + std::shared_ptr root_serializer_ptr; }; } diff --git a/src/Formats/ProtobufSerializer.h b/src/Formats/ProtobufSerializer.h index 315a138f9cf..3eaca6a18d6 100644 --- a/src/Formats/ProtobufSerializer.h +++ b/src/Formats/ProtobufSerializer.h @@ -15,7 +15,7 @@ class ProtobufWriter; class IDataType; using DataTypePtr = std::shared_ptr; using DataTypes = std::vector; - +class WriteBuffer; /// Utility class, does all the work for serialization in the Protobuf format. class ProtobufSerializer @@ -30,6 +30,8 @@ public: virtual void readRow(size_t row_num) = 0; virtual void insertDefaults(size_t row_num) = 0; + virtual void describeTree(WriteBuffer & out, size_t indent) const = 0; + static std::unique_ptr create( const Strings & column_names, const DataTypes & data_types,