From b2e52f80cd81bcff9d9b9b4900b8b2e3233e4e49 Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 3 Apr 2023 19:36:43 +0000 Subject: [PATCH 1/2] Fix serializing LowCardinality as Arrow dictionary --- .../Formats/Impl/ArrowBlockOutputFormat.cpp | 1 + .../Formats/Impl/CHColumnToArrowColumn.cpp | 127 ++++++++++++++---- .../Formats/Impl/CHColumnToArrowColumn.h | 2 +- ...706_arrow_different_dictionaries.reference | 20 +++ .../02706_arrow_different_dictionaries.sh | 9 ++ 5 files changed, 132 insertions(+), 27 deletions(-) create mode 100644 tests/queries/0_stateless/02706_arrow_different_dictionaries.reference create mode 100755 tests/queries/0_stateless/02706_arrow_different_dictionaries.sh diff --git a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp index c85c0342c8c..b5dc074c1e6 100644 --- a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp @@ -98,6 +98,7 @@ void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr arrow::Result> writer_status; arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults(); options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method)); + options.emit_dictionary_deltas = true; // TODO: should we use arrow::ipc::IpcOptions::alignment? if (stream) diff --git a/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp index 8698b343eb3..86dc05a4af8 100644 --- a/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp +++ b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp @@ -194,7 +194,7 @@ namespace DB size_t end, bool output_string_as_string, bool output_fixed_string_as_fixed_byte_array, - std::unordered_map> & dictionary_values); + std::unordered_map & dictionary_values); template static void fillArrowArrayWithArrayColumnData( @@ -208,7 +208,7 @@ namespace DB size_t end, bool output_string_as_string, bool output_fixed_string_as_fixed_byte_array, - std::unordered_map> & dictionary_values) + std::unordered_map & dictionary_values) { const auto * column_array = assert_cast(column.get()); ColumnPtr nested_column = column_array->getDataPtr(); @@ -239,7 +239,7 @@ namespace DB size_t end, bool output_string_as_string, bool output_fixed_string_as_fixed_byte_array, - std::unordered_map> & dictionary_values) + std::unordered_map & dictionary_values) { const auto * column_tuple = assert_cast(column.get()); const auto * type_tuple = assert_cast(column_type.get()); @@ -270,7 +270,7 @@ namespace DB } template - static PaddedPODArray extractIndexesImpl(ColumnPtr column, size_t start, size_t end, bool shift) + static PaddedPODArray extractIndexes(ColumnPtr column, size_t start, size_t end, bool shift) { const PaddedPODArray & data = assert_cast *>(column.get())->getData(); PaddedPODArray result; @@ -282,23 +282,72 @@ namespace DB return result; } - static PaddedPODArray extractIndexesImpl(ColumnPtr column, size_t start, size_t end, bool shift) + static PaddedPODArray extractIndexes(ColumnPtr column, size_t start, size_t end, bool shift) { switch (column->getDataType()) { case TypeIndex::UInt8: - return extractIndexesImpl(column, start, end, shift); + return extractIndexes(column, start, end, shift); case TypeIndex::UInt16: - return extractIndexesImpl(column, start, end, shift); + return extractIndexes(column, start, end, shift); case TypeIndex::UInt32: - return extractIndexesImpl(column, start, end, shift); + return extractIndexes(column, start, end, shift); case TypeIndex::UInt64: - return extractIndexesImpl(column, start, end, shift); + return extractIndexes(column, start, end, shift); default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", column->getName()); } } + template + static PaddedPODArray extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift) + { + const PaddedPODArray & indexes_data = assert_cast *>(indexes.get())->getData(); + const PaddedPODArray & mapping_data = assert_cast *>(mapping.get())->getData(); + PaddedPODArray result; + result.reserve(end - start); + if (shift) + std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)] - 1; }); + else + std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)]; }); + return result; + } + + template + static PaddedPODArray extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift) + { + switch (mapping->getDataType()) + { + case TypeIndex::UInt8: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + case TypeIndex::UInt16: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + case TypeIndex::UInt32: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + case TypeIndex::UInt64: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", indexes->getName()); + } + } + + static PaddedPODArray extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift) + { + switch (indexes->getDataType()) + { + case TypeIndex::UInt8: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + case TypeIndex::UInt16: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + case TypeIndex::UInt32: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + case TypeIndex::UInt64: + return extractIndexesWithRemapping(indexes, mapping, start, end, shift); + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", indexes->getName()); + } + } + template static void fillArrowArrayWithLowCardinalityColumnDataImpl( const String & column_name, @@ -311,35 +360,61 @@ namespace DB size_t end, bool output_string_as_string, bool output_fixed_string_as_fixed_byte_array, - std::unordered_map> & dictionary_values) + std::unordered_map & dictionary_values) { const auto * column_lc = assert_cast(column.get()); arrow::DictionaryBuilder * builder = assert_cast *>(array_builder); auto & dict_values = dictionary_values[column_name]; bool is_nullable = column_type->isLowCardinalityNullable(); - /// Convert dictionary from LowCardinality to Arrow dictionary only once and then reuse it. + ColumnPtr mapping; if (!dict_values) { - auto value_type = assert_cast(builder->type().get())->value_type(); - std::unique_ptr values_builder; - arrow::MemoryPool* pool = arrow::default_memory_pool(); - arrow::Status status = MakeBuilder(pool, value_type, &values_builder); - checkStatus(status, column->getName(), format_name); - - auto dict_column = column_lc->getDictionary().getNestedNotNullableColumn(); - const auto & dict_type = removeNullable(assert_cast(column_type.get())->getDictionaryType()); - fillArrowArray(column_name, dict_column, dict_type, nullptr, values_builder.get(), format_name, is_nullable, dict_column->size(), output_string_as_string, output_fixed_string_as_fixed_byte_array, dictionary_values); - status = values_builder->Finish(&dict_values); - checkStatus(status, column->getName(), format_name); + /// On first time just remember the first dictionary + dict_values = IColumn::mutate(column_lc->getDictionaryPtr()); + } + else + { + /// In ClickHouse blocks with same header can contain LowCardinality columns with + /// different dictionaries. + /// Arrow supports only single dictionary for all batches, but it allows to extend + /// dictionary if previous dictionary is a prefix of a new one. + /// So, if new LowCardinality column has different dictionary + /// we extend previous one by using IColumnUnique::uniqueInsertRangeFrom + /// and then remap indexes so they match with the new extended dictionary. + const auto & new_dict = column_lc->getDictionary(); + auto & dict = dynamic_cast(*dict_values); + if (dict.getHash() != new_dict.getHash()) + { + const auto & new_values = new_dict.getNestedColumn(); + mapping = dict.uniqueInsertRangeFrom(*new_values, 0, new_values->size()); + } } - arrow::Status status = builder->InsertMemoValues(*dict_values); + /// Convert dictionary values to arrow array. + auto value_type = assert_cast(builder->type().get())->value_type(); + std::unique_ptr values_builder; + arrow::MemoryPool* pool = arrow::default_memory_pool(); + arrow::Status status = MakeBuilder(pool, value_type, &values_builder); + checkStatus(status, column->getName(), format_name); + + auto dict_column = dynamic_cast(*dict_values).getNestedNotNullableColumn(); + const auto & dict_type = removeNullable(assert_cast(column_type.get())->getDictionaryType()); + fillArrowArray(column_name, dict_column, dict_type, nullptr, values_builder.get(), format_name, is_nullable, dict_column->size(), output_string_as_string, output_fixed_string_as_fixed_byte_array, dictionary_values); + std::shared_ptr arrow_dict_array; + status = values_builder->Finish(&arrow_dict_array); + checkStatus(status, column->getName(), format_name); + + status = builder->InsertMemoValues(*arrow_dict_array); checkStatus(status, column->getName(), format_name); /// AppendIndices in DictionaryBuilder works only with int64_t data, so we cannot use /// fillArrowArray here and should copy all indexes to int64_t container. - auto indexes = extractIndexesImpl(column_lc->getIndexesPtr(), start, end, is_nullable); + PaddedPODArray indexes; + if (mapping) + indexes = extractIndexesWithRemapping(column_lc->getIndexesPtr(), mapping, start, end, is_nullable); + else + indexes = extractIndexes(column_lc->getIndexesPtr(), start, end, is_nullable); const uint8_t * arrow_null_bytemap_raw_ptr = nullptr; PaddedPODArray arrow_null_bytemap; if (column_type->isLowCardinalityNullable()) @@ -367,7 +442,7 @@ namespace DB size_t end, bool output_string_as_string, bool output_fixed_string_as_fixed_byte_array, - std::unordered_map> & dictionary_values) + std::unordered_map & dictionary_values) { auto value_type = assert_cast(array_builder->type().get())->value_type(); @@ -552,7 +627,7 @@ namespace DB size_t end, bool output_string_as_string, bool output_fixed_string_as_fixed_byte_array, - std::unordered_map> & dictionary_values) + std::unordered_map & dictionary_values) { const String column_type_name = column_type->getFamilyName(); diff --git a/src/Processors/Formats/Impl/CHColumnToArrowColumn.h b/src/Processors/Formats/Impl/CHColumnToArrowColumn.h index 62fdcaa8086..3649d0eed9b 100644 --- a/src/Processors/Formats/Impl/CHColumnToArrowColumn.h +++ b/src/Processors/Formats/Impl/CHColumnToArrowColumn.h @@ -26,7 +26,7 @@ private: /// Map {column name : arrow dictionary}. /// To avoid converting dictionary from LowCardinality to Arrow /// Dictionary every chunk we save it and reuse. - std::unordered_map> dictionary_values; + std::unordered_map dictionary_values; /// We should initialize arrow fields on first call of chChunkToArrowTable, not in constructor /// because LowCardinality column from header always has indexes type UInt8, so, we should get diff --git a/tests/queries/0_stateless/02706_arrow_different_dictionaries.reference b/tests/queries/0_stateless/02706_arrow_different_dictionaries.reference new file mode 100644 index 00000000000..7b36cc96f5e --- /dev/null +++ b/tests/queries/0_stateless/02706_arrow_different_dictionaries.reference @@ -0,0 +1,20 @@ +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 diff --git a/tests/queries/0_stateless/02706_arrow_different_dictionaries.sh b/tests/queries/0_stateless/02706_arrow_different_dictionaries.sh new file mode 100755 index 00000000000..b755696aec2 --- /dev/null +++ b/tests/queries/0_stateless/02706_arrow_different_dictionaries.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_LOCAL -q "select toLowCardinality(toString(number % 10)) as x from numbers(20) format Arrow settings max_block_size=7, output_format_arrow_low_cardinality_as_dictionary=1" | $CLICKHOUSE_LOCAL -q "select * from table" --input-format='Arrow' + From 5c9b404c6eef9a81da5d001c1cb14a268289e3be Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Mon, 17 Apr 2023 14:02:07 +0200 Subject: [PATCH 2/2] Update src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp Co-authored-by: Yakov Olkhovskiy <99031427+yakov-olkhovskiy@users.noreply.github.com> --- src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp index ce7bb98e148..19b2dcccf64 100644 --- a/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp +++ b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp @@ -313,10 +313,7 @@ namespace DB const PaddedPODArray & mapping_data = assert_cast *>(mapping.get())->getData(); PaddedPODArray result; result.reserve(end - start); - if (shift) - std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)] - 1; }); - else - std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)]; }); + std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)] - shift; }); return result; }