From c9626314f7e7b128eccfd5fca7e04c9ff1fc45c5 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 31 May 2023 19:22:44 +0000 Subject: [PATCH] Better --- contrib/capnproto | 2 +- src/Formats/CapnProtoSerializer.cpp | 1028 ++++++++--------- ...lumnsStructureToQueryWithClusterEngine.cpp | 52 + ...ColumnsStructureToQueryWithClusterEngine.h | 14 + 4 files changed, 529 insertions(+), 567 deletions(-) create mode 100644 src/Storages/addColumnsStructureToQueryWithClusterEngine.cpp create mode 100644 src/Storages/addColumnsStructureToQueryWithClusterEngine.h diff --git a/contrib/capnproto b/contrib/capnproto index dc8b50b9997..976209a6d18 160000 --- a/contrib/capnproto +++ b/contrib/capnproto @@ -1 +1 @@ -Subproject commit dc8b50b999777bcb23c89bb5907c785c3f654441 +Subproject commit 976209a6d18074804f60d18ef99b6a809d27dadf diff --git a/src/Formats/CapnProtoSerializer.cpp b/src/Formats/CapnProtoSerializer.cpp index 91e207a1846..e36f5fa4947 100644 --- a/src/Formats/CapnProtoSerializer.cpp +++ b/src/Formats/CapnProtoSerializer.cpp @@ -94,9 +94,21 @@ namespace std::vector> field_builders; }; + template + std::unique_ptr initStructBuilder(ParentBuilder & parent_builder, UInt32 offset_or_index, const capnp::_::StructSize & struct_size, size_t elements, const capnp::StructSchema & schema) + { + capnp::DynamicStruct::Builder builder_impl; + if constexpr (std::is_same_v) + builder_impl = capnp::DynamicStruct::Builder(schema, parent_builder.getBuilderImpl().getPointerField(offset_or_index).initStruct(struct_size)); + else + builder_impl = capnp::DynamicStruct::Builder(schema, parent_builder.getBuilderImpl().getStructElement(offset_or_index)); + return std::make_unique(std::move(builder_impl), elements); + } + class ICapnProtoSerializer { public: + /// Write row as struct field. virtual void writeRow( const ColumnPtr & column, std::unique_ptr & builder, @@ -104,6 +116,7 @@ namespace UInt32 slot_offset, size_t row_num) = 0; + /// Write row as list element. virtual void writeRow( const ColumnPtr & column, std::unique_ptr & builder, @@ -111,8 +124,10 @@ namespace UInt32 array_index, size_t row_num) = 0; + /// Read row from struct field at slot_offset. virtual void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) = 0; + /// Read row from list element at array_index. virtual void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) = 0; virtual ~ICapnProtoSerializer() = default; @@ -124,32 +139,32 @@ namespace public: void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - CapnProtoNumericType value = static_cast(assert_cast &>(*column).getElement(row_num)); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - CapnProtoNumericType value = static_cast(assert_cast &>(*column).getElement(row_num)); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - CapnProtoNumericType value = reader_impl.getDataField(slot_offset); - if constexpr (convert_to_bool_on_read) - assert_cast(column).insertValue(static_cast(value)); - else - assert_cast &>(column).insertValue(static_cast(value)); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - CapnProtoNumericType value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + CapnProtoNumericType getValue(const ColumnPtr & column, size_t row_num) + { + return static_cast(assert_cast &>(*column).getElement(row_num)); + } + + void insertValue(IColumn & column, CapnProtoNumericType value) + { if constexpr (convert_to_bool_on_read) assert_cast(column).insertValue(static_cast(value)); else @@ -191,29 +206,32 @@ namespace public: void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - CapnProtoFloatType value = static_cast(assert_cast &>(*column).getElement(row_num)); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - CapnProtoFloatType value = static_cast(assert_cast &>(*column).getElement(row_num)); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - CapnProtoFloatType value = reader_impl.getDataField(slot_offset); - assert_cast &>(column).insertValue(static_cast(value)); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - CapnProtoFloatType value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + CapnProtoFloatType getValue(const ColumnPtr & column, size_t row_num) + { + return static_cast(assert_cast &>(*column).getElement(row_num)); + } + + void insertValue(IColumn & column, CapnProtoFloatType value) + { assert_cast &>(column).insertValue(static_cast(value)); } }; @@ -298,57 +316,41 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - EnumType enum_value = assert_cast &>(*column).getElement(row_num); - UInt16 capnp_value; - if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) - capnp_value = static_cast(enum_value); - else - capnp_value = ch_to_capnp_values[enum_value]; - - builder_impl.setDataField(slot_offset, capnp_value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - EnumType enum_value = assert_cast &>(*column).getElement(row_num); - UInt16 capnp_value; - if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) - capnp_value = static_cast(enum_value); - else - capnp_value = ch_to_capnp_values[enum_value]; - - builder_impl.setDataElement(array_index, capnp_value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - UInt16 capnp_value = reader_impl.getDataField(slot_offset); - EnumType value; - if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) - value = static_cast(capnp_value); - else - value = capnp_to_ch_values[capnp_value]; - - assert_cast &>(column).insertValue(value); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - UInt16 capnp_value = reader_impl.getDataElement(array_index); - EnumType value; - if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) - value = static_cast(capnp_value); - else - value = capnp_to_ch_values[capnp_value]; - - assert_cast &>(column).insertValue(value); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); } private: + UInt16 getValue(const ColumnPtr & column, size_t row_num) + { + EnumType enum_value = assert_cast &>(*column).getElement(row_num); + if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) + return static_cast(enum_value); + return ch_to_capnp_values[enum_value]; + } + + void insertValue(IColumn & column, UInt16 capnp_enum_value) + { + if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) + assert_cast &>(column).insertValue(static_cast(capnp_enum_value)); + else + assert_cast &>(column).insertValue(capnp_to_ch_values[capnp_enum_value]); + } + DataTypePtr data_type; capnp::EnumSchema enum_schema; const FormatSettings::CapnProtoEnumComparingMode enum_comparing_mode; @@ -367,29 +369,32 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - UInt16 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - UInt16 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - UInt16 value = reader_impl.getDataField(slot_offset); - assert_cast(column).insertValue(value); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - UInt16 value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + UInt16 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, UInt16 value) + { assert_cast(column).insertValue(value); } }; @@ -405,29 +410,32 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - Int32 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - Int32 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - Int32 value = reader_impl.getDataField(slot_offset); - assert_cast(column).insertValue(value); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - Int32 value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + Int32 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, Int32 value) + { assert_cast(column).insertValue(value); } }; @@ -443,29 +451,32 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - UInt32 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - UInt32 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - UInt32 value = reader_impl.getDataField(slot_offset); - assert_cast(column).insertValue(value); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - UInt32 value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + UInt32 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, UInt32 value) + { assert_cast(column).insertValue(value); } }; @@ -481,29 +492,32 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - Int64 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - Int64 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - Int64 value = reader_impl.getDataField(slot_offset); - assert_cast(column).insertValue(value); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - Int64 value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + Int64 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, Int64 value) + { assert_cast(column).insertValue(value); } }; @@ -523,275 +537,36 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - DecimalType value = assert_cast &>(*column).getElement(row_num); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - DecimalType value = assert_cast &>(*column).getElement(row_num); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - NativeType value = reader_impl.getDataField(slot_offset); - assert_cast &>(column).insertValue(value); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - NativeType value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + NativeType getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast &>(*column).getElement(row_num); + } + + void insertValue(IColumn & column, NativeType value) + { assert_cast &>(column).insertValue(value); } }; - template - class CapnProtoFixedSizeRawDataSerializer : public ICapnProtoSerializer - { - private: - static constexpr size_t value_size = sizeof(T); - - public: - CapnProtoFixedSizeRawDataSerializer(const DataTypePtr & data_type_, const String & column_name, const capnp::Type & capnp_type) : data_type(data_type_) - { - if (!capnp_type.isData()) - throwCannotConvert(data_type, column_name, capnp_type); - } - - void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override - { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - auto data = column->getDataAt(row_num); - capnp::Data::Reader value = capnp::Data::Reader(reinterpret_cast(data.data), data.size); - builder_impl.getPointerField(slot_offset).template setBlob(value); - } - - void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override - { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - auto data = column->getDataAt(row_num); - capnp::Data::Reader value = capnp::Data::Reader(reinterpret_cast(data.data), data.size); - builder_impl.getPointerElement(array_index).setBlob(value); - } - - void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override - { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - capnp::Data::Reader value = reader_impl.getPointerField(slot_offset).template getBlob(nullptr, 0); - if (value.size() != value_size) - throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected size of {} value: {}", data_type->getName(), value.size()); - - column.insertData(reinterpret_cast(value.begin()), value.size()); - } - - void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override - { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - capnp::Data::Reader value = reader_impl.getPointerElement(array_index).getBlob(nullptr, 0); - if (value.size() != value_size) - throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected size of {} value: {}", data_type->getName(), value.size()); - - column.insertData(reinterpret_cast(value.begin()), value.size()); - } - - private: - DataTypePtr data_type; - }; - - template - class CapnProtoStringSerializer : public ICapnProtoSerializer - { - public: - CapnProtoStringSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) - { - if (!capnp_type.isData() && !capnp_type.isText()) - throwCannotConvert(data_type, column_name, capnp_type); - } - - void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override - { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - auto data = column->getDataAt(row_num); - if constexpr (is_binary) - { - capnp::Data::Reader value = capnp::Data::Reader(reinterpret_cast(data.data), data.size); - builder_impl.getPointerField(slot_offset).setBlob(value); - } - else - { - capnp::Text::Reader value = capnp::Text::Reader(data.data, data.size); - builder_impl.getPointerField(slot_offset).setBlob(value); - } - } - - void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override - { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - auto data = column->getDataAt(row_num); - if constexpr (is_binary) - { - capnp::Data::Reader value = capnp::Data::Reader(reinterpret_cast(data.data), data.size); - builder_impl.getPointerElement(array_index).setBlob(value); - } - else - { - capnp::Text::Reader value = capnp::Text::Reader(data.data, data.size); - builder_impl.getPointerElement(array_index).setBlob(value); - } - } - - void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override - { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - if constexpr (is_binary) - { - capnp::Data::Reader value = reader_impl.getPointerField(slot_offset).getBlob(nullptr, 0); - column.insertData(reinterpret_cast(value.begin()), value.size()); - } - else - { - capnp::Text::Reader value = reader_impl.getPointerField(slot_offset).getBlob(nullptr, 0); - column.insertData(reinterpret_cast(value.begin()), value.size()); - } - } - - void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override - { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - if constexpr (is_binary) - { - capnp::Data::Reader value = reader_impl.getPointerElement(array_index).getBlob(nullptr, 0); - column.insertData(reinterpret_cast(value.begin()), value.size()); - } - else - { - capnp::Text::Reader value = reader_impl.getPointerElement(array_index).getBlob(nullptr, 0); - column.insertData(reinterpret_cast(value.begin()), value.size()); - } - } - }; - - template - class CapnProtoFixedStringSerializer : public ICapnProtoSerializer - { - public: - CapnProtoFixedStringSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type_) : capnp_type(capnp_type_) - { - if (!capnp_type.isData() && !capnp_type.isText()) - throwCannotConvert(data_type, column_name, capnp_type); - } - - void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override - { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - auto data = column->getDataAt(row_num); - if constexpr (is_binary) - { - capnp::Data::Reader value = capnp::Data::Reader(reinterpret_cast(data.data), data.size); - builder_impl.getPointerField(slot_offset).setBlob(value); - } - else - { - if (data.data[data.size - 1] == 0) - { - capnp::Text::Reader value = capnp::Text::Reader(data.data, data.size); - builder_impl.getPointerField(slot_offset).setBlob(value); - } - else - { - /// In TEXT type data should be null-terminated, but ClickHouse FixedString data could not be. - /// To make data null-terminated we should copy it to temporary String object and use it in capnp::Text::Reader. - /// Note that capnp::Text::Reader works only with pointer to the data and it's size, so we should - /// guarantee that new String object life time is longer than capnp::Text::Reader life time. - tmp_string = data.toString(); - capnp::Text::Reader value = capnp::Text::Reader(tmp_string.data(), tmp_string.size()); - builder_impl.getPointerField(slot_offset).setBlob(value); - } - } - } - - void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override - { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - auto data = column->getDataAt(row_num); - if constexpr (is_binary) - { - capnp::Data::Reader value = capnp::Data::Reader(reinterpret_cast(data.data), data.size); - builder_impl.getPointerElement(array_index).setBlob(value); - } - else - { - if (data.data[data.size - 1] == 0) - { - capnp::Text::Reader value = capnp::Text::Reader(data.data, data.size); - builder_impl.getPointerElement(array_index).setBlob(value); - } - else - { - /// In TEXT type data should be null-terminated, but ClickHouse FixedString data could not be. - /// To make data null-terminated we should copy it to temporary String object and use it in capnp::Text::Reader. - /// Note that capnp::Text::Reader works only with pointer to the data and it's size, so we should - /// guarantee that new String object life time is longer than capnp::Text::Reader life time. - tmp_string = data.toString(); - capnp::Text::Reader value = capnp::Text::Reader(tmp_string.data(), tmp_string.size()); - builder_impl.getPointerElement(array_index).setBlob(value); - } - } - } - - void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override - { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - auto & fixed_string_column = assert_cast(column); - if constexpr (is_binary) - { - capnp::Data::Reader value = reader_impl.getPointerField(slot_offset).getBlob(nullptr, 0); - if (value.size() > fixed_string_column.getN()) - throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read data with size {} to FixedString with size {}", value.size(), fixed_string_column.getN()); - - fixed_string_column.insertData(reinterpret_cast(value.begin()), value.size()); - } - else - { - capnp::Text::Reader value = reader_impl.getPointerField(slot_offset).getBlob(nullptr, 0); - if (value.size() > fixed_string_column.getN()) - throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read data with size {} to FixedString with size {}", value.size(), fixed_string_column.getN()); - - fixed_string_column.insertData(reinterpret_cast(value.begin()), value.size()); - } - } - - void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override - { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - auto & fixed_string_column = assert_cast(column); - if constexpr (is_binary) - { - capnp::Data::Reader value = reader_impl.getPointerElement(array_index).getBlob(nullptr, 0); - if (value.size() > fixed_string_column.getN()) - throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read data with size {} to FixedString with size {}", value.size(), fixed_string_column.getN()); - - fixed_string_column.insertData(reinterpret_cast(value.begin()), value.size()); - } - else - { - capnp::Text::Reader value = reader_impl.getPointerElement(array_index).getBlob(nullptr, 0); - if (value.size() > fixed_string_column.getN()) - throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read data with size {} to FixedString with size {}", value.size(), fixed_string_column.getN()); - - fixed_string_column.insertData(reinterpret_cast(value.begin()), value.size()); - } - } - - private: - String tmp_string; - capnp::Type capnp_type; - }; class CapnProtoIPv4Serializer : public ICapnProtoSerializer { @@ -804,33 +579,204 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - auto & builder_impl = parent_struct_builder.getBuilderImpl(); - UInt32 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataField(slot_offset, value); + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); } void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - auto & builder_impl = parent_list_builder.getBuilderImpl(); - UInt32 value = assert_cast(*column).getElement(row_num); - builder_impl.setDataElement(array_index, value); + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - UInt32 value = reader_impl.getDataField(slot_offset); - assert_cast(column).insertValue(IPv4(value)); + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - UInt32 value = reader_impl.getDataElement(array_index); + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + UInt32 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, UInt32 value) + { assert_cast(column).insertValue(IPv4(value)); } }; + template + class CapnProtoFixedSizeRawDataSerializer : public ICapnProtoSerializer + { + private: + static constexpr size_t expected_value_size = sizeof(T); + + public: + CapnProtoFixedSizeRawDataSerializer(const DataTypePtr & data_type_, const String & column_name, const capnp::Type & capnp_type) : data_type(data_type_) + { + if (!capnp_type.isData()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerField(slot_offset).setBlob(getData(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerElement(array_index).setBlob(getData(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertData(column, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getBlob(nullptr, 0)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertData(column, parent_list_reader.getReaderImpl().getPointerElement(array_index).getBlob(nullptr, 0)); + } + + private: + capnp::Data::Reader getData(const ColumnPtr & column, size_t row_num) + { + auto data = column->getDataAt(row_num); + return capnp::Data::Reader(reinterpret_cast(data.data), data.size); + } + + void insertData(IColumn & column, capnp::Data::Reader data) + { + if (data.size() != expected_value_size) + throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected size of {} value: {}", data_type->getName(), data.size()); + + column.insertData(reinterpret_cast(data.begin()), data.size()); + } + + DataTypePtr data_type; + }; + + template + class CapnProtoStringSerializer : public ICapnProtoSerializer + { + public: + CapnProtoStringSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + if (!capnp_type.isData() && !capnp_type.isText()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerField(slot_offset).setBlob(getData(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerElement(array_index).setBlob(getData(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertData(column, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getBlob(nullptr, 0)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertData(column, parent_list_reader.getReaderImpl().getPointerElement(array_index).getBlob(nullptr, 0)); + } + + private: + using Reader = typename CapnpType::Reader; + + CapnpType::Reader getData(const ColumnPtr & column, size_t row_num) + { + auto data = column->getDataAt(row_num); + if constexpr (std::is_same_v) + return Reader(reinterpret_cast(data.data), data.size); + else + return Reader(data.data, data.size); + } + + void insertData(IColumn & column, Reader data) + { + column.insertData(reinterpret_cast(data.begin()), data.size()); + } + }; + + template + class CapnProtoFixedStringSerializer : public ICapnProtoSerializer + { + private: + + public: + CapnProtoFixedStringSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type_) : capnp_type(capnp_type_) + { + if (!capnp_type.isData() && !capnp_type.isText()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerField(slot_offset).setBlob(getData(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerElement(array_index).setBlob(getData(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertData(column, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getBlob(nullptr, 0)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertData(column, parent_list_reader.getReaderImpl().getPointerElement(array_index).getBlob(nullptr, 0)); + } + + private: + using Reader = typename CapnpType::Reader; + + CapnpType::Reader getData(const ColumnPtr & column, size_t row_num) + { + auto data = column->getDataAt(row_num); + if constexpr (std::is_same_v) + { + return Reader(reinterpret_cast(data.data), data.size); + } + else + { + if (data.data[data.size - 1] == 0) + return Reader(data.data, data.size); + + /// In TEXT type data should be null-terminated, but ClickHouse FixedString data could not be. + /// To make data null-terminated we should copy it to temporary String object and use it in capnp::Text::Reader. + /// Note that capnp::Text::Reader works only with pointer to the data and it's size, so we should + /// guarantee that new String object life time is longer than capnp::Text::Reader life time. + tmp_string = data.toString(); + return Reader(tmp_string.data(), tmp_string.size()); + } + } + + void insertData(IColumn & column, Reader data) + { + auto & fixed_string_column = assert_cast(column); + if (data.size() > fixed_string_column.getN()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read data with size {} to FixedString with size {}", data.size(), fixed_string_column.getN()); + + fixed_string_column.insertData(reinterpret_cast(data.begin()), data.size()); + } + + String tmp_string; + capnp::Type capnp_type; + }; + std::unique_ptr createSerializer(const DataTypePtr & type, const String & name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings); class CapnProtoLowCardinalitySerializer : public ICapnProtoSerializer @@ -843,37 +789,43 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - const auto & low_cardinality_column = assert_cast(*column); - size_t index = low_cardinality_column.getIndexAt(row_num); - const auto & dict_column = low_cardinality_column.getDictionary().getNestedColumn(); - nested_serializer->writeRow(dict_column, field_builder, parent_struct_builder, slot_offset, index); + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); } void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - const auto & low_cardinality_column = assert_cast(*column); - size_t index = low_cardinality_column.getIndexAt(row_num); - const auto & dict_column = low_cardinality_column.getDictionary().getNestedColumn(); - nested_serializer->writeRow(dict_column, field_builder, parent_list_builder, array_index, index); + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); } void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - auto & low_cardinality_column = assert_cast(column); - auto tmp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); - nested_serializer->readRow(*tmp_column, parent_struct_reader, slot_offset); - low_cardinality_column.insertFromFullColumn(*tmp_column, 0); + readRowImpl(column, parent_struct_reader, slot_offset); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - auto & low_cardinality_column = assert_cast(column); - auto tmp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); - nested_serializer->readRow(*tmp_column, parent_list_reader, array_index); - low_cardinality_column.insertFromFullColumn(*tmp_column, 0); + readRowImpl(column, parent_list_reader, array_index); } private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) + { + const auto & low_cardinality_column = assert_cast(*column); + size_t index = low_cardinality_column.getIndexAt(row_num); + const auto & dict_column = low_cardinality_column.getDictionary().getNestedColumn(); + nested_serializer->writeRow(dict_column, field_builder, parent_builder, offset_or_index, index); + } + + template + void readRowImpl(IColumn & column, const ParentReader & parent_reader, UInt32 offset_or_index) + { + auto & low_cardinality_column = assert_cast(column); + auto tmp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); + nested_serializer->readRow(*tmp_column, parent_reader, offset_or_index); + low_cardinality_column.insertFromFullColumn(*tmp_column, 0); + } + std::unique_ptr nested_serializer; }; @@ -938,38 +890,32 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - if (!field_builder) - { - auto builder_impl = parent_struct_builder.getBuilderImpl(); - auto struct_builder_impl = capnp::DynamicStruct::Builder(struct_schema, builder_impl.getPointerField(slot_offset).initStruct(struct_size)); - field_builder = std::make_unique(std::move(struct_builder_impl), 1); - } - - auto & struct_builder = assert_cast(*field_builder); - - const auto & nullable_column = assert_cast(*column); - if (nullable_column.isNullAt(row_num)) - { - auto struct_builder_impl = struct_builder.impl.getBuilderImpl(); - struct_builder_impl.setDataField(discriminant_offset, null_discriminant); - struct_builder_impl.setDataField(nested_slot_offset, capnp::Void()); - } - else - { - const auto & nested_column = nullable_column.getNestedColumnPtr(); - struct_builder.impl.getBuilderImpl().setDataField(discriminant_offset, nested_discriminant); - nested_serializer->writeRow(nested_column, struct_builder.field_builders[0], struct_builder.impl, nested_slot_offset, row_num); - } + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); } void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getStruct(nullptr)); + readRowImpl(column, struct_reader); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_list_reader.getReaderImpl().getStructElement(array_index)); + readRowImpl(column, struct_reader); + } + + private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) { if (!field_builder) - { - auto builder_impl = parent_list_builder.getBuilderImpl(); - auto struct_builder_impl = capnp::DynamicStruct::Builder(struct_schema, builder_impl.getStructElement(array_index)); - field_builder = std::make_unique(std::move(struct_builder_impl), 1); - } + field_builder = initStructBuilder(parent_builder, offset_or_index, struct_size, 1, struct_schema); auto & struct_builder = assert_cast(*field_builder); @@ -988,12 +934,9 @@ namespace } } - void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + void readRowImpl(IColumn & column, capnp::DynamicStruct::Reader & struct_reader) { auto & nullable_column = assert_cast(column); - auto reader_impl = parent_struct_reader.getReaderImpl(); - auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, reader_impl.getPointerField(slot_offset).getStruct(nullptr)); - auto discriminant = struct_reader.getReaderImpl().getDataField(discriminant_offset); if (discriminant == null_discriminant) @@ -1006,25 +949,7 @@ namespace } } - void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override - { - auto & nullable_column = assert_cast(column); - auto reader_impl = parent_list_reader.getReaderImpl(); - auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, reader_impl.getStructElement(array_index)); - auto discriminant = struct_reader.getReaderImpl().getDataField(discriminant_offset); - - if (discriminant == null_discriminant) - nullable_column.insertDefault(); - else - { - auto & nested_column = nullable_column.getNestedColumn(); - nested_serializer->readRow(nested_column, struct_reader, nested_slot_offset); - nullable_column.getNullMapData().push_back(0); - } - } - - private: std::unique_ptr nested_serializer; capnp::StructSchema struct_schema; capnp::_::StructSize struct_size; @@ -1058,29 +983,29 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - const auto * array_column = assert_cast(column.get()); - const auto & nested_column = array_column->getDataPtr(); - const auto & offsets = array_column->getOffsets(); - auto offset = offsets[row_num - 1]; - UInt32 size = static_cast(offsets[row_num] - offset); - - if (!field_builder) - { - auto builder_impl = parent_struct_builder.getBuilderImpl(); - capnp::DynamicList::Builder list_builder_impl; - if (element_is_struct) - list_builder_impl = capnp::DynamicList::Builder(list_schema, builder_impl.getPointerField(slot_offset).initStructList(size, element_struct_size)); - else - list_builder_impl = capnp::DynamicList::Builder(list_schema, builder_impl.getPointerField(slot_offset).initList(element_size, size)); - field_builder = std::make_unique(std::move(list_builder_impl), size); - } - - auto & list_builder = assert_cast(*field_builder); - for (UInt32 i = 0; i != size; ++i) - nested_serializer->writeRow(nested_column, list_builder.nested_builders[i], list_builder.impl, i, offset + i); + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); } void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + auto list_reader = capnp::DynamicList::Reader(list_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getList(element_size, nullptr)); + readRowImpl(column, list_reader); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + auto list_reader = capnp::DynamicList::Reader(list_schema, parent_list_reader.getReaderImpl().getPointerElement(array_index).getList(element_size, nullptr)); + readRowImpl(column, list_reader); + } + + private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) { const auto * array_column = assert_cast(column.get()); const auto & nested_column = array_column->getDataPtr(); @@ -1089,25 +1014,32 @@ namespace UInt32 size = static_cast(offsets[row_num] - offset); if (!field_builder) - { - auto builder_impl = parent_list_builder.getBuilderImpl(); - capnp::DynamicList::Builder list_builder_impl; - if (element_is_struct) - list_builder_impl = capnp::DynamicList::Builder(list_schema, builder_impl.getPointerElement(array_index).initStructList(size, element_struct_size)); - else - list_builder_impl = capnp::DynamicList::Builder(list_schema, builder_impl.getPointerElement(array_index).initList(element_size, size)); - field_builder = std::make_unique(std::move(list_builder_impl), size); - } + field_builder = std::make_unique(capnp::DynamicList::Builder(list_schema, initListBuilder(parent_builder, offset_or_index, size)), size); auto & list_builder = assert_cast(*field_builder); for (UInt32 i = 0; i != size; ++i) nested_serializer->writeRow(nested_column, list_builder.nested_builders[i], list_builder.impl, i, offset + i); } - void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + template + capnp::_::ListBuilder initListBuilder(ParentBuilder & parent_builder, UInt32 offset_or_index, UInt32 size) + { + if (element_is_struct) + { + if constexpr (std::is_same_v) + return parent_builder.getBuilderImpl().getPointerField(offset_or_index).initStructList(size, element_struct_size); + else + return parent_builder.getBuilderImpl().getPointerElement(offset_or_index).initStructList(size, element_struct_size); + } + + if constexpr (std::is_same_v) + return parent_builder.getBuilderImpl().getPointerField(offset_or_index).initList(element_size, size); + else + return parent_builder.getBuilderImpl().getPointerElement(offset_or_index).initList(element_size, size); + } + + void readRowImpl(IColumn & column, const capnp::DynamicList::Reader & list_reader) { - const auto & reader_impl = parent_struct_reader.getReaderImpl(); - auto list_reader = capnp::DynamicList::Reader(list_schema, reader_impl.getPointerField(slot_offset).getList(element_size, nullptr)); UInt32 size = list_reader.size(); auto & column_array = assert_cast(column); auto & offsets = column_array.getOffsets(); @@ -1118,21 +1050,6 @@ namespace nested_serializer->readRow(nested_column, list_reader, i); } - void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override - { - const auto & reader_impl = parent_list_reader.getReaderImpl(); - auto list_reader = capnp::DynamicList::Reader(list_schema, reader_impl.getPointerElement(array_index).getList(element_size, nullptr)); - UInt32 size = list_reader.size(); - auto & column_array = assert_cast(column); - auto & offsets = column_array.getOffsets(); - offsets.push_back(offsets.back() + list_reader.size()); - - auto & nested_column = column_array.getData(); - for (UInt32 i = 0; i != size; ++i) - nested_serializer->readRow(nested_column, list_reader, i); - } - - private: capnp::ListSchema list_schema; std::unique_ptr nested_serializer; capnp::ElementSize element_size; @@ -1219,49 +1136,44 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - if (!field_builder) - { - auto builder_impl = parent_struct_builder.getBuilderImpl(); - auto struct_builder_impl = capnp::DynamicStruct::Builder(struct_schema, builder_impl.getPointerField(slot_offset).initStruct(struct_size)); - field_builder = std::make_unique(std::move(struct_builder_impl), 1); - } - - auto & struct_builder = assert_cast(*field_builder); - const auto & entries_column = assert_cast(column.get())->getNestedColumnPtr(); - nested_serializer->writeRow(entries_column, struct_builder.field_builders[0], struct_builder.impl, entries_slot_offset, row_num); + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); } void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getStruct(nullptr)); + readRowImpl(column, struct_reader); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_list_reader.getReaderImpl().getStructElement(array_index)); + readRowImpl(column, struct_reader); + } + + private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) { if (!field_builder) - { - auto builder_impl = parent_list_builder.getBuilderImpl(); - auto struct_builder_impl = capnp::DynamicStruct::Builder(struct_schema, builder_impl.getStructElement(array_index)); - field_builder = std::make_unique(std::move(struct_builder_impl), 1); - } + field_builder = initStructBuilder(parent_builder, offset_or_index, struct_size, 1, struct_schema); auto & struct_builder = assert_cast(*field_builder); const auto & entries_column = assert_cast(column.get())->getNestedColumnPtr(); nested_serializer->writeRow(entries_column, struct_builder.field_builders[0], struct_builder.impl, entries_slot_offset, row_num); } - void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + void readRowImpl(IColumn & column, const capnp::DynamicStruct::Reader & struct_reader) { - auto reader_impl = parent_struct_reader.getReaderImpl(); - auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, reader_impl.getPointerField(slot_offset).getStruct(nullptr)); auto & entries_column = assert_cast(column).getNestedColumn(); nested_serializer->readRow(entries_column, struct_reader, entries_slot_offset); } - void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override - { - auto reader_impl = parent_list_reader.getReaderImpl(); - auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, reader_impl.getStructElement(array_index)); - auto & entries_column = assert_cast(column).getNestedColumn(); - nested_serializer->readRow(entries_column, struct_reader, entries_slot_offset); - } - - private: std::unique_ptr nested_serializer; capnp::StructSchema struct_schema; capnp::_::StructSize struct_size; @@ -1332,48 +1244,15 @@ namespace void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override { - if (!field_builder) - { - auto builder_impl = parent_struct_builder.getBuilderImpl(); - auto struct_builder_impl = capnp::DynamicStruct::Builder(struct_schema, builder_impl.getPointerField(slot_offset).initStruct(struct_size)); - field_builder = std::make_unique(std::move(struct_builder_impl), fields_count); - } - - auto & struct_builder = assert_cast(*field_builder); - if (const auto * tuple_column = typeid_cast(column.get())) - { - const auto & columns = tuple_column->getColumns(); - for (size_t i = 0; i != columns.size(); ++i) - fields_serializers[i]->writeRow(columns[i], struct_builder.field_builders[fields_indexes[i]], struct_builder.impl, fields_offsets[i], row_num); - } - else - { - fields_serializers[0]->writeRow(column, struct_builder.field_builders[fields_indexes[0]], struct_builder.impl, fields_offsets[0], row_num); - } + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); } void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override { - if (!field_builder) - { - auto builder_impl = parent_list_builder.getBuilderImpl(); - auto struct_builder_impl = capnp::DynamicStruct::Builder(struct_schema, builder_impl.getStructElement(array_index)); - field_builder = std::make_unique(std::move(struct_builder_impl), fields_count); - } - - auto & struct_builder = assert_cast(*field_builder); - if (const auto * tuple_column = typeid_cast(column.get())) - { - const auto & columns = tuple_column->getColumns(); - for (size_t i = 0; i != columns.size(); ++i) - fields_serializers[i]->writeRow(columns[i], struct_builder.field_builders[fields_indexes[i]], struct_builder.impl, fields_offsets[i], row_num); - } - else - { - fields_serializers[0]->writeRow(column, struct_builder.field_builders[fields_indexes[0]], struct_builder.impl, fields_offsets[0], row_num); - } + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); } + /// Method for writing root struct. void writeRow(const Columns & columns, StructBuilder & struct_builder, size_t row_num) { for (size_t i = 0; i != columns.size(); ++i) @@ -1382,30 +1261,17 @@ namespace void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override { - auto reader_impl = parent_struct_reader.getReaderImpl(); - auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, reader_impl.getPointerField(slot_offset).getStruct(nullptr)); - if (auto * tuple_column = typeid_cast(&column)) - { - for (size_t i = 0; i != tuple_column->tupleSize(); ++i) - fields_serializers[i]->readRow(tuple_column->getColumn(i), struct_reader, fields_offsets[i]); - } - else - fields_serializers[0]->readRow(column, struct_reader, fields_offsets[0]); + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getStruct(nullptr)); + readRowImpl(column, struct_reader); } void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override { - auto reader_impl = parent_list_reader.getReaderImpl(); - auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, reader_impl.getStructElement(array_index)); - if (auto * tuple_column = typeid_cast(&column)) - { - for (size_t i = 0; i != tuple_column->tupleSize(); ++i) - fields_serializers[i]->readRow(tuple_column->getColumn(i), struct_reader, fields_offsets[i]); - } - else - fields_serializers[0]->readRow(column, struct_reader, fields_offsets[0]); + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_list_reader.getReaderImpl().getStructElement(array_index)); + readRowImpl(column, struct_reader); } + /// Method for reading from root struct. void readRow(MutableColumns & columns, const capnp::DynamicStruct::Reader & reader) { for (size_t i = 0; i != columns.size(); ++i) @@ -1435,6 +1301,36 @@ namespace } } + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) + { + if (!field_builder) + field_builder = initStructBuilder(parent_builder, offset_or_index, struct_size, fields_count, struct_schema); + + auto & struct_builder = assert_cast(*field_builder); + if (const auto * tuple_column = typeid_cast(column.get())) + { + const auto & columns = tuple_column->getColumns(); + for (size_t i = 0; i != columns.size(); ++i) + fields_serializers[i]->writeRow(columns[i], struct_builder.field_builders[fields_indexes[i]], struct_builder.impl, fields_offsets[i], row_num); + } + else + { + fields_serializers[0]->writeRow(column, struct_builder.field_builders[fields_indexes[0]], struct_builder.impl, fields_offsets[0], row_num); + } + } + + void readRowImpl(IColumn & column, const capnp::DynamicStruct::Reader & struct_reader) + { + if (auto * tuple_column = typeid_cast(&column)) + { + for (size_t i = 0; i != tuple_column->tupleSize(); ++i) + fields_serializers[i]->readRow(tuple_column->getColumn(i), struct_reader, fields_offsets[i]); + } + else + fields_serializers[0]->readRow(column, struct_reader, fields_offsets[0]); + } + capnp::StructSchema struct_schema; capnp::_::StructSize struct_size; size_t fields_count; @@ -1515,12 +1411,12 @@ namespace return std::make_unique>(type, name, capnp_type, settings.enum_comparing_mode); case TypeIndex::String: if (capnp_type.isData()) - return std::make_unique>(type, name, capnp_type); - return std::make_unique>(type, name, capnp_type); + return std::make_unique>(type, name, capnp_type); + return std::make_unique>(type, name, capnp_type); case TypeIndex::FixedString: if (capnp_type.isData()) - return std::make_unique>(type, name, capnp_type); - return std::make_unique>(type, name, capnp_type); + return std::make_unique>(type, name, capnp_type); + return std::make_unique>(type, name, capnp_type); case TypeIndex::LowCardinality: return std::make_unique(type, name, capnp_type, settings); case TypeIndex::Nullable: diff --git a/src/Storages/addColumnsStructureToQueryWithClusterEngine.cpp b/src/Storages/addColumnsStructureToQueryWithClusterEngine.cpp new file mode 100644 index 00000000000..106161ae620 --- /dev/null +++ b/src/Storages/addColumnsStructureToQueryWithClusterEngine.cpp @@ -0,0 +1,52 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) +{ + auto * select_query = query->as(); + if (!select_query || !select_query->tables()) + return nullptr; + + auto * tables = select_query->tables()->as(); + auto * table_expression = tables->children[0]->as()->table_expression->as(); + if (!table_expression->table_function) + return nullptr; + + auto * table_function = table_expression->table_function->as(); + return table_function->arguments->as(); +} + +void addColumnsStructureToQueryWithClusterEngine(ASTPtr & query, const String & structure, size_t max_arguments, const String & function_name) +{ + ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); + if (!expression_list) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function {}, got '{}'", function_name, queryToString(query)); + auto structure_literal = std::make_shared(structure); + + if (expression_list->children.size() < 2 || expression_list->children.size() > max_arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 to {} arguments in {} table functions, got {}", + function_name, max_arguments, expression_list->children.size()); + + if (expression_list->children.size() == 2 || expression_list->children.size() == max_arguments - 1) + { + auto format_literal = std::make_shared("auto"); + expression_list->children.push_back(format_literal); + } + + expression_list->children.push_back(structure_literal); +} + +} diff --git a/src/Storages/addColumnsStructureToQueryWithClusterEngine.h b/src/Storages/addColumnsStructureToQueryWithClusterEngine.h new file mode 100644 index 00000000000..5939f3f43aa --- /dev/null +++ b/src/Storages/addColumnsStructureToQueryWithClusterEngine.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +namespace DB +{ + +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); + +/// Add structure argument for queries with s3Cluster/hdfsCluster table function. +void addColumnsStructureToQueryWithClusterEngine(ASTPtr & query, const String & structure, size_t max_arguments, const String & function_name); + +}