Add CapnProto output format, refactor CapnProto input format

This commit is contained in:
avogar 2021-09-28 15:59:22 +03:00
parent 746964af88
commit ce22f534c4
32 changed files with 1416 additions and 272 deletions

View File

@ -589,6 +589,8 @@
M(619, POSTGRESQL_REPLICATION_INTERNAL_ERROR) \
M(620, QUERY_NOT_ALLOWED) \
M(621, CANNOT_NORMALIZE_STRING) \
M(622, CANNOT_PARSE_CAPN_PROTO_SCHEMA) \
M(623, CAPN_PROTO_BAD_CAST) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -625,7 +625,8 @@ class IColumn;
M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0)\
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -116,4 +116,9 @@ IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS
{{"enable", ShortCircuitFunctionEvaluation::ENABLE},
{"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE},
{"disable", ShortCircuitFunctionEvaluation::DISABLE}})
IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})
}

View File

@ -168,4 +168,6 @@ enum class ShortCircuitFunctionEvaluation
DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)
DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
}

View File

@ -1,4 +1,5 @@
#include <DataTypes/EnumValues.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -82,6 +83,24 @@ Names EnumValues<T>::getAllRegisteredNames() const
return result;
}
template <typename T>
std::unordered_set<String> EnumValues<T>::getSetOfAllNames(bool to_lower) const
{
std::unordered_set<String> result;
for (const auto & value : values)
result.insert(to_lower ? boost::algorithm::to_lower_copy(value.first) : value.first);
return result;
}
template <typename T>
std::unordered_set<T> EnumValues<T>::getSetOfAllValues() const
{
std::unordered_set<T> result;
for (const auto & value : values)
result.insert(value.second);
return result;
}
template class EnumValues<Int8>;
template class EnumValues<Int16>;

View File

@ -80,6 +80,10 @@ public:
}
Names getAllRegisteredNames() const override;
std::unordered_set<String> getSetOfAllNames(bool to_lower) const;
std::unordered_set<T> getSetOfAllValues() const;
};
}

View File

@ -0,0 +1,406 @@
#include <Formats/CapnProtoUtils.h>
#if USE_CAPNP
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/IDataType.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <fcntl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA;
extern const int THERE_IS_NO_COLUMN;
extern const int BAD_TYPE_OF_FIELD;
extern const int CAPN_PROTO_BAD_CAST;
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
{
capnp::ParsedSchema schema;
try
{
int fd;
KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY));
auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd));
schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {});
}
catch (const kj::Exception & e)
{
/// That's not good to determine the type of error by its description, but
/// this is the only way to do it here, because kj doesn't specify the type of error.
String description = String(e.getDescription().cStr());
if (description.starts_with("no such directory"))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath());
if (description.starts_with("Parse error"))
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine());
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while parsing CapnProro schema: {}, schema dir and file: {}, {}", description, schema_info.schemaDirectory(), schema_info.schemaPath());
}
auto message_maybe = schema.findNested(schema_info.messageName());
auto * message_schema = kj::_::readMaybe(message_maybe);
if (!message_schema)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "CapnProto schema doesn't contain message with name {}", schema_info.messageName());
return message_schema->asStruct();
}
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode)
{
if (mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE)
return boost::algorithm::to_lower_copy(first) == boost::algorithm::to_lower_copy(second);
return first == second;
}
static const std::map<capnp::schema::Type::Which, String> capnp_simple_type_names =
{
{capnp::schema::Type::Which::BOOL, "Bool"},
{capnp::schema::Type::Which::VOID, "Void"},
{capnp::schema::Type::Which::INT8, "Int8"},
{capnp::schema::Type::Which::INT16, "Int16"},
{capnp::schema::Type::Which::INT32, "Int32"},
{capnp::schema::Type::Which::INT64, "Int64"},
{capnp::schema::Type::Which::UINT8, "UInt8"},
{capnp::schema::Type::Which::UINT16, "UInt16"},
{capnp::schema::Type::Which::UINT32, "UInt32"},
{capnp::schema::Type::Which::UINT64, "UInt64"},
{capnp::schema::Type::Which::FLOAT32, "Float32"},
{capnp::schema::Type::Which::FLOAT64, "Float64"},
{capnp::schema::Type::Which::TEXT, "Text"},
{capnp::schema::Type::Which::DATA, "Data"},
{capnp::schema::Type::Which::ENUM, "Enum"},
{capnp::schema::Type::Which::INTERFACE, "Interface"},
{capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"},
};
static bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size();
}
static bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() == struct_schema.getUnionFields().size();
}
/// Get full name of type for better exception messages.
static String getCapnProtoFullTypeName(const capnp::Type & type)
{
if (type.isStruct())
{
auto struct_schema = type.asStruct();
auto non_union_fields = struct_schema.getNonUnionFields();
std::vector<String> non_union_field_names;
for (auto nested_field : non_union_fields)
non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
auto union_fields = struct_schema.getUnionFields();
std::vector<String> union_field_names;
for (auto nested_field : union_fields)
union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")";
/// Check if the struct is a named union.
if (non_union_field_names.empty())
return union_name;
String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", ");
/// Check if the struct contains unnamed union.
if (!union_field_names.empty())
type_name += "," + union_name;
type_name += ")";
return type_name;
}
if (type.isList())
return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")";
if (!capnp_simple_type_names.contains(type.which()))
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type");
return capnp_simple_type_names.at(type.which());
}
template <typename Type>
static bool checkEnums(const capnp::Type & capnp_type, const DataTypePtr column_type, FormatSettings::EnumComparingMode mode, UInt64 max_value, String & error_message)
{
if (!capnp_type.isEnum())
return false;
auto enum_schema = capnp_type.asEnum();
bool to_lower = mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE;
const auto * enum_type = assert_cast<const DataTypeEnum<Type> *>(column_type.get());
const auto & enum_values = dynamic_cast<const EnumValues<Type> &>(*enum_type);
auto names = enum_values.getSetOfAllNames(to_lower);
auto values = enum_values.getSetOfAllValues();
std::unordered_set<String> capn_enum_names;
std::unordered_set<Type> capn_enum_values;
auto enumerants = enum_schema.getEnumerants();
for (auto enumerant : enumerants)
{
String name = enumerant.getProto().getName();
capn_enum_names.insert(to_lower ? boost::algorithm::to_lower_copy(name) : name);
auto value = enumerant.getOrdinal();
if (mode == FormatSettings::EnumComparingMode::BY_VALUES && value > max_value)
{
error_message += "Enum from CapnProto schema contains value that is out of range for Clickhouse Enum";
return false;
}
capn_enum_values.insert(Type(value));
}
if (mode == FormatSettings::EnumComparingMode::BY_NAMES || mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE)
{
auto result = names == capn_enum_names;
if (!result)
error_message += "The set of names in Enum from CapnProto schema is different from the set of names in ClickHouse Enum";
return result;
}
auto result = values == capn_enum_values;
if (!result)
error_message += "The set of values in Enum from CapnProto schema is different from the set of values in ClickHouse Enum";
return result;
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message);
static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isStruct())
return false;
/// Check that struct is a named union of type VOID and one arbitrary type.
auto struct_schema = capnp_type.asStruct();
if (!checkIfStructIsNamedUnion(struct_schema))
return false;
auto union_fields = struct_schema.getUnionFields();
if (union_fields.size() != 2)
return false;
auto first = union_fields[0];
auto second = union_fields[1];
auto nested_type = assert_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
if (first.getType().isVoid())
return checkCapnProtoType(second.getType(), nested_type, mode, error_message);
if (second.getType().isVoid())
return checkCapnProtoType(first.getType(), nested_type, mode, error_message);
return false;
}
static bool checkTupleType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isStruct())
return false;
auto struct_schema = capnp_type.asStruct();
if (checkIfStructIsNamedUnion(struct_schema))
return false;
if (checkIfStructContainsUnnamedUnion(struct_schema))
{
error_message += "CapnProto struct contains unnamed union";
return false;
}
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
auto nested_types = tuple_data_type->getElements();
if (nested_types.size() != struct_schema.getFields().size())
{
error_message += "Tuple and Struct types have different sizes";
return false;
}
if (!tuple_data_type->haveExplicitNames())
{
error_message += "Only named Tuple can be converted to CapnProto Struct";
return false;
}
for (const auto & name : tuple_data_type->getElementNames())
{
KJ_IF_MAYBE(field, struct_schema.findFieldByName(name))
{
if (!checkCapnProtoType(field->getType(), nested_types[tuple_data_type->getPositionByName(name)], mode, error_message))
return false;
}
else
{
error_message += "CapnProto struct doesn't contain a field with name " + name;
return false;
}
}
return true;
}
static bool checkArrayType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isList())
return false;
auto list_schema = capnp_type.asList();
auto nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
return checkCapnProtoType(list_schema.getElementType(), nested_type, mode, error_message);
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
return capnp_type.isBool() || capnp_type.isUInt8();
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
return capnp_type.isUInt16();
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
return capnp_type.isUInt32();
case TypeIndex::UInt64:
return capnp_type.isUInt64();
case TypeIndex::Int8:
return capnp_type.isInt8();
case TypeIndex::Int16:
return capnp_type.isInt16();
case TypeIndex::Date32: [[fallthrough]];
case TypeIndex::Int32:
return capnp_type.isInt32();
case TypeIndex::DateTime64: [[fallthrough]];
case TypeIndex::Int64:
return capnp_type.isInt64();
case TypeIndex::Float32:
return capnp_type.isFloat32();
case TypeIndex::Float64:
return capnp_type.isFloat64();
case TypeIndex::Enum8:
return checkEnums<Int8>(capnp_type, data_type, mode, INT8_MAX, error_message);
case TypeIndex::Enum16:
return checkEnums<Int16>(capnp_type, data_type, mode, INT16_MAX, error_message);
case TypeIndex::Tuple:
return checkTupleType(capnp_type, data_type, mode, error_message);
case TypeIndex::Nullable:
{
auto result = checkNullableType(capnp_type, data_type, mode, error_message);
if (!result)
error_message += "Nullable can be represented only as a named union of type Void and nested type";
return result;
}
case TypeIndex::Array:
return checkArrayType(capnp_type, data_type, mode, error_message);
case TypeIndex::LowCardinality:
return checkCapnProtoType(capnp_type, assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType(), mode, error_message);
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
return capnp_type.isText() || capnp_type.isData();
default:
return false;
}
}
static std::pair<String, String> splitFieldName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {first, second};
}
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, struct_reader.getSchema().findFieldByName(field_name))
{
auto field_reader = struct_reader.get(*field);
if (nested_name.empty())
return field_reader;
if (field_reader.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getReaderByColumnName(field_reader.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, struct_builder.getSchema().findFieldByName(field_name))
{
if (nested_name.empty())
return {struct_builder, *field};
auto field_builder = struct_builder.get(*field);
if (field_builder.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getStructBuilderAndFieldByColumnName(field_builder.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
static capnp::StructSchema::Field getFieldByName(const capnp::StructSchema & schema, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, schema.findFieldByName(field_name))
{
if (nested_name.empty())
return *field;
if (!field->getType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getFieldByName(field->getType().asStruct(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto schema doesn't contain field with name {}", field_name);
}
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode)
{
/// Firstly check that struct doesn't contain unnamed union, because we don't support it.
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Schema contains unnamed union that is not supported");
auto names_and_types = header.getNamesAndTypesList();
String additional_error_message;
for (auto & [name, type] : names_and_types)
{
auto field = getFieldByName(schema, name);
if (!checkCapnProtoType(field.getType(), type, mode, additional_error_message))
{
auto e = Exception(
ErrorCodes::CAPN_PROTO_BAD_CAST,
"Cannot convert ClickHouse type {} to CapnProto type {}",
type->getName(),
getCapnProtoFullTypeName(field.getType()));
if (!additional_error_message.empty())
e.addMessage(additional_error_message);
throw std::move(e);
}
}
}
}
#endif

View File

@ -0,0 +1,43 @@
#pragma once
#include "config_formats.h"
#if USE_CAPNP
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <Core/Block.h>
#include <capnp/schema-parser.h>
#include <capnp/dynamic.h>
namespace DB
{
// Wrapper for classes that could throw in destructor
// https://github.com/capnproto/capnproto/issues/553
template <typename T>
struct DestructorCatcher
{
T impl;
template <typename ... Arg>
DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
~DestructorCatcher() noexcept try { } catch (...) { return; }
};
class CapnProtoSchemaParser : public DestructorCatcher<capnp::SchemaParser>
{
public:
CapnProtoSchemaParser() {}
capnp::StructSchema getMessageSchema(const FormatSchemaInfo & schema_info);
};
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode);
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name);
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name);
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode);
}
#endif

View File

@ -112,6 +112,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -99,4 +99,10 @@ FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String &
}
}
FormatSchemaInfo::FormatSchemaInfo(const FormatSettings & settings, const String & format, bool require_message)
: FormatSchemaInfo(
settings.schema.format_schema, format, require_message, settings.schema.is_server, settings.schema.format_schema_path)
{
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <Formats/FormatSettings.h>
namespace DB
{
@ -11,6 +12,7 @@ class FormatSchemaInfo
{
public:
FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path);
FormatSchemaInfo(const FormatSettings & settings, const String & format, bool require_message);
/// Returns path to the schema file.
const String & schemaPath() const { return schema_path; }

View File

@ -183,6 +183,20 @@ struct FormatSettings
{
bool import_nested = false;
} orc;
/// For apnProto format we should determine how to
/// compare ClickHouse Enum and Enum from schema.
enum class EnumComparingMode
{
BY_NAMES, // Names in enums should be the same, values can be different.
BY_NAMES_CASE_INSENSITIVE, // Case-insensitive name comparison.
BY_VALUES, // Values should be the same, names can be different.
};
struct
{
EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES;
} capn_proto;
};
}

View File

@ -67,6 +67,7 @@ void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatMySQLWire(FormatFactory & factory);
void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
/// Input only formats.
@ -139,6 +140,7 @@ void registerFormats()
registerOutputFormatMySQLWire(factory);
registerOutputFormatMarkdown(factory);
registerOutputFormatPostgreSQLWire(factory);
registerOutputFormatProcessorsCapnProto(factory);
registerInputFormatRegexp(factory);
registerInputFormatJSONAsString(factory);

View File

@ -1,7 +1,6 @@
#include "CapnProtoRowInputFormat.h"
#if USE_CAPNP
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
@ -9,198 +8,40 @@
#include <capnp/serialize.h>
#include <capnp/dynamic.h>
#include <capnp/common.h>
#include <base/logger_useful.h>
#include <base/find_symbols.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
extern const int THERE_IS_NO_COLUMN;
extern const int LOGICAL_ERROR;
}
static CapnProtoRowInputFormat::NestedField split(const Block & header, size_t i)
{
CapnProtoRowInputFormat::NestedField field = {{}, i};
// Remove leading dot in field definition, e.g. ".msg" -> "msg"
String name(header.safeGetByPosition(i).name);
if (!name.empty() && name[0] == '.')
name.erase(0, 1);
splitInto<'.', '_'>(field.tokens, name);
return field;
}
static Field convertNodeToField(const capnp::DynamicValue::Reader & value)
{
switch (value.getType())
{
case capnp::DynamicValue::UNKNOWN:
throw Exception("Unknown field type", ErrorCodes::BAD_TYPE_OF_FIELD);
case capnp::DynamicValue::VOID:
return Field();
case capnp::DynamicValue::BOOL:
return value.as<bool>() ? 1u : 0u;
case capnp::DynamicValue::INT:
return value.as<int64_t>();
case capnp::DynamicValue::UINT:
return value.as<uint64_t>();
case capnp::DynamicValue::FLOAT:
return value.as<double>();
case capnp::DynamicValue::TEXT:
{
auto arr = value.as<capnp::Text>();
return String(arr.begin(), arr.size());
}
case capnp::DynamicValue::DATA:
{
auto arr = value.as<capnp::Data>().asChars();
return String(arr.begin(), arr.size());
}
case capnp::DynamicValue::LIST:
{
auto list_value = value.as<capnp::DynamicList>();
Array res(list_value.size());
for (auto i : kj::indices(list_value))
res[i] = convertNodeToField(list_value[i]);
return res;
}
case capnp::DynamicValue::ENUM:
return value.as<capnp::DynamicEnum>().getRaw();
case capnp::DynamicValue::STRUCT:
{
auto struct_value = value.as<capnp::DynamicStruct>();
const auto & fields = struct_value.getSchema().getFields();
Tuple tuple(fields.size());
for (auto i : kj::indices(fields))
tuple[i] = convertNodeToField(struct_value.get(fields[i]));
return tuple;
}
case capnp::DynamicValue::CAPABILITY:
throw Exception("CAPABILITY type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
case capnp::DynamicValue::ANY_POINTER:
throw Exception("ANY_POINTER type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
}
return Field();
}
static capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
{
KJ_IF_MAYBE(child, node.findFieldByName(field))
return *child;
else
throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN);
}
void CapnProtoRowInputFormat::createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader)
{
/// Columns in a table can map to fields in Cap'n'Proto or to structs.
/// Store common parents and their tokens in order to backtrack.
std::vector<capnp::StructSchema::Field> parents;
std::vector<std::string> parent_tokens;
capnp::StructSchema cur_reader = reader;
for (const auto & field : sorted_fields)
{
if (field.tokens.empty())
throw Exception("Logical error in CapnProtoRowInputFormat", ErrorCodes::LOGICAL_ERROR);
// Backtrack to common parent
while (field.tokens.size() < parent_tokens.size() + 1
|| !std::equal(parent_tokens.begin(), parent_tokens.end(), field.tokens.begin()))
{
actions.push_back({Action::POP});
parents.pop_back();
parent_tokens.pop_back();
if (parents.empty())
{
cur_reader = reader;
break;
}
else
cur_reader = parents.back().getType().asStruct();
}
// Go forward
while (parent_tokens.size() + 1 < field.tokens.size())
{
const auto & token = field.tokens[parents.size()];
auto node = getFieldOrThrow(cur_reader, token);
if (node.getType().isStruct())
{
// Descend to field structure
parents.emplace_back(node);
parent_tokens.emplace_back(token);
cur_reader = node.getType().asStruct();
actions.push_back({Action::PUSH, node});
}
else if (node.getType().isList())
{
break; // Collect list
}
else
throw Exception("Field " + token + " is neither Struct nor List", ErrorCodes::BAD_TYPE_OF_FIELD);
}
// Read field from the structure
auto node = getFieldOrThrow(cur_reader, field.tokens[parents.size()]);
if (node.getType().isList() && !actions.empty() && actions.back().field == node)
{
// The field list here flattens Nested elements into multiple arrays
// In order to map Nested types in Cap'nProto back, they need to be collected
// Since the field names are sorted, the order of field positions must be preserved
// For example, if the fields are { b @0 :Text, a @1 :Text }, the `a` would come first
// even though it's position is second.
auto & columns = actions.back().columns;
auto it = std::upper_bound(columns.cbegin(), columns.cend(), field.pos);
columns.insert(it, field.pos);
}
else
{
actions.push_back({Action::READ, node, {field.pos}});
}
}
}
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info)
: IRowInputFormat(std::move(header), in_, std::move(params_)), parser(std::make_shared<SchemaParser>())
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_)
: IRowInputFormat(std::move(header), in_, std::move(params_))
, parser(std::make_shared<CapnProtoSchemaParser>())
, format_settings(format_settings_)
, column_types(getPort().getHeader().getDataTypes())
, column_names(getPort().getHeader().getNames())
{
// Parse the schema and fetch the root object
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
auto schema = parser->impl.parseDiskFile(info.schemaPath(), info.absoluteSchemaPath(), {});
#pragma GCC diagnostic pop
root = schema.getNested(info.messageName()).asStruct();
/**
* The schema typically consists of fields in various nested structures.
* Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent,
* and the nesting level doesn't decrease to make traversal easier.
*/
const auto & sample = getPort().getHeader();
NestedFieldList list;
size_t num_columns = sample.columns();
for (size_t i = 0; i < num_columns; ++i)
list.push_back(split(sample, i));
// Order list first by value of strings then by length of string vector.
std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) { return a.tokens < b.tokens; });
createActions(list, root);
root = parser->getMessageSchema(info);
checkCapnProtoSchemaStructure(root, getPort().getHeader(), format_settings.capn_proto.enum_comparing_mode);
}
kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
@ -233,6 +74,186 @@ kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
return msg;
}
static void insertSignedInteger(IColumn & column, const DataTypePtr & column_type, Int64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::Int8:
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
case TypeIndex::Int16:
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
case TypeIndex::Int32:
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
case TypeIndex::Int64:
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
case TypeIndex::DateTime64:
assert_cast<ColumnDecimal<DateTime64> &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not a signed integer.");
}
}
static void insertUnsignedInteger(IColumn & column, const DataTypePtr & column_type, UInt64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::UInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(value);
break;
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(value);
break;
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(value);
break;
case TypeIndex::UInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not an unsigned integer.");
}
}
static void insertFloat(IColumn & column, const DataTypePtr & column_type, Float64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::Float32:
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
case TypeIndex::Float64:
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not a float.");
}
}
template <typename Value>
static void insertString(IColumn & column, Value value)
{
column.insertData(reinterpret_cast<const char *>(value.begin()), value.size());
}
template <typename ValueType>
static void insertEnum(IColumn & column, const DataTypePtr & column_type, const capnp::DynamicEnum & enum_value, FormatSettings::EnumComparingMode enum_comparing_mode)
{
auto enumerant = *kj::_::readMaybe(enum_value.getEnumerant());
auto enum_type = assert_cast<const DataTypeEnum<ValueType> *>(column_type.get());
DataTypePtr nested_type = std::make_shared<DataTypeNumber<ValueType>>();
if (enum_comparing_mode == FormatSettings::EnumComparingMode::BY_VALUES)
insertSignedInteger(column, nested_type, Int64(enumerant.getOrdinal()));
else if (enum_comparing_mode == FormatSettings::EnumComparingMode::BY_NAMES)
insertSignedInteger(column, nested_type, Int64(enum_type->getValue(String(enumerant.getProto().getName()))));
else
{
/// Find the same enum name case insensitive.
String enum_name = enumerant.getProto().getName();
for (auto & name : enum_type->getAllRegisteredNames())
{
if (compareEnumNames(name, enum_name, enum_comparing_mode))
{
insertSignedInteger(column, nested_type, Int64(enum_type->getValue(name)));
break;
}
}
}
}
static void insertValue(IColumn & column, const DataTypePtr & column_type, const capnp::DynamicValue::Reader & value, FormatSettings::EnumComparingMode enum_comparing_mode)
{
if (column_type->lowCardinality())
{
auto & lc_column = assert_cast<ColumnLowCardinality &>(column);
auto tmp_column = lc_column.getDictionary().getNestedColumn()->cloneEmpty();
auto dict_type = assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType();
insertValue(*tmp_column, dict_type, value, enum_comparing_mode);
lc_column.insertFromFullColumn(*tmp_column, 0);
return;
}
switch (value.getType())
{
case capnp::DynamicValue::Type::INT:
insertSignedInteger(column, column_type, value.as<Int64>());
break;
case capnp::DynamicValue::Type::UINT:
insertUnsignedInteger(column, column_type, value.as<UInt64>());
break;
case capnp::DynamicValue::Type::FLOAT:
insertFloat(column, column_type, value.as<Float64>());
break;
case capnp::DynamicValue::Type::BOOL:
insertUnsignedInteger(column, column_type, UInt64(value.as<bool>()));
break;
case capnp::DynamicValue::Type::DATA:
insertString(column, value.as<capnp::Data>());
break;
case capnp::DynamicValue::Type::TEXT:
insertString(column, value.as<capnp::Text>());
break;
case capnp::DynamicValue::Type::ENUM:
if (column_type->getTypeId() == TypeIndex::Enum8)
insertEnum<Int8>(column, column_type, value.as<capnp::DynamicEnum>(), enum_comparing_mode);
else
insertEnum<Int16>(column, column_type, value.as<capnp::DynamicEnum>(), enum_comparing_mode);
break;
case capnp::DynamicValue::LIST:
{
auto list_value = value.as<capnp::DynamicList>();
auto & column_array = assert_cast<ColumnArray &>(column);
auto & offsets = column_array.getOffsets();
offsets.push_back(offsets.back() + list_value.size());
auto & nested_column = column_array.getData();
auto nested_type = assert_cast<const DataTypeArray *>(column_type.get())->getNestedType();
for (const auto & nested_value : list_value)
insertValue(nested_column, nested_type, nested_value, enum_comparing_mode);
break;
}
case capnp::DynamicValue::Type::STRUCT:
{
auto struct_value = value.as<capnp::DynamicStruct>();
if (column_type->isNullable())
{
auto & nullable_column = assert_cast<ColumnNullable &>(column);
auto field = *kj::_::readMaybe(struct_value.which());
if (field.getType().isVoid())
nullable_column.insertDefault();
else
{
auto & nested_column = nullable_column.getNestedColumn();
auto nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
auto nested_value = struct_value.get(field);
insertValue(nested_column, nested_type, nested_value, enum_comparing_mode);
nullable_column.getNullMapData().push_back(0);
}
}
else
{
auto & tuple_column = assert_cast<ColumnTuple &>(column);
const auto * tuple_type = assert_cast<const DataTypeTuple *>(column_type.get());
for (size_t i = 0; i != tuple_column.tupleSize(); ++i)
insertValue(
tuple_column.getColumn(i),
tuple_type->getElements()[i],
struct_value.get(tuple_type->getElementNames()[i]),
enum_comparing_mode);
}
break;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto value type.");
}
}
bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in->eof())
@ -245,51 +266,12 @@ bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
#else
capnp::FlatArrayMessageReader msg(array);
#endif
std::vector<capnp::DynamicStruct::Reader> stack;
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
for (auto action : actions)
auto root_reader = msg.getRoot<capnp::DynamicStruct>(root);
for (size_t i = 0; i != columns.size(); ++i)
{
switch (action.type)
{
case Action::READ:
{
Field value = convertNodeToField(stack.back().get(action.field));
if (action.columns.size() > 1)
{
// Nested columns must be flattened into several arrays
// e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
const auto & collected = DB::get<const Array &>(value);
size_t size = collected.size();
// The flattened array contains an array of a part of the nested tuple
Array flattened(size);
for (size_t column_index = 0; column_index < action.columns.size(); ++column_index)
{
// Populate array with a single tuple elements
for (size_t off = 0; off < size; ++off)
{
const auto & tuple = DB::get<const Tuple &>(collected[off]);
flattened[off] = tuple[column_index];
}
auto & col = columns[action.columns[column_index]];
col->insert(flattened);
}
}
else
{
auto & col = columns[action.columns[0]];
col->insert(value);
}
break;
}
case Action::POP:
stack.pop_back();
break;
case Action::PUSH:
stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>());
break;
}
auto value = getReaderByColumnName(root_reader, column_names[i]);
insertValue(*columns[i], column_types[i], value, format_settings.capn_proto.enum_comparing_mode);
}
return true;
@ -302,8 +284,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
[](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings)
{
return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "CapnProto", true,
settings.schema.is_server, settings.schema.format_schema_path));
FormatSchemaInfo(settings, "CapnProto", true), settings);
});
}

View File

@ -4,8 +4,8 @@
#if USE_CAPNP
#include <Core/Block.h>
#include <Formats/CapnProtoUtils.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <capnp/schema-parser.h>
namespace DB
{
@ -22,18 +22,7 @@ class ReadBuffer;
class CapnProtoRowInputFormat : public IRowInputFormat
{
public:
struct NestedField
{
std::vector<std::string> tokens;
size_t pos;
};
using NestedFieldList = std::vector<NestedField>;
/** schema_dir - base path for schema files
* schema_file - location of the capnproto schema, e.g. "schema.capnp"
* root_object - name to the root object, e.g. "Message"
*/
CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info);
CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_);
String getName() const override { return "CapnProtoRowInputFormat"; }
@ -42,34 +31,11 @@ public:
private:
kj::Array<capnp::word> readMessage();
// Build a traversal plan from a sorted list of fields
void createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader);
/* Action for state machine for traversing nested structures. */
using BlockPositionList = std::vector<size_t>;
struct Action
{
enum Type { POP, PUSH, READ };
Type type{};
capnp::StructSchema::Field field{};
BlockPositionList columns{};
};
// Wrapper for classes that could throw in destructor
// https://github.com/capnproto/capnproto/issues/553
template <typename T>
struct DestructorCatcher
{
T impl;
template <typename ... Arg>
DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
~DestructorCatcher() noexcept try { } catch (...) { return; }
};
using SchemaParser = DestructorCatcher<capnp::SchemaParser>;
std::shared_ptr<SchemaParser> parser;
std::shared_ptr<CapnProtoSchemaParser> parser;
capnp::StructSchema root;
std::vector<Action> actions;
const FormatSettings format_settings;
DataTypes column_types;
Names column_names;
};
}

View File

@ -0,0 +1,251 @@
#include <Processors/Formats/Impl/CapnProtoRowOutputFormat.h>
#if USE_CAPNP
#include <Formats/CapnProtoUtils.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBuffer.h>
#include <capnp/dynamic.h>
#include <capnp/serialize-packed.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
CapnProtoOutputStream::CapnProtoOutputStream(WriteBuffer & out_) : out(out_)
{
}
void CapnProtoOutputStream::write(const void * buffer, size_t size)
{
out.write(reinterpret_cast<const char *>(buffer), size);
}
CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
{
schema = schema_parser.getMessageSchema(info);
checkCapnProtoSchemaStructure(schema, getPort(PortKind::Main).getHeader(), format_settings.capn_proto.enum_comparing_mode);
}
template <typename EnumValue>
static capnp::DynamicEnum getDynamicEnum(
const ColumnPtr & column,
const DataTypePtr & data_type,
size_t row_num,
const capnp::EnumSchema & enum_schema,
FormatSettings::EnumComparingMode mode)
{
const auto * enum_data_type = assert_cast<const DataTypeEnum<EnumValue> *>(data_type.get());
EnumValue enum_value = column->getInt(row_num);
if (mode == FormatSettings::EnumComparingMode::BY_VALUES)
return capnp::DynamicEnum(enum_schema, enum_value);
auto enum_name = enum_data_type->getNameForValue(enum_value);
for (const auto enumerant : enum_schema.getEnumerants())
{
if (compareEnumNames(String(enum_name), enumerant.getProto().getName(), mode))
return capnp::DynamicEnum(enumerant);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot convert CLickHouse Enum value to CapnProto Enum");
}
static capnp::DynamicValue::Builder initStructFieldBuilder(const ColumnPtr & column, size_t row_num, capnp::DynamicStruct::Builder & struct_builder, capnp::StructSchema::Field field)
{
if (const auto * array_column = checkAndGetColumn<ColumnArray>(*column))
{
size_t size = array_column->getOffsets()[row_num] - array_column->getOffsets()[row_num - 1];
return struct_builder.init(field, size);
}
if (field.getType().isStruct())
return struct_builder.init(field);
return struct_builder.get(field);
}
static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(const ColumnPtr & column, const DataTypePtr & data_type, size_t row_num, capnp::DynamicValue::Builder builder, FormatSettings::EnumComparingMode enum_comparing_mode)
{
/// Here we don't do any types validation, because we did it in CapnProtoRowOutputFormat constructor.
if (data_type->lowCardinality())
{
const auto * lc_column = assert_cast<const ColumnLowCardinality *>(column.get());
const auto & dict_type = assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType();
size_t index = lc_column->getIndexAt(row_num);
return convertToDynamicValue(lc_column->getDictionary().getNestedColumn(), dict_type, index, builder, enum_comparing_mode);
}
switch (builder.getType())
{
case capnp::DynamicValue::Type::INT:
/// We allow output DateTime64 as Int64.
if (WhichDataType(data_type).isDateTime64())
return capnp::DynamicValue::Reader(assert_cast<const ColumnDecimal<DateTime64> *>(column.get())->getElement(row_num));
return capnp::DynamicValue::Reader(column->getInt(row_num));
case capnp::DynamicValue::Type::UINT:
return capnp::DynamicValue::Reader(column->getUInt(row_num));
case capnp::DynamicValue::Type::BOOL:
return capnp::DynamicValue::Reader(column->getBool(row_num));
case capnp::DynamicValue::Type::FLOAT:
return capnp::DynamicValue::Reader(column->getFloat64(row_num));
case capnp::DynamicValue::Type::ENUM:
{
auto enum_schema = builder.as<capnp::DynamicEnum>().getSchema();
if (data_type->getTypeId() == TypeIndex::Enum8)
return capnp::DynamicValue::Reader(
getDynamicEnum<Int8>(column, data_type, row_num, enum_schema, enum_comparing_mode));
return capnp::DynamicValue::Reader(
getDynamicEnum<Int16>(column, data_type, row_num, enum_schema, enum_comparing_mode));
}
case capnp::DynamicValue::Type::DATA:
{
auto data = column->getDataAt(row_num);
return capnp::DynamicValue::Reader(capnp::Data::Reader(reinterpret_cast<const kj::byte *>(data.data), data.size));
}
case capnp::DynamicValue::Type::TEXT:
{
auto data = String(column->getDataAt(row_num));
return capnp::DynamicValue::Reader(capnp::Text::Reader(data.data(), data.size()));
}
case capnp::DynamicValue::Type::STRUCT:
{
auto struct_builder = builder.as<capnp::DynamicStruct>();
auto nested_struct_schema = struct_builder.getSchema();
/// Struct can be represent Tuple or Naullable (named union with two fields)
if (data_type->isNullable())
{
const auto * nullable_type = assert_cast<const DataTypeNullable *>(data_type.get());
const auto * nullable_column = assert_cast<const ColumnNullable *>(column.get());
auto fields = nested_struct_schema.getUnionFields();
if (nullable_column->isNullAt(row_num))
{
auto null_field = fields[0].getType().isVoid() ? fields[0] : fields[1];
struct_builder.set(null_field, capnp::Void());
}
else
{
auto value_field = fields[0].getType().isVoid() ? fields[1] : fields[0];
struct_builder.clear(value_field);
const auto & nested_column = nullable_column->getNestedColumnPtr();
auto value_builder = initStructFieldBuilder(nested_column, row_num, struct_builder, value_field);
auto value = convertToDynamicValue(nested_column, nullable_type->getNestedType(), row_num, value_builder, enum_comparing_mode);
if (value)
struct_builder.set(value_field, std::move(*value));
}
}
else
{
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
auto nested_types = tuple_data_type->getElements();
const auto & nested_columns = assert_cast<const ColumnTuple *>(column.get())->getColumns();
for (const auto & name : tuple_data_type->getElementNames())
{
auto pos = tuple_data_type->getPositionByName(name);
auto field_builder
= initStructFieldBuilder(nested_columns[pos], row_num, struct_builder, nested_struct_schema.getFieldByName(name));
auto value = convertToDynamicValue(nested_columns[pos], nested_types[pos], row_num, field_builder, enum_comparing_mode);
if (value)
struct_builder.set(name, std::move(*value));
}
}
return std::nullopt;
}
case capnp::DynamicValue::Type::LIST:
{
auto list_builder = builder.as<capnp::DynamicList>();
const auto * array_column = assert_cast<const ColumnArray *>(column.get());
const auto & nested_column = array_column->getDataPtr();
const auto & nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
const auto & offsets = array_column->getOffsets();
auto offset = offsets[row_num - 1];
size_t size = offsets[row_num] - offset;
const auto * nested_array_column = checkAndGetColumn<ColumnArray>(*nested_column);
for (size_t i = 0; i != size; ++i)
{
capnp::DynamicValue::Builder value_builder;
/// For nested arrays we need to initialize nested list builder.
if (nested_array_column)
{
const auto & nested_offset = nested_array_column->getOffsets();
size_t nested_array_size = nested_offset[offset + i] - nested_offset[offset + i - 1];
value_builder = list_builder.init(i, nested_array_size);
}
else
value_builder = list_builder[i];
auto value = convertToDynamicValue(nested_column, nested_type, offset + i, value_builder, enum_comparing_mode);
if (value)
list_builder.set(i, std::move(*value));
}
return std::nullopt;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto type.");
}
}
void CapnProtoRowOutputFormat::write(const Columns & columns, size_t row_num)
{
capnp::MallocMessageBuilder message;
capnp::DynamicStruct::Builder root = message.initRoot<capnp::DynamicStruct>(schema);
for (size_t i = 0; i != columns.size(); ++i)
{
auto [struct_builder, field] = getStructBuilderAndFieldByColumnName(root, column_names[i]);
auto field_builder = initStructFieldBuilder(columns[i], row_num, struct_builder, field);
auto value = convertToDynamicValue(columns[i], column_types[i], row_num, field_builder, format_settings.capn_proto.enum_comparing_mode);
if (value)
struct_builder.set(field, std::move(*value));
}
capnp::writeMessage(*output_stream, message);
}
void registerOutputFormatProcessorsCapnProto(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("CapnProto", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, params, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProcessorsCapnProto(FormatFactory &) {}
}
#endif // USE_CAPNP

View File

@ -0,0 +1,53 @@
#pragma once
#include "config_formats.h"
#if USE_CAPNP
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/CapnProtoUtils.h>
#include <capnp/schema.h>
#include <capnp/dynamic.h>
#include <kj/io.h>
namespace DB
{
class CapnProtoOutputStream : public kj::OutputStream
{
public:
CapnProtoOutputStream(WriteBuffer & out_);
void write(const void * buffer, size_t size) override;
private:
WriteBuffer & out;
};
class CapnProtoRowOutputFormat : public IRowOutputFormat
{
public:
CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_);
String getName() const override { return "CapnProtoRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override { }
private:
Names column_names;
DataTypes column_types;
capnp::StructSchema schema;
std::unique_ptr<CapnProtoOutputStream> output_stream;
const FormatSettings format_settings;
CapnProtoSchemaParser schema_parser;
};
}
#endif // USE_CAPNP

View File

@ -67,8 +67,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
FormatSchemaInfo(settings, "Protobuf", true),
with_length_delimiter);
});
}

View File

@ -64,9 +64,7 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
{
return std::make_shared<ProtobufRowOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf",
true, settings.schema.is_server,
settings.schema.format_schema_path),
FormatSchemaInfo(settings, "Protobuf", true),
settings,
with_length_delimiter);
});

View File

@ -0,0 +1,52 @@
-1 1 -1000 1000 -10000000 1000000 -1000000000 1000000000 123.123 123123123.12312312 Some string fixed Some data 2000-01-06 2000-06-01 19:42:42 2000-04-01 11:21:33.123
-1 1 -1000 1000 -10000000 1000000 -1000000000 1000000000 123.123 123123123.12312312 Some string fixed Some data 2000-01-06 2000-06-01 19:42:42 2000-04-01 11:21:33.123
1 (2,(3,4)) (((5)))
1 (2,(3,4)) (((5)))
1 [1,2,3] [[[1,2,3],[4,5,6]],[[7,8,9],[]],[]]
1 [1,2,3] [[[1,2,3],[4,5,6]],[[7,8,9],[]],[]]
1 ((2,[[3,4],[5,6],[]]),[([[(7,8),(9,10)],[(11,12),(13,14)],[]],[([15,16,17]),([])])])
1 ((2,[[3,4],[5,6],[]]),[([[(7,8),(9,10)],[(11,12),(13,14)],[]],[([15,16,17]),([])])])
[1,2,3] [[4,5,6],[],[7,8]] [(9,10),(11,12),(13,14)]
[1,2,3] [[4,5,6],[],[7,8]] [(9,10),(11,12),(13,14)]
1 [1,NULL,2] (1)
\N [NULL,NULL,42] (NULL)
1 [1,NULL,2] (1)
\N [NULL,NULL,42] (NULL)
one
two
tHrEe
oNe
tWo
threE
first
second
third
OK
OK
OK
OK
one two ['one',NULL,'two',NULL]
two \N [NULL]
one two ['one',NULL,'two',NULL]
two \N [NULL]
0 1 2
1 2 3
2 3 4
3 4 5
4 5 6
(0,(1,(2)))
(1,(2,(3)))
(2,(3,(4)))
(3,(4,(5)))
(4,(5,(6)))
OK
OK
OK
OK
OK
OK
OK
OK
OK
OK
OK

View File

@ -0,0 +1,109 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
CAPN_PROTO_FILE=$USER_FILES_PATH/data.capnp
touch $CAPN_PROTO_FILE
SCHEMADIR=/$(clickhouse-client --query "select * from file('data.capnp', 'CapnProto', 'val1 char') settings format_schema='nonexist:Message'" 2>&1 | grep Exception | grep -oP "file \K.*(?=/nonexist.capnp)")
CLIENT_SCHEMADIR=$CURDIR/format_schemas
SERVER_SCHEMADIR=test_02030
mkdir -p $SCHEMADIR/$SERVER_SCHEMADIR
cp -r $CLIENT_SCHEMADIR/02030_* $SCHEMADIR/$SERVER_SCHEMADIR/
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_simple_types";
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_simple_types (int8 Int8, uint8 UInt8, int16 Int16, uint16 UInt16, int32 Int32, uint32 UInt32, int64 Int64, uint64 UInt64, float32 Float32, float64 Float64, string String, fixed FixedString(5), data String, date Date, datetime DateTime, datetime64 DateTime64(3)) ENGINE=Memory"
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_simple_types values (-1, 1, -1000, 1000, -10000000, 1000000, -1000000000, 1000000000, 123.123, 123123123.123123123, 'Some string', 'fixed', 'Some data', '2000-01-06', '2000-06-01 19:42:42', '2000-04-01 11:21:33.123')"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_simple_types FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_simple_types:Message'" | $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_simple_types FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_simple_types:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_simple_types"
$CLICKHOUSE_CLIENT --query="DROP TABLE capnp_simple_types"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_tuples"
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_tuples (value UInt64, tuple1 Tuple(one UInt64, two Tuple(three UInt64, four UInt64)), tuple2 Tuple(nested1 Tuple(nested2 Tuple(x UInt64)))) ENGINE=Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_tuples VALUES (1, (2, (3, 4)), (((5))))"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_tuples FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_tuples:Message'" | $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_tuples FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_tuples:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_tuples"
$CLICKHOUSE_CLIENT --query="DROP TABLE capnp_tuples"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_lists"
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_lists (value UInt64, list1 Array(UInt64), list2 Array(Array(Array(UInt64)))) ENGINE=Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_lists VALUES (1, [1, 2, 3], [[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], []], []])"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_lists FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_lists:Message'" | $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_lists FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_lists:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_lists"
$CLICKHOUSE_CLIENT --query="DROP TABLE capnp_lists"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_nested_lists_and_tuples"
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_nested_lists_and_tuples (value UInt64, nested Tuple(a Tuple(b UInt64, c Array(Array(UInt64))), d Array(Tuple(e Array(Array(Tuple(f UInt64, g UInt64))), h Array(Tuple(k Array(UInt64))))))) ENGINE=Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_nested_lists_and_tuples VALUES (1, ((2, [[3, 4], [5, 6], []]), [([[(7, 8), (9, 10)], [(11, 12), (13, 14)], []], [([15, 16, 17]), ([])])]))"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_nested_lists_and_tuples FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_lists_and_tuples:Message'" | $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_nested_lists_and_tuples FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_lists_and_tuples:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_nested_lists_and_tuples"
$CLICKHOUSE_CLIENT --query="DROP TABLE capnp_nested_lists_and_tuples"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_nested_table"
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_nested_table (nested Nested(value UInt64, array Array(UInt64), tuple Tuple(one UInt64, two UInt64))) ENGINE=Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_nested_table VALUES ([1, 2, 3], [[4, 5, 6], [], [7, 8]], [(9, 10), (11, 12), (13, 14)])"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_nested_table FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_table:Message'" | $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_nested_table FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_table:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_nested_table"
$CLICKHOUSE_CLIENT --query="DROP TABLE capnp_nested_table"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_nullable"
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_nullable (nullable Nullable(UInt64), array Array(Nullable(UInt64)), tuple Tuple(nullable Nullable(UInt64))) ENGINE=Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_nullable VALUES (1, [1, Null, 2], (1)), (Null, [Null, Null, 42], (Null))"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_nullable FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nullable:Message'" | $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_nullable FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nullable:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_nullable"
$CLICKHOUSE_CLIENT --query="DROP TABLE capnp_nullable"
$CLICKHOUSE_CLIENT --query="SELECT CAST(number, 'Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2)') AS value FROM numbers(3) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_enum:Message'" > $CAPN_PROTO_FILE
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 1, \'two\' = 2, \'tHrEe\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'oNe\' = 1, \'tWo\' = 2, \'threE\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 0, \'second\' = 1, \'third\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'three\' = 2)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 0, \'two\' = 1, \'tHrEe\' = 2, \'four\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'one\' = 1, \'two\' = 2, \'tHrEe\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_values'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'value Enum(\'first\' = 1, \'two\' = 2, \'three\' = 3)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_enum:Message', format_capn_proto_enum_comparising_mode='by_names_case_insensitive'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS capnp_low_cardinality"
$CLICKHOUSE_CLIENT --query="CREATE TABLE capnp_low_cardinality (lc1 LowCardinality(String), lc2 LowCardinality(Nullable(String)), lc3 Array(LowCardinality(Nullable(String)))) ENGINE=Memory"
$CLICKHOUSE_CLIENT --query="INSERT INTO capnp_low_cardinality VALUES ('one', 'two', ['one', Null, 'two', Null]), ('two', Null, [Null])"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_low_cardinality FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_low_cardinality:Message'" | $CLICKHOUSE_CLIENT --query="INSERT INTO capnp_low_cardinality FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_low_cardinality:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM capnp_low_cardinality"
$CLICKHOUSE_CLIENT --query="DROP TABLE capnp_low_cardinality"
$CLICKHOUSE_CLIENT --query="SELECT CAST(tuple(number, tuple(number + 1, tuple(number + 2))), 'Tuple(b UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') AS a FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_tuples:Message'" > $CAPN_PROTO_FILE
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a_b UInt64, a_c_d UInt64, a_c_e_f UInt64') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'"
$CLICKHOUSE_CLIENT --query="SELECT number AS a_b, number + 1 AS a_c_d, number + 2 AS a_c_e_f FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_nested_tuples:Message'" > $CAPN_PROTO_FILE
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'"
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(bb UInt64, c Tuple(d UInt64, e Tuple(f UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'a Tuple(b UInt64, c Tuple(d UInt64, e Tuple(ff UInt64)))') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_nested_tuples:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT number AS uint64 FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_simple_types:Message'" > $CAPN_PROTO_FILE
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'uint64 String') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'uint64 Array(UInt64)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'uint64 Enum(\'one\' = 1)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'uint64 Tuple(UInt64)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'uint64 Nullable(UInt64)') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.capnp', 'CapnProto', 'uint64 Int32') SETTINGS format_schema='$SERVER_SCHEMADIR/02030_capnp_simple_types:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT number AS a, toString(number) as b FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_unnamed_union:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT toNullable(toString(number)) as nullable1 FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_fake_nullable:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT toNullable(toString(number)) as nullable2 FROM numbers(5) FORMAT CapnProto SETTINGS format_schema='$CLIENT_SCHEMADIR/02030_capnp_fake_nullable:Message'" 2>&1 | grep -F -q "CAPN_PROTO_BAD_CAST" && echo 'OK' || echo 'FAIL';
rm $CAPN_PROTO_FILE
rm -rf $SCHEMADIR/$SERVER_SCHEMADIR

View File

@ -0,0 +1,13 @@
@0x9ef128e10a8010b2;
struct Message
{
value @0 : EnumType;
enum EnumType
{
one @0;
two @1;
tHrEe @2;
}
}

View File

@ -0,0 +1,23 @@
@0xd8dd7b35452d1c4c;
struct FakeNullable1
{
union
{
value @0 : Text;
null @1 : Void;
trash @2 : Text;
}
}
struct FakeNullable2
{
value @0 : Text;
null @1 : Void;
}
struct Message
{
nullable1 @0 : FakeNullable1;
nullable2 @1 : FakeNullable2;
}

View File

@ -0,0 +1,8 @@
@0x9ef128e10a8010b7;
struct Message
{
value @0 : UInt64;
list1 @1 : List(UInt64);
list2 @2 : List(List(List(UInt64)));
}

View File

@ -0,0 +1,17 @@
@0x9ef128e10a8010b7;
struct NullableText
{
union
{
value @0 : Text;
null @1 : Void;
}
}
struct Message
{
lc1 @0 : Text;
lc2 @1 : NullableText;
lc3 @2 : List(NullableText);
}

View File

@ -0,0 +1,36 @@
@0x9ef128e10a8010b2;
struct Nested1
{
b @0 : UInt64;
c @1 : List(List(UInt64));
}
struct Nested2
{
e @0 : List(List(Nested3));
h @1 : List(Nested4);
}
struct Nested3
{
f @0 : UInt64;
g @1 : UInt64;
}
struct Nested4
{
k @0 : List(UInt64);
}
struct Nested
{
a @0 : Nested1;
d @1 : List(Nested2);
}
struct Message
{
value @0 : UInt64;
nested @1 : Nested;
}

View File

@ -0,0 +1,20 @@
@0x9ef128e10a8010b3;
struct Nested1
{
one @0 : UInt64;
two @1 : UInt64;
}
struct Nested
{
value @0 : List(UInt64);
array @1 : List(List(UInt64));
tuple @2 : List(Nested1);
}
struct Message
{
nested @0 : Nested;
}

View File

@ -0,0 +1,23 @@
@0x9ef128e12a8010b2;
struct Nested1
{
d @0 : UInt64;
e @1 : Nested2;
}
struct Nested2
{
f @0 : UInt64;
}
struct Nested
{
b @0 : UInt64;
c @1 : Nested1;
}
struct Message
{
a @0 : Nested;
}

View File

@ -0,0 +1,22 @@
@0x9ef128e10a8010b2;
struct NullableUInt64
{
union
{
value @0 : UInt64;
null @1 : Void;
}
}
struct Tuple
{
nullable @0 : NullableUInt64;
}
struct Message
{
nullable @0 : NullableUInt64;
array @1 : List(NullableUInt64);
tuple @2 : Tuple;
}

View File

@ -0,0 +1,21 @@
@0xd9dd7b35452d1c4f;
struct Message
{
int8 @0 : Int8;
uint8 @1 : UInt8;
int16 @2 : Int16;
uint16 @3 : UInt16;
int32 @4 : Int32;
uint32 @5 : UInt32;
int64 @6 : Int64;
uint64 @7 : UInt64;
float32 @8 : Float32;
float64 @9 : Float64;
string @10 : Text;
fixed @11 : Text;
data @12 : Data;
date @13 : UInt16;
datetime @14 : UInt32;
datetime64 @15 : Int64;
}

View File

@ -0,0 +1,35 @@
@0x9ef128e10a8010b8;
struct Nested5
{
x @0 : UInt64;
}
struct Nested4
{
nested2 @0 : Nested5;
}
struct Nested3
{
nested1 @0 : Nested4;
}
struct Nested2
{
three @0 : UInt64;
four @1 : UInt64;
}
struct Nested1
{
one @0 : UInt64;
two @1 : Nested2;
}
struct Message
{
value @0 : UInt64;
tuple1 @1 : Nested1;
tuple2 @2 : Nested3;
}

View File

@ -0,0 +1,10 @@
@0xd8dd7b35452d1c4f;
struct Message
{
union
{
a @0 : UInt64;
b @1 : Text;
}
}