Compare commits

...

6 Commits

Author SHA1 Message Date
Eliot Hautefeuille
ca916a5082
Merge 167196bdd2 into 7fd2207626 2024-09-18 14:43:15 +02:00
Eliot Hautefeuille
167196bdd2
fix: formats: protobuf: serde: fix client call, target correct db 2024-09-17 23:37:31 +02:00
Eliot Hautefeuille
a44d6021df
fix: formats: protobuf: serde: formats appear disabled in fasttest 2024-09-17 21:47:25 +02:00
Eliot Hautefeuille
93ae7957b7
fix: formats: protobuf: serde: pre-create format schema files folder 2024-09-17 21:23:56 +02:00
Eliot Hautefeuille
2cef7f79fc
fix: formats: protobuf: serde: fix style 2024-09-17 20:58:12 +02:00
Eliot Hautefeuille
df482bbbba
fix: protobuf: serde: exception bypass for canonical repeated nested 2024-09-11 19:37:18 +02:00
7 changed files with 259 additions and 105 deletions

View File

@ -3322,124 +3322,138 @@ namespace
}
}
/// Complex case: one or more columns are serialized as a nested message.
for (const auto & [field_descriptor, suffix] : field_descriptors_with_suffixes)
{
if (!suffix.empty())
if (suffix.empty())
continue;
std::vector<size_t> nested_column_indices;
std::vector<std::string_view> nested_column_names;
nested_column_indices.reserve(num_columns - used_column_indices.size());
nested_column_names.reserve(num_columns - used_column_indices.size());
nested_column_indices.push_back(column_idx);
nested_column_names.push_back(suffix);
for (size_t j : collections::range(column_idx + 1, num_columns))
{
/// Complex case: one or more columns are serialized as a nested message.
std::vector<size_t> nested_column_indices;
std::vector<std::string_view> nested_column_names;
nested_column_indices.reserve(num_columns - used_column_indices.size());
nested_column_names.reserve(num_columns - used_column_indices.size());
nested_column_indices.push_back(column_idx);
nested_column_names.push_back(suffix);
if (used_column_indices_sorted.count(j))
continue;
std::string_view other_suffix;
if (!columnNameStartsWithFieldName(column_names[j], *field_descriptor, other_suffix))
continue;
nested_column_indices.push_back(j);
nested_column_names.push_back(other_suffix);
}
for (size_t j : collections::range(column_idx + 1, num_columns))
DataTypes nested_data_types;
nested_data_types.reserve(nested_column_indices.size());
for (size_t j : nested_column_indices)
nested_data_types.push_back(data_types[j]);
/// Now we have up to `nested_message_column_names.size()` columns
/// which can be serialized as one or many nested message(s)
/// If the field is repeated, and ALL matching columns are array, we serialize as an array of nested messages.
/// Otherwise, we first try to serialize those columns as one nested message,
/// then, if failed, as an array of nested messages (on condition if those columns are array).
bool repeated_field_matching_nested_columns_are_all_arrays = false;
bool repeated_field_matching_nested_columns_have_some_arrays = false;
if (field_descriptor->is_repeated())
{
repeated_field_matching_nested_columns_are_all_arrays = true;
for (const auto & nested_data_type : nested_data_types)
{
if (used_column_indices_sorted.count(j))
continue;
std::string_view other_suffix;
if (!columnNameStartsWithFieldName(column_names[j], *field_descriptor, other_suffix))
continue;
nested_column_indices.push_back(j);
nested_column_names.push_back(other_suffix);
if (nested_data_type->getTypeId() == TypeIndex::Array)
repeated_field_matching_nested_columns_have_some_arrays = true;
else
repeated_field_matching_nested_columns_are_all_arrays = false;
}
}
std::vector<size_t> used_column_indices_in_nested;
auto attempt_build_serializer = [&](const DataTypes & passed_nested_data_types)
{
return buildMessageSerializerImpl(
nested_column_names.size(),
nested_column_names.data(),
passed_nested_data_types.data(),
*field_descriptor->message_type(),
/* with_length_delimiter = */ false,
google_wrappers_special_treatment,
field_descriptor,
used_column_indices_in_nested,
/* columns_are_reordered_outside = */ true,
/* check_nested_while_filling_missing_columns = */ false);
/// `columns_are_reordered_outside` is true because column indices are
/// going to be transformed and then written to the outer message,
/// see next calls to add_field_serializer() further below.
};
auto attempt_unwrap_and_build_array_serializer = [&]()
{
DataTypes unwrapped_nested_data_types;
unwrapped_nested_data_types.reserve(nested_data_types.size());
for (DataTypePtr & dt : nested_data_types)
unwrapped_nested_data_types.push_back(assert_cast<const DataTypeArray &>(*dt).getNestedType());
if (auto serializer = attempt_build_serializer(unwrapped_nested_data_types))
{
std::vector<std::string_view> column_names_used;
column_names_used.reserve(used_column_indices_in_nested.size());
for (const size_t i : used_column_indices_in_nested)
column_names_used.emplace_back(nested_column_names[i]);
auto array_serializer = std::make_unique<ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages>(
std::move(column_names_used), field_descriptor, std::move(serializer), get_root_desc_function);
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(column_name,std::move(used_column_indices_in_nested), *field_descriptor, std::move(array_serializer));
return true;
}
DataTypes nested_data_types;
nested_data_types.reserve(nested_column_indices.size());
for (size_t j : nested_column_indices)
nested_data_types.push_back(data_types[j]);
return false;
};
/// Now we have up to `nested_message_column_names.size()` columns
/// which can be serialized as a nested message.
/// if the protobuf field has the repeated label,
/// for ALL matching nested cols, since they are all of type array
/// try as ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages
if (repeated_field_matching_nested_columns_are_all_arrays)
{
if (attempt_unwrap_and_build_array_serializer())
break;
}
/// We will try to serialize those columns as one nested message,
/// then, if failed, as an array of nested messages (on condition if those columns are array).
bool has_fallback_to_array_of_nested_messages = false;
if (field_descriptor->is_repeated())
/// for ALL matching nested cols
/// try as ProtobufSerializerMessage
try
{
if (auto serializer = attempt_build_serializer(nested_data_types))
{
bool has_arrays
= boost::range::find_if(
nested_data_types, [](const DataTypePtr & dt) { return (dt->getTypeId() == TypeIndex::Array); })
!= nested_data_types.end();
if (has_arrays)
has_fallback_to_array_of_nested_messages = true;
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(column_name,std::move(used_column_indices_in_nested), *field_descriptor, std::move(serializer));
break;
}
}
/// Try to serialize those columns as one nested message.
try
{
std::vector<size_t> used_column_indices_in_nested;
auto nested_message_serializer = buildMessageSerializerImpl(
nested_column_names.size(),
nested_column_names.data(),
nested_data_types.data(),
*field_descriptor->message_type(),
/* with_length_delimiter = */ false,
google_wrappers_special_treatment,
field_descriptor,
used_column_indices_in_nested,
/* columns_are_reordered_outside = */ true,
/* check_nested_while_filling_missing_columns = */ false);
catch (Exception & e)
{
if ((e.code() != ErrorCodes::PROTOBUF_FIELD_NOT_REPEATED) || !repeated_field_matching_nested_columns_have_some_arrays)
throw;
}
/// `columns_are_reordered_outside` is true because column indices are
/// going to be transformed and then written to the outer message,
/// see add_field_serializer() below.
if (nested_message_serializer)
{
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(
column_name,
std::move(used_column_indices_in_nested),
*field_descriptor,
std::move(nested_message_serializer));
break;
}
}
catch (Exception & e)
{
if ((e.code() != ErrorCodes::PROTOBUF_FIELD_NOT_REPEATED) || !has_fallback_to_array_of_nested_messages)
throw;
}
if (has_fallback_to_array_of_nested_messages)
{
/// Try to serialize those columns as an array of nested messages.
removeNonArrayElements(nested_data_types, nested_column_names, nested_column_indices);
for (DataTypePtr & dt : nested_data_types)
dt = assert_cast<const DataTypeArray &>(*dt).getNestedType();
std::vector<size_t> used_column_indices_in_nested;
auto nested_message_serializer = buildMessageSerializerImpl(
nested_column_names.size(),
nested_column_names.data(),
nested_data_types.data(),
*field_descriptor->message_type(),
/* with_length_delimiter = */ false,
google_wrappers_special_treatment,
field_descriptor,
used_column_indices_in_nested,
/* columns_are_reordered_outside = */ true,
/* check_nested_while_filling_missing_columns = */ false);
/// `columns_are_reordered_outside` is true because column indices are
/// going to be transformed and then written to the outer message,
/// see add_field_serializer() below.
if (nested_message_serializer)
{
std::vector<std::string_view> column_names_used;
column_names_used.reserve(used_column_indices_in_nested.size());
for (size_t i : used_column_indices_in_nested)
column_names_used.emplace_back(nested_column_names[i]);
auto field_serializer = std::make_unique<ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages>(
std::move(column_names_used), field_descriptor, std::move(nested_message_serializer), get_root_desc_function);
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(column_name, std::move(used_column_indices_in_nested), *field_descriptor, std::move(field_serializer));
break;
}
}
/// if the protobuf field has the repeated label,
/// only for the SUBSET of matching nested cols that are of type Array,
/// try as ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages
if (repeated_field_matching_nested_columns_have_some_arrays)
{
removeNonArrayElements(nested_data_types, nested_column_names, nested_column_indices);
if (attempt_unwrap_and_build_array_serializer())
break;
}
}
}

View File

@ -0,0 +1,3 @@
{"error_not_raised":1}
{"i":1001,"j.k":[2101,2102],"j.l":[2201,2202],"m":3001,"n":[4001,4002,4003,4004],"o.key":[5001,5002],"o.value":[{"p":[{"q":5111,"r":5121}]},{"p":[{"q":5112,"r":5122},{"q":5113,"r":5123}]}]}
{"i":6001,"j.k":[7101],"j.l":[7201],"m":8001,"n":[],"o.key":[9001],"o.value":[{"p":[{"q":10111,"r":10121}]}]}

View File

@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
mkdir -p "${CLICKHOUSE_SCHEMA_FILES}"
SOURCE_SCHEMA_FILE="${CURDIR}/format_schemas/03234_proto_complex_nested_repeated_noexception.proto"
TARGET_SCHEMA_FILE="${CLICKHOUSE_SCHEMA_FILES}/03234_proto_complex_nested_repeated_noexception.proto"
cp "${SOURCE_SCHEMA_FILE}" "${TARGET_SCHEMA_FILE}"
cat <<'EOF' | $CLICKHOUSE_CLIENT -mn
DROP TABLE IF EXISTS exception_counter ;
CREATE TABLE exception_counter (`val` UInt32) ENGINE = Memory ;
INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;
DROP TABLE IF EXISTS table_file ;
CREATE TABLE table_file (
`i` UInt32,
`j.k` Array(UInt32),
`j.l` Array(UInt32),
`m` UInt32,
`n` Array(UInt32),
`o` Nested(
`key` UInt32,
`value` Tuple(
`p` Nested(
`q` UInt32,
`r` UInt32
)
)
)
) ENGINE File(Protobuf) SETTINGS format_schema = '03234_proto_complex_nested_repeated_noexception.proto:A' ;
INSERT INTO table_file VALUES
( 1001, [2101, 2102], [2201, 2202], 3001, [4001, 4002, 4003, 4004], [5001,5002] , [ ([(5111,5121)]), ([(5112,5122),(5113,5123)]) ] ),
( 6001, [7101], [7201], 8001, [], [9001] , [ ([(10111,10121)]) ] ) ;
INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;
SELECT min(val) == max(val) as error_not_raised FROM exception_counter FORMAT JSONEachRow ;
SELECT * FROM table_file FORMAT JSONEachRow ;
DROP TABLE exception_counter ;
DROP TABLE table_file ;
EOF
rm -f "${TARGET_SCHEMA_FILE}"

View File

@ -0,0 +1,5 @@
{"error_not_raised":1}
{"u":1001,"v.w":[],"v.x":[],"v.y":[],"v.z":[]}
{"u":2002,"v.w":[2102],"v.x":[2202],"v.y":[[2302]],"v.z":[[2402,2403]]}
{"u":3003,"v.w":[3103,3104],"v.x":[3203,3204],"v.y":[[3303],[3304]],"v.z":[[3403,3404],[3405,3406]]}
{"u":4004,"v.w":[4104,4105,4106],"v.x":[4204,4205,4206],"v.y":[[4304],[4305],[4306]],"v.z":[[4304,4305],[4306,4307],[4308,4309]]}

View File

@ -0,0 +1,44 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
mkdir -p "${CLICKHOUSE_SCHEMA_FILES}"
SOURCE_SCHEMA_FILE="${CURDIR}/format_schemas/03234_proto_simple_nested_repeated_noexception.proto"
TARGET_SCHEMA_FILE="${CLICKHOUSE_SCHEMA_FILES}/03234_proto_simple_nested_repeated_noexception.proto"
cp "${SOURCE_SCHEMA_FILE}" "${TARGET_SCHEMA_FILE}"
cat <<'EOF' | $CLICKHOUSE_CLIENT -mn
DROP TABLE IF EXISTS exception_counter ;
CREATE TABLE exception_counter (`val` UInt32) ENGINE = Memory ;
INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;
DROP TABLE IF EXISTS table_file ;
CREATE TABLE table_file (
`u` UInt32,
`v.w` Array(UInt32),
`v.x` Array(UInt32),
`v.y` Array(Array(UInt32)),
`v.z` Array(Array(UInt32))
) ENGINE File(Protobuf) SETTINGS format_schema = '03234_proto_simple_nested_repeated_noexception.proto:M' ;
INSERT INTO table_file VALUES
( 1001, [], [], [], []),
( 2002, [2102], [2202], [[2302]], [[2402, 2403]]),
( 3003, [3103, 3104], [3203, 3204], [[3303], [3304]], [[3403, 3404], [3405, 3406]]),
( 4004, [4104, 4105, 4106], [4204, 4205, 4206], [[4304], [4305], [4306]], [[4304, 4305], [4306, 4307], [4308, 4309]]);
INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;
SELECT min(val) == max(val) as error_not_raised FROM exception_counter FORMAT JSONEachRow ;
SELECT * FROM table_file FORMAT JSONEachRow ;
DROP TABLE exception_counter ;
DROP TABLE table_file ;
EOF
rm -f "${TARGET_SCHEMA_FILE}"

View File

@ -0,0 +1,25 @@
syntax = "proto3";
message A {
message B {
uint32 k = 2100 ;
uint32 l = 2200 ;
}
uint32 i = 1000 ;
repeated B j = 2000 ;
uint32 m = 3000 ;
repeated uint32 n = 4000 ;
map<uint32, C> o = 5000 ;
}
message D {
uint32 q = 5110 ;
uint32 r = 5120 ;
}
message C {
repeated D p = 5100 ;
}

View File

@ -0,0 +1,13 @@
syntax = "proto3";
message M {
uint32 u = 1000 ;
repeated N v = 2000 ;
message N {
uint32 w = 2100 ;
uint32 x = 2200 ;
repeated uint32 y = 2300 ;
repeated uint32 z = 2400 ;
}
}