Merge pull request #49752 from Avogar/better-capnproto-3

Refactor CapnProto format to improve input/output performance
This commit is contained in:
Kruglov Pavel 2023-06-13 16:20:38 +02:00 committed by GitHub
commit 8fdcd91c38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 2034 additions and 1269 deletions

2
contrib/capnproto vendored

@ -1 +1 @@
Subproject commit dc8b50b999777bcb23c89bb5907c785c3f654441
Subproject commit 976209a6d18074804f60d18ef99b6a809d27dadf

View File

@ -981,7 +981,7 @@ class IColumn;
M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \
M(ORCCompression, output_format_orc_compression_method, "lz4", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
\
M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \
M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \

View File

@ -144,10 +144,10 @@ IMPLEMENT_SETTING_ENUM(TransactionsWaitCSNMode, ErrorCodes::BAD_ARGUMENTS,
{"wait", TransactionsWaitCSNMode::WAIT},
{"wait_unknown", TransactionsWaitCSNMode::WAIT_UNKNOWN}})
IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})
IMPLEMENT_SETTING_ENUM(CapnProtoEnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::CapnProtoEnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::CapnProtoEnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::CapnProtoEnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})
IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
{{"None", FormatSettings::EscapingRule::None},

View File

@ -188,7 +188,7 @@ enum class TransactionsWaitCSNMode
DECLARE_SETTING_ENUM(TransactionsWaitCSNMode)
DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
DECLARE_SETTING_ENUM_WITH_RENAME(CapnProtoEnumComparingMode, FormatSettings::CapnProtoEnumComparingMode)
DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)

View File

@ -0,0 +1,298 @@
#include <Formats/CapnProtoSchema.h>
#if USE_CAPNP
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/IDataType.h>
#include <Common/StringUtils/StringUtils.h>
#include <boost/algorithm/string/join.hpp>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <fcntl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA;
extern const int BAD_TYPE_OF_FIELD;
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
extern const int CAPN_PROTO_BAD_TYPE;
extern const int BAD_ARGUMENTS;
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
{
capnp::ParsedSchema schema;
try
{
int fd;
KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY)); // NOLINT(bugprone-suspicious-semicolon)
auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd));
schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {});
}
catch (const kj::Exception & e)
{
/// That's not good to determine the type of error by its description, but
/// this is the only way to do it here, because kj doesn't specify the type of error.
auto description = std::string_view(e.getDescription().cStr());
if (description.find("No such file or directory") != String::npos || description.find("no such directory") != String::npos || description.find("no such file") != String::npos)
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath());
if (description.find("Parse error") != String::npos)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine());
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Unknown exception while parsing CapnProto schema: {}, schema dir and file: {}, {}",
description, schema_info.schemaDirectory(), schema_info.schemaPath());
}
auto message_maybe = schema.findNested(schema_info.messageName());
auto * message_schema = kj::_::readMaybe(message_maybe);
if (!message_schema)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA,
"CapnProto schema doesn't contain message with name {}", schema_info.messageName());
return message_schema->asStruct();
}
bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size();
}
bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() == struct_schema.getUnionFields().size();
}
/// Get full name of type for better exception messages.
String getCapnProtoFullTypeName(const capnp::Type & type)
{
static const std::map<capnp::schema::Type::Which, String> capnp_simple_type_names =
{
{capnp::schema::Type::Which::BOOL, "Bool"},
{capnp::schema::Type::Which::VOID, "Void"},
{capnp::schema::Type::Which::INT8, "Int8"},
{capnp::schema::Type::Which::INT16, "Int16"},
{capnp::schema::Type::Which::INT32, "Int32"},
{capnp::schema::Type::Which::INT64, "Int64"},
{capnp::schema::Type::Which::UINT8, "UInt8"},
{capnp::schema::Type::Which::UINT16, "UInt16"},
{capnp::schema::Type::Which::UINT32, "UInt32"},
{capnp::schema::Type::Which::UINT64, "UInt64"},
{capnp::schema::Type::Which::FLOAT32, "Float32"},
{capnp::schema::Type::Which::FLOAT64, "Float64"},
{capnp::schema::Type::Which::TEXT, "Text"},
{capnp::schema::Type::Which::DATA, "Data"},
{capnp::schema::Type::Which::INTERFACE, "Interface"},
{capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"},
};
switch (type.which())
{
case capnp::schema::Type::Which::STRUCT:
{
auto struct_schema = type.asStruct();
auto non_union_fields = struct_schema.getNonUnionFields();
std::vector<String> non_union_field_names;
for (auto nested_field : non_union_fields)
non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
auto union_fields = struct_schema.getUnionFields();
std::vector<String> union_field_names;
for (auto nested_field : union_fields)
union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")";
/// Check if the struct is a named union.
if (non_union_field_names.empty())
return union_name;
String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", ");
/// Check if the struct contains unnamed union.
if (!union_field_names.empty())
type_name += ", " + union_name;
type_name += ")";
return type_name;
}
case capnp::schema::Type::Which::LIST:
return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")";
case capnp::schema::Type::Which::ENUM:
{
auto enum_schema = type.asEnum();
String enum_name = "Enum(";
auto enumerants = enum_schema.getEnumerants();
for (unsigned i = 0; i != enumerants.size(); ++i)
{
enum_name += String(enumerants[i].getProto().getName()) + " = " + std::to_string(enumerants[i].getOrdinal());
if (i + 1 != enumerants.size())
enum_name += ", ";
}
enum_name += ")";
return enum_name;
}
default:
auto it = capnp_simple_type_names.find(type.which());
if (it == capnp_simple_type_names.end())
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type");
return it->second;
}
}
namespace
{
template <typename ValueType>
DataTypePtr getEnumDataTypeFromEnumerants(const capnp::EnumSchema::EnumerantList & enumerants)
{
std::vector<std::pair<String, ValueType>> values;
for (auto enumerant : enumerants)
values.emplace_back(enumerant.getProto().getName(), ValueType(enumerant.getOrdinal()));
return std::make_shared<DataTypeEnum<ValueType>>(std::move(values));
}
DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_schema)
{
auto enumerants = enum_schema.getEnumerants();
if (enumerants.size() < 128)
return getEnumDataTypeFromEnumerants<Int8>(enumerants);
if (enumerants.size() < 32768)
return getEnumDataTypeFromEnumerants<Int16>(enumerants);
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums");
}
DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type, bool skip_unsupported_fields)
{
switch (capnp_type.which())
{
case capnp::schema::Type::INT8:
return std::make_shared<DataTypeInt8>();
case capnp::schema::Type::INT16:
return std::make_shared<DataTypeInt16>();
case capnp::schema::Type::INT32:
return std::make_shared<DataTypeInt32>();
case capnp::schema::Type::INT64:
return std::make_shared<DataTypeInt64>();
case capnp::schema::Type::BOOL: [[fallthrough]];
case capnp::schema::Type::UINT8:
return std::make_shared<DataTypeUInt8>();
case capnp::schema::Type::UINT16:
return std::make_shared<DataTypeUInt16>();
case capnp::schema::Type::UINT32:
return std::make_shared<DataTypeUInt32>();
case capnp::schema::Type::UINT64:
return std::make_shared<DataTypeUInt64>();
case capnp::schema::Type::FLOAT32:
return std::make_shared<DataTypeFloat32>();
case capnp::schema::Type::FLOAT64:
return std::make_shared<DataTypeFloat64>();
case capnp::schema::Type::DATA: [[fallthrough]];
case capnp::schema::Type::TEXT:
return std::make_shared<DataTypeString>();
case capnp::schema::Type::ENUM:
return getEnumDataTypeFromEnumSchema(capnp_type.asEnum());
case capnp::schema::Type::LIST:
{
auto list_schema = capnp_type.asList();
auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType(), skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeArray>(nested_type);
}
case capnp::schema::Type::STRUCT:
{
auto struct_schema = capnp_type.asStruct();
if (struct_schema.getFields().size() == 0)
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Empty messages are not supported");
}
/// Check if it can be Nullable.
if (checkIfStructIsNamedUnion(struct_schema))
{
auto fields = struct_schema.getUnionFields();
if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid()))
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported");
}
auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType();
if (value_type.isStruct() || value_type.isList())
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable");
}
auto nested_type = getDataTypeFromCapnProtoType(value_type, skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeNullable>(nested_type);
}
if (checkIfStructContainsUnnamedUnion(struct_schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");
/// Treat Struct as Tuple.
DataTypes nested_types;
Names nested_names;
for (auto field : struct_schema.getNonUnionFields())
{
auto nested_type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
if (!nested_type)
continue;
nested_names.push_back(field.getProto().getName());
nested_types.push_back(nested_type);
}
if (nested_types.empty())
return nullptr;
return std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names));
}
default:
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type));
}
}
}
}
NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields)
{
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");
NamesAndTypesList names_and_types;
for (auto field : schema.getNonUnionFields())
{
auto name = field.getProto().getName();
auto type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
if (type)
names_and_types.emplace_back(name, type);
}
if (names_and_types.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot convert CapnProto schema to ClickHouse table schema, all fields have unsupported types");
return names_and_types;
}
}
#endif

View File

@ -30,17 +30,14 @@ public:
capnp::StructSchema getMessageSchema(const FormatSchemaInfo & schema_info);
};
std::pair<String, String> splitCapnProtoFieldName(const String & name);
bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema);
bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema);
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode);
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name);
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name);
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode);
/// Get full name of type for better exception messages.
String getCapnProtoFullTypeName(const capnp::Type & type);
NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields);
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,30 @@
#pragma once
#if USE_CAPNP
#include <Core/Block.h>
#include <capnp/dynamic.h>
#include <Formats/FormatSettings.h>
namespace DB
{
class CapnProtoSerializer
{
public:
CapnProtoSerializer(const DataTypes & data_types, const Names & names, const capnp::StructSchema & schema, const FormatSettings::CapnProto & settings);
void writeRow(const Columns & columns, capnp::DynamicStruct::Builder builder, size_t row_num);
void readRow(MutableColumns & columns, capnp::DynamicStruct::Reader & reader);
~CapnProtoSerializer();
private:
class Impl;
std::unique_ptr<Impl> serializer_impl;
};
}
#endif

View File

@ -1,734 +0,0 @@
#include <Formats/CapnProtoUtils.h>
#if USE_CAPNP
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/IDataType.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <fcntl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA;
extern const int THERE_IS_NO_COLUMN;
extern const int BAD_TYPE_OF_FIELD;
extern const int CAPN_PROTO_BAD_CAST;
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_DATA;
extern const int CAPN_PROTO_BAD_TYPE;
extern const int BAD_ARGUMENTS;
}
std::pair<String, String> splitCapnProtoFieldName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {first, second};
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
{
capnp::ParsedSchema schema;
try
{
int fd;
KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY)); // NOLINT(bugprone-suspicious-semicolon)
auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd));
schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {});
}
catch (const kj::Exception & e)
{
/// That's not good to determine the type of error by its description, but
/// this is the only way to do it here, because kj doesn't specify the type of error.
auto description = std::string_view(e.getDescription().cStr());
if (description.find("No such file or directory") != String::npos || description.find("no such directory") != String::npos)
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath());
if (description.find("Parse error") != String::npos)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine());
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Unknown exception while parsing CapnProto schema: {}, schema dir and file: {}, {}",
description, schema_info.schemaDirectory(), schema_info.schemaPath());
}
auto message_maybe = schema.findNested(schema_info.messageName());
auto * message_schema = kj::_::readMaybe(message_maybe);
if (!message_schema)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA,
"CapnProto schema doesn't contain message with name {}", schema_info.messageName());
return message_schema->asStruct();
}
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode)
{
if (mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE)
return boost::algorithm::to_lower_copy(first) == boost::algorithm::to_lower_copy(second);
return first == second;
}
static const std::map<capnp::schema::Type::Which, String> capnp_simple_type_names =
{
{capnp::schema::Type::Which::BOOL, "Bool"},
{capnp::schema::Type::Which::VOID, "Void"},
{capnp::schema::Type::Which::INT8, "Int8"},
{capnp::schema::Type::Which::INT16, "Int16"},
{capnp::schema::Type::Which::INT32, "Int32"},
{capnp::schema::Type::Which::INT64, "Int64"},
{capnp::schema::Type::Which::UINT8, "UInt8"},
{capnp::schema::Type::Which::UINT16, "UInt16"},
{capnp::schema::Type::Which::UINT32, "UInt32"},
{capnp::schema::Type::Which::UINT64, "UInt64"},
{capnp::schema::Type::Which::FLOAT32, "Float32"},
{capnp::schema::Type::Which::FLOAT64, "Float64"},
{capnp::schema::Type::Which::TEXT, "Text"},
{capnp::schema::Type::Which::DATA, "Data"},
{capnp::schema::Type::Which::INTERFACE, "Interface"},
{capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"},
};
static bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size();
}
static bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() == struct_schema.getUnionFields().size();
}
/// Get full name of type for better exception messages.
static String getCapnProtoFullTypeName(const capnp::Type & type)
{
switch (type.which())
{
case capnp::schema::Type::Which::STRUCT:
{
auto struct_schema = type.asStruct();
auto non_union_fields = struct_schema.getNonUnionFields();
std::vector<String> non_union_field_names;
for (auto nested_field : non_union_fields)
non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
auto union_fields = struct_schema.getUnionFields();
std::vector<String> union_field_names;
for (auto nested_field : union_fields)
union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")";
/// Check if the struct is a named union.
if (non_union_field_names.empty())
return union_name;
String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", ");
/// Check if the struct contains unnamed union.
if (!union_field_names.empty())
type_name += ", " + union_name;
type_name += ")";
return type_name;
}
case capnp::schema::Type::Which::LIST:
return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")";
case capnp::schema::Type::Which::ENUM:
{
auto enum_schema = type.asEnum();
String enum_name = "Enum(";
auto enumerants = enum_schema.getEnumerants();
for (unsigned i = 0; i != enumerants.size(); ++i)
{
enum_name += String(enumerants[i].getProto().getName()) + " = " + std::to_string(enumerants[i].getOrdinal());
if (i + 1 != enumerants.size())
enum_name += ", ";
}
enum_name += ")";
return enum_name;
}
default:
auto it = capnp_simple_type_names.find(type.which());
if (it == capnp_simple_type_names.end())
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type");
return it->second;
}
}
template <typename Type>
static bool checkEnums(const capnp::Type & capnp_type, const DataTypePtr column_type, FormatSettings::EnumComparingMode mode, UInt64 max_value, String & error_message)
{
if (!capnp_type.isEnum())
return false;
auto enum_schema = capnp_type.asEnum();
bool to_lower = mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE;
const auto * enum_type = assert_cast<const DataTypeEnum<Type> *>(column_type.get());
const auto & enum_values = dynamic_cast<const EnumValues<Type> &>(*enum_type);
auto enumerants = enum_schema.getEnumerants();
if (mode == FormatSettings::EnumComparingMode::BY_VALUES)
{
/// In CapnProto Enum fields are numbered sequentially starting from zero.
if (enumerants.size() > max_value)
{
error_message += "Enum from CapnProto schema contains values that is out of range for Clickhouse Enum";
return false;
}
auto values = enum_values.getSetOfAllValues();
std::unordered_set<Type> capn_enum_values;
for (auto enumerant : enumerants)
capn_enum_values.insert(Type(enumerant.getOrdinal()));
auto result = values == capn_enum_values;
if (!result)
error_message += "The set of values in Enum from CapnProto schema is different from the set of values in ClickHouse Enum";
return result;
}
auto names = enum_values.getSetOfAllNames(to_lower);
std::unordered_set<String> capn_enum_names;
for (auto enumerant : enumerants)
{
String name = enumerant.getProto().getName();
capn_enum_names.insert(to_lower ? boost::algorithm::to_lower_copy(name) : name);
}
auto result = names == capn_enum_names;
if (!result)
error_message += "The set of names in Enum from CapnProto schema is different from the set of names in ClickHouse Enum";
return result;
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name);
static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name)
{
if (!capnp_type.isStruct())
return false;
/// Check that struct is a named union of type VOID and one arbitrary type.
auto struct_schema = capnp_type.asStruct();
if (!checkIfStructIsNamedUnion(struct_schema))
return false;
auto union_fields = struct_schema.getUnionFields();
if (union_fields.size() != 2)
return false;
auto first = union_fields[0];
auto second = union_fields[1];
auto nested_type = assert_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
if (first.getType().isVoid())
return checkCapnProtoType(second.getType(), nested_type, mode, error_message, column_name);
if (second.getType().isVoid())
return checkCapnProtoType(first.getType(), nested_type, mode, error_message, column_name);
return false;
}
static bool checkTupleType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isStruct())
return false;
auto struct_schema = capnp_type.asStruct();
if (checkIfStructIsNamedUnion(struct_schema))
return false;
if (checkIfStructContainsUnnamedUnion(struct_schema))
{
error_message += "CapnProto struct contains unnamed union";
return false;
}
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
auto nested_types = tuple_data_type->getElements();
if (nested_types.size() != struct_schema.getFields().size())
{
error_message += "Tuple and Struct types have different sizes";
return false;
}
bool have_explicit_names = tuple_data_type->haveExplicitNames();
const auto & nested_names = tuple_data_type->getElementNames();
for (uint32_t i = 0; i != nested_names.size(); ++i)
{
if (have_explicit_names)
{
KJ_IF_MAYBE (field, struct_schema.findFieldByName(nested_names[i]))
{
if (!checkCapnProtoType(field->getType(), nested_types[tuple_data_type->getPositionByName(nested_names[i])], mode, error_message, nested_names[i]))
return false;
}
else
{
error_message += "CapnProto struct doesn't contain a field with name " + nested_names[i];
return false;
}
}
else if (!checkCapnProtoType(struct_schema.getFields()[i].getType(), nested_types[tuple_data_type->getPositionByName(nested_names[i])], mode, error_message, nested_names[i]))
return false;
}
return true;
}
static bool checkArrayType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name)
{
if (!capnp_type.isList())
return false;
auto list_schema = capnp_type.asList();
auto nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
auto [field_name, nested_name] = splitCapnProtoFieldName(column_name);
if (!nested_name.empty() && list_schema.getElementType().isStruct())
{
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(field, struct_schema.findFieldByName(nested_name))
return checkCapnProtoType(field->getType(), nested_type, mode, error_message, nested_name);
error_message += "Element type of List {} doesn't contain field with name " + nested_name;
return false;
}
return checkCapnProtoType(list_schema.getElementType(), nested_type, mode, error_message, column_name);
}
static bool checkMapType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
/// We output/input Map type as follow CapnProto schema
///
/// struct Map {
/// struct Entry {
/// key @0: Key;
/// value @1: Value;
/// }
/// entries @0 :List(Entry);
/// }
if (!capnp_type.isStruct())
return false;
auto struct_schema = capnp_type.asStruct();
if (checkIfStructContainsUnnamedUnion(struct_schema))
{
error_message += "CapnProto struct contains unnamed union";
return false;
}
if (struct_schema.getFields().size() != 1)
{
error_message += "CapnProto struct that represents Map type can contain only one field";
return false;
}
const auto & field_type = struct_schema.getFields()[0].getType();
if (!field_type.isList())
{
error_message += "Field of CapnProto struct that represents Map is not a list";
return false;
}
auto list_element_type = field_type.asList().getElementType();
if (!list_element_type.isStruct())
{
error_message += "Field of CapnProto struct that represents Map is not a list of structs";
return false;
}
auto key_value_struct = list_element_type.asStruct();
if (checkIfStructContainsUnnamedUnion(key_value_struct))
{
error_message += "CapnProto struct contains unnamed union";
return false;
}
if (key_value_struct.getFields().size() != 2)
{
error_message += "Key-value structure for Map struct should have exactly 2 fields";
return false;
}
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
DataTypes types = {map_type.getKeyType(), map_type.getValueType()};
Names names = {"key", "value"};
for (size_t i = 0; i != types.size(); ++i)
{
KJ_IF_MAYBE(field, key_value_struct.findFieldByName(names[i]))
{
if (!checkCapnProtoType(field->getType(), types[i], mode, error_message, names[i]))
return false;
}
else
{
error_message += R"(Key-value structure for Map struct should have exactly 2 fields with names "key" and "value")";
return false;
}
}
return true;
}
static bool isCapnInteger(const capnp::Type & capnp_type)
{
return capnp_type.isInt8() || capnp_type.isUInt8() || capnp_type.isInt16() || capnp_type.isUInt16() || capnp_type.isInt32()
|| capnp_type.isUInt32() || capnp_type.isInt64() || capnp_type.isUInt64();
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
return capnp_type.isBool() || isCapnInteger(capnp_type);
case TypeIndex::Int8: [[fallthrough]];
case TypeIndex::Int16: [[fallthrough]];
case TypeIndex::UInt16: [[fallthrough]];
case TypeIndex::Int32: [[fallthrough]];
case TypeIndex::UInt32: [[fallthrough]];
case TypeIndex::Int64: [[fallthrough]];
case TypeIndex::UInt64:
/// Allow integer conversions durin input/output.
return isCapnInteger(capnp_type);
case TypeIndex::Date:
return capnp_type.isUInt16();
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::IPv4:
return capnp_type.isUInt32();
case TypeIndex::Date32: [[fallthrough]];
case TypeIndex::Decimal32:
return capnp_type.isInt32() || capnp_type.isUInt32();
case TypeIndex::DateTime64: [[fallthrough]];
case TypeIndex::Decimal64:
return capnp_type.isInt64() || capnp_type.isUInt64();
case TypeIndex::Float32:[[fallthrough]];
case TypeIndex::Float64:
/// Allow converting between Float32 and isFloat64
return capnp_type.isFloat32() || capnp_type.isFloat64();
case TypeIndex::Enum8:
return checkEnums<Int8>(capnp_type, data_type, mode, INT8_MAX, error_message);
case TypeIndex::Enum16:
return checkEnums<Int16>(capnp_type, data_type, mode, INT16_MAX, error_message);
case TypeIndex::Int128: [[fallthrough]];
case TypeIndex::UInt128: [[fallthrough]];
case TypeIndex::Int256: [[fallthrough]];
case TypeIndex::UInt256: [[fallthrough]];
case TypeIndex::Decimal128: [[fallthrough]];
case TypeIndex::Decimal256:
return capnp_type.isData();
case TypeIndex::Tuple:
return checkTupleType(capnp_type, data_type, mode, error_message);
case TypeIndex::Nullable:
{
auto result = checkNullableType(capnp_type, data_type, mode, error_message, column_name);
if (!result)
error_message += "Nullable can be represented only as a named union of type Void and nested type";
return result;
}
case TypeIndex::Array:
return checkArrayType(capnp_type, data_type, mode, error_message, column_name);
case TypeIndex::LowCardinality:
return checkCapnProtoType(capnp_type, assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType(), mode, error_message, column_name);
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::IPv6: [[fallthrough]];
case TypeIndex::String:
return capnp_type.isText() || capnp_type.isData();
case TypeIndex::Map:
return checkMapType(capnp_type, data_type, mode, error_message);
default:
return false;
}
}
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name)
{
auto [field_name, nested_name] = splitCapnProtoFieldName(name);
KJ_IF_MAYBE(field, struct_reader.getSchema().findFieldByName(field_name))
{
capnp::DynamicValue::Reader field_reader;
try
{
field_reader = struct_reader.get(*field);
}
catch (const kj::Exception & e)
{
throw Exception(ErrorCodes::INCORRECT_DATA,
"Cannot extract field value from struct by provided schema, error: "
"{} Perhaps the data was generated by another schema", String(e.getDescription().cStr()));
}
if (nested_name.empty())
return field_reader;
/// Support reading Nested as List of Structs.
if (field_reader.getType() == capnp::DynamicValue::LIST)
{
auto list_schema = field->getType().asList();
if (!list_schema.getElementType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name);
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name))
return field_reader;
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name);
}
if (field_reader.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getReaderByColumnName(field_reader.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name)
{
auto [field_name, nested_name] = splitCapnProtoFieldName(name);
KJ_IF_MAYBE(field, struct_builder.getSchema().findFieldByName(field_name))
{
if (nested_name.empty())
return {struct_builder, *field};
auto field_builder = struct_builder.get(*field);
/// Support reading Nested as List of Structs.
if (field_builder.getType() == capnp::DynamicValue::LIST)
{
auto list_schema = field->getType().asList();
if (!list_schema.getElementType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name);
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name))
return {struct_builder, *field};
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name);
}
if (field_builder.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getStructBuilderAndFieldByColumnName(field_builder.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
static std::pair<capnp::StructSchema::Field, String> getFieldByName(const capnp::StructSchema & schema, const String & name)
{
auto [field_name, nested_name] = splitCapnProtoFieldName(name);
KJ_IF_MAYBE(field, schema.findFieldByName(field_name))
{
if (nested_name.empty())
return {*field, name};
/// Support reading Nested as List of Structs.
if (field->getType().isList())
{
auto list_schema = field->getType().asList();
if (!list_schema.getElementType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name);
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name))
return {*field, name};
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name);
}
if (!field->getType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getFieldByName(field->getType().asStruct(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto schema doesn't contain field with name {}", field_name);
}
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode)
{
/// Firstly check that struct doesn't contain unnamed union, because we don't support it.
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Schema contains unnamed union that is not supported");
auto names_and_types = header.getNamesAndTypesList();
String additional_error_message;
for (auto & [name, type] : names_and_types)
{
auto [field, field_name] = getFieldByName(schema, name);
if (!checkCapnProtoType(field.getType(), type, mode, additional_error_message, field_name))
{
auto e = Exception(
ErrorCodes::CAPN_PROTO_BAD_CAST,
"Cannot convert ClickHouse type {} to CapnProto type {}",
type->getName(),
getCapnProtoFullTypeName(field.getType()));
if (!additional_error_message.empty())
e.addMessage(additional_error_message);
throw std::move(e);
}
}
}
template <typename ValueType>
static DataTypePtr getEnumDataTypeFromEnumerants(const capnp::EnumSchema::EnumerantList & enumerants)
{
std::vector<std::pair<String, ValueType>> values;
for (auto enumerant : enumerants)
values.emplace_back(enumerant.getProto().getName(), ValueType(enumerant.getOrdinal()));
return std::make_shared<DataTypeEnum<ValueType>>(std::move(values));
}
static DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_schema)
{
auto enumerants = enum_schema.getEnumerants();
if (enumerants.size() < 128)
return getEnumDataTypeFromEnumerants<Int8>(enumerants);
if (enumerants.size() < 32768)
return getEnumDataTypeFromEnumerants<Int16>(enumerants);
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums");
}
static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type, bool skip_unsupported_fields)
{
switch (capnp_type.which())
{
case capnp::schema::Type::INT8:
return std::make_shared<DataTypeInt8>();
case capnp::schema::Type::INT16:
return std::make_shared<DataTypeInt16>();
case capnp::schema::Type::INT32:
return std::make_shared<DataTypeInt32>();
case capnp::schema::Type::INT64:
return std::make_shared<DataTypeInt64>();
case capnp::schema::Type::BOOL: [[fallthrough]];
case capnp::schema::Type::UINT8:
return std::make_shared<DataTypeUInt8>();
case capnp::schema::Type::UINT16:
return std::make_shared<DataTypeUInt16>();
case capnp::schema::Type::UINT32:
return std::make_shared<DataTypeUInt32>();
case capnp::schema::Type::UINT64:
return std::make_shared<DataTypeUInt64>();
case capnp::schema::Type::FLOAT32:
return std::make_shared<DataTypeFloat32>();
case capnp::schema::Type::FLOAT64:
return std::make_shared<DataTypeFloat64>();
case capnp::schema::Type::DATA: [[fallthrough]];
case capnp::schema::Type::TEXT:
return std::make_shared<DataTypeString>();
case capnp::schema::Type::ENUM:
return getEnumDataTypeFromEnumSchema(capnp_type.asEnum());
case capnp::schema::Type::LIST:
{
auto list_schema = capnp_type.asList();
auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType(), skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeArray>(nested_type);
}
case capnp::schema::Type::STRUCT:
{
auto struct_schema = capnp_type.asStruct();
if (struct_schema.getFields().size() == 0)
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Empty messages are not supported");
}
/// Check if it can be Nullable.
if (checkIfStructIsNamedUnion(struct_schema))
{
auto fields = struct_schema.getUnionFields();
if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid()))
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported");
}
auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType();
if (value_type.isStruct() || value_type.isList())
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable");
}
auto nested_type = getDataTypeFromCapnProtoType(value_type, skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeNullable>(nested_type);
}
if (checkIfStructContainsUnnamedUnion(struct_schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");
/// Treat Struct as Tuple.
DataTypes nested_types;
Names nested_names;
for (auto field : struct_schema.getNonUnionFields())
{
auto nested_type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
if (!nested_type)
continue;
nested_names.push_back(field.getProto().getName());
nested_types.push_back(nested_type);
}
if (nested_types.empty())
return nullptr;
return std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names));
}
default:
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type));
}
}
}
NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields)
{
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");
NamesAndTypesList names_and_types;
for (auto field : schema.getNonUnionFields())
{
auto name = field.getProto().getName();
auto type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
if (type)
names_and_types.emplace_back(name, type);
}
if (names_and_types.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot convert CapnProto schema to ClickHouse table schema, all fields have unsupported types");
return names_and_types;
}
}
#endif

View File

@ -327,16 +327,16 @@ struct FormatSettings
/// For capnProto format we should determine how to
/// compare ClickHouse Enum and Enum from schema.
enum class EnumComparingMode
enum class CapnProtoEnumComparingMode
{
BY_NAMES, // Names in enums should be the same, values can be different.
BY_NAMES_CASE_INSENSITIVE, // Case-insensitive name comparison.
BY_VALUES, // Values should be the same, names can be different.
};
struct
struct CapnProto
{
EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES;
CapnProtoEnumComparingMode enum_comparing_mode = CapnProtoEnumComparingMode::BY_VALUES;
bool skip_fields_with_unsupported_types_in_schema_inference = false;
} capn_proto;

View File

@ -9,42 +9,22 @@
#include <capnp/dynamic.h>
#include <capnp/common.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnMap.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
}
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_)
: IRowInputFormat(std::move(header), in_, std::move(params_))
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings)
: IRowInputFormat(std::move(header_), in_, std::move(params_))
, parser(std::make_shared<CapnProtoSchemaParser>())
, format_settings(format_settings_)
, column_types(getPort().getHeader().getDataTypes())
, column_names(getPort().getHeader().getNames())
{
// Parse the schema and fetch the root object
root = parser->getMessageSchema(info);
checkCapnProtoSchemaStructure(root, getPort().getHeader(), format_settings.capn_proto.enum_comparing_mode);
schema = parser->getMessageSchema(info);
const auto & header = getPort().getHeader();
serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto);
}
kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
@ -82,213 +62,6 @@ kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
return msg;
}
static void insertInteger(IColumn & column, const DataTypePtr & column_type, UInt64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::Int8:
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
case TypeIndex::UInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(value);
break;
case TypeIndex::Int16:
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(value);
break;
case TypeIndex::Int32:
assert_cast<ColumnInt32 &>(column).insertValue(static_cast<Int32>(value));
break;
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value));
break;
case TypeIndex::IPv4:
assert_cast<ColumnIPv4 &>(column).insertValue(IPv4(static_cast<UInt32>(value)));
break;
case TypeIndex::Int64:
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
case TypeIndex::UInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
case TypeIndex::DateTime64:
assert_cast<ColumnDecimal<DateTime64> &>(column).insertValue(value);
break;
case TypeIndex::Decimal32:
assert_cast<ColumnDecimal<Decimal32> &>(column).insertValue(static_cast<Int32>(value));
break;
case TypeIndex::Decimal64:
assert_cast<ColumnDecimal<Decimal64> &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type {} cannot be parsed from integer", column_type->getName());
}
}
static void insertFloat(IColumn & column, const DataTypePtr & column_type, Float64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::Float32:
assert_cast<ColumnFloat32 &>(column).insertValue(static_cast<Float32>(value));
break;
case TypeIndex::Float64:
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not a float.");
}
}
template <typename Value>
static void insertData(IColumn & column, const DataTypePtr & column_type, Value value)
{
if (column_type->haveMaximumSizeOfValue() && value.size() != column_type->getSizeOfValueInMemory())
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected size of {} value: {}", column_type->getName(), value.size());
column.insertData(reinterpret_cast<const char *>(value.begin()), value.size());
}
template <typename ValueType>
static void insertEnum(IColumn & column, const DataTypePtr & column_type, const capnp::DynamicEnum & enum_value, FormatSettings::EnumComparingMode enum_comparing_mode)
{
auto enumerant = *kj::_::readMaybe(enum_value.getEnumerant());
auto enum_type = assert_cast<const DataTypeEnum<ValueType> *>(column_type.get());
DataTypePtr nested_type = std::make_shared<DataTypeNumber<ValueType>>();
switch (enum_comparing_mode)
{
case FormatSettings::EnumComparingMode::BY_VALUES:
insertInteger(column, nested_type, Int64(enumerant.getOrdinal()));
return;
case FormatSettings::EnumComparingMode::BY_NAMES:
insertInteger(column, nested_type, Int64(enum_type->getValue(String(enumerant.getProto().getName()))));
return;
case FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE:
{
/// Find the same enum name case insensitive.
String enum_name = enumerant.getProto().getName();
for (auto & name : enum_type->getAllRegisteredNames())
{
if (compareEnumNames(name, enum_name, enum_comparing_mode))
{
insertInteger(column, nested_type, Int64(enum_type->getValue(name)));
break;
}
}
}
}
}
static void insertValue(IColumn & column, const DataTypePtr & column_type, const String & column_name, const capnp::DynamicValue::Reader & value, FormatSettings::EnumComparingMode enum_comparing_mode)
{
if (column_type->lowCardinality())
{
auto & lc_column = assert_cast<ColumnLowCardinality &>(column);
auto tmp_column = lc_column.getDictionary().getNestedColumn()->cloneEmpty();
auto dict_type = assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType();
insertValue(*tmp_column, dict_type, column_name, value, enum_comparing_mode);
lc_column.insertFromFullColumn(*tmp_column, 0);
return;
}
switch (value.getType())
{
case capnp::DynamicValue::Type::INT:
insertInteger(column, column_type, value.as<Int64>());
break;
case capnp::DynamicValue::Type::UINT:
insertInteger(column, column_type, value.as<UInt64>());
break;
case capnp::DynamicValue::Type::FLOAT:
insertFloat(column, column_type, value.as<Float64>());
break;
case capnp::DynamicValue::Type::BOOL:
insertInteger(column, column_type, UInt64(value.as<bool>()));
break;
case capnp::DynamicValue::Type::DATA:
insertData(column, column_type, value.as<capnp::Data>());
break;
case capnp::DynamicValue::Type::TEXT:
insertData(column, column_type, value.as<capnp::Text>());
break;
case capnp::DynamicValue::Type::ENUM:
if (column_type->getTypeId() == TypeIndex::Enum8)
insertEnum<Int8>(column, column_type, value.as<capnp::DynamicEnum>(), enum_comparing_mode);
else
insertEnum<Int16>(column, column_type, value.as<capnp::DynamicEnum>(), enum_comparing_mode);
break;
case capnp::DynamicValue::LIST:
{
auto list_value = value.as<capnp::DynamicList>();
auto & column_array = assert_cast<ColumnArray &>(column);
auto & offsets = column_array.getOffsets();
offsets.push_back(offsets.back() + list_value.size());
auto & nested_column = column_array.getData();
auto nested_type = assert_cast<const DataTypeArray *>(column_type.get())->getNestedType();
for (const auto & nested_value : list_value)
insertValue(nested_column, nested_type, column_name, nested_value, enum_comparing_mode);
break;
}
case capnp::DynamicValue::Type::STRUCT:
{
auto struct_value = value.as<capnp::DynamicStruct>();
if (column_type->isNullable())
{
auto & nullable_column = assert_cast<ColumnNullable &>(column);
auto field = *kj::_::readMaybe(struct_value.which());
if (field.getType().isVoid())
nullable_column.insertDefault();
else
{
auto & nested_column = nullable_column.getNestedColumn();
auto nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
auto nested_value = struct_value.get(field);
insertValue(nested_column, nested_type, column_name, nested_value, enum_comparing_mode);
nullable_column.getNullMapData().push_back(0);
}
}
else if (isTuple(column_type))
{
auto & tuple_column = assert_cast<ColumnTuple &>(column);
const auto * tuple_type = assert_cast<const DataTypeTuple *>(column_type.get());
bool have_explicit_names = tuple_type->haveExplicitNames();
auto struct_schema = struct_value.getSchema();
for (uint32_t i = 0; i != tuple_column.tupleSize(); ++i)
insertValue(
tuple_column.getColumn(i),
tuple_type->getElements()[i],
tuple_type->getElementNames()[i],
struct_value.get(have_explicit_names ? struct_schema.getFieldByName(tuple_type->getElementNames()[i]) : struct_schema.getFields()[i]),
enum_comparing_mode);
}
else if (isMap(column_type))
{
const auto & map_type = assert_cast<const DataTypeMap &>(*column_type);
DataTypes key_value_types = {map_type.getKeyType(), map_type.getValueType()};
Names key_value_names = {"key", "value"};
auto entries_type = std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(key_value_types, key_value_names));
auto & entries_column = assert_cast<ColumnMap &>(column).getNestedColumn();
auto entries_field = struct_value.getSchema().getFields()[0];
insertValue(entries_column, entries_type, column_name, struct_value.get(entries_field), enum_comparing_mode);
}
else
{
/// It can be nested column from Nested type.
auto [field_name, nested_name] = splitCapnProtoFieldName(column_name);
insertValue(column, column_type, nested_name, struct_value.get(nested_name), enum_comparing_mode);
}
break;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto value type.");
}
}
bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in->eof())
@ -298,12 +71,8 @@ bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
{
auto array = readMessage();
capnp::FlatArrayMessageReader msg(array);
auto root_reader = msg.getRoot<capnp::DynamicStruct>(root);
for (size_t i = 0; i != columns.size(); ++i)
{
auto value = getReaderByColumnName(root_reader, column_names[i]);
insertValue(*columns[i], column_types[i], column_names[i], value, format_settings.capn_proto.enum_comparing_mode);
}
auto root_reader = msg.getRoot<capnp::DynamicStruct>(schema);
serializer->readRow(columns, root_reader);
}
catch (const kj::Exception & e)
{
@ -343,7 +112,14 @@ void registerInputFormatCapnProto(FormatFactory & factory)
factory.markFormatSupportsSubsetOfColumns("CapnProto");
factory.registerFileExtension("capnp", "CapnProto");
factory.registerAdditionalInfoForSchemaCacheGetter(
"CapnProto", [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); });
"CapnProto",
[](const FormatSettings & settings)
{
return fmt::format(
"format_schema={}, skip_fields_with_unsupported_types_in_schema_inference={}",
settings.schema.format_schema,
settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference);
});
}
void registerCapnProtoSchemaReader(FormatFactory & factory)

View File

@ -4,7 +4,8 @@
#if USE_CAPNP
#include <Core/Block.h>
#include <Formats/CapnProtoUtils.h>
#include <Formats/CapnProtoSchema.h>
#include <Formats/CapnProtoSerializer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
@ -33,10 +34,8 @@ private:
kj::Array<capnp::word> readMessage();
std::shared_ptr<CapnProtoSchemaParser> parser;
capnp::StructSchema root;
const FormatSettings format_settings;
DataTypes column_types;
Names column_names;
capnp::StructSchema schema;
std::unique_ptr<CapnProtoSerializer> serializer;
};
class CapnProtoSchemaReader : public IExternalSchemaReader

View File

@ -1,37 +1,16 @@
#include <Processors/Formats/Impl/CapnProtoRowOutputFormat.h>
#if USE_CAPNP
#include <Formats/CapnProtoUtils.h>
#include <Formats/CapnProtoSchema.h>
#include <Formats/FormatSettings.h>
#include <Formats/CapnProtoSerializer.h>
#include <IO/WriteBuffer.h>
#include <capnp/dynamic.h>
#include <capnp/serialize-packed.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnMap.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
CapnProtoOutputStream::CapnProtoOutputStream(WriteBuffer & out_) : out(out_)
{
}
@ -45,252 +24,25 @@ CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
const FormatSettings & format_settings)
: IRowOutputFormat(header_, out_)
, column_names(header_.getNames())
, column_types(header_.getDataTypes())
, output_stream(std::make_unique<CapnProtoOutputStream>(out_))
{
schema = schema_parser.getMessageSchema(info);
checkCapnProtoSchemaStructure(schema, getPort(PortKind::Main).getHeader(), format_settings.capn_proto.enum_comparing_mode);
}
template <typename EnumValue>
static capnp::DynamicEnum getDynamicEnum(
const ColumnPtr & column,
const DataTypePtr & data_type,
size_t row_num,
const capnp::EnumSchema & enum_schema,
FormatSettings::EnumComparingMode mode)
{
const auto * enum_data_type = assert_cast<const DataTypeEnum<EnumValue> *>(data_type.get());
EnumValue enum_value = column->getInt(row_num);
if (mode == FormatSettings::EnumComparingMode::BY_VALUES)
return capnp::DynamicEnum(enum_schema, enum_value);
auto enum_name = enum_data_type->getNameForValue(enum_value);
for (const auto enumerant : enum_schema.getEnumerants())
{
if (compareEnumNames(String(enum_name), enumerant.getProto().getName(), mode))
return capnp::DynamicEnum(enumerant);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot convert CLickHouse Enum value to CapnProto Enum");
}
static capnp::DynamicValue::Builder initStructFieldBuilder(const ColumnPtr & column, size_t row_num, capnp::DynamicStruct::Builder & struct_builder, capnp::StructSchema::Field field)
{
if (const auto * array_column = checkAndGetColumn<ColumnArray>(*column))
{
size_t size = array_column->getOffsets()[row_num] - array_column->getOffsets()[row_num - 1];
return struct_builder.init(field, static_cast<unsigned>(size));
}
if (field.getType().isStruct())
return struct_builder.init(field);
return struct_builder.get(field);
}
static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
const ColumnPtr & column,
const DataTypePtr & data_type,
size_t row_num,
const String & column_name,
capnp::DynamicValue::Builder builder,
FormatSettings::EnumComparingMode enum_comparing_mode,
std::vector<std::unique_ptr<String>> & temporary_text_data_storage)
{
/// Here we don't do any types validation, because we did it in CapnProtoRowOutputFormat constructor.
if (data_type->lowCardinality())
{
const auto * lc_column = assert_cast<const ColumnLowCardinality *>(column.get());
const auto & dict_type = assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType();
size_t index = lc_column->getIndexAt(row_num);
return convertToDynamicValue(lc_column->getDictionary().getNestedColumn(), dict_type, index, column_name, builder, enum_comparing_mode, temporary_text_data_storage);
}
switch (builder.getType())
{
case capnp::DynamicValue::Type::INT:
return capnp::DynamicValue::Reader(column->getInt(row_num));
case capnp::DynamicValue::Type::UINT:
{
/// IPv4 column doesn't support getUInt method.
if (isIPv4(data_type))
return capnp::DynamicValue::Reader(assert_cast<const ColumnIPv4 *>(column.get())->getElement(row_num));
return capnp::DynamicValue::Reader(column->getUInt(row_num));
}
case capnp::DynamicValue::Type::BOOL:
return capnp::DynamicValue::Reader(column->getBool(row_num));
case capnp::DynamicValue::Type::FLOAT:
return capnp::DynamicValue::Reader(column->getFloat64(row_num));
case capnp::DynamicValue::Type::ENUM:
{
auto enum_schema = builder.as<capnp::DynamicEnum>().getSchema();
if (data_type->getTypeId() == TypeIndex::Enum8)
return capnp::DynamicValue::Reader(
getDynamicEnum<Int8>(column, data_type, row_num, enum_schema, enum_comparing_mode));
return capnp::DynamicValue::Reader(
getDynamicEnum<Int16>(column, data_type, row_num, enum_schema, enum_comparing_mode));
}
case capnp::DynamicValue::Type::DATA:
{
auto data = column->getDataAt(row_num);
return capnp::DynamicValue::Reader(capnp::Data::Reader(reinterpret_cast<const kj::byte *>(data.data), data.size));
}
case capnp::DynamicValue::Type::TEXT:
{
/// In TEXT type data should be null-terminated, but ClickHouse String data could not be.
/// To make data null-terminated we should copy it to temporary String object, but
/// capnp::Text::Reader works only with pointer to the data and it's size, so we should
/// guarantee that new String object life time is longer than capnp::Text::Reader life time.
/// To do this we store new String object in a temporary storage, passed in this function
/// by reference. We use unique_ptr<String> instead of just String to avoid pointers
/// invalidation on vector reallocation.
temporary_text_data_storage.push_back(std::make_unique<String>(column->getDataAt(row_num)));
auto & data = temporary_text_data_storage.back();
return capnp::DynamicValue::Reader(capnp::Text::Reader(data->data(), data->size()));
}
case capnp::DynamicValue::Type::STRUCT:
{
auto struct_builder = builder.as<capnp::DynamicStruct>();
auto nested_struct_schema = struct_builder.getSchema();
/// Struct can represent Tuple, Nullable (named union with two fields) or single column when it contains one nested column.
if (data_type->isNullable())
{
const auto * nullable_type = assert_cast<const DataTypeNullable *>(data_type.get());
const auto * nullable_column = assert_cast<const ColumnNullable *>(column.get());
auto fields = nested_struct_schema.getUnionFields();
if (nullable_column->isNullAt(row_num))
{
auto null_field = fields[0].getType().isVoid() ? fields[0] : fields[1];
struct_builder.set(null_field, capnp::Void());
}
else
{
auto value_field = fields[0].getType().isVoid() ? fields[1] : fields[0];
struct_builder.clear(value_field);
const auto & nested_column = nullable_column->getNestedColumnPtr();
auto value_builder = initStructFieldBuilder(nested_column, row_num, struct_builder, value_field);
auto value = convertToDynamicValue(nested_column, nullable_type->getNestedType(), row_num, column_name, value_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(value_field, *value);
}
}
else if (isTuple(data_type))
{
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
const auto & nested_types = tuple_data_type->getElements();
const auto & nested_names = tuple_data_type->getElementNames();
const auto & nested_columns = assert_cast<const ColumnTuple *>(column.get())->getColumns();
bool have_explicit_names = tuple_data_type->haveExplicitNames();
for (uint32_t i = 0; i != nested_names.size(); ++i)
{
capnp::StructSchema::Field nested_field = have_explicit_names ? nested_struct_schema.getFieldByName(nested_names[i]) : nested_struct_schema.getFields()[i];
auto field_builder = initStructFieldBuilder(nested_columns[i], row_num, struct_builder, nested_field);
auto value = convertToDynamicValue(nested_columns[i], nested_types[i], row_num, nested_names[i], field_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(nested_field, *value);
}
}
else if (isMap(data_type))
{
/// We output Map type as follow CapnProto schema
///
/// struct Map {
/// struct Entry {
/// key @0: Key;
/// value @1: Value;
/// }
/// entries @0 :List(Entry);
/// }
///
/// And we don't need to check that struct have this form here because we checked it before.
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
DataTypes key_value_types = {map_type.getKeyType(), map_type.getValueType()};
Names key_value_names = {"key", "value"};
auto entries_type = std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(key_value_types, key_value_names));
/// Nested column in Map is actually Array(Tuple), so we can output it according to "entries" field schema.
const auto & entries_column = assert_cast<const ColumnMap *>(column.get())->getNestedColumnPtr();
auto entries_field = nested_struct_schema.getFields()[0];
auto field_builder = initStructFieldBuilder(entries_column, row_num, struct_builder, entries_field);
auto entries_value = convertToDynamicValue(entries_column, entries_type, row_num, column_name, field_builder, enum_comparing_mode, temporary_text_data_storage);
if (entries_value)
struct_builder.set(entries_field, *entries_value);
}
else
{
/// It can be nested column from Nested type.
auto [field_name, nested_name] = splitCapnProtoFieldName(column_name);
auto nested_field = nested_struct_schema.getFieldByName(nested_name);
auto field_builder = initStructFieldBuilder(column, row_num, struct_builder, nested_field);
auto value = convertToDynamicValue(column, data_type, row_num, nested_name, field_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(nested_field, *value);
}
return std::nullopt;
}
case capnp::DynamicValue::Type::LIST:
{
auto list_builder = builder.as<capnp::DynamicList>();
const auto * array_column = assert_cast<const ColumnArray *>(column.get());
const auto & nested_column = array_column->getDataPtr();
const auto & nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
const auto & offsets = array_column->getOffsets();
auto offset = offsets[row_num - 1];
size_t size = offsets[row_num] - offset;
const auto * nested_array_column = checkAndGetColumn<ColumnArray>(*nested_column);
for (unsigned i = 0; i != static_cast<unsigned>(size); ++i)
{
capnp::DynamicValue::Builder value_builder;
/// For nested arrays we need to initialize nested list builder.
if (nested_array_column)
{
const auto & nested_offset = nested_array_column->getOffsets();
size_t nested_array_size = nested_offset[offset + i] - nested_offset[offset + i - 1];
value_builder = list_builder.init(i, static_cast<unsigned>(nested_array_size));
}
else
value_builder = list_builder[i];
auto value = convertToDynamicValue(nested_column, nested_type, offset + i, column_name, value_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
list_builder.set(i, *value);
}
return std::nullopt;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto type.");
}
const auto & header = getPort(PortKind::Main).getHeader();
serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto);
capnp::MallocMessageBuilder message;
}
void CapnProtoRowOutputFormat::write(const Columns & columns, size_t row_num)
{
capnp::MallocMessageBuilder message;
/// Temporary storage for data that will be outputted in fields with CapnProto type TEXT.
/// See comment in convertToDynamicValue() for more details.
std::vector<std::unique_ptr<String>> temporary_text_data_storage;
capnp::DynamicStruct::Builder root = message.initRoot<capnp::DynamicStruct>(schema);
/// Some columns can share same field builder. For example when we have
/// column with Nested type that was flattened into several columns.
std::unordered_map<size_t, capnp::DynamicValue::Builder> field_builders;
for (size_t i = 0; i != columns.size(); ++i)
{
auto [struct_builder, field] = getStructBuilderAndFieldByColumnName(root, column_names[i]);
if (!field_builders.contains(field.getIndex()))
{
auto field_builder = initStructFieldBuilder(columns[i], row_num, struct_builder, field);
field_builders[field.getIndex()] = field_builder;
}
auto value = convertToDynamicValue(columns[i], column_types[i], row_num, column_names[i], field_builders[field.getIndex()], format_settings.capn_proto.enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(field, *value);
}
serializer->writeRow(columns, std::move(root), row_num);
capnp::writeMessage(*output_stream, message);
}
void registerOutputFormatCapnProto(FormatFactory & factory)

View File

@ -3,15 +3,17 @@
#include "config.h"
#if USE_CAPNP
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/CapnProtoUtils.h>
#include <capnp/schema.h>
#include <capnp/dynamic.h>
#include <kj/io.h>
# include <Formats/CapnProtoSchema.h>
# include <Formats/CapnProtoSerializer.h>
# include <Formats/FormatSchemaInfo.h>
# include <Processors/Formats/IRowOutputFormat.h>
# include <capnp/dynamic.h>
# include <capnp/schema.h>
# include <kj/io.h>
namespace DB
{
class CapnProtoOutputStream : public kj::OutputStream
{
public:
@ -43,8 +45,9 @@ private:
DataTypes column_types;
capnp::StructSchema schema;
std::unique_ptr<CapnProtoOutputStream> output_stream;
const FormatSettings format_settings;
CapnProtoSchemaParser schema_parser;
std::unique_ptr<CapnProtoSerializer> serializer;
};
}

View File

@ -88,7 +88,14 @@ void registerInputFormatProtobufList(FormatFactory & factory)
});
factory.markFormatSupportsSubsetOfColumns("ProtobufList");
factory.registerAdditionalInfoForSchemaCacheGetter(
"ProtobufList", [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); });
"ProtobufList",
[](const FormatSettings & settings)
{
return fmt::format(
"format_schema={}, skip_fields_with_unsupported_types_in_schema_inference={}",
settings.schema.format_schema,
settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference);
});
}
void registerProtobufListSchemaReader(FormatFactory & factory)

View File

@ -128,7 +128,14 @@ void registerProtobufSchemaReader(FormatFactory & factory)
for (const auto & name : {"Protobuf", "ProtobufSingle"})
factory.registerAdditionalInfoForSchemaCacheGetter(
name, [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); });
name,
[](const FormatSettings & settings)
{
return fmt::format(
"format_schema={}, skip_fields_with_unsupported_types_in_schema_inference={}",
settings.schema.format_schema,
settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference);
});
}
}

View File

@ -12,6 +12,9 @@
\N [NULL,NULL,42] (NULL)
1 [1,NULL,2] (1)
\N [NULL,NULL,42] (NULL)
OK
OK
OK
one
two
tHrEe
@ -21,6 +24,14 @@ threE
first
second
third
first
second
third
OK
one
two
tHrEe
OK
OK
OK
OK

View File

@ -71,16 +71,25 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE capnp_nullable"
$CLICKHOUSE_CLIENT --query="SELECT CAST(number, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message'" > $CAPN_PROTO_FILE
$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 4)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 4)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 4)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 1, \'two\' = 2, \'tHrEe\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'oNe\' = 1, \'tWo\' = 2, \'threE\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'second\' = 1, \'third\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'second\' = 1, \'third\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'three\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 1, \'two\' = 2, \'tHrEe\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 1, \'two\' = 2, \'three\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT CAST(number % 2, 'Enum(\'one\' = 0, \'two\' = 1)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message'" > $CAPN_PROTO_FILE
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'two\' = 1)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'two\' = 1)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'two\' = 1)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_low_cardinality"
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_low_cardinality (lc1 LowCardinality(String), lc2 LowCardinality(Nullable(String)), lc3 Array(LowCardinality(Nullable(String)))) ENGINE=Memory"
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_low_cardinality VALUES ('one', 'two', ['one', Null, 'two', Null]), ('two', Null, [Null])"
@ -96,8 +105,8 @@ $CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a_b U
$CLICKHOUSE_CLIENT --query="SELECT number AS a_b, number + 1 AS a_c_d, number + 2 AS a_c_e_f FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_tuples:Message'" > $CAPN_PROTO_FILE
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(bb UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(ff UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(bb UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "THERE_IS_NO_COLUMN" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(ff UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "THERE_IS_NO_COLUMN" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'string String') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "INCORRECT_DATA" && echo 'OK' || echo 'FAIL';

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-replicated-database
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
SCHEMADIR=$CURDIR/format_schemas
$CLICKHOUSE_LOCAL -q "select 42 as Field1, (42, 42)::Tuple(Field1 UInt32, Field2 UInt32) as Nested format CapnProto settings format_schema='$SCHEMADIR/02735_case_insensitive_names_matching:Message'" | $CLICKHOUSE_LOCAL --input-format CapnProto --structure "Field1 UInt32, Nested Tuple(Field1 UInt32, Field2 UInt32)" -q "select * from table" --format_schema="$SCHEMADIR/02735_case_insensitive_names_matching:Message"

View File

@ -0,0 +1,3 @@
(42,(42,42),[(42,42),(24,24)]) [(42,(42,42),[(42,42),(24,24)]),(24,(24,24),[(24,24),(42,42)])]
42 42 42
[42,24] [42,24] [42,24] [[42,24],[24,42]] [[42,24],[24,42]]

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-replicated-database
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
SCHEMADIR=$CURDIR/format_schemas
DATA_FILE=02736_$CLICKHOUSE_TEST_UNIQUE_NAME.bin
$CLICKHOUSE_LOCAL -q "select tuple(42, tuple(42, 42), [tuple(42, 42), tuple(24, 24)]) as nested, [tuple(42, tuple(42, 42), [tuple(42, 42), tuple(24, 24)]), tuple(24, tuple(24, 24), [tuple(24, 24), tuple(42, 42)])] as nestedList format CapnProto settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file($DATA_FILE, CapnProto) settings format_schema='$SCHEMADIR/02736_nested_structures:Message'"
$CLICKHOUSE_LOCAL -q "select 42 as nested_field1, 42 as nested_nested_field1, 42 as nested_nested_field2 format CapnProto settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file($DATA_FILE, CapnProto, 'nested_field1 UInt32, nested_nested_field1 UInt32, nested_nested_field2 UInt32') settings format_schema='$SCHEMADIR/02736_nested_structures:Message'"
$CLICKHOUSE_LOCAL -q "select [42, 24] as nestedList_field1, [42, 24] as nestedList_nested_field1, [42, 24] as nestedList_nested_field2, [[42, 24], [24, 42]] as nestedList_nestedList_field1, [[42, 24], [24, 42]] as nestedList_nestedList_field2 format CapnProto settings format_schema='$SCHEMADIR/02736_nested_structures:Message'" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file($DATA_FILE, CapnProto, 'nestedList_field1 Array(UInt32), nestedList_nested_field1 Array(UInt32), nestedList_nested_field2 Array(UInt32), nestedList_nestedList_field1 Array(Array(UInt32)), nestedList_nestedList_field2 Array(Array(UInt32))') settings format_schema='$SCHEMADIR/02736_nested_structures:Message'"
rm $DATA_FILE

View File

@ -0,0 +1,13 @@
@0x9ef128e10a8010b8;
struct Nested
{
field1 @0 : UInt32;
field2 @1 : UInt32;
}
struct Message
{
field1 @0 : UInt32;
nested @1 : Nested;
}

View File

@ -0,0 +1,21 @@
@0x9ef128e10a8010b8;
struct Nested2
{
field1 @0 : UInt32;
field2 @1 : UInt32;
}
struct Nested
{
field1 @0 : UInt32;
nested @1 : Nested2;
nestedList @2 : List(Nested2);
}
struct Message
{
nested @0 : Nested;
nestedList @1 : List(Nested);
}