diff --git a/contrib/capnproto b/contrib/capnproto index dc8b50b9997..976209a6d18 160000 --- a/contrib/capnproto +++ b/contrib/capnproto @@ -1 +1 @@ -Subproject commit dc8b50b999777bcb23c89bb5907c785c3f654441 +Subproject commit 976209a6d18074804f60d18ef99b6a809d27dadf diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 3a23127e2fd..c7b515d4246 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -981,7 +981,7 @@ class IColumn; M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \ M(ORCCompression, output_format_orc_compression_method, "lz4", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \ \ - M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \ + M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \ \ M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \ M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \ diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index e0f16ea00db..a291a23c140 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -144,10 +144,10 @@ IMPLEMENT_SETTING_ENUM(TransactionsWaitCSNMode, ErrorCodes::BAD_ARGUMENTS, {"wait", TransactionsWaitCSNMode::WAIT}, {"wait_unknown", TransactionsWaitCSNMode::WAIT_UNKNOWN}}) -IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS, - {{"by_names", FormatSettings::EnumComparingMode::BY_NAMES}, - {"by_values", FormatSettings::EnumComparingMode::BY_VALUES}, - {"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}}) +IMPLEMENT_SETTING_ENUM(CapnProtoEnumComparingMode, ErrorCodes::BAD_ARGUMENTS, + {{"by_names", FormatSettings::CapnProtoEnumComparingMode::BY_NAMES}, + {"by_values", FormatSettings::CapnProtoEnumComparingMode::BY_VALUES}, + {"by_names_case_insensitive", FormatSettings::CapnProtoEnumComparingMode::BY_NAMES_CASE_INSENSITIVE}}) IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS, {{"None", FormatSettings::EscapingRule::None}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 3ae7bfaa673..1c5be910ef7 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -188,7 +188,7 @@ enum class TransactionsWaitCSNMode DECLARE_SETTING_ENUM(TransactionsWaitCSNMode) -DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode) +DECLARE_SETTING_ENUM_WITH_RENAME(CapnProtoEnumComparingMode, FormatSettings::CapnProtoEnumComparingMode) DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule) diff --git a/src/Formats/CapnProtoSchema.cpp b/src/Formats/CapnProtoSchema.cpp new file mode 100644 index 00000000000..559047a6f8d --- /dev/null +++ b/src/Formats/CapnProtoSchema.cpp @@ -0,0 +1,298 @@ +#include + +#if USE_CAPNP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA; + extern const int BAD_TYPE_OF_FIELD; + extern const int FILE_DOESNT_EXIST; + extern const int UNKNOWN_EXCEPTION; + extern const int CAPN_PROTO_BAD_TYPE; + extern const int BAD_ARGUMENTS; +} + +capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info) +{ + capnp::ParsedSchema schema; + try + { + int fd; + KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY)); // NOLINT(bugprone-suspicious-semicolon) + auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd)); + schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {}); + } + catch (const kj::Exception & e) + { + /// That's not good to determine the type of error by its description, but + /// this is the only way to do it here, because kj doesn't specify the type of error. + auto description = std::string_view(e.getDescription().cStr()); + if (description.find("No such file or directory") != String::npos || description.find("no such directory") != String::npos || description.find("no such file") != String::npos) + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath()); + + if (description.find("Parse error") != String::npos) + throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine()); + + throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, + "Unknown exception while parsing CapnProto schema: {}, schema dir and file: {}, {}", + description, schema_info.schemaDirectory(), schema_info.schemaPath()); + } + + auto message_maybe = schema.findNested(schema_info.messageName()); + auto * message_schema = kj::_::readMaybe(message_maybe); + if (!message_schema) + throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, + "CapnProto schema doesn't contain message with name {}", schema_info.messageName()); + return message_schema->asStruct(); +} + +bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema) +{ + return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size(); +} + +bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema) +{ + return struct_schema.getFields().size() == struct_schema.getUnionFields().size(); +} + +/// Get full name of type for better exception messages. +String getCapnProtoFullTypeName(const capnp::Type & type) +{ + static const std::map capnp_simple_type_names = + { + {capnp::schema::Type::Which::BOOL, "Bool"}, + {capnp::schema::Type::Which::VOID, "Void"}, + {capnp::schema::Type::Which::INT8, "Int8"}, + {capnp::schema::Type::Which::INT16, "Int16"}, + {capnp::schema::Type::Which::INT32, "Int32"}, + {capnp::schema::Type::Which::INT64, "Int64"}, + {capnp::schema::Type::Which::UINT8, "UInt8"}, + {capnp::schema::Type::Which::UINT16, "UInt16"}, + {capnp::schema::Type::Which::UINT32, "UInt32"}, + {capnp::schema::Type::Which::UINT64, "UInt64"}, + {capnp::schema::Type::Which::FLOAT32, "Float32"}, + {capnp::schema::Type::Which::FLOAT64, "Float64"}, + {capnp::schema::Type::Which::TEXT, "Text"}, + {capnp::schema::Type::Which::DATA, "Data"}, + {capnp::schema::Type::Which::INTERFACE, "Interface"}, + {capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"}, + }; + + switch (type.which()) + { + case capnp::schema::Type::Which::STRUCT: + { + auto struct_schema = type.asStruct(); + + auto non_union_fields = struct_schema.getNonUnionFields(); + std::vector non_union_field_names; + for (auto nested_field : non_union_fields) + non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType())); + + auto union_fields = struct_schema.getUnionFields(); + std::vector union_field_names; + for (auto nested_field : union_fields) + union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType())); + + String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")"; + /// Check if the struct is a named union. + if (non_union_field_names.empty()) + return union_name; + + String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", "); + /// Check if the struct contains unnamed union. + if (!union_field_names.empty()) + type_name += ", " + union_name; + type_name += ")"; + return type_name; + } + case capnp::schema::Type::Which::LIST: + return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")"; + case capnp::schema::Type::Which::ENUM: + { + auto enum_schema = type.asEnum(); + String enum_name = "Enum("; + auto enumerants = enum_schema.getEnumerants(); + for (unsigned i = 0; i != enumerants.size(); ++i) + { + enum_name += String(enumerants[i].getProto().getName()) + " = " + std::to_string(enumerants[i].getOrdinal()); + if (i + 1 != enumerants.size()) + enum_name += ", "; + } + enum_name += ")"; + return enum_name; + } + default: + auto it = capnp_simple_type_names.find(type.which()); + if (it == capnp_simple_type_names.end()) + throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type"); + return it->second; + } +} + +namespace +{ + + template + DataTypePtr getEnumDataTypeFromEnumerants(const capnp::EnumSchema::EnumerantList & enumerants) + { + std::vector> values; + for (auto enumerant : enumerants) + values.emplace_back(enumerant.getProto().getName(), ValueType(enumerant.getOrdinal())); + return std::make_shared>(std::move(values)); + } + + DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_schema) + { + auto enumerants = enum_schema.getEnumerants(); + if (enumerants.size() < 128) + return getEnumDataTypeFromEnumerants(enumerants); + if (enumerants.size() < 32768) + return getEnumDataTypeFromEnumerants(enumerants); + + throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums"); + } + + DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type, bool skip_unsupported_fields) + { + switch (capnp_type.which()) + { + case capnp::schema::Type::INT8: + return std::make_shared(); + case capnp::schema::Type::INT16: + return std::make_shared(); + case capnp::schema::Type::INT32: + return std::make_shared(); + case capnp::schema::Type::INT64: + return std::make_shared(); + case capnp::schema::Type::BOOL: [[fallthrough]]; + case capnp::schema::Type::UINT8: + return std::make_shared(); + case capnp::schema::Type::UINT16: + return std::make_shared(); + case capnp::schema::Type::UINT32: + return std::make_shared(); + case capnp::schema::Type::UINT64: + return std::make_shared(); + case capnp::schema::Type::FLOAT32: + return std::make_shared(); + case capnp::schema::Type::FLOAT64: + return std::make_shared(); + case capnp::schema::Type::DATA: [[fallthrough]]; + case capnp::schema::Type::TEXT: + return std::make_shared(); + case capnp::schema::Type::ENUM: + return getEnumDataTypeFromEnumSchema(capnp_type.asEnum()); + case capnp::schema::Type::LIST: + { + auto list_schema = capnp_type.asList(); + auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType(), skip_unsupported_fields); + if (!nested_type) + return nullptr; + return std::make_shared(nested_type); + } + case capnp::schema::Type::STRUCT: + { + auto struct_schema = capnp_type.asStruct(); + + + if (struct_schema.getFields().size() == 0) + { + if (skip_unsupported_fields) + return nullptr; + throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Empty messages are not supported"); + } + + /// Check if it can be Nullable. + if (checkIfStructIsNamedUnion(struct_schema)) + { + auto fields = struct_schema.getUnionFields(); + if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid())) + { + if (skip_unsupported_fields) + return nullptr; + throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported"); + } + auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType(); + if (value_type.isStruct() || value_type.isList()) + { + if (skip_unsupported_fields) + return nullptr; + throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable"); + } + + auto nested_type = getDataTypeFromCapnProtoType(value_type, skip_unsupported_fields); + if (!nested_type) + return nullptr; + return std::make_shared(nested_type); + } + + if (checkIfStructContainsUnnamedUnion(struct_schema)) + throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported"); + + /// Treat Struct as Tuple. + DataTypes nested_types; + Names nested_names; + for (auto field : struct_schema.getNonUnionFields()) + { + auto nested_type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields); + if (!nested_type) + continue; + nested_names.push_back(field.getProto().getName()); + nested_types.push_back(nested_type); + } + if (nested_types.empty()) + return nullptr; + return std::make_shared(std::move(nested_types), std::move(nested_names)); + } + default: + { + if (skip_unsupported_fields) + return nullptr; + throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type)); + } + } +} + +} + +NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields) +{ + if (checkIfStructContainsUnnamedUnion(schema)) + throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported"); + + NamesAndTypesList names_and_types; + for (auto field : schema.getNonUnionFields()) + { + auto name = field.getProto().getName(); + auto type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields); + if (type) + names_and_types.emplace_back(name, type); + } + if (names_and_types.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Cannot convert CapnProto schema to ClickHouse table schema, all fields have unsupported types"); + + return names_and_types; +} + +} + +#endif diff --git a/src/Formats/CapnProtoUtils.h b/src/Formats/CapnProtoSchema.h similarity index 59% rename from src/Formats/CapnProtoUtils.h rename to src/Formats/CapnProtoSchema.h index 2d8cdb418d7..225f6f56207 100644 --- a/src/Formats/CapnProtoUtils.h +++ b/src/Formats/CapnProtoSchema.h @@ -30,17 +30,14 @@ public: capnp::StructSchema getMessageSchema(const FormatSchemaInfo & schema_info); }; -std::pair splitCapnProtoFieldName(const String & name); +bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema); +bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema); -bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode); - -std::pair getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name); - -capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name); - -void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode); +/// Get full name of type for better exception messages. +String getCapnProtoFullTypeName(const capnp::Type & type); NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields); + } #endif diff --git a/src/Formats/CapnProtoSerializer.cpp b/src/Formats/CapnProtoSerializer.cpp new file mode 100644 index 00000000000..b306cca4f94 --- /dev/null +++ b/src/Formats/CapnProtoSerializer.cpp @@ -0,0 +1,1538 @@ +#include "config.h" + +#if USE_CAPNP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int THERE_IS_NO_COLUMN; + extern const int CAPN_PROTO_BAD_CAST; + extern const int INCORRECT_DATA; + extern const int ILLEGAL_COLUMN; +} + +namespace +{ + std::pair splitFieldName(const String & name) + { + const auto * begin = name.data(); + const auto * end = name.data() + name.size(); + const auto * it = find_first_symbols<'_', '.'>(begin, end); + String first = String(begin, it); + String second = it == end ? "" : String(it + 1, end); + return {first, second}; + } + + std::optional findFieldByName(const capnp::StructSchema & struct_schema, const String & name) + { + const auto & fields = struct_schema.getFields(); + for (auto field : fields) + { + auto field_name = String(field.getProto().getName()); + if (boost::to_lower_copy(name) == boost::to_lower_copy(field_name)) + return field; + } + return std::nullopt; + } + + [[noreturn]] void throwCannotConvert(const DataTypePtr & type, const String & name, const capnp::Type & capnp_type) + { + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\" with type {} to CapnProto type {}", + name, + type->getName(), + getCapnProtoFullTypeName(capnp_type)); + } + + struct FieldBuilder + { + virtual ~FieldBuilder() = default; + }; + + struct ListBuilder : public FieldBuilder + { + explicit ListBuilder(capnp::DynamicValue::Builder builder, UInt32 elements_size) : impl(builder.as()), nested_builders(elements_size) + { + } + + capnp::DynamicList::Builder impl; + std::vector> nested_builders; + }; + + struct StructBuilder : public FieldBuilder + { + explicit StructBuilder(capnp::DynamicStruct::Builder struct_builder, size_t fields_size) : impl(std::move(struct_builder)), field_builders(fields_size) + { + } + + capnp::DynamicStruct::Builder impl; + std::vector> field_builders; + }; + + template + std::unique_ptr initStructBuilder(ParentBuilder & parent_builder, UInt32 offset_or_index, const capnp::_::StructSize & struct_size, size_t elements, const capnp::StructSchema & schema) + { + capnp::DynamicStruct::Builder builder_impl; + if constexpr (std::is_same_v) + builder_impl = capnp::DynamicStruct::Builder(schema, parent_builder.getBuilderImpl().getPointerField(offset_or_index).initStruct(struct_size)); + else + builder_impl = capnp::DynamicStruct::Builder(schema, parent_builder.getBuilderImpl().getStructElement(offset_or_index)); + return std::make_unique(std::move(builder_impl), elements); + } + + class ICapnProtoSerializer + { + public: + /// Write row as struct field. + virtual void writeRow( + const ColumnPtr & column, + std::unique_ptr & builder, /// Maybe unused for simple types, needed to initialize structs and lists. + capnp::DynamicStruct::Builder & parent_struct_builder, + UInt32 slot_offset, + size_t row_num) = 0; + + /// Write row as list element. + virtual void writeRow( + const ColumnPtr & column, + std::unique_ptr & builder, /// Maybe unused for simple types, needed to initialize structs and lists. + capnp::DynamicList::Builder & parent_list_builder, + UInt32 array_index, + size_t row_num) = 0; + + /// Read row from struct field at slot_offset. + virtual void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) = 0; + + /// Read row from list element at array_index. + virtual void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) = 0; + + virtual ~ICapnProtoSerializer() = default; + }; + + template + class CapnProtoIntegerSerializer : public ICapnProtoSerializer + { + public: + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + CapnProtoNumericType getValue(const ColumnPtr & column, size_t row_num) + { + return static_cast(assert_cast &>(*column).getElement(row_num)); + } + + void insertValue(IColumn & column, CapnProtoNumericType value) + { + if constexpr (convert_to_bool_on_read) + assert_cast(column).insertValue(static_cast(value)); + else + assert_cast &>(column).insertValue(static_cast(value)); + } + }; + + template + std::unique_ptr createIntegerSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + switch (capnp_type.which()) + { + case capnp::schema::Type::INT8: + return std::make_unique>(); + case capnp::schema::Type::INT16: + return std::make_unique>(); + case capnp::schema::Type::INT32: + return std::make_unique>(); + case capnp::schema::Type::INT64: + return std::make_unique>(); + case capnp::schema::Type::UINT8: + return std::make_unique>(); + case capnp::schema::Type::UINT16: + return std::make_unique>(); + case capnp::schema::Type::UINT32: + return std::make_unique>(); + case capnp::schema::Type::UINT64: + return std::make_unique>(); + case capnp::schema::Type::BOOL: + return std::make_unique>(); + default: + throwCannotConvert(data_type, column_name, capnp_type); + } + } + + template + class CapnProtoFloatSerializer : public ICapnProtoSerializer + { + public: + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + CapnProtoFloatType getValue(const ColumnPtr & column, size_t row_num) + { + return static_cast(assert_cast &>(*column).getElement(row_num)); + } + + void insertValue(IColumn & column, CapnProtoFloatType value) + { + assert_cast &>(column).insertValue(static_cast(value)); + } + }; + + template + std::unique_ptr createFloatSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + switch (capnp_type.which()) + { + case capnp::schema::Type::FLOAT32: + return std::make_unique>(); + case capnp::schema::Type::FLOAT64: + return std::make_unique>(); + default: + throwCannotConvert(data_type, column_name, capnp_type); + } + } + + template + class CapnProtoEnumSerializer : public ICapnProtoSerializer + { + public: + CapnProtoEnumSerializer( + const DataTypePtr & data_type_, + const String & column_name, + const capnp::Type & capnp_type, + const FormatSettings::CapnProtoEnumComparingMode enum_comparing_mode_) : data_type(data_type_), enum_comparing_mode(enum_comparing_mode_) + { + if (!capnp_type.isEnum()) + throwCannotConvert(data_type, column_name, capnp_type); + + const auto * enum_type = assert_cast *>(data_type.get()); + const auto & enum_values = dynamic_cast &>(*enum_type); + + enum_schema = capnp_type.asEnum(); + auto enumerants = enum_schema.getEnumerants(); + if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) + { + auto ch_enum_values = enum_values.getSetOfAllValues(); + std::unordered_set capn_enum_values; + for (auto enumerant : enumerants) + capn_enum_values.insert(enumerant.getOrdinal()); + + /// Check if ClickHouse values is a superset of CapnProto values. + ch_enum_is_superset = true; + /// In CapnProto Enum fields are numbered sequentially starting from zero. + /// Check if max CapnProto value exceeds max ClickHouse value. + constexpr auto max_value = std::is_same_v ? INT8_MAX : INT16_MAX; + if (enumerants.size() > max_value) + { + ch_enum_is_superset = false; + } + else + { + for (auto capnp_value : capn_enum_values) + { + if (!ch_enum_values.contains(static_cast(capnp_value))) + { + ch_enum_is_superset = false; + break; + } + } + } + + /// Check if CapnProto values is a superset of ClickHouse values. + capnp_enum_is_superset = true; + for (auto ch_value : ch_enum_values) + { + /// Capnp doesn't support negative enum values. + if (ch_value < 0 || !capn_enum_values.contains(static_cast(ch_value))) + { + capnp_enum_is_superset = false; + break; + } + } + } + else + { + bool to_lower = enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_NAMES_CASE_INSENSITIVE; + + auto all_values = enum_values.getValues(); + std::unordered_map ch_name_to_value; + for (auto & [name, value] : all_values) + ch_name_to_value[to_lower ? boost::algorithm::to_lower_copy(name) : name] = value; + + std::unordered_map capnp_name_to_value; + for (auto enumerant : enumerants) + { + String capnp_name = enumerant.getProto().getName(); + capnp_name_to_value[to_lower ? boost::algorithm::to_lower_copy(capnp_name) : capnp_name] = enumerant.getOrdinal(); + } + + /// Check if ClickHouse names is a superset of CapnProto names. + ch_enum_is_superset = true; + for (auto & [capnp_name, capnp_value] : capnp_name_to_value) + { + auto it = ch_name_to_value.find(capnp_name); + if (it == ch_name_to_value.end()) + { + ch_enum_is_superset = false; + break; + } + capnp_to_ch_values[capnp_value] = it->second; + } + + /// Check if CapnProto names is a superset of ClickHouse names. + capnp_enum_is_superset = true; + + for (auto & [ch_name, ch_value] : ch_name_to_value) + { + auto it = capnp_name_to_value.find(ch_name); + if (it == capnp_name_to_value.end()) + { + capnp_enum_is_superset = false; + break; + } + ch_to_capnp_values[ch_value] = it->second; + } + } + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + UInt16 getValue(const ColumnPtr & column, size_t row_num) + { + if (!capnp_enum_is_superset) + throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Cannot convert ClickHouse enum to CapnProto enum: CapnProto enum values/names is not a superset of ClickHouse enum values/names"); + + EnumType enum_value = assert_cast &>(*column).getElement(row_num); + if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) + return static_cast(enum_value); + auto it = ch_to_capnp_values.find(enum_value); + if (it == ch_to_capnp_values.end()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected value {} in ClickHouse enum", enum_value); + + return it->second; + } + + void insertValue(IColumn & column, UInt16 capnp_enum_value) + { + if (!ch_enum_is_superset) + throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Cannot convert CapnProto enum to ClickHouse enum: ClickHouse enum values/names is not a superset of CapnProto enum values/names"); + + if (enum_comparing_mode == FormatSettings::CapnProtoEnumComparingMode::BY_VALUES) + { + assert_cast &>(column).insertValue(static_cast(capnp_enum_value)); + } + else + { + auto it = capnp_to_ch_values.find(capnp_enum_value); + if (it == capnp_to_ch_values.end()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected value {} in CapnProto enum", capnp_enum_value); + + assert_cast &>(column).insertValue(it->second); + } + } + + DataTypePtr data_type; + capnp::EnumSchema enum_schema; + const FormatSettings::CapnProtoEnumComparingMode enum_comparing_mode; + bool ch_enum_is_superset; + bool capnp_enum_is_superset; + std::unordered_map ch_to_capnp_values; + std::unordered_map capnp_to_ch_values; + }; + + class CapnProtoDateSerializer : public ICapnProtoSerializer + { + public: + CapnProtoDateSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + if (!capnp_type.isUInt16()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + UInt16 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, UInt16 value) + { + assert_cast(column).insertValue(value); + } + }; + + class CapnProtoDate32Serializer : public ICapnProtoSerializer + { + public: + CapnProtoDate32Serializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + if (!capnp_type.isInt32()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + Int32 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, Int32 value) + { + assert_cast(column).insertValue(value); + } + }; + + class CapnProtoDateTimeSerializer : public ICapnProtoSerializer + { + public: + CapnProtoDateTimeSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + if (!capnp_type.isUInt32()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + UInt32 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, UInt32 value) + { + assert_cast(column).insertValue(value); + } + }; + + class CapnProtoDateTime64Serializer : public ICapnProtoSerializer + { + public: + CapnProtoDateTime64Serializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + if (!capnp_type.isInt64()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + Int64 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, Int64 value) + { + assert_cast(column).insertValue(value); + } + }; + + template + class CapnProtoDecimalSerializer : public ICapnProtoSerializer + { + public: + using NativeType = typename DecimalType::NativeType; + + CapnProtoDecimalSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + auto which = WhichDataType(data_type); + if ((!capnp_type.isInt32() && which.isDecimal32()) || (!capnp_type.isInt64() && which.isDecimal64())) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + NativeType getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast &>(*column).getElement(row_num); + } + + void insertValue(IColumn & column, NativeType value) + { + assert_cast &>(column).insertValue(value); + } + }; + + + class CapnProtoIPv4Serializer : public ICapnProtoSerializer + { + public: + CapnProtoIPv4Serializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + if (!capnp_type.isUInt32()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().setDataField(slot_offset, getValue(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + parent_list_builder.getBuilderImpl().setDataElement(array_index, getValue(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertValue(column, parent_struct_reader.getReaderImpl().getDataField(slot_offset)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertValue(column, parent_list_reader.getReaderImpl().getDataElement(array_index)); + } + + private: + UInt32 getValue(const ColumnPtr & column, size_t row_num) + { + return assert_cast(*column).getElement(row_num); + } + + void insertValue(IColumn & column, UInt32 value) + { + assert_cast(column).insertValue(IPv4(value)); + } + }; + + template + class CapnProtoFixedSizeRawDataSerializer : public ICapnProtoSerializer + { + private: + static constexpr size_t expected_value_size = sizeof(T); + + public: + CapnProtoFixedSizeRawDataSerializer(const DataTypePtr & data_type_, const String & column_name, const capnp::Type & capnp_type) : data_type(data_type_) + { + if (!capnp_type.isData()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerField(slot_offset).setBlob(getData(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerElement(array_index).setBlob(getData(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertData(column, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getBlob(nullptr, 0)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertData(column, parent_list_reader.getReaderImpl().getPointerElement(array_index).getBlob(nullptr, 0)); + } + + private: + capnp::Data::Reader getData(const ColumnPtr & column, size_t row_num) + { + auto data = column->getDataAt(row_num); + return capnp::Data::Reader(reinterpret_cast(data.data), data.size); + } + + void insertData(IColumn & column, capnp::Data::Reader data) + { + if (data.size() != expected_value_size) + throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected size of {} value: {}", data_type->getName(), data.size()); + + column.insertData(reinterpret_cast(data.begin()), data.size()); + } + + DataTypePtr data_type; + }; + + template + class CapnProtoStringSerializer : public ICapnProtoSerializer + { + public: + CapnProtoStringSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type) + { + if (!capnp_type.isData() && !capnp_type.isText()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerField(slot_offset).setBlob(getData(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerElement(array_index).setBlob(getData(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertData(column, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getBlob(nullptr, 0)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertData(column, parent_list_reader.getReaderImpl().getPointerElement(array_index).getBlob(nullptr, 0)); + } + + private: + using Reader = typename CapnpType::Reader; + + CapnpType::Reader getData(const ColumnPtr & column, size_t row_num) + { + auto data = column->getDataAt(row_num); + if constexpr (std::is_same_v) + return Reader(reinterpret_cast(data.data), data.size); + else + return Reader(data.data, data.size); + } + + void insertData(IColumn & column, Reader data) + { + column.insertData(reinterpret_cast(data.begin()), data.size()); + } + }; + + template + class CapnProtoFixedStringSerializer : public ICapnProtoSerializer + { + private: + + public: + CapnProtoFixedStringSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type_) : capnp_type(capnp_type_) + { + if (!capnp_type.isData() && !capnp_type.isText()) + throwCannotConvert(data_type, column_name, capnp_type); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerField(slot_offset).setBlob(getData(column, row_num)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr &, capnp::DynamicList::Builder & parent_struct_builder, UInt32 array_index, size_t row_num) override + { + parent_struct_builder.getBuilderImpl().getPointerElement(array_index).setBlob(getData(column, row_num)); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + insertData(column, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getBlob(nullptr, 0)); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + insertData(column, parent_list_reader.getReaderImpl().getPointerElement(array_index).getBlob(nullptr, 0)); + } + + private: + using Reader = typename CapnpType::Reader; + + CapnpType::Reader getData(const ColumnPtr & column, size_t row_num) + { + auto data = column->getDataAt(row_num); + if constexpr (std::is_same_v) + { + return Reader(reinterpret_cast(data.data), data.size); + } + else + { + if (data.data[data.size - 1] == 0) + return Reader(data.data, data.size); + + /// In TEXT type data should be null-terminated, but ClickHouse FixedString data could not be. + /// To make data null-terminated we should copy it to temporary String object and use it in capnp::Text::Reader. + /// Note that capnp::Text::Reader works only with pointer to the data and it's size, so we should + /// guarantee that new String object life time is longer than capnp::Text::Reader life time. + tmp_string = data.toString(); + return Reader(tmp_string.data(), tmp_string.size()); + } + } + + void insertData(IColumn & column, Reader data) + { + auto & fixed_string_column = assert_cast(column); + if (data.size() > fixed_string_column.getN()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read data with size {} to FixedString with size {}", data.size(), fixed_string_column.getN()); + + fixed_string_column.insertData(reinterpret_cast(data.begin()), data.size()); + } + + String tmp_string; + capnp::Type capnp_type; + }; + + std::unique_ptr createSerializer(const DataTypePtr & type, const String & name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings); + + class CapnProtoLowCardinalitySerializer : public ICapnProtoSerializer + { + public: + CapnProtoLowCardinalitySerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings) + { + nested_serializer = createSerializer(assert_cast(*data_type).getDictionaryType(), column_name, capnp_type, settings); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + readRowImpl(column, parent_struct_reader, slot_offset); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + readRowImpl(column, parent_list_reader, array_index); + } + + private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) + { + const auto & low_cardinality_column = assert_cast(*column); + size_t index = low_cardinality_column.getIndexAt(row_num); + const auto & dict_column = low_cardinality_column.getDictionary().getNestedColumn(); + nested_serializer->writeRow(dict_column, field_builder, parent_builder, offset_or_index, index); + } + + template + void readRowImpl(IColumn & column, const ParentReader & parent_reader, UInt32 offset_or_index) + { + auto & low_cardinality_column = assert_cast(column); + auto tmp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); + nested_serializer->readRow(*tmp_column, parent_reader, offset_or_index); + low_cardinality_column.insertFromFullColumn(*tmp_column, 0); + } + + std::unique_ptr nested_serializer; + }; + + class CapnProtoNullableSerializer : public ICapnProtoSerializer + { + public: + CapnProtoNullableSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings) + { + if (!capnp_type.isStruct()) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert column \"{}\": Nullable can be represented only as a named union of type Void and nested type, got CapnProto type {}", + column_name, + getCapnProtoFullTypeName(capnp_type)); + + /// Check that struct is a named union of type VOID and one arbitrary type. + struct_schema = capnp_type.asStruct(); + auto node = struct_schema.getProto().getStruct(); + struct_size = capnp::_::StructSize(node.getDataWordCount(), node.getPointerCount()); + discriminant_offset = node.getDiscriminantOffset(); + if (!checkIfStructIsNamedUnion(struct_schema)) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert column \"{}\": Nullable can be represented only as a named union of type Void and nested type." + "Given CapnProto struct is not a named union: {}", + column_name, + getCapnProtoFullTypeName(capnp_type)); + + auto union_fields = struct_schema.getUnionFields(); + if (union_fields.size() != 2) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert column \"{}\": Nullable can be represented only as a named union of type Void and nested type." + "Given CapnProto union have more than 2 fields: {}", + column_name, + getCapnProtoFullTypeName(capnp_type)); + + auto first = union_fields[0]; + auto second = union_fields[1]; + auto nested_type = assert_cast(data_type.get())->getNestedType(); + nested_slot_offset = first.getProto().getSlot().getOffset(); /// Both fields have the same offset. + if (first.getType().isVoid()) + { + nested_serializer = createSerializer(nested_type, column_name, second.getType(), settings); + null_discriminant = 0; + nested_discriminant = 1; + } + else if (second.getType().isVoid()) + { + nested_serializer = createSerializer(nested_type, column_name, first.getType(), settings); + null_discriminant = 1; + nested_discriminant = 0; + } + else + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert column \"{}\": Nullable can be represented only as a named union of type Void and nested type." + "Given CapnProto union doesn't have field with type Void: {}", + column_name, + getCapnProtoFullTypeName(capnp_type)); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getStruct(nullptr)); + readRowImpl(column, struct_reader); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_list_reader.getReaderImpl().getStructElement(array_index)); + readRowImpl(column, struct_reader); + } + + private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) + { + if (!field_builder) + field_builder = initStructBuilder(parent_builder, offset_or_index, struct_size, 1, struct_schema); + + auto & struct_builder = assert_cast(*field_builder); + + const auto & nullable_column = assert_cast(*column); + if (nullable_column.isNullAt(row_num)) + { + auto struct_builder_impl = struct_builder.impl.getBuilderImpl(); + struct_builder_impl.setDataField(discriminant_offset, null_discriminant); + struct_builder_impl.setDataField(nested_slot_offset, capnp::Void()); + } + else + { + const auto & nested_column = nullable_column.getNestedColumnPtr(); + struct_builder.impl.getBuilderImpl().setDataField(discriminant_offset, nested_discriminant); + nested_serializer->writeRow(nested_column, struct_builder.field_builders[0], struct_builder.impl, nested_slot_offset, row_num); + } + } + + void readRowImpl(IColumn & column, capnp::DynamicStruct::Reader & struct_reader) + { + auto & nullable_column = assert_cast(column); + auto discriminant = struct_reader.getReaderImpl().getDataField(discriminant_offset); + + if (discriminant == null_discriminant) + nullable_column.insertDefault(); + else + { + auto & nested_column = nullable_column.getNestedColumn(); + nested_serializer->readRow(nested_column, struct_reader, nested_slot_offset); + nullable_column.getNullMapData().push_back(0); + } + } + + + std::unique_ptr nested_serializer; + capnp::StructSchema struct_schema; + capnp::_::StructSize struct_size; + UInt32 discriminant_offset; + UInt16 null_discriminant; + UInt16 nested_discriminant; + UInt32 nested_slot_offset; + }; + + class CapnProtoArraySerializer : public ICapnProtoSerializer + { + public: + CapnProtoArraySerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings) + { + if (!capnp_type.isList()) + throwCannotConvert(data_type, column_name, capnp_type); + + auto nested_type = assert_cast(data_type.get())->getNestedType(); + list_schema = capnp_type.asList(); + auto element_type = list_schema.getElementType(); + element_size = capnp::elementSizeFor(element_type.which()); + if (element_type.isStruct()) + { + element_is_struct = true; + auto node = element_type.asStruct().getProto().getStruct(); + element_struct_size = capnp::_::StructSize(node.getDataWordCount(), node.getPointerCount()); + } + + nested_serializer = createSerializer(nested_type, column_name, capnp_type.asList().getElementType(), settings); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + auto list_reader = capnp::DynamicList::Reader(list_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getList(element_size, nullptr)); + readRowImpl(column, list_reader); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + auto list_reader = capnp::DynamicList::Reader(list_schema, parent_list_reader.getReaderImpl().getPointerElement(array_index).getList(element_size, nullptr)); + readRowImpl(column, list_reader); + } + + private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) + { + const auto * array_column = assert_cast(column.get()); + const auto & nested_column = array_column->getDataPtr(); + const auto & offsets = array_column->getOffsets(); + auto offset = offsets[row_num - 1]; + UInt32 size = static_cast(offsets[row_num] - offset); + + if (!field_builder) + field_builder = std::make_unique(capnp::DynamicList::Builder(list_schema, initListBuilder(parent_builder, offset_or_index, size)), size); + + auto & list_builder = assert_cast(*field_builder); + for (UInt32 i = 0; i != size; ++i) + nested_serializer->writeRow(nested_column, list_builder.nested_builders[i], list_builder.impl, i, offset + i); + } + + template + capnp::_::ListBuilder initListBuilder(ParentBuilder & parent_builder, UInt32 offset_or_index, UInt32 size) + { + if (element_is_struct) + { + if constexpr (std::is_same_v) + return parent_builder.getBuilderImpl().getPointerField(offset_or_index).initStructList(size, element_struct_size); + else + return parent_builder.getBuilderImpl().getPointerElement(offset_or_index).initStructList(size, element_struct_size); + } + + if constexpr (std::is_same_v) + return parent_builder.getBuilderImpl().getPointerField(offset_or_index).initList(element_size, size); + else + return parent_builder.getBuilderImpl().getPointerElement(offset_or_index).initList(element_size, size); + } + + void readRowImpl(IColumn & column, const capnp::DynamicList::Reader & list_reader) + { + UInt32 size = list_reader.size(); + auto & column_array = assert_cast(column); + auto & offsets = column_array.getOffsets(); + offsets.push_back(offsets.back() + list_reader.size()); + + auto & nested_column = column_array.getData(); + for (UInt32 i = 0; i != size; ++i) + nested_serializer->readRow(nested_column, list_reader, i); + } + + capnp::ListSchema list_schema; + std::unique_ptr nested_serializer; + capnp::ElementSize element_size; + capnp::_::StructSize element_struct_size; + bool element_is_struct = false; + + }; + + class CapnProtoMapSerializer : public ICapnProtoSerializer + { + public: + CapnProtoMapSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings) + { + /// We output/input Map type as follow CapnProto schema + /// + /// struct Map { + /// struct Entry { + /// key @0: Key; + /// value @1: Value; + /// } + /// entries @0 :List(Entry); + /// } + + if (!capnp_type.isStruct()) + throwCannotConvert(data_type, column_name, capnp_type); + + struct_schema = capnp_type.asStruct(); + auto node = struct_schema.getProto().getStruct(); + struct_size = capnp::_::StructSize(node.getDataWordCount(), node.getPointerCount()); + + if (checkIfStructContainsUnnamedUnion(struct_schema)) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\" with type {} to CapnProto Struct with unnamed union {}", + column_name, + data_type->getName(), + getCapnProtoFullTypeName(capnp_type)); + + if (struct_schema.getFields().size() != 1) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\": Map type can be represented as a Struct with one list field, got struct: {}", + column_name, + getCapnProtoFullTypeName(capnp_type)); + + const auto & field_type = struct_schema.getFields()[0].getType(); + if (!field_type.isList()) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\": Map type can be represented as a Struct with one list field, got field: {}", + column_name, + getCapnProtoFullTypeName(field_type)); + + auto list_element_type = field_type.asList().getElementType(); + if (!list_element_type.isStruct()) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\": Field of struct that represents Map should be a list of structs, got list of {}", + column_name, + getCapnProtoFullTypeName(list_element_type)); + + auto key_value_struct = list_element_type.asStruct(); + if (checkIfStructContainsUnnamedUnion(key_value_struct)) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\": struct that represents Map entries is unnamed union: {}", + column_name, + getCapnProtoFullTypeName(list_element_type)); + + if (key_value_struct.getFields().size() != 2) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\": struct that represents Map entries should contain only 2 fields, got struct {}", + column_name, + getCapnProtoFullTypeName(list_element_type)); + + const auto & map_type = assert_cast(*data_type); + DataTypes types = {map_type.getKeyType(), map_type.getValueType()}; + Names names = {"key", "value"}; + auto entries_type = std::make_shared(std::make_shared(types, names)); + nested_serializer = createSerializer(entries_type, column_name, field_type, settings); + entries_slot_offset = struct_schema.getFields()[0].getProto().getSlot().getOffset(); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getStruct(nullptr)); + readRowImpl(column, struct_reader); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_list_reader.getReaderImpl().getStructElement(array_index)); + readRowImpl(column, struct_reader); + } + + private: + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) + { + if (!field_builder) + field_builder = initStructBuilder(parent_builder, offset_or_index, struct_size, 1, struct_schema); + + auto & struct_builder = assert_cast(*field_builder); + const auto & entries_column = assert_cast(column.get())->getNestedColumnPtr(); + nested_serializer->writeRow(entries_column, struct_builder.field_builders[0], struct_builder.impl, entries_slot_offset, row_num); + } + + void readRowImpl(IColumn & column, const capnp::DynamicStruct::Reader & struct_reader) + { + auto & entries_column = assert_cast(column).getNestedColumn(); + nested_serializer->readRow(entries_column, struct_reader, entries_slot_offset); + } + + std::unique_ptr nested_serializer; + capnp::StructSchema struct_schema; + capnp::_::StructSize struct_size; + UInt32 entries_slot_offset; + }; + + class CapnProtoStructureSerializer : public ICapnProtoSerializer + { + public: + CapnProtoStructureSerializer(const DataTypes & data_types, const Names & names, const capnp::StructSchema & schema, const FormatSettings::CapnProto & settings) : struct_schema(schema) + { + if (checkIfStructIsNamedUnion(schema) || checkIfStructContainsUnnamedUnion(schema)) + throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Root CapnProto Struct cannot be named union/struct with unnamed union"); + + initialize(data_types, names, settings); + } + + CapnProtoStructureSerializer(const DataTypePtr & data_type, const String & column_name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings) + { + if (!capnp_type.isStruct()) + throwCannotConvert(data_type, column_name, capnp_type); + + struct_schema = capnp_type.asStruct(); + + if (checkIfStructIsNamedUnion(struct_schema) || checkIfStructContainsUnnamedUnion(struct_schema)) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\" with type {} to CapnProto named union/struct with unnamed union {}", + column_name, + data_type->getName(), + getCapnProtoFullTypeName(capnp_type)); + + const auto * tuple_data_type = assert_cast(data_type.get()); + auto nested_types = tuple_data_type->getElements(); + Names nested_names; + bool have_explicit_names = tuple_data_type->haveExplicitNames(); + auto structure_fields = struct_schema.getFields(); + if (!have_explicit_names) + { + if (nested_types.size() != structure_fields.size()) + throw Exception( + ErrorCodes::CAPN_PROTO_BAD_CAST, + "Cannot convert ClickHouse column \"{}\" with type {} to CapnProto type {}: Tuple and Struct have different sizes {} != {}", + column_name, + data_type->getName(), + getCapnProtoFullTypeName(capnp_type), + nested_types.size(), + structure_fields.size()); + nested_names.reserve(structure_fields.size()); + for (auto field : structure_fields) + nested_names.push_back(field.getProto().getName()); + } + else + { + nested_names = tuple_data_type->getElementNames(); + } + + try + { + initialize(nested_types, nested_names, settings); + } + catch (Exception & e) + { + e.addMessage("(while converting column {})", column_name); + throw std::move(e); + } + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicStruct::Builder & parent_struct_builder, UInt32 slot_offset, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_struct_builder, slot_offset, row_num); + } + + void writeRow(const ColumnPtr & column, std::unique_ptr & field_builder, capnp::DynamicList::Builder & parent_list_builder, UInt32 array_index, size_t row_num) override + { + writeRowImpl(column, field_builder, parent_list_builder, array_index, row_num); + } + + /// Method for writing root struct. + void writeRow(const Columns & columns, StructBuilder & struct_builder, size_t row_num) + { + for (size_t i = 0; i != columns.size(); ++i) + fields_serializers[i]->writeRow(columns[i], struct_builder.field_builders[fields_indexes[i]], struct_builder.impl, fields_offsets[i], row_num); + } + + void readRow(IColumn & column, const capnp::DynamicStruct::Reader & parent_struct_reader, UInt32 slot_offset) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_struct_reader.getReaderImpl().getPointerField(slot_offset).getStruct(nullptr)); + readRowImpl(column, struct_reader); + } + + void readRow(IColumn & column, const capnp::DynamicList::Reader & parent_list_reader, UInt32 array_index) override + { + auto struct_reader = capnp::DynamicStruct::Reader(struct_schema, parent_list_reader.getReaderImpl().getStructElement(array_index)); + readRowImpl(column, struct_reader); + } + + /// Method for reading from root struct. + void readRow(MutableColumns & columns, const capnp::DynamicStruct::Reader & reader) + { + for (size_t i = 0; i != columns.size(); ++i) + fields_serializers[i]->readRow(*columns[i], reader, fields_offsets[i]); + } + + private: + void initialize(const DataTypes & data_types, const Names & names, const FormatSettings::CapnProto & settings) + { + auto node = struct_schema.getProto().getStruct(); + struct_size = capnp::_::StructSize(node.getDataWordCount(), node.getPointerCount()); + fields_count = struct_schema.getFields().size(); + fields_serializers.reserve(data_types.size()); + fields_offsets.reserve(data_types.size()); + fields_indexes.reserve(data_types.size()); + for (size_t i = 0; i != data_types.size(); ++i) + { + auto [field_name, _] = splitFieldName(names[i]); + auto field = findFieldByName(struct_schema, field_name); + if (!field) + throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto schema doesn't contain field with name {}", field_name); + + auto capnp_type = field->getType(); + fields_serializers.push_back(createSerializer(data_types[i], names[i], capnp_type, settings)); + fields_offsets.push_back(field->getProto().getSlot().getOffset()); + fields_indexes.push_back(field->getIndex()); + } + } + + template + void writeRowImpl(const ColumnPtr & column, std::unique_ptr & field_builder, ParentBuilder & parent_builder, UInt32 offset_or_index, size_t row_num) + { + if (!field_builder) + field_builder = initStructBuilder(parent_builder, offset_or_index, struct_size, fields_count, struct_schema); + + auto & struct_builder = assert_cast(*field_builder); + if (const auto * tuple_column = typeid_cast(column.get())) + { + const auto & columns = tuple_column->getColumns(); + for (size_t i = 0; i != columns.size(); ++i) + fields_serializers[i]->writeRow(columns[i], struct_builder.field_builders[fields_indexes[i]], struct_builder.impl, fields_offsets[i], row_num); + } + else + { + fields_serializers[0]->writeRow(column, struct_builder.field_builders[fields_indexes[0]], struct_builder.impl, fields_offsets[0], row_num); + } + } + + void readRowImpl(IColumn & column, const capnp::DynamicStruct::Reader & struct_reader) + { + if (auto * tuple_column = typeid_cast(&column)) + { + for (size_t i = 0; i != tuple_column->tupleSize(); ++i) + fields_serializers[i]->readRow(tuple_column->getColumn(i), struct_reader, fields_offsets[i]); + } + else + fields_serializers[0]->readRow(column, struct_reader, fields_offsets[0]); + } + + capnp::StructSchema struct_schema; + capnp::_::StructSize struct_size; + size_t fields_count; + std::vector> fields_serializers; + std::vector fields_offsets; + std::vector fields_indexes; + + }; + + std::unique_ptr createSerializer(const DataTypePtr & type, const String & name, const capnp::Type & capnp_type, const FormatSettings::CapnProto & settings) + { + auto [field_name, nested_name] = splitFieldName(name); + if (!nested_name.empty() && !capnp_type.isList()) + { + if (!capnp_type.isStruct()) + throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name); + + return std::make_unique(DataTypes{type}, Names{nested_name}, capnp_type.asStruct(), settings); + } + + switch (type->getTypeId()) + { + case TypeIndex::Int8: + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::UInt8: + if (isBool(type)) + return createIntegerSerializer(type, name, capnp_type); + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::Int16: + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::UInt16: + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::Int32: + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::UInt32: + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::Int64: + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::UInt64: + return createIntegerSerializer(type, name, capnp_type); + case TypeIndex::Int128: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::UInt128: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::Int256: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::UInt256: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::Float32: + return createFloatSerializer(type, name, capnp_type); + case TypeIndex::Float64: + return createFloatSerializer(type, name, capnp_type); + case TypeIndex::Date: + return std::make_unique(type, name, capnp_type); + case TypeIndex::Date32: + return std::make_unique(type, name, capnp_type); + case TypeIndex::DateTime: + return std::make_unique(type, name, capnp_type); + case TypeIndex::DateTime64: + return std::make_unique(type, name, capnp_type); + case TypeIndex::Decimal32: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::Decimal64: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::Decimal128: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::Decimal256: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::IPv4: + return std::make_unique(type, name, capnp_type); + case TypeIndex::IPv6: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::UUID: + return std::make_unique>(type, name, capnp_type); + case TypeIndex::Enum8: + return std::make_unique>(type, name, capnp_type, settings.enum_comparing_mode); + case TypeIndex::Enum16: + return std::make_unique>(type, name, capnp_type, settings.enum_comparing_mode); + case TypeIndex::String: + if (capnp_type.isData()) + return std::make_unique>(type, name, capnp_type); + return std::make_unique>(type, name, capnp_type); + case TypeIndex::FixedString: + if (capnp_type.isData()) + return std::make_unique>(type, name, capnp_type); + return std::make_unique>(type, name, capnp_type); + case TypeIndex::LowCardinality: + return std::make_unique(type, name, capnp_type, settings); + case TypeIndex::Nullable: + return std::make_unique(type, name, capnp_type, settings); + case TypeIndex::Array: + return std::make_unique(type, name, capnp_type, settings); + case TypeIndex::Map: + return std::make_unique(type, name, capnp_type, settings); + case TypeIndex::Tuple: + return std::make_unique(type, name, capnp_type, settings); + default: + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Type {} is not supported in CapnProto format", type->getName()); + } + } +} + +class CapnProtoSerializer::Impl +{ +public: + Impl(const DataTypes & data_types, const Names & names, const capnp::StructSchema & schema, const FormatSettings::CapnProto & settings) + : struct_serializer(std::make_unique(data_types, names, schema, settings)) + , fields_size(schema.getFields().size()) + { + } + + void writeRow(const Columns & columns, capnp::DynamicStruct::Builder builder, size_t row_num) + { + StructBuilder struct_builder(std::move(builder), fields_size); + struct_serializer->writeRow(columns, struct_builder, row_num); + } + + void readRow(MutableColumns & columns, capnp::DynamicStruct::Reader & reader) + { + struct_serializer->readRow(columns, reader); + } + +private: + std::unique_ptr struct_serializer; + size_t fields_size; +}; + +CapnProtoSerializer::CapnProtoSerializer(const DataTypes & data_types, const Names & names, const capnp::StructSchema & schema, const FormatSettings::CapnProto & settings) + : serializer_impl(std::make_unique(data_types, names, schema, settings)) +{ +} + +void CapnProtoSerializer::writeRow(const Columns & columns, capnp::DynamicStruct::Builder builder, size_t row_num) +{ + serializer_impl->writeRow(columns, std::move(builder), row_num); +} + +void CapnProtoSerializer::readRow(MutableColumns & columns, capnp::DynamicStruct::Reader & reader) +{ + serializer_impl->readRow(columns, reader); +} + +CapnProtoSerializer::~CapnProtoSerializer() = default; + +} + +#endif diff --git a/src/Formats/CapnProtoSerializer.h b/src/Formats/CapnProtoSerializer.h new file mode 100644 index 00000000000..5bdd1a0e554 --- /dev/null +++ b/src/Formats/CapnProtoSerializer.h @@ -0,0 +1,30 @@ +#pragma once + +#if USE_CAPNP + +#include +#include +#include + +namespace DB +{ + +class CapnProtoSerializer +{ +public: + CapnProtoSerializer(const DataTypes & data_types, const Names & names, const capnp::StructSchema & schema, const FormatSettings::CapnProto & settings); + + void writeRow(const Columns & columns, capnp::DynamicStruct::Builder builder, size_t row_num); + + void readRow(MutableColumns & columns, capnp::DynamicStruct::Reader & reader); + + ~CapnProtoSerializer(); + +private: + class Impl; + std::unique_ptr serializer_impl; +}; + +} + +#endif diff --git a/src/Formats/CapnProtoUtils.cpp b/src/Formats/CapnProtoUtils.cpp deleted file mode 100644 index d6c032408bb..00000000000 --- a/src/Formats/CapnProtoUtils.cpp +++ /dev/null @@ -1,734 +0,0 @@ -#include - -#if USE_CAPNP - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA; - extern const int THERE_IS_NO_COLUMN; - extern const int BAD_TYPE_OF_FIELD; - extern const int CAPN_PROTO_BAD_CAST; - extern const int FILE_DOESNT_EXIST; - extern const int UNKNOWN_EXCEPTION; - extern const int INCORRECT_DATA; - extern const int CAPN_PROTO_BAD_TYPE; - extern const int BAD_ARGUMENTS; -} - -std::pair splitCapnProtoFieldName(const String & name) -{ - const auto * begin = name.data(); - const auto * end = name.data() + name.size(); - const auto * it = find_first_symbols<'_', '.'>(begin, end); - String first = String(begin, it); - String second = it == end ? "" : String(it + 1, end); - return {first, second}; -} - -capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info) -{ - capnp::ParsedSchema schema; - try - { - int fd; - KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY)); // NOLINT(bugprone-suspicious-semicolon) - auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd)); - schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {}); - } - catch (const kj::Exception & e) - { - /// That's not good to determine the type of error by its description, but - /// this is the only way to do it here, because kj doesn't specify the type of error. - auto description = std::string_view(e.getDescription().cStr()); - if (description.find("No such file or directory") != String::npos || description.find("no such directory") != String::npos) - throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath()); - - if (description.find("Parse error") != String::npos) - throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine()); - - throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, - "Unknown exception while parsing CapnProto schema: {}, schema dir and file: {}, {}", - description, schema_info.schemaDirectory(), schema_info.schemaPath()); - } - - auto message_maybe = schema.findNested(schema_info.messageName()); - auto * message_schema = kj::_::readMaybe(message_maybe); - if (!message_schema) - throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, - "CapnProto schema doesn't contain message with name {}", schema_info.messageName()); - return message_schema->asStruct(); -} - -bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode) -{ - if (mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE) - return boost::algorithm::to_lower_copy(first) == boost::algorithm::to_lower_copy(second); - return first == second; -} - -static const std::map capnp_simple_type_names = -{ - {capnp::schema::Type::Which::BOOL, "Bool"}, - {capnp::schema::Type::Which::VOID, "Void"}, - {capnp::schema::Type::Which::INT8, "Int8"}, - {capnp::schema::Type::Which::INT16, "Int16"}, - {capnp::schema::Type::Which::INT32, "Int32"}, - {capnp::schema::Type::Which::INT64, "Int64"}, - {capnp::schema::Type::Which::UINT8, "UInt8"}, - {capnp::schema::Type::Which::UINT16, "UInt16"}, - {capnp::schema::Type::Which::UINT32, "UInt32"}, - {capnp::schema::Type::Which::UINT64, "UInt64"}, - {capnp::schema::Type::Which::FLOAT32, "Float32"}, - {capnp::schema::Type::Which::FLOAT64, "Float64"}, - {capnp::schema::Type::Which::TEXT, "Text"}, - {capnp::schema::Type::Which::DATA, "Data"}, - {capnp::schema::Type::Which::INTERFACE, "Interface"}, - {capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"}, -}; - -static bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema) -{ - return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size(); -} - -static bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema) -{ - return struct_schema.getFields().size() == struct_schema.getUnionFields().size(); -} - -/// Get full name of type for better exception messages. -static String getCapnProtoFullTypeName(const capnp::Type & type) -{ - switch (type.which()) - { - case capnp::schema::Type::Which::STRUCT: - { - auto struct_schema = type.asStruct(); - - auto non_union_fields = struct_schema.getNonUnionFields(); - std::vector non_union_field_names; - for (auto nested_field : non_union_fields) - non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType())); - - auto union_fields = struct_schema.getUnionFields(); - std::vector union_field_names; - for (auto nested_field : union_fields) - union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType())); - - String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")"; - /// Check if the struct is a named union. - if (non_union_field_names.empty()) - return union_name; - - String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", "); - /// Check if the struct contains unnamed union. - if (!union_field_names.empty()) - type_name += ", " + union_name; - type_name += ")"; - return type_name; - } - case capnp::schema::Type::Which::LIST: - return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")"; - case capnp::schema::Type::Which::ENUM: - { - auto enum_schema = type.asEnum(); - String enum_name = "Enum("; - auto enumerants = enum_schema.getEnumerants(); - for (unsigned i = 0; i != enumerants.size(); ++i) - { - enum_name += String(enumerants[i].getProto().getName()) + " = " + std::to_string(enumerants[i].getOrdinal()); - if (i + 1 != enumerants.size()) - enum_name += ", "; - } - enum_name += ")"; - return enum_name; - } - default: - auto it = capnp_simple_type_names.find(type.which()); - if (it == capnp_simple_type_names.end()) - throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type"); - return it->second; - } -} - -template -static bool checkEnums(const capnp::Type & capnp_type, const DataTypePtr column_type, FormatSettings::EnumComparingMode mode, UInt64 max_value, String & error_message) -{ - if (!capnp_type.isEnum()) - return false; - - auto enum_schema = capnp_type.asEnum(); - bool to_lower = mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE; - const auto * enum_type = assert_cast *>(column_type.get()); - const auto & enum_values = dynamic_cast &>(*enum_type); - - auto enumerants = enum_schema.getEnumerants(); - if (mode == FormatSettings::EnumComparingMode::BY_VALUES) - { - /// In CapnProto Enum fields are numbered sequentially starting from zero. - if (enumerants.size() > max_value) - { - error_message += "Enum from CapnProto schema contains values that is out of range for Clickhouse Enum"; - return false; - } - - auto values = enum_values.getSetOfAllValues(); - std::unordered_set capn_enum_values; - for (auto enumerant : enumerants) - capn_enum_values.insert(Type(enumerant.getOrdinal())); - auto result = values == capn_enum_values; - if (!result) - error_message += "The set of values in Enum from CapnProto schema is different from the set of values in ClickHouse Enum"; - return result; - } - - auto names = enum_values.getSetOfAllNames(to_lower); - std::unordered_set capn_enum_names; - - for (auto enumerant : enumerants) - { - String name = enumerant.getProto().getName(); - capn_enum_names.insert(to_lower ? boost::algorithm::to_lower_copy(name) : name); - } - - auto result = names == capn_enum_names; - if (!result) - error_message += "The set of names in Enum from CapnProto schema is different from the set of names in ClickHouse Enum"; - return result; -} - -static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name); - -static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name) -{ - if (!capnp_type.isStruct()) - return false; - - /// Check that struct is a named union of type VOID and one arbitrary type. - auto struct_schema = capnp_type.asStruct(); - if (!checkIfStructIsNamedUnion(struct_schema)) - return false; - - auto union_fields = struct_schema.getUnionFields(); - if (union_fields.size() != 2) - return false; - - auto first = union_fields[0]; - auto second = union_fields[1]; - - auto nested_type = assert_cast(data_type.get())->getNestedType(); - if (first.getType().isVoid()) - return checkCapnProtoType(second.getType(), nested_type, mode, error_message, column_name); - if (second.getType().isVoid()) - return checkCapnProtoType(first.getType(), nested_type, mode, error_message, column_name); - return false; -} - -static bool checkTupleType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message) -{ - if (!capnp_type.isStruct()) - return false; - auto struct_schema = capnp_type.asStruct(); - - if (checkIfStructIsNamedUnion(struct_schema)) - return false; - - if (checkIfStructContainsUnnamedUnion(struct_schema)) - { - error_message += "CapnProto struct contains unnamed union"; - return false; - } - - const auto * tuple_data_type = assert_cast(data_type.get()); - auto nested_types = tuple_data_type->getElements(); - if (nested_types.size() != struct_schema.getFields().size()) - { - error_message += "Tuple and Struct types have different sizes"; - return false; - } - - bool have_explicit_names = tuple_data_type->haveExplicitNames(); - const auto & nested_names = tuple_data_type->getElementNames(); - for (uint32_t i = 0; i != nested_names.size(); ++i) - { - if (have_explicit_names) - { - KJ_IF_MAYBE (field, struct_schema.findFieldByName(nested_names[i])) - { - if (!checkCapnProtoType(field->getType(), nested_types[tuple_data_type->getPositionByName(nested_names[i])], mode, error_message, nested_names[i])) - return false; - } - else - { - error_message += "CapnProto struct doesn't contain a field with name " + nested_names[i]; - return false; - } - } - else if (!checkCapnProtoType(struct_schema.getFields()[i].getType(), nested_types[tuple_data_type->getPositionByName(nested_names[i])], mode, error_message, nested_names[i])) - return false; - } - - return true; -} - -static bool checkArrayType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name) -{ - if (!capnp_type.isList()) - return false; - auto list_schema = capnp_type.asList(); - auto nested_type = assert_cast(data_type.get())->getNestedType(); - - auto [field_name, nested_name] = splitCapnProtoFieldName(column_name); - if (!nested_name.empty() && list_schema.getElementType().isStruct()) - { - auto struct_schema = list_schema.getElementType().asStruct(); - KJ_IF_MAYBE(field, struct_schema.findFieldByName(nested_name)) - return checkCapnProtoType(field->getType(), nested_type, mode, error_message, nested_name); - - error_message += "Element type of List {} doesn't contain field with name " + nested_name; - return false; - } - - return checkCapnProtoType(list_schema.getElementType(), nested_type, mode, error_message, column_name); -} - -static bool checkMapType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message) -{ - /// We output/input Map type as follow CapnProto schema - /// - /// struct Map { - /// struct Entry { - /// key @0: Key; - /// value @1: Value; - /// } - /// entries @0 :List(Entry); - /// } - - if (!capnp_type.isStruct()) - return false; - auto struct_schema = capnp_type.asStruct(); - - if (checkIfStructContainsUnnamedUnion(struct_schema)) - { - error_message += "CapnProto struct contains unnamed union"; - return false; - } - - if (struct_schema.getFields().size() != 1) - { - error_message += "CapnProto struct that represents Map type can contain only one field"; - return false; - } - - const auto & field_type = struct_schema.getFields()[0].getType(); - if (!field_type.isList()) - { - error_message += "Field of CapnProto struct that represents Map is not a list"; - return false; - } - - auto list_element_type = field_type.asList().getElementType(); - if (!list_element_type.isStruct()) - { - error_message += "Field of CapnProto struct that represents Map is not a list of structs"; - return false; - } - - auto key_value_struct = list_element_type.asStruct(); - if (checkIfStructContainsUnnamedUnion(key_value_struct)) - { - error_message += "CapnProto struct contains unnamed union"; - return false; - } - - if (key_value_struct.getFields().size() != 2) - { - error_message += "Key-value structure for Map struct should have exactly 2 fields"; - return false; - } - - const auto & map_type = assert_cast(*data_type); - DataTypes types = {map_type.getKeyType(), map_type.getValueType()}; - Names names = {"key", "value"}; - - for (size_t i = 0; i != types.size(); ++i) - { - KJ_IF_MAYBE(field, key_value_struct.findFieldByName(names[i])) - { - if (!checkCapnProtoType(field->getType(), types[i], mode, error_message, names[i])) - return false; - } - else - { - error_message += R"(Key-value structure for Map struct should have exactly 2 fields with names "key" and "value")"; - return false; - } - } - - return true; -} - -static bool isCapnInteger(const capnp::Type & capnp_type) -{ - return capnp_type.isInt8() || capnp_type.isUInt8() || capnp_type.isInt16() || capnp_type.isUInt16() || capnp_type.isInt32() - || capnp_type.isUInt32() || capnp_type.isInt64() || capnp_type.isUInt64(); -} - -static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name) -{ - switch (data_type->getTypeId()) - { - case TypeIndex::UInt8: - return capnp_type.isBool() || isCapnInteger(capnp_type); - case TypeIndex::Int8: [[fallthrough]]; - case TypeIndex::Int16: [[fallthrough]]; - case TypeIndex::UInt16: [[fallthrough]]; - case TypeIndex::Int32: [[fallthrough]]; - case TypeIndex::UInt32: [[fallthrough]]; - case TypeIndex::Int64: [[fallthrough]]; - case TypeIndex::UInt64: - /// Allow integer conversions durin input/output. - return isCapnInteger(capnp_type); - case TypeIndex::Date: - return capnp_type.isUInt16(); - case TypeIndex::DateTime: [[fallthrough]]; - case TypeIndex::IPv4: - return capnp_type.isUInt32(); - case TypeIndex::Date32: [[fallthrough]]; - case TypeIndex::Decimal32: - return capnp_type.isInt32() || capnp_type.isUInt32(); - case TypeIndex::DateTime64: [[fallthrough]]; - case TypeIndex::Decimal64: - return capnp_type.isInt64() || capnp_type.isUInt64(); - case TypeIndex::Float32:[[fallthrough]]; - case TypeIndex::Float64: - /// Allow converting between Float32 and isFloat64 - return capnp_type.isFloat32() || capnp_type.isFloat64(); - case TypeIndex::Enum8: - return checkEnums(capnp_type, data_type, mode, INT8_MAX, error_message); - case TypeIndex::Enum16: - return checkEnums(capnp_type, data_type, mode, INT16_MAX, error_message); - case TypeIndex::Int128: [[fallthrough]]; - case TypeIndex::UInt128: [[fallthrough]]; - case TypeIndex::Int256: [[fallthrough]]; - case TypeIndex::UInt256: [[fallthrough]]; - case TypeIndex::Decimal128: [[fallthrough]]; - case TypeIndex::Decimal256: - return capnp_type.isData(); - case TypeIndex::Tuple: - return checkTupleType(capnp_type, data_type, mode, error_message); - case TypeIndex::Nullable: - { - auto result = checkNullableType(capnp_type, data_type, mode, error_message, column_name); - if (!result) - error_message += "Nullable can be represented only as a named union of type Void and nested type"; - return result; - } - case TypeIndex::Array: - return checkArrayType(capnp_type, data_type, mode, error_message, column_name); - case TypeIndex::LowCardinality: - return checkCapnProtoType(capnp_type, assert_cast(data_type.get())->getDictionaryType(), mode, error_message, column_name); - case TypeIndex::FixedString: [[fallthrough]]; - case TypeIndex::IPv6: [[fallthrough]]; - case TypeIndex::String: - return capnp_type.isText() || capnp_type.isData(); - case TypeIndex::Map: - return checkMapType(capnp_type, data_type, mode, error_message); - default: - return false; - } -} - -capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name) -{ - auto [field_name, nested_name] = splitCapnProtoFieldName(name); - KJ_IF_MAYBE(field, struct_reader.getSchema().findFieldByName(field_name)) - { - capnp::DynamicValue::Reader field_reader; - try - { - field_reader = struct_reader.get(*field); - } - catch (const kj::Exception & e) - { - throw Exception(ErrorCodes::INCORRECT_DATA, - "Cannot extract field value from struct by provided schema, error: " - "{} Perhaps the data was generated by another schema", String(e.getDescription().cStr())); - } - - if (nested_name.empty()) - return field_reader; - - /// Support reading Nested as List of Structs. - if (field_reader.getType() == capnp::DynamicValue::LIST) - { - auto list_schema = field->getType().asList(); - if (!list_schema.getElementType().isStruct()) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name); - - auto struct_schema = list_schema.getElementType().asStruct(); - KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name)) - return field_reader; - - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name); - } - - if (field_reader.getType() != capnp::DynamicValue::STRUCT) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name); - - return getReaderByColumnName(field_reader.as(), nested_name); - } - - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name); -} - -std::pair getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name) -{ - auto [field_name, nested_name] = splitCapnProtoFieldName(name); - KJ_IF_MAYBE(field, struct_builder.getSchema().findFieldByName(field_name)) - { - if (nested_name.empty()) - return {struct_builder, *field}; - - auto field_builder = struct_builder.get(*field); - - /// Support reading Nested as List of Structs. - if (field_builder.getType() == capnp::DynamicValue::LIST) - { - auto list_schema = field->getType().asList(); - if (!list_schema.getElementType().isStruct()) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name); - - auto struct_schema = list_schema.getElementType().asStruct(); - KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name)) - return {struct_builder, *field}; - - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name); - } - - if (field_builder.getType() != capnp::DynamicValue::STRUCT) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name); - - return getStructBuilderAndFieldByColumnName(field_builder.as(), nested_name); - } - - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name); -} - -static std::pair getFieldByName(const capnp::StructSchema & schema, const String & name) -{ - auto [field_name, nested_name] = splitCapnProtoFieldName(name); - KJ_IF_MAYBE(field, schema.findFieldByName(field_name)) - { - if (nested_name.empty()) - return {*field, name}; - - /// Support reading Nested as List of Structs. - if (field->getType().isList()) - { - auto list_schema = field->getType().asList(); - if (!list_schema.getElementType().isStruct()) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name); - - auto struct_schema = list_schema.getElementType().asStruct(); - KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name)) - return {*field, name}; - - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name); - } - - if (!field->getType().isStruct()) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name); - - return getFieldByName(field->getType().asStruct(), nested_name); - } - - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto schema doesn't contain field with name {}", field_name); -} - -void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode) -{ - /// Firstly check that struct doesn't contain unnamed union, because we don't support it. - if (checkIfStructContainsUnnamedUnion(schema)) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Schema contains unnamed union that is not supported"); - auto names_and_types = header.getNamesAndTypesList(); - String additional_error_message; - for (auto & [name, type] : names_and_types) - { - auto [field, field_name] = getFieldByName(schema, name); - if (!checkCapnProtoType(field.getType(), type, mode, additional_error_message, field_name)) - { - auto e = Exception( - ErrorCodes::CAPN_PROTO_BAD_CAST, - "Cannot convert ClickHouse type {} to CapnProto type {}", - type->getName(), - getCapnProtoFullTypeName(field.getType())); - if (!additional_error_message.empty()) - e.addMessage(additional_error_message); - throw std::move(e); - } - } -} - -template -static DataTypePtr getEnumDataTypeFromEnumerants(const capnp::EnumSchema::EnumerantList & enumerants) -{ - std::vector> values; - for (auto enumerant : enumerants) - values.emplace_back(enumerant.getProto().getName(), ValueType(enumerant.getOrdinal())); - return std::make_shared>(std::move(values)); -} - -static DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_schema) -{ - auto enumerants = enum_schema.getEnumerants(); - if (enumerants.size() < 128) - return getEnumDataTypeFromEnumerants(enumerants); - if (enumerants.size() < 32768) - return getEnumDataTypeFromEnumerants(enumerants); - - throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums"); -} - -static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type, bool skip_unsupported_fields) -{ - switch (capnp_type.which()) - { - case capnp::schema::Type::INT8: - return std::make_shared(); - case capnp::schema::Type::INT16: - return std::make_shared(); - case capnp::schema::Type::INT32: - return std::make_shared(); - case capnp::schema::Type::INT64: - return std::make_shared(); - case capnp::schema::Type::BOOL: [[fallthrough]]; - case capnp::schema::Type::UINT8: - return std::make_shared(); - case capnp::schema::Type::UINT16: - return std::make_shared(); - case capnp::schema::Type::UINT32: - return std::make_shared(); - case capnp::schema::Type::UINT64: - return std::make_shared(); - case capnp::schema::Type::FLOAT32: - return std::make_shared(); - case capnp::schema::Type::FLOAT64: - return std::make_shared(); - case capnp::schema::Type::DATA: [[fallthrough]]; - case capnp::schema::Type::TEXT: - return std::make_shared(); - case capnp::schema::Type::ENUM: - return getEnumDataTypeFromEnumSchema(capnp_type.asEnum()); - case capnp::schema::Type::LIST: - { - auto list_schema = capnp_type.asList(); - auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType(), skip_unsupported_fields); - if (!nested_type) - return nullptr; - return std::make_shared(nested_type); - } - case capnp::schema::Type::STRUCT: - { - auto struct_schema = capnp_type.asStruct(); - - - if (struct_schema.getFields().size() == 0) - { - if (skip_unsupported_fields) - return nullptr; - throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Empty messages are not supported"); - } - - /// Check if it can be Nullable. - if (checkIfStructIsNamedUnion(struct_schema)) - { - auto fields = struct_schema.getUnionFields(); - if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid())) - { - if (skip_unsupported_fields) - return nullptr; - throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported"); - } - auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType(); - if (value_type.isStruct() || value_type.isList()) - { - if (skip_unsupported_fields) - return nullptr; - throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable"); - } - - auto nested_type = getDataTypeFromCapnProtoType(value_type, skip_unsupported_fields); - if (!nested_type) - return nullptr; - return std::make_shared(nested_type); - } - - if (checkIfStructContainsUnnamedUnion(struct_schema)) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported"); - - /// Treat Struct as Tuple. - DataTypes nested_types; - Names nested_names; - for (auto field : struct_schema.getNonUnionFields()) - { - auto nested_type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields); - if (!nested_type) - continue; - nested_names.push_back(field.getProto().getName()); - nested_types.push_back(nested_type); - } - if (nested_types.empty()) - return nullptr; - return std::make_shared(std::move(nested_types), std::move(nested_names)); - } - default: - { - if (skip_unsupported_fields) - return nullptr; - throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type)); - } - } -} - -NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields) -{ - if (checkIfStructContainsUnnamedUnion(schema)) - throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported"); - - NamesAndTypesList names_and_types; - for (auto field : schema.getNonUnionFields()) - { - auto name = field.getProto().getName(); - auto type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields); - if (type) - names_and_types.emplace_back(name, type); - } - if (names_and_types.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Cannot convert CapnProto schema to ClickHouse table schema, all fields have unsupported types"); - - return names_and_types; -} - -} - -#endif diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 74a96e477f4..54282a0c232 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -327,16 +327,16 @@ struct FormatSettings /// For capnProto format we should determine how to /// compare ClickHouse Enum and Enum from schema. - enum class EnumComparingMode + enum class CapnProtoEnumComparingMode { BY_NAMES, // Names in enums should be the same, values can be different. BY_NAMES_CASE_INSENSITIVE, // Case-insensitive name comparison. BY_VALUES, // Values should be the same, names can be different. }; - struct + struct CapnProto { - EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES; + CapnProtoEnumComparingMode enum_comparing_mode = CapnProtoEnumComparingMode::BY_VALUES; bool skip_fields_with_unsupported_types_in_schema_inference = false; } capn_proto; diff --git a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp index 2f84e9bde3c..c056ee2b4a4 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp @@ -9,42 +9,22 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - namespace DB { namespace ErrorCodes { - extern const int LOGICAL_ERROR; extern const int INCORRECT_DATA; } -CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_) - : IRowInputFormat(std::move(header), in_, std::move(params_)) +CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings) + : IRowInputFormat(std::move(header_), in_, std::move(params_)) , parser(std::make_shared()) - , format_settings(format_settings_) - , column_types(getPort().getHeader().getDataTypes()) - , column_names(getPort().getHeader().getNames()) { // Parse the schema and fetch the root object - root = parser->getMessageSchema(info); - checkCapnProtoSchemaStructure(root, getPort().getHeader(), format_settings.capn_proto.enum_comparing_mode); + schema = parser->getMessageSchema(info); + const auto & header = getPort().getHeader(); + serializer = std::make_unique(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto); } kj::Array CapnProtoRowInputFormat::readMessage() @@ -82,213 +62,6 @@ kj::Array CapnProtoRowInputFormat::readMessage() return msg; } -static void insertInteger(IColumn & column, const DataTypePtr & column_type, UInt64 value) -{ - switch (column_type->getTypeId()) - { - case TypeIndex::Int8: - assert_cast(column).insertValue(value); - break; - case TypeIndex::UInt8: - assert_cast(column).insertValue(value); - break; - case TypeIndex::Int16: - assert_cast(column).insertValue(value); - break; - case TypeIndex::Date: [[fallthrough]]; - case TypeIndex::UInt16: - assert_cast(column).insertValue(value); - break; - case TypeIndex::Int32: - assert_cast(column).insertValue(static_cast(value)); - break; - case TypeIndex::DateTime: [[fallthrough]]; - case TypeIndex::UInt32: - assert_cast(column).insertValue(static_cast(value)); - break; - case TypeIndex::IPv4: - assert_cast(column).insertValue(IPv4(static_cast(value))); - break; - case TypeIndex::Int64: - assert_cast(column).insertValue(value); - break; - case TypeIndex::UInt64: - assert_cast(column).insertValue(value); - break; - case TypeIndex::DateTime64: - assert_cast &>(column).insertValue(value); - break; - case TypeIndex::Decimal32: - assert_cast &>(column).insertValue(static_cast(value)); - break; - case TypeIndex::Decimal64: - assert_cast &>(column).insertValue(value); - break; - default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type {} cannot be parsed from integer", column_type->getName()); - } -} - -static void insertFloat(IColumn & column, const DataTypePtr & column_type, Float64 value) -{ - switch (column_type->getTypeId()) - { - case TypeIndex::Float32: - assert_cast(column).insertValue(static_cast(value)); - break; - case TypeIndex::Float64: - assert_cast(column).insertValue(value); - break; - default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not a float."); - } -} - -template -static void insertData(IColumn & column, const DataTypePtr & column_type, Value value) -{ - if (column_type->haveMaximumSizeOfValue() && value.size() != column_type->getSizeOfValueInMemory()) - throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected size of {} value: {}", column_type->getName(), value.size()); - - column.insertData(reinterpret_cast(value.begin()), value.size()); -} - -template -static void insertEnum(IColumn & column, const DataTypePtr & column_type, const capnp::DynamicEnum & enum_value, FormatSettings::EnumComparingMode enum_comparing_mode) -{ - auto enumerant = *kj::_::readMaybe(enum_value.getEnumerant()); - auto enum_type = assert_cast *>(column_type.get()); - DataTypePtr nested_type = std::make_shared>(); - switch (enum_comparing_mode) - { - case FormatSettings::EnumComparingMode::BY_VALUES: - insertInteger(column, nested_type, Int64(enumerant.getOrdinal())); - return; - case FormatSettings::EnumComparingMode::BY_NAMES: - insertInteger(column, nested_type, Int64(enum_type->getValue(String(enumerant.getProto().getName())))); - return; - case FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE: - { - /// Find the same enum name case insensitive. - String enum_name = enumerant.getProto().getName(); - for (auto & name : enum_type->getAllRegisteredNames()) - { - if (compareEnumNames(name, enum_name, enum_comparing_mode)) - { - insertInteger(column, nested_type, Int64(enum_type->getValue(name))); - break; - } - } - } - } -} - -static void insertValue(IColumn & column, const DataTypePtr & column_type, const String & column_name, const capnp::DynamicValue::Reader & value, FormatSettings::EnumComparingMode enum_comparing_mode) -{ - if (column_type->lowCardinality()) - { - auto & lc_column = assert_cast(column); - auto tmp_column = lc_column.getDictionary().getNestedColumn()->cloneEmpty(); - auto dict_type = assert_cast(column_type.get())->getDictionaryType(); - insertValue(*tmp_column, dict_type, column_name, value, enum_comparing_mode); - lc_column.insertFromFullColumn(*tmp_column, 0); - return; - } - - switch (value.getType()) - { - case capnp::DynamicValue::Type::INT: - insertInteger(column, column_type, value.as()); - break; - case capnp::DynamicValue::Type::UINT: - insertInteger(column, column_type, value.as()); - break; - case capnp::DynamicValue::Type::FLOAT: - insertFloat(column, column_type, value.as()); - break; - case capnp::DynamicValue::Type::BOOL: - insertInteger(column, column_type, UInt64(value.as())); - break; - case capnp::DynamicValue::Type::DATA: - insertData(column, column_type, value.as()); - break; - case capnp::DynamicValue::Type::TEXT: - insertData(column, column_type, value.as()); - break; - case capnp::DynamicValue::Type::ENUM: - if (column_type->getTypeId() == TypeIndex::Enum8) - insertEnum(column, column_type, value.as(), enum_comparing_mode); - else - insertEnum(column, column_type, value.as(), enum_comparing_mode); - break; - case capnp::DynamicValue::LIST: - { - auto list_value = value.as(); - auto & column_array = assert_cast(column); - auto & offsets = column_array.getOffsets(); - offsets.push_back(offsets.back() + list_value.size()); - - auto & nested_column = column_array.getData(); - auto nested_type = assert_cast(column_type.get())->getNestedType(); - for (const auto & nested_value : list_value) - insertValue(nested_column, nested_type, column_name, nested_value, enum_comparing_mode); - break; - } - case capnp::DynamicValue::Type::STRUCT: - { - auto struct_value = value.as(); - if (column_type->isNullable()) - { - auto & nullable_column = assert_cast(column); - auto field = *kj::_::readMaybe(struct_value.which()); - if (field.getType().isVoid()) - nullable_column.insertDefault(); - else - { - auto & nested_column = nullable_column.getNestedColumn(); - auto nested_type = assert_cast(column_type.get())->getNestedType(); - auto nested_value = struct_value.get(field); - insertValue(nested_column, nested_type, column_name, nested_value, enum_comparing_mode); - nullable_column.getNullMapData().push_back(0); - } - } - else if (isTuple(column_type)) - { - auto & tuple_column = assert_cast(column); - const auto * tuple_type = assert_cast(column_type.get()); - bool have_explicit_names = tuple_type->haveExplicitNames(); - auto struct_schema = struct_value.getSchema(); - for (uint32_t i = 0; i != tuple_column.tupleSize(); ++i) - insertValue( - tuple_column.getColumn(i), - tuple_type->getElements()[i], - tuple_type->getElementNames()[i], - struct_value.get(have_explicit_names ? struct_schema.getFieldByName(tuple_type->getElementNames()[i]) : struct_schema.getFields()[i]), - enum_comparing_mode); - } - else if (isMap(column_type)) - { - const auto & map_type = assert_cast(*column_type); - DataTypes key_value_types = {map_type.getKeyType(), map_type.getValueType()}; - Names key_value_names = {"key", "value"}; - auto entries_type = std::make_shared(std::make_shared(key_value_types, key_value_names)); - auto & entries_column = assert_cast(column).getNestedColumn(); - auto entries_field = struct_value.getSchema().getFields()[0]; - insertValue(entries_column, entries_type, column_name, struct_value.get(entries_field), enum_comparing_mode); - } - else - { - /// It can be nested column from Nested type. - auto [field_name, nested_name] = splitCapnProtoFieldName(column_name); - insertValue(column, column_type, nested_name, struct_value.get(nested_name), enum_comparing_mode); - } - break; - } - default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto value type."); - } -} - bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) { if (in->eof()) @@ -298,12 +71,8 @@ bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension { auto array = readMessage(); capnp::FlatArrayMessageReader msg(array); - auto root_reader = msg.getRoot(root); - for (size_t i = 0; i != columns.size(); ++i) - { - auto value = getReaderByColumnName(root_reader, column_names[i]); - insertValue(*columns[i], column_types[i], column_names[i], value, format_settings.capn_proto.enum_comparing_mode); - } + auto root_reader = msg.getRoot(schema); + serializer->readRow(columns, root_reader); } catch (const kj::Exception & e) { @@ -343,7 +112,14 @@ void registerInputFormatCapnProto(FormatFactory & factory) factory.markFormatSupportsSubsetOfColumns("CapnProto"); factory.registerFileExtension("capnp", "CapnProto"); factory.registerAdditionalInfoForSchemaCacheGetter( - "CapnProto", [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); }); + "CapnProto", + [](const FormatSettings & settings) + { + return fmt::format( + "format_schema={}, skip_fields_with_unsupported_types_in_schema_inference={}", + settings.schema.format_schema, + settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference); + }); } void registerCapnProtoSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h index cf23f22b643..06e94da123f 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h +++ b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h @@ -4,7 +4,8 @@ #if USE_CAPNP #include -#include +#include +#include #include #include @@ -33,10 +34,8 @@ private: kj::Array readMessage(); std::shared_ptr parser; - capnp::StructSchema root; - const FormatSettings format_settings; - DataTypes column_types; - Names column_names; + capnp::StructSchema schema; + std::unique_ptr serializer; }; class CapnProtoSchemaReader : public IExternalSchemaReader diff --git a/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.cpp b/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.cpp index 0225680b396..66a7160dd89 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.cpp @@ -1,37 +1,16 @@ #include #if USE_CAPNP -#include +#include #include +#include #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - namespace DB { -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - - CapnProtoOutputStream::CapnProtoOutputStream(WriteBuffer & out_) : out(out_) { } @@ -45,252 +24,25 @@ CapnProtoRowOutputFormat::CapnProtoRowOutputFormat( WriteBuffer & out_, const Block & header_, const FormatSchemaInfo & info, - const FormatSettings & format_settings_) - : IRowOutputFormat(header_, out_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique(out_)), format_settings(format_settings_) + const FormatSettings & format_settings) + : IRowOutputFormat(header_, out_) + , column_names(header_.getNames()) + , column_types(header_.getDataTypes()) + , output_stream(std::make_unique(out_)) { schema = schema_parser.getMessageSchema(info); - checkCapnProtoSchemaStructure(schema, getPort(PortKind::Main).getHeader(), format_settings.capn_proto.enum_comparing_mode); -} - -template -static capnp::DynamicEnum getDynamicEnum( - const ColumnPtr & column, - const DataTypePtr & data_type, - size_t row_num, - const capnp::EnumSchema & enum_schema, - FormatSettings::EnumComparingMode mode) -{ - const auto * enum_data_type = assert_cast *>(data_type.get()); - EnumValue enum_value = column->getInt(row_num); - if (mode == FormatSettings::EnumComparingMode::BY_VALUES) - return capnp::DynamicEnum(enum_schema, enum_value); - - auto enum_name = enum_data_type->getNameForValue(enum_value); - for (const auto enumerant : enum_schema.getEnumerants()) - { - if (compareEnumNames(String(enum_name), enumerant.getProto().getName(), mode)) - return capnp::DynamicEnum(enumerant); - } - - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot convert CLickHouse Enum value to CapnProto Enum"); -} - -static capnp::DynamicValue::Builder initStructFieldBuilder(const ColumnPtr & column, size_t row_num, capnp::DynamicStruct::Builder & struct_builder, capnp::StructSchema::Field field) -{ - if (const auto * array_column = checkAndGetColumn(*column)) - { - size_t size = array_column->getOffsets()[row_num] - array_column->getOffsets()[row_num - 1]; - return struct_builder.init(field, static_cast(size)); - } - - if (field.getType().isStruct()) - return struct_builder.init(field); - - return struct_builder.get(field); -} - -static std::optional convertToDynamicValue( - const ColumnPtr & column, - const DataTypePtr & data_type, - size_t row_num, - const String & column_name, - capnp::DynamicValue::Builder builder, - FormatSettings::EnumComparingMode enum_comparing_mode, - std::vector> & temporary_text_data_storage) -{ - /// Here we don't do any types validation, because we did it in CapnProtoRowOutputFormat constructor. - - if (data_type->lowCardinality()) - { - const auto * lc_column = assert_cast(column.get()); - const auto & dict_type = assert_cast(data_type.get())->getDictionaryType(); - size_t index = lc_column->getIndexAt(row_num); - return convertToDynamicValue(lc_column->getDictionary().getNestedColumn(), dict_type, index, column_name, builder, enum_comparing_mode, temporary_text_data_storage); - } - - switch (builder.getType()) - { - case capnp::DynamicValue::Type::INT: - return capnp::DynamicValue::Reader(column->getInt(row_num)); - case capnp::DynamicValue::Type::UINT: - { - /// IPv4 column doesn't support getUInt method. - if (isIPv4(data_type)) - return capnp::DynamicValue::Reader(assert_cast(column.get())->getElement(row_num)); - return capnp::DynamicValue::Reader(column->getUInt(row_num)); - } - case capnp::DynamicValue::Type::BOOL: - return capnp::DynamicValue::Reader(column->getBool(row_num)); - case capnp::DynamicValue::Type::FLOAT: - return capnp::DynamicValue::Reader(column->getFloat64(row_num)); - case capnp::DynamicValue::Type::ENUM: - { - auto enum_schema = builder.as().getSchema(); - if (data_type->getTypeId() == TypeIndex::Enum8) - return capnp::DynamicValue::Reader( - getDynamicEnum(column, data_type, row_num, enum_schema, enum_comparing_mode)); - return capnp::DynamicValue::Reader( - getDynamicEnum(column, data_type, row_num, enum_schema, enum_comparing_mode)); - } - case capnp::DynamicValue::Type::DATA: - { - auto data = column->getDataAt(row_num); - return capnp::DynamicValue::Reader(capnp::Data::Reader(reinterpret_cast(data.data), data.size)); - } - case capnp::DynamicValue::Type::TEXT: - { - /// In TEXT type data should be null-terminated, but ClickHouse String data could not be. - /// To make data null-terminated we should copy it to temporary String object, but - /// capnp::Text::Reader works only with pointer to the data and it's size, so we should - /// guarantee that new String object life time is longer than capnp::Text::Reader life time. - /// To do this we store new String object in a temporary storage, passed in this function - /// by reference. We use unique_ptr instead of just String to avoid pointers - /// invalidation on vector reallocation. - temporary_text_data_storage.push_back(std::make_unique(column->getDataAt(row_num))); - auto & data = temporary_text_data_storage.back(); - return capnp::DynamicValue::Reader(capnp::Text::Reader(data->data(), data->size())); - } - case capnp::DynamicValue::Type::STRUCT: - { - auto struct_builder = builder.as(); - auto nested_struct_schema = struct_builder.getSchema(); - /// Struct can represent Tuple, Nullable (named union with two fields) or single column when it contains one nested column. - if (data_type->isNullable()) - { - const auto * nullable_type = assert_cast(data_type.get()); - const auto * nullable_column = assert_cast(column.get()); - auto fields = nested_struct_schema.getUnionFields(); - if (nullable_column->isNullAt(row_num)) - { - auto null_field = fields[0].getType().isVoid() ? fields[0] : fields[1]; - struct_builder.set(null_field, capnp::Void()); - } - else - { - auto value_field = fields[0].getType().isVoid() ? fields[1] : fields[0]; - struct_builder.clear(value_field); - const auto & nested_column = nullable_column->getNestedColumnPtr(); - auto value_builder = initStructFieldBuilder(nested_column, row_num, struct_builder, value_field); - auto value = convertToDynamicValue(nested_column, nullable_type->getNestedType(), row_num, column_name, value_builder, enum_comparing_mode, temporary_text_data_storage); - if (value) - struct_builder.set(value_field, *value); - } - } - else if (isTuple(data_type)) - { - const auto * tuple_data_type = assert_cast(data_type.get()); - const auto & nested_types = tuple_data_type->getElements(); - const auto & nested_names = tuple_data_type->getElementNames(); - const auto & nested_columns = assert_cast(column.get())->getColumns(); - bool have_explicit_names = tuple_data_type->haveExplicitNames(); - for (uint32_t i = 0; i != nested_names.size(); ++i) - { - capnp::StructSchema::Field nested_field = have_explicit_names ? nested_struct_schema.getFieldByName(nested_names[i]) : nested_struct_schema.getFields()[i]; - auto field_builder = initStructFieldBuilder(nested_columns[i], row_num, struct_builder, nested_field); - auto value = convertToDynamicValue(nested_columns[i], nested_types[i], row_num, nested_names[i], field_builder, enum_comparing_mode, temporary_text_data_storage); - if (value) - struct_builder.set(nested_field, *value); - } - } - else if (isMap(data_type)) - { - /// We output Map type as follow CapnProto schema - /// - /// struct Map { - /// struct Entry { - /// key @0: Key; - /// value @1: Value; - /// } - /// entries @0 :List(Entry); - /// } - /// - /// And we don't need to check that struct have this form here because we checked it before. - const auto & map_type = assert_cast(*data_type); - DataTypes key_value_types = {map_type.getKeyType(), map_type.getValueType()}; - Names key_value_names = {"key", "value"}; - auto entries_type = std::make_shared(std::make_shared(key_value_types, key_value_names)); - - /// Nested column in Map is actually Array(Tuple), so we can output it according to "entries" field schema. - const auto & entries_column = assert_cast(column.get())->getNestedColumnPtr(); - - auto entries_field = nested_struct_schema.getFields()[0]; - auto field_builder = initStructFieldBuilder(entries_column, row_num, struct_builder, entries_field); - auto entries_value = convertToDynamicValue(entries_column, entries_type, row_num, column_name, field_builder, enum_comparing_mode, temporary_text_data_storage); - if (entries_value) - struct_builder.set(entries_field, *entries_value); - } - else - { - /// It can be nested column from Nested type. - auto [field_name, nested_name] = splitCapnProtoFieldName(column_name); - auto nested_field = nested_struct_schema.getFieldByName(nested_name); - auto field_builder = initStructFieldBuilder(column, row_num, struct_builder, nested_field); - auto value = convertToDynamicValue(column, data_type, row_num, nested_name, field_builder, enum_comparing_mode, temporary_text_data_storage); - if (value) - struct_builder.set(nested_field, *value); - } - return std::nullopt; - } - case capnp::DynamicValue::Type::LIST: - { - auto list_builder = builder.as(); - const auto * array_column = assert_cast(column.get()); - const auto & nested_column = array_column->getDataPtr(); - const auto & nested_type = assert_cast(data_type.get())->getNestedType(); - const auto & offsets = array_column->getOffsets(); - auto offset = offsets[row_num - 1]; - size_t size = offsets[row_num] - offset; - - const auto * nested_array_column = checkAndGetColumn(*nested_column); - for (unsigned i = 0; i != static_cast(size); ++i) - { - capnp::DynamicValue::Builder value_builder; - /// For nested arrays we need to initialize nested list builder. - if (nested_array_column) - { - const auto & nested_offset = nested_array_column->getOffsets(); - size_t nested_array_size = nested_offset[offset + i] - nested_offset[offset + i - 1]; - value_builder = list_builder.init(i, static_cast(nested_array_size)); - } - else - value_builder = list_builder[i]; - - auto value = convertToDynamicValue(nested_column, nested_type, offset + i, column_name, value_builder, enum_comparing_mode, temporary_text_data_storage); - if (value) - list_builder.set(i, *value); - } - return std::nullopt; - } - default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto type."); - } + const auto & header = getPort(PortKind::Main).getHeader(); + serializer = std::make_unique(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto); + capnp::MallocMessageBuilder message; } void CapnProtoRowOutputFormat::write(const Columns & columns, size_t row_num) { capnp::MallocMessageBuilder message; - /// Temporary storage for data that will be outputted in fields with CapnProto type TEXT. - /// See comment in convertToDynamicValue() for more details. - std::vector> temporary_text_data_storage; capnp::DynamicStruct::Builder root = message.initRoot(schema); - - /// Some columns can share same field builder. For example when we have - /// column with Nested type that was flattened into several columns. - std::unordered_map field_builders; - for (size_t i = 0; i != columns.size(); ++i) - { - auto [struct_builder, field] = getStructBuilderAndFieldByColumnName(root, column_names[i]); - if (!field_builders.contains(field.getIndex())) - { - auto field_builder = initStructFieldBuilder(columns[i], row_num, struct_builder, field); - field_builders[field.getIndex()] = field_builder; - } - auto value = convertToDynamicValue(columns[i], column_types[i], row_num, column_names[i], field_builders[field.getIndex()], format_settings.capn_proto.enum_comparing_mode, temporary_text_data_storage); - if (value) - struct_builder.set(field, *value); - } - + serializer->writeRow(columns, std::move(root), row_num); capnp::writeMessage(*output_stream, message); + } void registerOutputFormatCapnProto(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h b/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h index 5cc7099d4c7..dd9dcc6b340 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h +++ b/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h @@ -3,15 +3,17 @@ #include "config.h" #if USE_CAPNP -#include -#include -#include -#include -#include -#include +# include +# include +# include +# include +# include +# include +# include namespace DB { + class CapnProtoOutputStream : public kj::OutputStream { public: @@ -43,8 +45,9 @@ private: DataTypes column_types; capnp::StructSchema schema; std::unique_ptr output_stream; - const FormatSettings format_settings; CapnProtoSchemaParser schema_parser; + std::unique_ptr serializer; + }; } diff --git a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp index 9777f2361a2..6098923a195 100644 --- a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp @@ -88,7 +88,14 @@ void registerInputFormatProtobufList(FormatFactory & factory) }); factory.markFormatSupportsSubsetOfColumns("ProtobufList"); factory.registerAdditionalInfoForSchemaCacheGetter( - "ProtobufList", [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); }); + "ProtobufList", + [](const FormatSettings & settings) + { + return fmt::format( + "format_schema={}, skip_fields_with_unsupported_types_in_schema_inference={}", + settings.schema.format_schema, + settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference); + }); } void registerProtobufListSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index ee60501dba5..126f3673571 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -128,7 +128,14 @@ void registerProtobufSchemaReader(FormatFactory & factory) for (const auto & name : {"Protobuf", "ProtobufSingle"}) factory.registerAdditionalInfoForSchemaCacheGetter( - name, [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); }); + name, + [](const FormatSettings & settings) + { + return fmt::format( + "format_schema={}, skip_fields_with_unsupported_types_in_schema_inference={}", + settings.schema.format_schema, + settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference); + }); } } diff --git a/tests/queries/0_stateless/02030_capnp_format.reference b/tests/queries/0_stateless/02030_capnp_format.reference index 2b2307bfc6a..e08b1eb1271 100644 --- a/tests/queries/0_stateless/02030_capnp_format.reference +++ b/tests/queries/0_stateless/02030_capnp_format.reference @@ -12,6 +12,9 @@ \N [NULL,NULL,42] (NULL) 1 [1,NULL,2] (1) \N [NULL,NULL,42] (NULL) +OK +OK +OK one two tHrEe @@ -21,6 +24,14 @@ threE first second third +first +second +third +OK +one +two +tHrEe +OK OK OK OK diff --git a/tests/queries/0_stateless/02030_capnp_format.sh b/tests/queries/0_stateless/02030_capnp_format.sh index c15d6fe442e..b4484ca3766 100755 --- a/tests/queries/0_stateless/02030_capnp_format.sh +++ b/tests/queries/0_stateless/02030_capnp_format.sh @@ -71,16 +71,25 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE capnp_nullable" $CLICKHOUSE_CLIENT --query="SELECT CAST(number, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message'" > $CAPN_PROTO_FILE +$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 4)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 4)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 4)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 1, \'two\' = 2, \'tHrEe\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'oNe\' = 1, \'tWo\' = 2, \'threE\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'second\' = 1, \'third\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" - +$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'second\' = 1, \'third\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'three\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; -$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 1, \'two\' = 2, \'tHrEe\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 1, \'two\' = 2, \'three\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message'" > $CAPN_PROTO_FILE +$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'two\' = 1)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'two\' = 1)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'two\' = 1)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; + + $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_low_cardinality" $CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_low_cardinality (lc1 LowCardinality(String), lc2 LowCardinality(Nullable(String)), lc3 Array(LowCardinality(Nullable(String)))) ENGINE=Memory" $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_low_cardinality VALUES ('one', 'two', ['one', Null, 'two', Null]), ('two', Null, [Null])" @@ -96,8 +105,8 @@ $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a_b U $CLICKHOUSE_CLIENT --query="SELECT number AS a_b, number + 1 AS a_c_d, number + 2 AS a_c_e_f FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_tuples:Message'" > $CAPN_PROTO_FILE $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" -$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(bb UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; -$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(ff UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(bb UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "THERE_IS_NO_COLUMN" && echo 'OK' || echo 'FAIL'; +$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(ff UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "THERE_IS_NO_COLUMN" && echo 'OK' || echo 'FAIL'; $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'string String') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "INCORRECT_DATA" && echo 'OK' || echo 'FAIL'; diff --git a/tests/queries/0_stateless/02735_capnp_case_insensitive_names_matching.reference b/tests/queries/0_stateless/02735_capnp_case_insensitive_names_matching.reference new file mode 100644 index 00000000000..f34c857e2f6 --- /dev/null +++ b/tests/queries/0_stateless/02735_capnp_case_insensitive_names_matching.reference @@ -0,0 +1 @@ +42 (42,42) diff --git a/tests/queries/0_stateless/02735_capnp_case_insensitive_names_matching.sh b/tests/queries/0_stateless/02735_capnp_case_insensitive_names_matching.sh new file mode 100755 index 00000000000..c3835948437 --- /dev/null +++ b/tests/queries/0_stateless/02735_capnp_case_insensitive_names_matching.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-parallel, no-replicated-database + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +SCHEMADIR=$CURDIR/format_schemas +$CLICKHOUSE_LOCAL -q "select 42 as Field1, (42, 42)::Tuple(Field1 UInt32, Field2 UInt32) as Nested format CapnProto settings format_schema='$SCHEMADIR/02735_case_insensitive_names_matching:Message'" | $CLICKHOUSE_LOCAL --input-format CapnProto --structure "Field1 UInt32, Nested Tuple(Field1 UInt32, Field2 UInt32)" -q "select * from table" --format_schema="$SCHEMADIR/02735_case_insensitive_names_matching:Message" + diff --git a/tests/queries/0_stateless/02736_reading_and_writing_structure_fields.reference b/tests/queries/0_stateless/02736_reading_and_writing_structure_fields.reference new file mode 100644 index 00000000000..b6e6d485929 --- /dev/null +++ b/tests/queries/0_stateless/02736_reading_and_writing_structure_fields.reference @@ -0,0 +1,3 @@ +(42,(42,42),[(42,42),(24,24)]) [(42,(42,42),[(42,42),(24,24)]),(24,(24,24),[(24,24),(42,42)])] +42 42 42 +[42,24] [42,24] [42,24] [[42,24],[24,42]] [[42,24],[24,42]] diff --git a/tests/queries/0_stateless/02736_reading_and_writing_structure_fields.sh b/tests/queries/0_stateless/02736_reading_and_writing_structure_fields.sh new file mode 100755 index 00000000000..c669be2ed33 --- /dev/null +++ b/tests/queries/0_stateless/02736_reading_and_writing_structure_fields.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-parallel, no-replicated-database + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +SCHEMADIR=$CURDIR/format_schemas +DATA_FILE=02736_$CLICKHOUSE_TEST_UNIQUE_NAME.bin + +$CLICKHOUSE_LOCAL -q "select tuple(42, tuple(42, 42), [tuple(42, 42), tuple(24, 24)]) as nested, [tuple(42, tuple(42, 42), [tuple(42, 42), tuple(24, 24)]), tuple(24, tuple(24, 24), [tuple(24, 24), tuple(42, 42)])] as nestedList format CapnProto settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" > $DATA_FILE + +$CLICKHOUSE_LOCAL -q "select * from file($DATA_FILE, CapnProto) settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" + +$CLICKHOUSE_LOCAL -q "select 42 as nested_field1, 42 as nested_nested_field1, 42 as nested_nested_field2 format CapnProto settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" > $DATA_FILE + +$CLICKHOUSE_LOCAL -q "select * from file($DATA_FILE, CapnProto, 'nested_field1 UInt32, nested_nested_field1 UInt32, nested_nested_field2 UInt32') settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" + +$CLICKHOUSE_LOCAL -q "select [42, 24] as nestedList_field1, [42, 24] as nestedList_nested_field1, [42, 24] as nestedList_nested_field2, [[42, 24], [24, 42]] as nestedList_nestedList_field1, [[42, 24], [24, 42]] as nestedList_nestedList_field2 format CapnProto settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" > $DATA_FILE + +$CLICKHOUSE_LOCAL -q "select * from file($DATA_FILE, CapnProto, 'nestedList_field1 Array(UInt32), nestedList_nested_field1 Array(UInt32), nestedList_nested_field2 Array(UInt32), nestedList_nestedList_field1 Array(Array(UInt32)), nestedList_nestedList_field2 Array(Array(UInt32))') settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" + +rm $DATA_FILE + diff --git a/tests/queries/0_stateless/format_schemas/02735_case_insensitive_names_matching.capnp b/tests/queries/0_stateless/format_schemas/02735_case_insensitive_names_matching.capnp new file mode 100644 index 00000000000..6b12aab081a --- /dev/null +++ b/tests/queries/0_stateless/format_schemas/02735_case_insensitive_names_matching.capnp @@ -0,0 +1,13 @@ +@0x9ef128e10a8010b8; + +struct Nested +{ + field1 @0 : UInt32; + field2 @1 : UInt32; +} + +struct Message +{ + field1 @0 : UInt32; + nested @1 : Nested; +} diff --git a/tests/queries/0_stateless/format_schemas/02736_nested_structures.capnp b/tests/queries/0_stateless/format_schemas/02736_nested_structures.capnp new file mode 100644 index 00000000000..a03eb27f383 --- /dev/null +++ b/tests/queries/0_stateless/format_schemas/02736_nested_structures.capnp @@ -0,0 +1,21 @@ +@0x9ef128e10a8010b8; + +struct Nested2 +{ + field1 @0 : UInt32; + field2 @1 : UInt32; +} + +struct Nested +{ + field1 @0 : UInt32; + nested @1 : Nested2; + nestedList @2 : List(Nested2); +} + +struct Message +{ + nested @0 : Nested; + nestedList @1 : List(Nested); +} +