From 61212635d829c056402893510e8e50e044154992 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Wed, 30 Mar 2022 16:03:36 +0200 Subject: [PATCH 01/22] MergeTree multiple ORDER BY columns improve insert performance --- src/Columns/ColumnVector.cpp | 54 +++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/src/Columns/ColumnVector.cpp b/src/Columns/ColumnVector.cpp index dded5ff6c99..82e2bba04d7 100644 --- a/src/Columns/ColumnVector.cpp +++ b/src/Columns/ColumnVector.cpp @@ -275,7 +275,59 @@ template void ColumnVector::updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability, size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges) const { - auto sort = [](auto begin, auto end, auto pred) { ::sort(begin, end, pred); }; + bool reverse = direction == IColumn::PermutationSortDirection::Descending; + bool ascending = direction == IColumn::PermutationSortDirection::Ascending; + bool sort_is_stable = stability == IColumn::PermutationSortStability::Stable; + + auto sort = [&](auto begin, auto end, auto pred) + { + /// A case for radix sort + if constexpr (is_arithmetic_v && !is_big_int_v) + { + /// TODO: LSD RadixSort is currently not stable if direction is descending, or value is floating point + bool use_radix_sort = (sort_is_stable && ascending && !std::is_floating_point_v) || !sort_is_stable; + size_t size = end - begin; + + /// Thresholds on size. Lower threshold is arbitrary. Upper threshold is chosen by the type for histogram counters. + if (size >= 256 && size <= std::numeric_limits::max() && use_radix_sort) + { + PaddedPODArray> pairs(size); + size_t index = 0; + + for (auto it = begin; it != end; ++it) + { + pairs[index] = {data[*it], static_cast(*it)}; + ++index; + } + + RadixSort>::executeLSD(pairs.data(), size, reverse, begin); + + /// Radix sort treats all NaNs to be greater than all numbers. + /// If the user needs the opposite, we must move them accordingly. + if (std::is_floating_point_v && nan_direction_hint < 0) + { + size_t nans_to_move = 0; + + for (size_t i = 0; i < size; ++i) + { + if (isNaN(data[begin[reverse ? i : size - 1 - i]])) + ++nans_to_move; + else + break; + } + + if (nans_to_move) + { + std::rotate(begin, begin + (reverse ? nans_to_move : size - nans_to_move), end); + } + } + + return; + } + } + + ::sort(begin, end, pred); + }; auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); }; if (direction == IColumn::PermutationSortDirection::Ascending && stability == IColumn::PermutationSortStability::Unstable) From 196ff48f3446529bb018ca34d5f1354187ee126a Mon Sep 17 00:00:00 2001 From: Kuz Le Date: Wed, 25 May 2022 12:55:55 +0300 Subject: [PATCH 02/22] Update delete-old-data.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Изменение смысла предложения в переводе на русский язык в статье по удалению старых данных --- docs/ru/faq/operations/delete-old-data.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/ru/faq/operations/delete-old-data.md b/docs/ru/faq/operations/delete-old-data.md index 92736ef5205..ab221f8303b 100644 --- a/docs/ru/faq/operations/delete-old-data.md +++ b/docs/ru/faq/operations/delete-old-data.md @@ -22,9 +22,9 @@ ClickHouse позволяет автоматически удалять данн ClickHouse не удаляет данные в реальном времени, как СУБД [OLTP](https://en.wikipedia.org/wiki/Online_transaction_processing). Больше всего на такое удаление похожи мутации. Они выполняются с помощью запросов `ALTER ... DELETE` или `ALTER ... UPDATE`. В отличие от обычных запросов `DELETE` и `UPDATE`, мутации выполняются асинхронно, в пакетном режиме, не в реальном времени. В остальном после слов `ALTER TABLE` синтаксис обычных запросов и мутаций одинаковый. -`ALTER DELETE` можно использовать для гибкого удаления устаревших данных. Если вам нужно делать это регулярно, единственный недостаток такого подхода будет заключаться в том, что потребуется внешняя система для запуска запроса. Кроме того, могут возникнуть некоторые проблемы с производительностью, поскольку мутации перезаписывают целые куски данных если в них содержится хотя бы одна строка, которую нужно удалить. +`ALTER DELETE` можно использовать для гибкого удаления устаревших данных. Если вам нужно делать это регулярно, основной недостаток такого подхода будет заключаться в том, что потребуется внешняя система для запуска запроса. Кроме того, могут возникнуть некоторые проблемы с производительностью, поскольку мутации перезаписывают целые куски данных если в них содержится хотя бы одна строка, которую нужно удалить. -Это самый распространенный подход к тому, чтобы обеспечить соблюдение принципов [GDPR](https://gdpr-info.eu) в вашей системе на ClickHouse. +Это - самый распространенный подход к тому, чтобы обеспечить соблюдение принципов [GDPR](https://gdpr-info.eu) в вашей системе на ClickHouse. Подробнее смотрите в разделе [Мутации](../../sql-reference/statements/alter/index.md#alter-mutations). From 7817d6aea37bb6150e96d92b1a8abb32c115d40c Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 25 May 2022 11:20:28 +0000 Subject: [PATCH 03/22] Support Maps and Records in Avro format --- .../Formats/Impl/AvroRowInputFormat.cpp | 113 ++++++++++++++---- .../Formats/Impl/AvroRowInputFormat.h | 8 +- .../Formats/Impl/AvroRowOutputFormat.cpp | 90 +++++++++++--- .../Formats/Impl/AvroRowOutputFormat.h | 6 +- .../02313_avro_records_and_maps.reference | 25 ++++ .../02313_avro_records_and_maps.sql | 24 ++++ 6 files changed, 216 insertions(+), 50 deletions(-) create mode 100644 tests/queries/0_stateless/02313_avro_records_and_maps.reference create mode 100644 tests/queries/0_stateless/02313_avro_records_and_maps.sql diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 24de8c65eeb..40dc49a8bd4 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -4,7 +4,6 @@ #include -#include #include #include @@ -13,12 +12,9 @@ #include #include -#include #include #include -#include -#include #include #include #include @@ -28,7 +24,7 @@ #include #include #include -#include +#include #include #include @@ -36,33 +32,24 @@ #include #include #include +#include +#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include -#include #include #include #include #include -#include -#include #include #include -#include #include @@ -386,8 +373,69 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node } case avro::AVRO_SYMBOLIC: return createDeserializeFn(avro::resolveSymbol(root_node), target_type); - case avro::AVRO_MAP: case avro::AVRO_RECORD: + { + if (target.isTuple()) + { + const DataTypeTuple & tuple_type = assert_cast(*target_type); + const auto & nested_types = tuple_type.getElements(); + std::vector> nested_deserializers; + nested_deserializers.reserve(root_node->leaves()); + if (root_node->leaves() != nested_types.size()) + throw Exception(ErrorCodes::INCORRECT_DATA, "The number of leaves in record doesn't match the number of elements in tuple"); + + for (size_t i = 0; i != root_node->leaves(); ++i) + { + const auto & name = root_node->nameAt(i); + size_t pos = tuple_type.getPositionByName(name); + auto nested_deserializer = createDeserializeFn(root_node->leafAt(i), nested_types[pos]); + nested_deserializers.emplace_back(nested_deserializer, pos); + } + + return [nested_deserializers](IColumn & column, avro::Decoder & decoder) + { + ColumnTuple & column_tuple = assert_cast(column); + auto nested_columns = column_tuple.getColumns(); + for (const auto & [nested_deserializer, pos] : nested_deserializers) + nested_deserializer(*nested_columns[pos], decoder); + }; + } + break; + } + case avro::AVRO_MAP: + { + if (target.isMap()) + { + const auto & map_type = assert_cast(*target_type); + const auto & keys_type = map_type.getKeyType(); + const auto & values_type = map_type.getValueType(); + auto keys_source_type = root_node->leafAt(0); + auto values_source_type = root_node->leafAt(1); + auto keys_deserializer = createDeserializeFn(keys_source_type, keys_type); + auto values_deserializer = createDeserializeFn(values_source_type, values_type); + return [keys_deserializer, values_deserializer](IColumn & column, avro::Decoder & decoder) + { + ColumnMap & column_map = assert_cast(column); + ColumnArray & column_array = column_map.getNestedColumn(); + ColumnArray::Offsets & offsets = column_array.getOffsets(); + ColumnTuple & nested_columns = column_map.getNestedData(); + IColumn & keys_column = nested_columns.getColumn(0); + IColumn & values_column = nested_columns.getColumn(1); + size_t total = 0; + for (size_t n = decoder.mapStart(); n != 0; n = decoder.mapNext()) + { + total += n; + for (size_t i = 0; i < n; ++i) + { + keys_deserializer(keys_column, decoder); + values_deserializer(values_column, decoder); + } + } + offsets.push_back(offsets.back() + total); + }; + } + break; + } default: break; } @@ -896,6 +944,21 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro type UNION is not supported for inserting."); case avro::Type::AVRO_SYMBOLIC: return avroNodeToDataType(avro::resolveSymbol(node)); + case avro::Type::AVRO_RECORD: + { + DataTypes nested_types; + nested_types.reserve(node->leaves()); + Names nested_names; + nested_names.reserve(node->leaves()); + for (size_t i = 0; i != node->leaves(); ++i) + { + nested_types.push_back(avroNodeToDataType(node->leafAt(i))); + nested_names.push_back(node->nameAt(i)); + } + return std::make_shared(nested_types, nested_names); + } + case avro::Type::AVRO_MAP: + return std::make_shared(avroNodeToDataType(node->leafAt(0)), avroNodeToDataType(node->leafAt(1))); default: throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro column {} is not supported for inserting."); } diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 7a598de1f6a..3fde0e14ede 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -15,10 +15,10 @@ #include #include -#include -#include -#include -#include +#include +#include +#include +#include namespace DB diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index 7b86dcd4a64..ddee20c187b 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -1,23 +1,21 @@ #include "AvroRowOutputFormat.h" #if USE_AVRO -#include #include -#include #include #include -#include #include #include #include -#include #include #include #include #include #include +#include +#include #include #include @@ -25,21 +23,13 @@ #include #include #include +#include +#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include #include @@ -321,6 +311,70 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF } case TypeIndex::Nothing: return {avro::NullSchema(), [](const IColumn &, size_t, avro::Encoder & encoder) { encoder.encodeNull(); }}; + case TypeIndex::Tuple: + { + const auto & tuple_type = assert_cast(*data_type); + const auto & nested_types = tuple_type.getElements(); + const auto & nested_names = tuple_type.getElementNames(); + std::vector nested_serializers; + nested_serializers.reserve(nested_types.size()); + auto schema = avro::RecordSchema(column_name); + for (size_t i = 0; i != nested_types.size(); ++i) + { + auto nested_mapping = createSchemaWithSerializeFn(nested_types[i], type_name_increment, nested_names[i]); + schema.addField(nested_names[i], nested_mapping.schema); + nested_serializers.push_back(nested_mapping.serialize); + } + + return {schema, [nested_serializers](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const ColumnTuple & column_tuple = assert_cast(column); + const auto & nested_columns = column_tuple.getColumns(); + for (size_t i = 0; i != nested_serializers.size(); ++i) + nested_serializers[i](*nested_columns[i], row_num, encoder); + }}; + } + case TypeIndex::Map: + { + const auto & map_type = assert_cast(*data_type); + const auto & keys_type = map_type.getKeyType(); + if (!isStringOrFixedString(keys_type)) + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro Maps support only keys with type String, got {}", keys_type->getName()); + + auto keys_serializer = [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const StringRef & s = column.getDataAt(row_num); + encoder.encodeString(s.toString()); + }; + + const auto & values_type = map_type.getValueType(); + auto values_mapping = createSchemaWithSerializeFn(values_type, type_name_increment, column_name + ".value"); + auto schema = avro::MapSchema(values_mapping.schema); + + return {schema, [keys_serializer, values_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const ColumnMap & column_map = assert_cast(column); + const ColumnArray & column_array = column_map.getNestedColumn(); + const ColumnArray::Offsets & offsets = column_array.getOffsets(); + size_t offset = offsets[row_num - 1]; + size_t next_offset = offsets[row_num]; + size_t row_count = next_offset - offset; + const ColumnTuple & nested_columns = column_map.getNestedData(); + const IColumn & keys_column = nested_columns.getColumn(0); + const IColumn & values_column = nested_columns.getColumn(1); + + encoder.mapStart(); + if (row_count > 0) + encoder.setItemCount(row_count); + + for (size_t i = offset; i < next_offset; ++i) + { + keys_serializer(keys_column, i, encoder); + values_mapping.serialize(values_column, i, encoder); + } + encoder.mapEnd(); + }}; + } default: break; } diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.h b/src/Processors/Formats/Impl/AvroRowOutputFormat.h index 0a2acc93688..a36b36286c3 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.h @@ -9,9 +9,9 @@ #include #include -#include -#include -#include +#include +#include +#include namespace DB diff --git a/tests/queries/0_stateless/02313_avro_records_and_maps.reference b/tests/queries/0_stateless/02313_avro_records_and_maps.reference new file mode 100644 index 00000000000..24fc635cdce --- /dev/null +++ b/tests/queries/0_stateless/02313_avro_records_and_maps.reference @@ -0,0 +1,25 @@ +t Tuple(a Int32, b String) +(0,'String') +(1,'String') +(2,'String') +t Tuple(a Int32, b Tuple(c Int32, d Int32), e Array(Int32)) +(0,(1,2),[]) +(1,(2,3),[0]) +(2,(3,4),[0,1]) +a.b Array(Int32) +a.c Array(Int32) +[0,1] [2,3] +[1,2] [3,4] +[2,3] [4,5] +a.b Array(Array(Tuple(c Int32, d Int32))) +[[(0,1),(2,3)]] +[[(1,2),(3,4)]] +[[(2,3),(4,5)]] +m Map(String, Int64) +{'key_0':0} +{'key_1':1} +{'key_2':2} +m Map(String, Tuple(`1` Int64, `2` Array(Int64))) +{'key_0':(0,[])} +{'key_1':(1,[0])} +{'key_2':(2,[0,1])} diff --git a/tests/queries/0_stateless/02313_avro_records_and_maps.sql b/tests/queries/0_stateless/02313_avro_records_and_maps.sql new file mode 100644 index 00000000000..bb487b3de07 --- /dev/null +++ b/tests/queries/0_stateless/02313_avro_records_and_maps.sql @@ -0,0 +1,24 @@ +insert into function file(data_02313.avro) select tuple(number, 'String')::Tuple(a UInt32, b String) as t from numbers(3) settings engine_file_truncate_on_insert=1; +desc file(data_02313.avro); +select * from file(data_02313.avro); + +insert into function file(data_02313.avro) select tuple(number, tuple(number + 1, number + 2), range(number))::Tuple(a UInt32, b Tuple(c UInt32, d UInt32), e Array(UInt32)) as t from numbers(3) settings engine_file_truncate_on_insert=1; +desc file(data_02313.avro); +select * from file(data_02313.avro); + +insert into function file(data_02313.avro, auto, 'a Nested(b UInt32, c UInt32)') select [number, number + 1], [number + 2, number + 3] from numbers(3) settings engine_file_truncate_on_insert=1; +desc file(data_02313.avro); +select * from file(data_02313.avro); + +insert into function file(data_02313.avro, auto, 'a Nested(b Nested(c UInt32, d UInt32))') select [[(number, number + 1), (number + 2, number + 3)]] from numbers(3) settings engine_file_truncate_on_insert=1; +desc file(data_02313.avro); +select * from file(data_02313.avro); + +insert into function file(data_02313.avro) select map(concat('key_', toString(number)), number) as m from numbers(3) settings engine_file_truncate_on_insert=1; +desc file(data_02313.avro); +select * from file(data_02313.avro); + +insert into function file(data_02313.avro) select map(concat('key_', toString(number)), tuple(number, range(number))) as m from numbers(3) settings engine_file_truncate_on_insert=1; +desc file(data_02313.avro); +select * from file(data_02313.avro); + From 12effa4c2ad7d08eb95432ef22979aae1a9b63d6 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 25 May 2022 11:22:04 +0000 Subject: [PATCH 04/22] Add tags to test --- tests/queries/0_stateless/02313_avro_records_and_maps.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/queries/0_stateless/02313_avro_records_and_maps.sql b/tests/queries/0_stateless/02313_avro_records_and_maps.sql index bb487b3de07..83ab342be9e 100644 --- a/tests/queries/0_stateless/02313_avro_records_and_maps.sql +++ b/tests/queries/0_stateless/02313_avro_records_and_maps.sql @@ -1,3 +1,5 @@ +-- Tags: no-parallel, no-fasttest + insert into function file(data_02313.avro) select tuple(number, 'String')::Tuple(a UInt32, b String) as t from numbers(3) settings engine_file_truncate_on_insert=1; desc file(data_02313.avro); select * from file(data_02313.avro); From 038a422aeb8f15cc6ffedda12bb0ddeedaef744f Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 25 May 2022 12:56:59 +0000 Subject: [PATCH 05/22] Add setting to insert null as default --- src/Core/Settings.h | 1 + src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 1 + .../Formats/Impl/AvroRowInputFormat.cpp | 67 +++++++++++------- .../Formats/Impl/AvroRowInputFormat.h | 8 ++- .../02314_avro_null_as_default.reference | 6 ++ .../0_stateless/02314_avro_null_as_default.sh | 17 +++++ .../0_stateless/data_avro/nullable_array.avro | Bin 0 -> 373 bytes 8 files changed, 72 insertions(+), 29 deletions(-) create mode 100644 tests/queries/0_stateless/02314_avro_null_as_default.reference create mode 100755 tests/queries/0_stateless/02314_avro_null_as_default.sh create mode 100644 tests/queries/0_stateless/data_avro/nullable_array.avro diff --git a/src/Core/Settings.h b/src/Core/Settings.h index bf9785fcc00..42483d87fbb 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -686,6 +686,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \ M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \ + M(Bool, input_format_avro_null_as_default, false, "For Avro/AvroConfluent format: insert default in case of null and non Nullable column", 0) \ M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ \ M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 644e4d3ecfd..f7e18083cb3 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -56,6 +56,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString(); format_settings.avro.string_column_pattern = settings.output_format_avro_string_column_pattern.toString(); format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file; + format_settings.avro.null_as_default = settings.input_format_avro_null_as_default; format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index e6f0a7d229e..64482f32381 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -92,6 +92,7 @@ struct FormatSettings bool allow_missing_fields = false; String string_column_pattern; UInt64 output_rows_in_file = 1; + bool null_as_default = false; } avro; String bool_true_representation = "true"; diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 40dc49a8bd4..6b056e64d41 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -279,30 +279,42 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node break; case avro::AVRO_UNION: { - auto nullable_deserializer = [root_node, target_type](size_t non_null_union_index) + if (root_node->leaves() == 2 + && (root_node->leafAt(0)->type() == avro::AVRO_NULL || root_node->leafAt(1)->type() == avro::AVRO_NULL)) { - auto nested_deserialize = createDeserializeFn(root_node->leafAt(non_null_union_index), removeNullable(target_type)); - return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder) + size_t non_null_union_index = root_node->leafAt(0)->type() == avro::AVRO_NULL ? 1 : 0; + if (target.isNullable()) { - ColumnNullable & col = assert_cast(column); - size_t union_index = decoder.decodeUnionIndex(); - if (union_index == non_null_union_index) + auto nested_deserialize = this->createDeserializeFn(root_node->leafAt(non_null_union_index), removeNullable(target_type)); + return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder) { - nested_deserialize(col.getNestedColumn(), decoder); - col.getNullMapData().push_back(0); - } - else + ColumnNullable & col = assert_cast(column); + size_t union_index = decoder.decodeUnionIndex(); + if (union_index == non_null_union_index) + { + nested_deserialize(col.getNestedColumn(), decoder); + col.getNullMapData().push_back(0); + } + else + { + col.insertDefault(); + } + }; + } + + if (null_as_default) + { + auto nested_deserialize = this->createDeserializeFn(root_node->leafAt(non_null_union_index), target_type); + return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder) { - col.insertDefault(); - } - }; - }; - if (root_node->leaves() == 2 && target.isNullable()) - { - if (root_node->leafAt(0)->type() == avro::AVRO_NULL) - return nullable_deserializer(1); - if (root_node->leafAt(1)->type() == avro::AVRO_NULL) - return nullable_deserializer(0); + size_t union_index = decoder.decodeUnionIndex(); + if (union_index == non_null_union_index) + nested_deserialize(column, decoder); + else + column.insertDefault(); + }; + } + } break; } @@ -625,7 +637,8 @@ AvroDeserializer::Action AvroDeserializer::createAction(const Block & header, co } } -AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields) +AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields, bool null_as_default_) + : null_as_default(null_as_default_) { const auto & schema_root = schema.root(); if (schema_root->type() != avro::AVRO_RECORD) @@ -663,15 +676,15 @@ void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) - : IRowInputFormat(header_, in_, params_), - allow_missing_fields(format_settings_.avro.allow_missing_fields) + : IRowInputFormat(header_, in_, params_), format_settings(format_settings_) { } void AvroRowInputFormat::readPrefix() { file_reader_ptr = std::make_unique(std::make_unique(*in)); - deserializer_ptr = std::make_unique(output.getHeader(), file_reader_ptr->dataSchema(), allow_missing_fields); + deserializer_ptr = std::make_unique( + output.getHeader(), file_reader_ptr->dataSchema(), format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default); file_reader_ptr->init(); } @@ -857,7 +870,8 @@ const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(Sc if (it == deserializer_cache.end()) { auto schema = schema_registry->getSchema(schema_id); - AvroDeserializer deserializer(output.getHeader(), schema, format_settings.avro.allow_missing_fields); + AvroDeserializer deserializer( + output.getHeader(), schema, format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default); it = deserializer_cache.emplace(schema_id, deserializer).first; } return it->second; @@ -939,7 +953,8 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) if (node->leaves() == 2 && (node->leafAt(0)->type() == avro::Type::AVRO_NULL || node->leafAt(1)->type() == avro::Type::AVRO_NULL)) { size_t nested_leaf_index = node->leafAt(0)->type() == avro::Type::AVRO_NULL ? 1 : 0; - return makeNullable(avroNodeToDataType(node->leafAt(nested_leaf_index))); + auto nested_type = avroNodeToDataType(node->leafAt(nested_leaf_index)); + return nested_type->canBeInsideNullable() ? makeNullable(nested_type) : nested_type; } throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro type UNION is not supported for inserting."); case avro::Type::AVRO_SYMBOLIC: diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 3fde0e14ede..13afa06b089 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -32,13 +32,13 @@ namespace ErrorCodes class AvroDeserializer { public: - AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields); + AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields, bool null_as_default_); void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const; private: using DeserializeFn = std::function; using SkipFn = std::function; - static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type); + DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type); SkipFn createSkipFn(avro::NodePtr root_node); struct Action @@ -113,6 +113,8 @@ private: /// Map from name of named Avro type (record, enum, fixed) to SkipFn. /// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList std::map symbolic_skip_fn_map; + + bool null_as_default = false; }; class AvroRowInputFormat final : public IRowInputFormat @@ -128,7 +130,7 @@ private: std::unique_ptr file_reader_ptr; std::unique_ptr deserializer_ptr; - bool allow_missing_fields; + FormatSettings format_settings; }; /// Confluent framing + Avro binary datum encoding. Mainly used for Kafka. diff --git a/tests/queries/0_stateless/02314_avro_null_as_default.reference b/tests/queries/0_stateless/02314_avro_null_as_default.reference new file mode 100644 index 00000000000..ba38a15f924 --- /dev/null +++ b/tests/queries/0_stateless/02314_avro_null_as_default.reference @@ -0,0 +1,6 @@ +a Nullable(Int64) +b Array(Tuple(c Nullable(Int64), d Nullable(String))) +1 [(100,'Q'),(200,'W')] +0 +0 +0 diff --git a/tests/queries/0_stateless/02314_avro_null_as_default.sh b/tests/queries/0_stateless/02314_avro_null_as_default.sh new file mode 100755 index 00000000000..207f814be10 --- /dev/null +++ b/tests/queries/0_stateless/02314_avro_null_as_default.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Tags: no-parallel, no-fasttest + +set -e + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +DATA_DIR=$CUR_DIR/data_avro + +$CLICKHOUSE_LOCAL -q "desc file('$DATA_DIR/nullable_array.avro') settings input_format_avro_null_as_default=1" +$CLICKHOUSE_LOCAL -q "select * from file('$DATA_DIR/nullable_array.avro') settings input_format_avro_null_as_default=1" + +$CLICKHOUSE_CLIENT -q "insert into function file(data_02314.avro) select NULL::Nullable(UInt32) as x from numbers(3) settings engine_file_truncate_on_insert=1" +$CLICKHOUSE_CLIENT -q "select * from file(data_02314.avro, auto, 'x UInt32') settings input_format_avro_null_as_default=1" + diff --git a/tests/queries/0_stateless/data_avro/nullable_array.avro b/tests/queries/0_stateless/data_avro/nullable_array.avro new file mode 100644 index 0000000000000000000000000000000000000000..915e319c8e4f376f36b38e82155154489051a080 GIT binary patch literal 373 zcmeZI%3@>@ODrqO*DFrWNX<>$!&0qOQdy9yWTjM;nw(#hqNJmgmzWFU=Vhj41|f?T z7bGTwB=U>W^%8;Xj8r|48laA}%+#EeVkN8SYMACkpeV%LXr;W;oE#uGCqFM;DYjMz zt|kdX4csk>MMa5~K Date: Wed, 25 May 2022 22:04:39 +0300 Subject: [PATCH 06/22] FPC codec --- src/Compression/CompressionCodecFPC.cpp | 498 ++++++++++++++++++++++++ src/Compression/CompressionFactory.cpp | 2 + src/Compression/CompressionInfo.h | 3 +- 3 files changed, 502 insertions(+), 1 deletion(-) create mode 100644 src/Compression/CompressionCodecFPC.cpp diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp new file mode 100644 index 00000000000..3c7aac794c0 --- /dev/null +++ b/src/Compression/CompressionCodecFPC.cpp @@ -0,0 +1,498 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +namespace DB +{ + +class CompressionCodecFPC : public ICompressionCodec +{ +public: + explicit CompressionCodecFPC(UInt8 float_size, UInt8 compression_level); + + uint8_t getMethodByte() const override; + + void updateHash(SipHash & hash) const override; + +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; + + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; + + UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; + + bool isCompression() const override { return true; } + bool isGenericCompression() const override { return false; } + + static constexpr UInt32 HEADER_SIZE{3}; + +private: + UInt8 float_width; + UInt8 level; +}; + + +namespace ErrorCodes +{ + extern const int CANNOT_COMPRESS; + extern const int CANNOT_DECOMPRESS; + extern const int ILLEGAL_CODEC_PARAMETER; + extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; + extern const int BAD_ARGUMENTS; +} + +uint8_t CompressionCodecFPC::getMethodByte() const +{ + return static_cast(CompressionMethodByte::FPC); +} + +void CompressionCodecFPC::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); +} + +CompressionCodecFPC::CompressionCodecFPC(UInt8 float_size, UInt8 compression_level) + : float_width{float_size}, level{compression_level} +{ + setCodecDescription("FPC", {std::make_shared(static_cast(level))}); +} + +UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + auto float_count = (uncompressed_size + float_width - 1) / float_width; + if (float_count % 2 != 0) { + ++float_count; + } + return HEADER_SIZE + (float_count + float_count / 2) * float_width; +} + +namespace +{ + +UInt8 getFloatBytesSize(const IDataType & column_type) +{ + if (!WhichDataType(column_type).isFloat()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for {} because the data type is not float", + column_type.getName()); + } + + if (auto float_size = column_type.getSizeOfValueInMemory(); float_size >= 4) + { + return static_cast(float_size); + } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for floats of size less than 4 bytes. Given type {}", + column_type.getName()); +} + +UInt8 encodeEndianness(std::endian endian) +{ + switch (endian) + { + case std::endian::little: + return 0; + case std::endian::big: + return 1; + } + throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); +} + +std::endian decodeEndianness(UInt8 endian) { + switch (endian) + { + case 0: + return std::endian::little; + case 1: + return std::endian::big; + } + throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); +} + +} + +void registerCodecFPC(CompressionCodecFactory & factory) +{ + auto method_code = static_cast(CompressionMethodByte::FPC); + auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr + { + if (!column_type) + { + throw Exception("FPC codec must have associated column", ErrorCodes::BAD_ARGUMENTS); + } + UInt8 level{0}; + if (arguments && !arguments->children.empty()) + { + if (arguments->children.size() > 1) + { + throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, + "FPC codec must have 1 parameter, given {}", arguments->children.size()); + } + + const auto * literal = arguments->children.front()->as(); + if (!literal) + { + throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + } + + level = literal->value.safeGet(); + } + return std::make_shared(getFloatBytesSize(*column_type), level); + }; + factory.registerCompressionCodecWithType("FPC", method_code, codec_builder); +} + +namespace +{ + +template requires (sizeof(TUint) >= 4) +class DfcmPredictor { +public: + explicit DfcmPredictor(std::size_t table_size): table(table_size, 0), prev_value{0}, hash{0} + { + } + + [[nodiscard]] + TUint predict() const noexcept + { + return table[hash] + prev_value; + } + + void add(TUint value) noexcept + { + table[hash] = value - prev_value; + recalculateHash(); + prev_value = value; + } + +private: + void recalculateHash() noexcept + { + auto value = table[hash]; + if constexpr (sizeof(TUint) >= 8) + { + hash = ((hash << 2) ^ static_cast(value >> 40)) & (table.size() - 1); + } + else + { + hash = ((hash << 4) ^ static_cast(value >> 23)) & (table.size() - 1); + } + } + + std::vector table; + TUint prev_value{0}; + std::size_t hash{0}; +}; + +template requires (sizeof(TUint) >= 4) +class FcmPredictor { +public: + explicit FcmPredictor(std::size_t table_size): table(table_size, 0), hash{0} + { + } + + [[nodiscard]] + TUint predict() const noexcept + { + return table[hash]; + } + + void add(TUint value) noexcept + { + table[hash] = value; + recalculateHash(); + } + +private: + void recalculateHash() noexcept + { + auto value = table[hash]; + if constexpr (sizeof(TUint) >= 8) + { + hash = ((hash << 6) ^ static_cast(value >> 48)) & (table.size() - 1); + } + else + { + hash = ((hash << 1) ^ static_cast(value >> 22)) & (table.size() - 1); + } + } + + std::vector table; + std::size_t hash{0}; +}; + +template + requires (Endian == std::endian::little || Endian == std::endian::big) +class FPCOperation +{ + static constexpr std::size_t CHUNK_SIZE{64}; + + static constexpr auto VALUE_SIZE = sizeof(TUint); + static constexpr std::byte DFCM_BIT_1{1u << 7}; + static constexpr std::byte DFCM_BIT_2{1u << 3}; + static constexpr unsigned MAX_COMPRESSED_SIZE{0b111u}; + +public: + explicit FPCOperation(std::span destination, UInt8 compression_level) + : dfcm_predictor(1 << compression_level), fcm_predictor(1 << compression_level), chunk{}, result{destination} + { + } + + std::size_t encode(std::span data) && + { + auto initial_size = result.size(); + + std::span chunk_view(chunk); + for (std::size_t i = 0; i < data.size(); i += chunk_view.size_bytes()) + { + auto written_values = importChunk(data.subspan(i), chunk_view); + encodeChunk(chunk_view.subspan(0, written_values)); + } + + return initial_size - result.size(); + } + + void decode(std::span values, std::size_t decoded_size) && + { + std::size_t read_bytes{0}; + + std::span chunk_view(chunk); + for (std::size_t i = 0; i < decoded_size; i += chunk_view.size_bytes()) + { + if (i + chunk_view.size_bytes() > decoded_size) + chunk_view = chunk_view.first(ceilBytesToEvenValues(decoded_size - i)); + read_bytes += decodeChunk(values.subspan(read_bytes), chunk_view); + exportChunk(chunk_view); + } + } + +private: + static std::size_t ceilBytesToEvenValues(std::size_t bytes_count) + { + auto values_count = (bytes_count + VALUE_SIZE - 1) / VALUE_SIZE; + return values_count % 2 == 0 ? values_count : values_count + 1; + } + + std::size_t importChunk(std::span values, std::span chnk) + { + if (auto chunk_view = std::as_writable_bytes(chnk); chunk_view.size() <= values.size()) + { + std::memcpy(chunk_view.data(), values.data(), chunk_view.size()); + return chunk_view.size() / VALUE_SIZE; + } + else + { + std::memset(chunk_view.data(), 0, chunk_view.size()); + std::memcpy(chunk_view.data(), values.data(), values.size()); + return ceilBytesToEvenValues(values.size()); + } + } + + void exportChunk(std::span chnk) + { + auto chunk_view = std::as_bytes(chnk).first(std::min(result.size(), chnk.size_bytes())); + std::memcpy(result.data(), chunk_view.data(), chunk_view.size()); + result = result.subspan(chunk_view.size()); + } + + void encodeChunk(std::span seq) + { + for (std::size_t i = 0; i < seq.size(); i += 2) + { + encodePair(seq[i], seq[i + 1]); + } + } + + struct CompressedValue + { + TUint value; + unsigned compressed_size; + bool is_dfcm_predictor; + }; + + unsigned encodeCompressedSize(int compressed) + { + if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + { + if (compressed >= 4) + --compressed; + } + return std::min(static_cast(compressed), MAX_COMPRESSED_SIZE); + } + + unsigned decodeCompressedSize(unsigned encoded_size) + { + if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + { + if (encoded_size > 3) + ++encoded_size; + } + return encoded_size; + } + + CompressedValue compressValue(TUint value) noexcept + { + TUint compressed_dfcm = dfcm_predictor.predict() ^ value; + TUint compressed_fcm = fcm_predictor.predict() ^ value; + dfcm_predictor.add(value); + fcm_predictor.add(value); + auto zeroes_dfcm = std::countl_zero(compressed_dfcm); + auto zeroes_fcm = std::countl_zero(compressed_fcm); + if (zeroes_dfcm > zeroes_fcm) + return {compressed_dfcm, encodeCompressedSize(zeroes_dfcm / CHAR_BIT), true}; + return {compressed_fcm, encodeCompressedSize(zeroes_fcm / CHAR_BIT), false}; + } + + void encodePair(TUint first, TUint second) + { + auto [value1, compressed_size1, is_dfcm_predictor1] = compressValue(first); + auto [value2, compressed_size2, is_dfcm_predictor2] = compressValue(second); + std::byte header{0x0}; + if (is_dfcm_predictor1) + header |= DFCM_BIT_1; + if (is_dfcm_predictor2) + header |= DFCM_BIT_2; + header |= static_cast((compressed_size1 << 4) | compressed_size2); + result.front() = header; + + compressed_size1 = decodeCompressedSize(compressed_size1); + compressed_size2 = decodeCompressedSize(compressed_size2); + auto tail_size1 = VALUE_SIZE - compressed_size1; + auto tail_size2 = VALUE_SIZE - compressed_size2; + + std::memcpy(result.data() + 1, valueTail(value1, compressed_size1), tail_size1); + std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, compressed_size2), tail_size2); + result = result.subspan(1 + tail_size1 + tail_size2); + } + + std::size_t decodeChunk(std::span values, std::span seq) + { + std::size_t read_bytes{0}; + for (std::size_t i = 0; i < seq.size(); i += 2) + { + read_bytes += decodePair(values.subspan(read_bytes), seq[i], seq[i + 1]); + } + return read_bytes; + } + + TUint decompressValue(TUint value, bool isDfcmPredictor) + { + TUint decompressed; + if (isDfcmPredictor) + { + decompressed = dfcm_predictor.predict() ^ value; + } + else + { + decompressed = fcm_predictor.predict() ^ value; + } + dfcm_predictor.add(decompressed); + fcm_predictor.add(decompressed); + return decompressed; + } + + std::size_t decodePair(std::span bytes, TUint& first, TUint& second) + { + if (bytes.empty()) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); + + auto compressed_size1 = decodeCompressedSize(static_cast(bytes.front() >> 4) & MAX_COMPRESSED_SIZE); + auto compressed_size2 = decodeCompressedSize(static_cast(bytes.front()) & MAX_COMPRESSED_SIZE); + + auto tail_size1 = VALUE_SIZE - compressed_size1; + auto tail_size2 = VALUE_SIZE - compressed_size2; + + if (bytes.size() < 1 + tail_size1 + tail_size2) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); + + TUint value1{0}; + TUint value2{0}; + + std::memcpy(valueTail(value1, compressed_size1), bytes.data() + 1, tail_size1); + std::memcpy(valueTail(value2, compressed_size2), bytes.data() + 1 + tail_size1, tail_size2); + + auto is_dfcm_predictor1 = static_cast(bytes.front() & DFCM_BIT_1); + auto is_dfcm_predictor2 = static_cast(bytes.front() & DFCM_BIT_2); + first = decompressValue(value1, is_dfcm_predictor1 != 0); + second = decompressValue(value2, is_dfcm_predictor2 != 0); + + return 1 + tail_size1 + tail_size2; + } + + static void* valueTail(TUint& value, unsigned compressed_size) + { + if constexpr (Endian == std::endian::little) + { + return &value; + } + else + { + return reinterpret_cast(&value) + compressed_size; + } + } + + DfcmPredictor dfcm_predictor; + FcmPredictor fcm_predictor; + std::array chunk{}; + std::span result{}; +}; + +} + +UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + dest[0] = static_cast(float_width); + dest[1] = static_cast(level); + dest[2] = static_cast(encodeEndianness(std::endian::native)); + + auto destination = std::as_writable_bytes(std::span(dest, source_size).subspan(HEADER_SIZE)); + auto src = std::as_bytes(std::span(source, source_size)); + switch (float_width) + { + case sizeof(Float64): + return HEADER_SIZE + FPCOperation(destination, level).encode(src); + case sizeof(Float32): + return HEADER_SIZE + FPCOperation(destination, level).encode(src); + default: + break; + } + throw Exception("Cannot compress. File has incorrect float width", ErrorCodes::CANNOT_COMPRESS); +} + +void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const +{ + if (source_size < HEADER_SIZE) + throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS); + + auto compressed_data = std::span(source, source_size); + if (static_cast(compressed_data[0]) != float_width) + throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); + if (static_cast(compressed_data[1]) != level) + throw Exception("Cannot decompress. File has incorrect compression level", ErrorCodes::CANNOT_DECOMPRESS); + if (decodeEndianness(static_cast(compressed_data[2])) != std::endian::native) + throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS); + + auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size)); + auto src = std::as_bytes(compressed_data.subspan(HEADER_SIZE)); + switch (float_width) + { + case sizeof(Float64): + FPCOperation(destination, level).decode(src, uncompressed_size); + break; + case sizeof(Float32): + FPCOperation(destination, level).decode(src, uncompressed_size); + break; + default: + break; + } +} + +} diff --git a/src/Compression/CompressionFactory.cpp b/src/Compression/CompressionFactory.cpp index abf5e38a8c3..b8a1c5877a4 100644 --- a/src/Compression/CompressionFactory.cpp +++ b/src/Compression/CompressionFactory.cpp @@ -177,6 +177,7 @@ void registerCodecT64(CompressionCodecFactory & factory); void registerCodecDoubleDelta(CompressionCodecFactory & factory); void registerCodecGorilla(CompressionCodecFactory & factory); void registerCodecEncrypted(CompressionCodecFactory & factory); +void registerCodecFPC(CompressionCodecFactory & factory); #endif @@ -194,6 +195,7 @@ CompressionCodecFactory::CompressionCodecFactory() registerCodecDoubleDelta(*this); registerCodecGorilla(*this); registerCodecEncrypted(*this); + registerCodecFPC(*this); #endif default_codec = get("LZ4", {}); diff --git a/src/Compression/CompressionInfo.h b/src/Compression/CompressionInfo.h index bbe8315f3ea..839fb68e8c3 100644 --- a/src/Compression/CompressionInfo.h +++ b/src/Compression/CompressionInfo.h @@ -44,7 +44,8 @@ enum class CompressionMethodByte : uint8_t DoubleDelta = 0x94, Gorilla = 0x95, AES_128_GCM_SIV = 0x96, - AES_256_GCM_SIV = 0x97 + AES_256_GCM_SIV = 0x97, + FPC = 0x98 }; } From adf888811cf036612ceb33ac245a47eb6a24e689 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 04:53:07 +0300 Subject: [PATCH 07/22] fixed max size computation --- src/Compression/CompressionCodecFPC.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 3c7aac794c0..0e50d62893c 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -72,7 +72,7 @@ UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) c if (float_count % 2 != 0) { ++float_count; } - return HEADER_SIZE + (float_count + float_count / 2) * float_width; + return HEADER_SIZE + float_count * float_width + float_count / 2; } namespace @@ -453,7 +453,8 @@ UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_si dest[1] = static_cast(level); dest[2] = static_cast(encodeEndianness(std::endian::native)); - auto destination = std::as_writable_bytes(std::span(dest, source_size).subspan(HEADER_SIZE)); + auto dest_size = getMaxCompressedDataSize(source_size); + auto destination = std::as_writable_bytes(std::span(dest, dest_size).subspan(HEADER_SIZE)); auto src = std::as_bytes(std::span(source, source_size)); switch (float_width) { From cd3e28e3b1e4c8881c642c29d3fcf59bbf25e621 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 06:53:48 +0300 Subject: [PATCH 08/22] fixed decoding parameters --- src/Compression/CompressionCodecFPC.cpp | 30 ++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 0e50d62893c..d6b96f8a2f2 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -124,11 +124,11 @@ void registerCodecFPC(CompressionCodecFactory & factory) auto method_code = static_cast(CompressionMethodByte::FPC); auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr { - if (!column_type) - { - throw Exception("FPC codec must have associated column", ErrorCodes::BAD_ARGUMENTS); - } - UInt8 level{0}; + UInt8 float_width{0}; + if (column_type != nullptr) + float_width = getFloatBytesSize(*column_type); + + UInt8 level{12}; if (arguments && !arguments->children.empty()) { if (arguments->children.size() > 1) @@ -139,13 +139,11 @@ void registerCodecFPC(CompressionCodecFactory & factory) const auto * literal = arguments->children.front()->as(); if (!literal) - { throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); - } level = literal->value.safeGet(); } - return std::make_shared(getFloatBytesSize(*column_type), level); + return std::make_shared(float_width, level); }; factory.registerCompressionCodecWithType("FPC", method_code, codec_builder); } @@ -474,26 +472,28 @@ void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_si throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS); auto compressed_data = std::span(source, source_size); - if (static_cast(compressed_data[0]) != float_width) - throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); - if (static_cast(compressed_data[1]) != level) - throw Exception("Cannot decompress. File has incorrect compression level", ErrorCodes::CANNOT_DECOMPRESS); if (decodeEndianness(static_cast(compressed_data[2])) != std::endian::native) throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS); + auto compressed_float_width = static_cast(compressed_data[0]); + auto compressed_level = static_cast(compressed_data[1]); + if (compressed_level == 0) + throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS); + auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size)); auto src = std::as_bytes(compressed_data.subspan(HEADER_SIZE)); - switch (float_width) + switch (compressed_float_width) { case sizeof(Float64): - FPCOperation(destination, level).decode(src, uncompressed_size); + FPCOperation(destination, compressed_level).decode(src, uncompressed_size); break; case sizeof(Float32): - FPCOperation(destination, level).decode(src, uncompressed_size); + FPCOperation(destination, compressed_level).decode(src, uncompressed_size); break; default: break; } + throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); } } From 1bfeb982af9bffea3c42ff2f87daa99ea016f83e Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 06:58:34 +0300 Subject: [PATCH 09/22] fixed decoding parameters --- src/Compression/CompressionCodecFPC.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index d6b96f8a2f2..269a935da4b 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -491,9 +491,8 @@ void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_si FPCOperation(destination, compressed_level).decode(src, uncompressed_size); break; default: - break; + throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); } - throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); } } From 4d41121c096918493b4f0ac05dbab4546b8c9331 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 10:58:00 +0300 Subject: [PATCH 10/22] added test query --- .../02313_test_fpc_codec.reference | 2 + .../0_stateless/02313_test_fpc_codec.sql | 61 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 tests/queries/0_stateless/02313_test_fpc_codec.reference create mode 100644 tests/queries/0_stateless/02313_test_fpc_codec.sql diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.reference b/tests/queries/0_stateless/02313_test_fpc_codec.reference new file mode 100644 index 00000000000..5e871ea0329 --- /dev/null +++ b/tests/queries/0_stateless/02313_test_fpc_codec.reference @@ -0,0 +1,2 @@ +F64 +F32 diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.sql b/tests/queries/0_stateless/02313_test_fpc_codec.sql new file mode 100644 index 00000000000..e077e59b07b --- /dev/null +++ b/tests/queries/0_stateless/02313_test_fpc_codec.sql @@ -0,0 +1,61 @@ +DROP TABLE IF EXISTS codecTest; + +CREATE TABLE codecTest ( + key UInt64, + name String, + ref_valueF64 Float64, + ref_valueF32 Float32, + valueF64 Float64 CODEC(FPC), + valueF32 Float32 CODEC(FPC) +) Engine = MergeTree ORDER BY key; + +-- best case - same value +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'e()', e() AS v, v, v, v FROM system.numbers LIMIT 1, 100; + +-- good case - values that grow insignificantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'log2(n)', log2(n) AS v, v, v, v FROM system.numbers LIMIT 101, 100; + +-- bad case - values differ significantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'n*sqrt(n)', n*sqrt(n) AS v, v, v, v FROM system.numbers LIMIT 201, 100; + +-- worst case - almost like a random values +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'sin(n*n*n)*n', sin(n * n * n * n* n) AS v, v, v, v FROM system.numbers LIMIT 301, 100; + + +-- These floating-point values are expected to be BINARY equal, so comparing by-value is Ok here. + +-- referencing previous row key, value, and case name to simplify debugging. +SELECT 'F64'; +SELECT + c1.key, c1.name, + c1.ref_valueF64, c1.valueF64, c1.ref_valueF64 - c1.valueF64 AS dF64, + 'prev:', + c2.key, c2.ref_valueF64 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF64 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + + +SELECT 'F32'; +SELECT + c1.key, c1.name, + c1.ref_valueF32, c1.valueF32, c1.ref_valueF32 - c1.valueF32 AS dF32, + 'prev:', + c2.key, c2.ref_valueF32 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF32 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + +DROP TABLE IF EXISTS codecTest; From 821100f145cf75c8ef3a3c30d16c4bafcc05378e Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 11:09:36 +0300 Subject: [PATCH 11/22] code style fixes --- src/Compression/CompressionCodecFPC.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 269a935da4b..0f68f512ad8 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -69,9 +69,8 @@ CompressionCodecFPC::CompressionCodecFPC(UInt8 float_size, UInt8 compression_lev UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) const { auto float_count = (uncompressed_size + float_width - 1) / float_width; - if (float_count % 2 != 0) { + if (float_count % 2 != 0) ++float_count; - } return HEADER_SIZE + float_count * float_width + float_count / 2; } @@ -106,7 +105,8 @@ UInt8 encodeEndianness(std::endian endian) throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); } -std::endian decodeEndianness(UInt8 endian) { +std::endian decodeEndianness(UInt8 endian) +{ switch (endian) { case 0: @@ -152,7 +152,8 @@ namespace { template requires (sizeof(TUint) >= 4) -class DfcmPredictor { +class DfcmPredictor +{ public: explicit DfcmPredictor(std::size_t table_size): table(table_size, 0), prev_value{0}, hash{0} { @@ -191,7 +192,8 @@ private: }; template requires (sizeof(TUint) >= 4) -class FcmPredictor { +class FcmPredictor +{ public: explicit FcmPredictor(std::size_t table_size): table(table_size, 0), hash{0} { From 7e69779575b712aae530c71f82f9e33f10e48d76 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 22:32:56 +0300 Subject: [PATCH 12/22] added fpc codec to float perftest --- src/Compression/CompressionCodecFPC.cpp | 2 + tests/performance/codecs_float_insert.xml | 1 + tests/performance/codecs_float_select.xml | 1 + .../02313_test_fpc_codec.reference | 2 + .../0_stateless/02313_test_fpc_codec.sql | 60 +++++++++++++++++++ 5 files changed, 66 insertions(+) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 0f68f512ad8..f3106204e01 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -142,6 +142,8 @@ void registerCodecFPC(CompressionCodecFactory & factory) throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); level = literal->value.safeGet(); + if (level == 0) + throw Exception("FPC codec level must be at least 1", ErrorCodes::ILLEGAL_CODEC_PARAMETER); } return std::make_shared(float_width, level); }; diff --git a/tests/performance/codecs_float_insert.xml b/tests/performance/codecs_float_insert.xml index b31e0eafdd7..64325d30189 100644 --- a/tests/performance/codecs_float_insert.xml +++ b/tests/performance/codecs_float_insert.xml @@ -12,6 +12,7 @@ ZSTD DoubleDelta Gorilla + FPC diff --git a/tests/performance/codecs_float_select.xml b/tests/performance/codecs_float_select.xml index 82489daf524..4743a756ac3 100644 --- a/tests/performance/codecs_float_select.xml +++ b/tests/performance/codecs_float_select.xml @@ -12,6 +12,7 @@ ZSTD DoubleDelta Gorilla + FPC diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.reference b/tests/queries/0_stateless/02313_test_fpc_codec.reference index 5e871ea0329..23c75ed1ac0 100644 --- a/tests/queries/0_stateless/02313_test_fpc_codec.reference +++ b/tests/queries/0_stateless/02313_test_fpc_codec.reference @@ -1,2 +1,4 @@ F64 F32 +F64 +F32 diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.sql b/tests/queries/0_stateless/02313_test_fpc_codec.sql index e077e59b07b..3b1127350f0 100644 --- a/tests/queries/0_stateless/02313_test_fpc_codec.sql +++ b/tests/queries/0_stateless/02313_test_fpc_codec.sql @@ -44,6 +44,66 @@ AND LIMIT 10; +SELECT 'F32'; +SELECT + c1.key, c1.name, + c1.ref_valueF32, c1.valueF32, c1.ref_valueF32 - c1.valueF32 AS dF32, + 'prev:', + c2.key, c2.ref_valueF32 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF32 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + +DROP TABLE IF EXISTS codecTest; + +CREATE TABLE codecTest ( + key UInt64, + name String, + ref_valueF64 Float64, + ref_valueF32 Float32, + valueF64 Float64 CODEC(FPC(4)), + valueF32 Float32 CODEC(FPC(4)) +) Engine = MergeTree ORDER BY key; + +-- best case - same value +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'e()', e() AS v, v, v, v FROM system.numbers LIMIT 1, 100; + +-- good case - values that grow insignificantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'log2(n)', log2(n) AS v, v, v, v FROM system.numbers LIMIT 101, 100; + +-- bad case - values differ significantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'n*sqrt(n)', n*sqrt(n) AS v, v, v, v FROM system.numbers LIMIT 201, 100; + +-- worst case - almost like a random values +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'sin(n*n*n)*n', sin(n * n * n * n* n) AS v, v, v, v FROM system.numbers LIMIT 301, 100; + + +-- These floating-point values are expected to be BINARY equal, so comparing by-value is Ok here. + +-- referencing previous row key, value, and case name to simplify debugging. +SELECT 'F64'; +SELECT + c1.key, c1.name, + c1.ref_valueF64, c1.valueF64, c1.ref_valueF64 - c1.valueF64 AS dF64, + 'prev:', + c2.key, c2.ref_valueF64 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF64 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + + SELECT 'F32'; SELECT c1.key, c1.name, From 0e63583b8f100a04af554aa086dd83167ae64aa6 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Tue, 31 May 2022 00:10:47 +0000 Subject: [PATCH 13/22] Support types with non-standard defaults in ROLLUP, CUBE, GROUPING SETS --- src/Columns/IColumn.h | 2 +- src/DataTypes/IDataType.cpp | 7 ++ src/DataTypes/IDataType.h | 2 + src/Processors/QueryPlan/AggregatingStep.cpp | 4 +- src/Processors/Transforms/CubeTransform.cpp | 5 +- src/Processors/Transforms/RollupTransform.cpp | 10 +- ...modifiers_with_non-default_types.reference | 113 ++++++++++++++++++ ...up_by_modifiers_with_non-default_types.sql | 39 ++++++ 8 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.reference create mode 100644 tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.sql diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index f62f6c444b3..a99d4172e5b 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -90,7 +90,7 @@ public: /// Creates column with the same type and specified size. /// If size is less current size, then data is cut. /// If size is greater, than default values are appended. - [[nodiscard]] virtual MutablePtr cloneResized(size_t /*size*/) const { throw Exception("Cannot cloneResized() column " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + [[nodiscard]] virtual MutablePtr cloneResized(size_t /*size*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot cloneResized() column {}", getName()); } /// Returns number of values in column. [[nodiscard]] virtual size_t size() const = 0; diff --git a/src/DataTypes/IDataType.cpp b/src/DataTypes/IDataType.cpp index 0976233c031..f2bb878a533 100644 --- a/src/DataTypes/IDataType.cpp +++ b/src/DataTypes/IDataType.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -162,6 +163,12 @@ void IDataType::insertDefaultInto(IColumn & column) const column.insertDefault(); } +void IDataType::insertManyDefaultsInto(IColumn & column, size_t n) const +{ + for (size_t i = 0; i < n; ++i) + insertDefaultInto(column); +} + void IDataType::setCustomization(DataTypeCustomDescPtr custom_desc_) const { /// replace only if not null diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index fc9e50dc55b..420ef61a13f 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -159,6 +159,8 @@ public: */ virtual void insertDefaultInto(IColumn & column) const; + void insertManyDefaultsInto(IColumn & column, size_t n) const; + /// Checks that two instances belong to the same type virtual bool equals(const IDataType & rhs) const = 0; diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 4114eff5c56..17a0498fb7e 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -241,7 +241,9 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B if (missign_column_index < missing_columns.size() && missing_columns[missign_column_index] == i) { ++missign_column_index; - auto column = ColumnConst::create(col.column->cloneResized(1), 0); + auto column_with_default = col.column->cloneEmpty(); + col.type->insertDefaultInto(*column_with_default); + auto column = ColumnConst::create(std::move(column_with_default), 0); const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name}); node = &dag->materializeNode(*node); index.push_back(node); diff --git a/src/Processors/Transforms/CubeTransform.cpp b/src/Processors/Transforms/CubeTransform.cpp index c699e724ffc..83ed346dabe 100644 --- a/src/Processors/Transforms/CubeTransform.cpp +++ b/src/Processors/Transforms/CubeTransform.cpp @@ -35,6 +35,8 @@ void CubeTransform::consume(Chunk chunk) consumed_chunks.emplace_back(std::move(chunk)); } +MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n); + Chunk CubeTransform::generate() { if (!consumed_chunks.empty()) @@ -53,8 +55,9 @@ Chunk CubeTransform::generate() current_zero_columns.clear(); current_zero_columns.reserve(keys.size()); + auto const & input_header = getInputPort().getHeader(); for (auto key : keys) - current_zero_columns.emplace_back(current_columns[key]->cloneEmpty()->cloneResized(num_rows)); + current_zero_columns.emplace_back(getColumnWithDefaults(input_header, key, num_rows)); } auto gen_chunk = std::move(cube_chunk); diff --git a/src/Processors/Transforms/RollupTransform.cpp b/src/Processors/Transforms/RollupTransform.cpp index db60b99102d..b69a691323c 100644 --- a/src/Processors/Transforms/RollupTransform.cpp +++ b/src/Processors/Transforms/RollupTransform.cpp @@ -29,6 +29,14 @@ Chunk RollupTransform::merge(Chunks && chunks, bool final) return Chunk(rollup_block.getColumns(), num_rows); } +MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n) +{ + auto const & col = header.getByPosition(key); + auto result_column = col.column->cloneEmpty(); + col.type->insertManyDefaultsInto(*result_column, n); + return result_column; +} + Chunk RollupTransform::generate() { if (!consumed_chunks.empty()) @@ -51,7 +59,7 @@ Chunk RollupTransform::generate() auto num_rows = gen_chunk.getNumRows(); auto columns = gen_chunk.getColumns(); - columns[key] = columns[key]->cloneEmpty()->cloneResized(num_rows); + columns[key] = getColumnWithDefaults(getInputPort().getHeader(), key, num_rows); Chunks chunks; chunks.emplace_back(std::move(columns), num_rows); diff --git a/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.reference b/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.reference new file mode 100644 index 00000000000..183c63d1222 --- /dev/null +++ b/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.reference @@ -0,0 +1,113 @@ +-- { echoOn } +SELECT + count() as d, a, b, c +FROM test02313 +GROUP BY ROLLUP(a, b, c) +ORDER BY d, a, b, c; +1 one default 0 +1 one default 2 +1 one default 4 +1 one default 6 +1 one default 8 +1 two non-default 1 +1 two non-default 3 +1 two non-default 5 +1 two non-default 7 +1 two non-default 9 +5 one default 0 +5 one default 0 +5 two default 0 +5 two non-default 0 +10 one default 0 +SELECT + count() as d, a, b, c +FROM test02313 +GROUP BY CUBE(a, b, c) +ORDER BY d, a, b, c; +1 one default 0 +1 one default 0 +1 one default 0 +1 one default 0 +1 one default 1 +1 one default 2 +1 one default 2 +1 one default 2 +1 one default 2 +1 one default 3 +1 one default 4 +1 one default 4 +1 one default 4 +1 one default 4 +1 one default 5 +1 one default 6 +1 one default 6 +1 one default 6 +1 one default 6 +1 one default 7 +1 one default 8 +1 one default 8 +1 one default 8 +1 one default 8 +1 one default 9 +1 one non-default 1 +1 one non-default 3 +1 one non-default 5 +1 one non-default 7 +1 one non-default 9 +1 two default 1 +1 two default 3 +1 two default 5 +1 two default 7 +1 two default 9 +1 two non-default 1 +1 two non-default 3 +1 two non-default 5 +1 two non-default 7 +1 two non-default 9 +5 one default 0 +5 one default 0 +5 one default 0 +5 one non-default 0 +5 two default 0 +5 two non-default 0 +10 one default 0 +SELECT + count() as d, a, b, c +FROM test02313 +GROUP BY GROUPING SETS + ( + (c), + (a, c), + (b, c) + ) +ORDER BY d, a, b, c; +1 one default 0 +1 one default 0 +1 one default 0 +1 one default 1 +1 one default 2 +1 one default 2 +1 one default 2 +1 one default 3 +1 one default 4 +1 one default 4 +1 one default 4 +1 one default 5 +1 one default 6 +1 one default 6 +1 one default 6 +1 one default 7 +1 one default 8 +1 one default 8 +1 one default 8 +1 one default 9 +1 one non-default 1 +1 one non-default 3 +1 one non-default 5 +1 one non-default 7 +1 one non-default 9 +1 two default 1 +1 two default 3 +1 two default 5 +1 two default 7 +1 two default 9 diff --git a/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.sql b/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.sql new file mode 100644 index 00000000000..d30cc930429 --- /dev/null +++ b/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.sql @@ -0,0 +1,39 @@ +DROP TABLE IF EXISTS test02313; + +CREATE TABLE test02313 +( + a Enum('one' = 1, 'two' = 2), + b Enum('default' = 0, 'non-default' = 1), + c UInt8 +) +ENGINE = MergeTree() +ORDER BY (a, b, c); + +INSERT INTO test02313 SELECT number % 2 + 1 AS a, number % 2 AS b, number FROM numbers(10); + +-- { echoOn } +SELECT + count() as d, a, b, c +FROM test02313 +GROUP BY ROLLUP(a, b, c) +ORDER BY d, a, b, c; + +SELECT + count() as d, a, b, c +FROM test02313 +GROUP BY CUBE(a, b, c) +ORDER BY d, a, b, c; + +SELECT + count() as d, a, b, c +FROM test02313 +GROUP BY GROUPING SETS + ( + (c), + (a, c), + (b, c) + ) +ORDER BY d, a, b, c; + +-- { echoOff } +DROP TABLE test02313; From d5064dd5b5480f93966f847e4752d620eebc6656 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Wed, 8 Jun 2022 02:17:34 +0300 Subject: [PATCH 14/22] so many improvements --- src/Compression/CompressionCodecFPC.cpp | 129 +++++++++++++----------- 1 file changed, 69 insertions(+), 60 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index f3106204e01..3b66060b6fc 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -17,12 +17,15 @@ namespace DB class CompressionCodecFPC : public ICompressionCodec { public: - explicit CompressionCodecFPC(UInt8 float_size, UInt8 compression_level); + CompressionCodecFPC(UInt8 float_size, UInt8 compression_level); uint8_t getMethodByte() const override; void updateHash(SipHash & hash) const override; + static constexpr UInt8 MAX_COMPRESSION_LEVEL{28}; + static constexpr UInt8 DEFAULT_COMPRESSION_LEVEL{12}; + protected: UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; @@ -33,11 +36,11 @@ protected: bool isCompression() const override { return true; } bool isGenericCompression() const override { return false; } +private: static constexpr UInt32 HEADER_SIZE{3}; -private: - UInt8 float_width; - UInt8 level; + UInt8 float_width; // size of uncompressed float in bytes + UInt8 level; // compression level, 2^level * float_width is the size of predictors table in bytes }; @@ -93,21 +96,21 @@ UInt8 getFloatBytesSize(const IDataType & column_type) column_type.getName()); } -UInt8 encodeEndianness(std::endian endian) +std::byte encodeEndianness(std::endian endian) { switch (endian) { case std::endian::little: - return 0; + return std::byte{0}; case std::endian::big: - return 1; + return std::byte{1}; } throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); } -std::endian decodeEndianness(UInt8 endian) +std::endian decodeEndianness(std::byte endian) { - switch (endian) + switch (std::to_integer(endian)) { case 0: return std::endian::little; @@ -128,7 +131,7 @@ void registerCodecFPC(CompressionCodecFactory & factory) if (column_type != nullptr) float_width = getFloatBytesSize(*column_type); - UInt8 level{12}; + UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL; if (arguments && !arguments->children.empty()) { if (arguments->children.size() > 1) @@ -144,6 +147,8 @@ void registerCodecFPC(CompressionCodecFactory & factory) level = literal->value.safeGet(); if (level == 0) throw Exception("FPC codec level must be at least 1", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + if (level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL) + throw Exception("FPC codec level must be at most 28", ErrorCodes::ILLEGAL_CODEC_PARAMETER); } return std::make_shared(float_width, level); }; @@ -153,7 +158,8 @@ void registerCodecFPC(CompressionCodecFactory & factory) namespace { -template requires (sizeof(TUint) >= 4) +template + requires (sizeof(TUint) >= 4) class DfcmPredictor { public: @@ -189,11 +195,12 @@ private: } std::vector table; - TUint prev_value{0}; - std::size_t hash{0}; + TUint prev_value; + std::size_t hash; }; -template requires (sizeof(TUint) >= 4) +template + requires (sizeof(TUint) >= 4) class FcmPredictor { public: @@ -228,7 +235,7 @@ private: } std::vector table; - std::size_t hash{0}; + std::size_t hash; }; template @@ -238,13 +245,15 @@ class FPCOperation static constexpr std::size_t CHUNK_SIZE{64}; static constexpr auto VALUE_SIZE = sizeof(TUint); - static constexpr std::byte DFCM_BIT_1{1u << 7}; - static constexpr std::byte DFCM_BIT_2{1u << 3}; - static constexpr unsigned MAX_COMPRESSED_SIZE{0b111u}; + static constexpr std::byte FCM_BIT{0}; + static constexpr std::byte DFCM_BIT{1u << 3}; + static constexpr auto DFCM_BIT_1 = DFCM_BIT << 4; + static constexpr auto DFCM_BIT_2 = DFCM_BIT; + static constexpr unsigned MAX_ZERO_BYTE_COUNT{0b111u}; public: - explicit FPCOperation(std::span destination, UInt8 compression_level) - : dfcm_predictor(1 << compression_level), fcm_predictor(1 << compression_level), chunk{}, result{destination} + FPCOperation(std::span destination, UInt8 compression_level) + : dfcm_predictor(1u << compression_level), fcm_predictor(1u << compression_level), chunk{}, result{destination} { } @@ -317,22 +326,22 @@ private: { TUint value; unsigned compressed_size; - bool is_dfcm_predictor; + std::byte predictor; }; - unsigned encodeCompressedSize(int compressed) + unsigned encodeCompressedZeroByteCount(int compressed) { - if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1) { if (compressed >= 4) --compressed; } - return std::min(static_cast(compressed), MAX_COMPRESSED_SIZE); + return std::min(static_cast(compressed), MAX_ZERO_BYTE_COUNT); } - unsigned decodeCompressedSize(unsigned encoded_size) + unsigned decodeCompressedZeroByteCount(unsigned encoded_size) { - if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1) { if (encoded_size > 3) ++encoded_size; @@ -342,6 +351,8 @@ private: CompressedValue compressValue(TUint value) noexcept { + static constexpr auto BITS_PER_BYTE = std::numeric_limits::digits; + TUint compressed_dfcm = dfcm_predictor.predict() ^ value; TUint compressed_fcm = fcm_predictor.predict() ^ value; dfcm_predictor.add(value); @@ -349,29 +360,26 @@ private: auto zeroes_dfcm = std::countl_zero(compressed_dfcm); auto zeroes_fcm = std::countl_zero(compressed_fcm); if (zeroes_dfcm > zeroes_fcm) - return {compressed_dfcm, encodeCompressedSize(zeroes_dfcm / CHAR_BIT), true}; - return {compressed_fcm, encodeCompressedSize(zeroes_fcm / CHAR_BIT), false}; + return {compressed_dfcm, encodeCompressedZeroByteCount(zeroes_dfcm / BITS_PER_BYTE), DFCM_BIT}; + return {compressed_fcm, encodeCompressedZeroByteCount(zeroes_fcm / BITS_PER_BYTE), FCM_BIT}; } void encodePair(TUint first, TUint second) { - auto [value1, compressed_size1, is_dfcm_predictor1] = compressValue(first); - auto [value2, compressed_size2, is_dfcm_predictor2] = compressValue(second); + auto [value1, zero_byte_count1, predictor1] = compressValue(first); + auto [value2, zero_byte_count2, predictor2] = compressValue(second); std::byte header{0x0}; - if (is_dfcm_predictor1) - header |= DFCM_BIT_1; - if (is_dfcm_predictor2) - header |= DFCM_BIT_2; - header |= static_cast((compressed_size1 << 4) | compressed_size2); + header |= (predictor1 << 4) | predictor2; + header |= static_cast((zero_byte_count1 << 4) | zero_byte_count2); result.front() = header; - compressed_size1 = decodeCompressedSize(compressed_size1); - compressed_size2 = decodeCompressedSize(compressed_size2); - auto tail_size1 = VALUE_SIZE - compressed_size1; - auto tail_size2 = VALUE_SIZE - compressed_size2; + zero_byte_count1 = decodeCompressedZeroByteCount(zero_byte_count1); + zero_byte_count2 = decodeCompressedZeroByteCount(zero_byte_count2); + auto tail_size1 = VALUE_SIZE - zero_byte_count1; + auto tail_size2 = VALUE_SIZE - zero_byte_count2; - std::memcpy(result.data() + 1, valueTail(value1, compressed_size1), tail_size1); - std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, compressed_size2), tail_size2); + std::memcpy(result.data() + 1, valueTail(value1, zero_byte_count1), tail_size1); + std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, zero_byte_count2), tail_size2); result = result.subspan(1 + tail_size1 + tail_size2); } @@ -406,11 +414,13 @@ private: if (bytes.empty()) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); - auto compressed_size1 = decodeCompressedSize(static_cast(bytes.front() >> 4) & MAX_COMPRESSED_SIZE); - auto compressed_size2 = decodeCompressedSize(static_cast(bytes.front()) & MAX_COMPRESSED_SIZE); + auto zero_byte_count1 = decodeCompressedZeroByteCount( + std::to_integer(bytes.front() >> 4) & MAX_ZERO_BYTE_COUNT); + auto zero_byte_count2 = decodeCompressedZeroByteCount( + std::to_integer(bytes.front()) & MAX_ZERO_BYTE_COUNT); - auto tail_size1 = VALUE_SIZE - compressed_size1; - auto tail_size2 = VALUE_SIZE - compressed_size2; + auto tail_size1 = VALUE_SIZE - zero_byte_count1; + auto tail_size2 = VALUE_SIZE - zero_byte_count2; if (bytes.size() < 1 + tail_size1 + tail_size2) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); @@ -418,13 +428,13 @@ private: TUint value1{0}; TUint value2{0}; - std::memcpy(valueTail(value1, compressed_size1), bytes.data() + 1, tail_size1); - std::memcpy(valueTail(value2, compressed_size2), bytes.data() + 1 + tail_size1, tail_size2); + std::memcpy(valueTail(value1, zero_byte_count1), bytes.data() + 1, tail_size1); + std::memcpy(valueTail(value2, zero_byte_count2), bytes.data() + 1 + tail_size1, tail_size2); - auto is_dfcm_predictor1 = static_cast(bytes.front() & DFCM_BIT_1); - auto is_dfcm_predictor2 = static_cast(bytes.front() & DFCM_BIT_2); - first = decompressValue(value1, is_dfcm_predictor1 != 0); - second = decompressValue(value2, is_dfcm_predictor2 != 0); + auto is_dfcm_predictor1 = std::to_integer(bytes.front() & DFCM_BIT_1) != 0; + auto is_dfcm_predictor2 = std::to_integer(bytes.front() & DFCM_BIT_2) != 0; + first = decompressValue(value1, is_dfcm_predictor1); + second = decompressValue(value2, is_dfcm_predictor2); return 1 + tail_size1 + tail_size2; } @@ -453,7 +463,7 @@ UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_si { dest[0] = static_cast(float_width); dest[1] = static_cast(level); - dest[2] = static_cast(encodeEndianness(std::endian::native)); + dest[2] = std::to_integer(encodeEndianness(std::endian::native)); auto dest_size = getMaxCompressedDataSize(source_size); auto destination = std::as_writable_bytes(std::span(dest, dest_size).subspan(HEADER_SIZE)); @@ -475,17 +485,16 @@ void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_si if (source_size < HEADER_SIZE) throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS); - auto compressed_data = std::span(source, source_size); - if (decodeEndianness(static_cast(compressed_data[2])) != std::endian::native) + auto compressed_data = std::as_bytes(std::span(source, source_size)); + auto compressed_float_width = std::to_integer(compressed_data[0]); + auto compressed_level = std::to_integer(compressed_data[1]); + if (compressed_level == 0 || compressed_level > MAX_COMPRESSION_LEVEL) + throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS); + if (decodeEndianness(compressed_data[2]) != std::endian::native) throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS); - auto compressed_float_width = static_cast(compressed_data[0]); - auto compressed_level = static_cast(compressed_data[1]); - if (compressed_level == 0) - throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS); - auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size)); - auto src = std::as_bytes(compressed_data.subspan(HEADER_SIZE)); + auto src = compressed_data.subspan(HEADER_SIZE); switch (compressed_float_width) { case sizeof(Float64): From e701adbd2828895763c6d2d6cbb735a1a74e1922 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Tue, 14 Jun 2022 16:22:41 +0200 Subject: [PATCH 15/22] Use workflow URL as "link" ref --- tests/ci/build_report_check.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ci/build_report_check.py b/tests/ci/build_report_check.py index b2d54eadd60..1ba91a38a60 100644 --- a/tests/ci/build_report_check.py +++ b/tests/ci/build_report_check.py @@ -93,7 +93,7 @@ def get_failed_report( elapsed_seconds=0, with_coverage=False, ) - return [build_result], [[""]], [""] + return [build_result], [[""]], [GITHUB_RUN_URL] def process_report( From dc2e117cce48323e8c7da31278c5c68f75f63f6f Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 14 Jun 2022 17:30:11 +0200 Subject: [PATCH 16/22] UnaryLogicalFunctions improve performance using dynamic dispatch --- src/Functions/FunctionsLogical.cpp | 69 ++++++++----------- tests/performance/unary_logical_functions.xml | 31 +++++++++ 2 files changed, 59 insertions(+), 41 deletions(-) create mode 100644 tests/performance/unary_logical_functions.xml diff --git a/src/Functions/FunctionsLogical.cpp b/src/Functions/FunctionsLogical.cpp index b615f52652c..be295186943 100644 --- a/src/Functions/FunctionsLogical.cpp +++ b/src/Functions/FunctionsLogical.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -50,17 +51,15 @@ MutableColumnPtr buildColumnFromTernaryData(const UInt8Container & ternary_data, const size_t rows_count = ternary_data.size(); auto new_column = ColumnUInt8::create(rows_count); - std::transform( - ternary_data.cbegin(), ternary_data.cend(), new_column->getData().begin(), - [](const auto x) { return x == Ternary::True; }); + for (size_t i = 0; i < rows_count; ++i) + new_column->getData()[i] = (ternary_data[i] == Ternary::True); if (!make_nullable) return new_column; auto null_column = ColumnUInt8::create(rows_count); - std::transform( - ternary_data.cbegin(), ternary_data.cend(), null_column->getData().begin(), - [](const auto x) { return x == Ternary::Null; }); + for (size_t i = 0; i < rows_count; ++i) + null_column->getData()[i] = (ternary_data[i] == Ternary::Null); return ColumnNullable::create(std::move(new_column), std::move(null_column)); } @@ -68,13 +67,14 @@ MutableColumnPtr buildColumnFromTernaryData(const UInt8Container & ternary_data, template bool tryConvertColumnToBool(const IColumn * column, UInt8Container & res) { - const auto col = checkAndGetColumn>(column); - if (!col) + const auto column_typed = checkAndGetColumn>(column); + if (!column_typed) return false; - std::transform( - col->getData().cbegin(), col->getData().cend(), res.begin(), - [](const auto x) { return !!x; }); + auto & data = column_typed->getData(); + size_t data_size = data.size(); + for (size_t i = 0; i < data_size; ++i) + res[i] = static_cast(data[i]); return true; } @@ -99,7 +99,7 @@ bool extractConstColumns(ColumnRawPtrs & in, UInt8 & res, Func && func) { bool has_res = false; - for (int i = static_cast(in.size()) - 1; i >= 0; --i) + for (Int64 i = static_cast(in.size()) - 1; i >= 0; --i) { UInt8 x; @@ -458,7 +458,9 @@ ColumnPtr basicExecuteImpl(ColumnRawPtrs arguments, size_t input_rows_count) for (const IColumn * column : arguments) { if (const auto * uint8_column = checkAndGetColumn(column)) + { uint8_args.push_back(uint8_column); + } else { auto converted_column = ColumnUInt8::create(input_rows_count); @@ -596,14 +598,14 @@ ColumnPtr FunctionAnyArityLogical::executeShortCircuit(ColumnsWithTy if (nulls) applyTernaryLogic(mask, *nulls); - MutableColumnPtr res = ColumnUInt8::create(); - typeid_cast(res.get())->getData() = std::move(mask); + auto res = ColumnUInt8::create(); + res->getData() = std::move(mask); if (!nulls) return res; - MutableColumnPtr bytemap = ColumnUInt8::create(); - typeid_cast(bytemap.get())->getData() = std::move(*nulls); + auto bytemap = ColumnUInt8::create(); + bytemap->getData() = std::move(*nulls); return ColumnNullable::create(std::move(res), std::move(bytemap)); } @@ -692,29 +694,14 @@ ColumnPtr FunctionAnyArityLogical::getConstantResultForNonConstArgum return result_column; } -template -struct UnaryOperationImpl -{ - using ResultType = typename Op::ResultType; - using ArrayA = typename ColumnVector::Container; - using ArrayC = typename ColumnVector::Container; - - static void NO_INLINE vector(const ArrayA & a, ArrayC & c) - { - std::transform( - a.cbegin(), a.cend(), c.begin(), - [](const auto x) { return Op::apply(x); }); - } -}; - template