From bbe8e2d751e2445afc07d97e18625d53868b2235 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Sun, 7 Apr 2024 20:35:27 +0300 Subject: [PATCH 1/2] Arrow schema to ClickHouse schema Nullable fix --- .../Formats/Impl/ArrowBlockInputFormat.cpp | 2 +- .../Formats/Impl/ArrowColumnToCHColumn.cpp | 365 ++++++++++++------ .../Formats/Impl/ArrowColumnToCHColumn.h | 23 +- .../Formats/Impl/ORCBlockInputFormat.cpp | 4 +- .../Formats/Impl/ParquetBlockInputFormat.cpp | 2 +- .../DataLakes/DeltaLakeMetadataParser.cpp | 3 +- 6 files changed, 264 insertions(+), 135 deletions(-) diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 206e244c75f..fc9a827be66 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -86,7 +86,7 @@ Chunk ArrowBlockInputFormat::read() /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. /// Otherwise fill the missing columns with zero values of its type. BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr; - arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result, (*table_result)->num_rows(), block_missing_values_ptr); + res = arrow_column_to_ch_column->arrowTableToCHChunk(*table_result, (*table_result)->num_rows(), block_missing_values_ptr); /// There is no easy way to get original record batch size from Arrow metadata. /// Let's just use the number of bytes read from read buffer. diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index 65704c85026..ec2d17d73cb 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -78,7 +78,7 @@ namespace ErrorCodes /// Inserts numeric data right into internal column data to reduce an overhead template > -static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithNumericData(const std::shared_ptr & arrow_column, const String & column_name) { auto internal_type = std::make_shared>(); auto internal_column = internal_type->createColumn(); @@ -103,7 +103,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr -static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithStringData(const std::shared_ptr & arrow_column, const String & column_name) { auto internal_type = std::make_shared(); auto internal_column = internal_type->createColumn(); @@ -147,7 +147,7 @@ static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithFixedStringData(const std::shared_ptr & arrow_column, const String & column_name) { const auto * fixed_type = assert_cast(arrow_column->type().get()); size_t fixed_len = fixed_type->byte_width(); @@ -166,7 +166,7 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr -static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(std::shared_ptr & arrow_column, const String & column_name, const DataTypePtr & column_type) +static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(const std::shared_ptr & arrow_column, const String & column_name, const DataTypePtr & column_type) { const auto * fixed_type = assert_cast(arrow_column->type().get()); size_t fixed_len = fixed_type->byte_width(); @@ -193,7 +193,7 @@ static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(std::sh } template -static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(std::shared_ptr & arrow_column, const String & column_name, const DataTypePtr & column_type) +static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(const std::shared_ptr & arrow_column, const String & column_name, const DataTypePtr & column_type) { size_t total_size = 0; for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i) @@ -229,7 +229,7 @@ static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(std::shared_p return {std::move(internal_column), column_type, column_name}; } -static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithBooleanData(const std::shared_ptr & arrow_column, const String & column_name) { auto internal_type = DataTypeFactory::instance().get("Bool"); auto internal_column = internal_type->createColumn(); @@ -248,7 +248,7 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr & arrow_column, const String & column_name, +static ColumnWithTypeAndName readColumnWithDate32Data(const std::shared_ptr & arrow_column, const String & column_name, const DataTypePtr & type_hint, FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior) { DataTypePtr internal_type; @@ -310,7 +310,7 @@ static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithDate64Data(const std::shared_ptr & arrow_column, const String & column_name) { auto internal_type = std::make_shared(); auto internal_column = internal_type->createColumn(); @@ -329,7 +329,7 @@ static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithTimestampData(const std::shared_ptr & arrow_column, const String & column_name) { const auto & arrow_type = static_cast(*(arrow_column->type())); const UInt8 scale = arrow_type.unit() * 3; @@ -350,7 +350,7 @@ static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr -static ColumnWithTypeAndName readColumnWithTimeData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithTimeData(const std::shared_ptr & arrow_column, const String & column_name) { const auto & arrow_type = static_cast(*(arrow_column->type())); const UInt8 scale = arrow_type.unit() * 3; @@ -373,18 +373,18 @@ static ColumnWithTypeAndName readColumnWithTimeData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithTime32Data(const std::shared_ptr & arrow_column, const String & column_name) { return readColumnWithTimeData(arrow_column, column_name); } -static ColumnWithTypeAndName readColumnWithTime64Data(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithTime64Data(const std::shared_ptr & arrow_column, const String & column_name) { return readColumnWithTimeData(arrow_column, column_name); } template -static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr & arrow_column, const String & column_name, DataTypePtr internal_type) +static ColumnWithTypeAndName readColumnWithDecimalDataImpl(const std::shared_ptr & arrow_column, const String & column_name, DataTypePtr internal_type) { auto internal_column = internal_type->createColumn(); auto & column = assert_cast &>(*internal_column); @@ -403,7 +403,7 @@ static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr -static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readColumnWithDecimalData(const std::shared_ptr & arrow_column, const String & column_name) { const auto * arrow_decimal_type = static_cast(arrow_column->type().get()); size_t precision = arrow_decimal_type->precision(); @@ -418,7 +418,7 @@ static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr & arrow_column) +static ColumnPtr readByteMapFromArrowColumn(const std::shared_ptr & arrow_column) { if (!arrow_column->null_count()) return ColumnUInt8::create(arrow_column->length(), 0); @@ -453,7 +453,7 @@ struct ArrowOffsetArray }; template -static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr & arrow_column) +static ColumnPtr readOffsetsFromArrowListColumn(const std::shared_ptr & arrow_column) { auto offsets_column = ColumnUInt64::create(); ColumnArray::Offsets & offsets_data = assert_cast &>(*offsets_column).getData(); @@ -463,7 +463,7 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr(*(arrow_column->chunk(chunk_i))); auto arrow_offsets_array = list_chunk.offsets(); - auto & arrow_offsets = dynamic_cast::type &>(*arrow_offsets_array); + auto & arrow_offsets = dynamic_cast::type &>(*arrow_offsets_array); /* * CH uses element size as "offsets", while arrow uses actual offsets as offsets. @@ -620,7 +620,7 @@ static ColumnPtr readColumnWithIndexesData(std::shared_ptr } template -static std::shared_ptr getNestedArrowColumn(std::shared_ptr & arrow_column) +static std::shared_ptr getNestedArrowColumn(const std::shared_ptr & arrow_column) { arrow::ArrayVector array_vector; array_vector.reserve(arrow_column->num_chunks()); @@ -648,7 +648,7 @@ static std::shared_ptr getNestedArrowColumn(std::shared_ptr return std::make_shared(array_vector); } -static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(const std::shared_ptr & arrow_column, const String & column_name) { size_t total_size = 0; for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i) @@ -684,7 +684,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr & arrow_column, const String & column_name) +static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(const std::shared_ptr & arrow_column, const String & column_name) { auto internal_type = std::make_shared(); auto internal_column = internal_type->createColumn(); @@ -705,35 +705,31 @@ static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(std::shared_ptr & arrow_column, - const std::string & column_name, - const std::string & format_name, - bool is_nullable, - std::unordered_map & dictionary_infos, - bool allow_null_type, - bool skip_columns_with_unsupported_types, - bool & skipped, - FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore, - DataTypePtr type_hint = nullptr, - bool is_map_nested = false) +struct ReadColumnFromArrowColumnSettings { - if (!is_nullable && (arrow_column->null_count() || (type_hint && type_hint->isNullable())) && arrow_column->type()->id() != arrow::Type::LIST - && arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT && - arrow_column->type()->id() != arrow::Type::DICTIONARY) - { - DataTypePtr nested_type_hint; - if (type_hint) - nested_type_hint = removeNullable(type_hint); - auto nested_column = readColumnFromArrowColumn(arrow_column, column_name, format_name, true, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint); - if (skipped) - return {}; - auto nullmap_column = readByteMapFromArrowColumn(arrow_column); - auto nullable_type = std::make_shared(std::move(nested_column.type)); - auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column); - return {std::move(nullable_column), std::move(nullable_type), column_name}; - } + std::string format_name; + FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior; + bool allow_arrow_null_type; + bool skip_columns_with_unsupported_types; +}; +static ColumnWithTypeAndName readColumnFromArrowColumn( + const std::shared_ptr & arrow_column, + std::string column_name, + std::unordered_map dictionary_infos, + DataTypePtr type_hint, + bool is_nullable_column, + bool is_map_nested_column, + const ReadColumnFromArrowColumnSettings & settings); + +static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn( + const std::shared_ptr & arrow_column, + std::string column_name, + std::unordered_map dictionary_infos, + DataTypePtr type_hint, + bool is_map_nested_column, + const ReadColumnFromArrowColumnSettings & settings) +{ switch (arrow_column->type()->id()) { case arrow::Type::STRING: @@ -790,7 +786,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( case arrow::Type::BOOL: return readColumnWithBooleanData(arrow_column, column_name); case arrow::Type::DATE32: - return readColumnWithDate32Data(arrow_column, column_name, type_hint, date_time_overflow_behavior); + return readColumnWithDate32Data(arrow_column, column_name, type_hint, settings.date_time_overflow_behavior); case arrow::Type::DATE64: return readColumnWithDate64Data(arrow_column, column_name); // ClickHouse writes Date as arrow UINT16 and DateTime as arrow UINT32, @@ -837,9 +833,16 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( key_type_hint = map_type_hint->getKeyType(); } } + auto arrow_nested_column = getNestedArrowColumn(arrow_column); - auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint, true); - if (skipped) + auto nested_column = readColumnFromArrowColumn(arrow_nested_column, + column_name, + dictionary_infos, + nested_type_hint, + false /*is_nullable_column*/, + true /*is_map_nested_column*/, + settings); + if (!nested_column.column) return {}; auto offsets_column = readOffsetsFromArrowListColumn(arrow_column); @@ -866,7 +869,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( case arrow::Type::LIST: case arrow::Type::LARGE_LIST: { - bool is_large = arrow_column->type()->id() == arrow::Type::LARGE_LIST; + bool is_large_list = arrow_column->type()->id() == arrow::Type::LARGE_LIST; DataTypePtr nested_type_hint; if (type_hint) { @@ -874,12 +877,33 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( if (array_type_hint) nested_type_hint = array_type_hint->getNestedType(); } - auto arrow_nested_column = is_large ? getNestedArrowColumn(arrow_column) : getNestedArrowColumn(arrow_column); - auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint); - if (skipped) + + bool is_nested_nullable_column = false; + if (is_large_list) + { + auto * arrow_large_list_type = assert_cast(arrow_column->type().get()); + is_nested_nullable_column = arrow_large_list_type->value_field()->nullable(); + } + else + { + auto * arrow_list_type = assert_cast(arrow_column->type().get()); + is_nested_nullable_column = arrow_list_type->value_field()->nullable(); + } + + auto arrow_nested_column = is_large_list ? getNestedArrowColumn(arrow_column) : getNestedArrowColumn(arrow_column); + auto nested_column = readColumnFromArrowColumn(arrow_nested_column, + column_name, + dictionary_infos, + nested_type_hint, + is_nested_nullable_column, + false /*is_map_nested_column*/, + settings); + if (!nested_column.column) return {}; - auto offsets_column = is_large ? readOffsetsFromArrowListColumn(arrow_column) : readOffsetsFromArrowListColumn(arrow_column); + + auto offsets_column = is_large_list ? readOffsetsFromArrowListColumn(arrow_column) : readOffsetsFromArrowListColumn(arrow_column); auto array_column = ColumnArray::create(nested_column.column, offsets_column); + DataTypePtr array_type; /// If type hint is Nested, we should return Nested type, /// because we differentiate Nested and simple Array(Tuple) @@ -913,11 +937,13 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( for (int i = 0; i != arrow_struct_type->num_fields(); ++i) { - auto field_name = arrow_struct_type->field(i)->name(); + const auto & field = arrow_struct_type->field(i); + const auto & field_name = field->name(); + DataTypePtr nested_type_hint; if (tuple_type_hint) { - if (tuple_type_hint->haveExplicitNames() && !is_map_nested) + if (tuple_type_hint->haveExplicitNames() && !is_map_nested_column) { auto pos = tuple_type_hint->tryGetPositionByName(field_name); if (pos) @@ -926,13 +952,21 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( else if (size_t(i) < tuple_type_hint->getElements().size()) nested_type_hint = tuple_type_hint->getElement(i); } + auto nested_arrow_column = std::make_shared(nested_arrow_columns[i]); - auto element = readColumnFromArrowColumn(nested_arrow_column, field_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint); - if (skipped) + auto column_with_type_and_name = readColumnFromArrowColumn(nested_arrow_column, + field_name, + dictionary_infos, + nested_type_hint, + field->nullable(), + false /*is_map_nested_column*/, + settings); + if (!column_with_type_and_name.column) return {}; - tuple_elements.emplace_back(std::move(element.column)); - tuple_types.emplace_back(std::move(element.type)); - tuple_names.emplace_back(std::move(element.name)); + + tuple_elements.emplace_back(std::move(column_with_type_and_name.column)); + tuple_types.emplace_back(std::move(column_with_type_and_name.type)); + tuple_names.emplace_back(std::move(column_with_type_and_name.name)); } auto tuple_column = ColumnTuple::create(std::move(tuple_elements)); @@ -953,8 +987,19 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( arrow::DictionaryArray & dict_chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); dict_array.emplace_back(dict_chunk.dictionary()); } + auto arrow_dict_column = std::make_shared(dict_array); - auto dict_column = readColumnFromArrowColumn(arrow_dict_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior); + auto dict_column = readColumnFromArrowColumn(arrow_dict_column, + column_name, + dictionary_infos, + nullptr /*nested_type_hint*/, + false /*is_nullable_column*/, + false /*is_map_nested_column*/, + settings); + + if (!dict_column.column) + return {}; + for (size_t i = 0; i != dict_column.column->size(); ++i) { if (dict_column.column->isDefaultAt(i)) @@ -963,6 +1008,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( break; } } + auto lc_type = std::make_shared(is_lc_nullable ? makeNullable(dict_column.type) : dict_column.type); auto tmp_lc_column = lc_type->createColumn(); auto tmp_dict_column = IColumn::mutate(assert_cast(tmp_lc_column.get())->getDictionaryPtr()); @@ -1002,7 +1048,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( // TODO: read UUID as a string? case arrow::Type::NA: { - if (allow_null_type) + if (settings.allow_arrow_null_type) { auto type = std::make_shared(); auto column = ColumnNothing::create(arrow_column->length()); @@ -1012,11 +1058,8 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( } default: { - if (skip_columns_with_unsupported_types) - { - skipped = true; + if (settings.skip_columns_with_unsupported_types) return {}; - } throw Exception( ErrorCodes::UNKNOWN_TYPE, @@ -1024,14 +1067,59 @@ static ColumnWithTypeAndName readColumnFromArrowColumn( "If it happens during schema inference and you want to skip columns with " "unsupported types, you can enable setting input_format_{}" "_skip_columns_with_unsupported_types_in_schema_inference", - format_name, + settings.format_name, arrow_column->type()->name(), column_name, - boost::algorithm::to_lower_copy(format_name)); + boost::algorithm::to_lower_copy(settings.format_name)); } } } +static ColumnWithTypeAndName readColumnFromArrowColumn( + const std::shared_ptr & arrow_column, + std::string column_name, + std::unordered_map dictionary_infos, + DataTypePtr type_hint, + bool is_nullable_column, + bool is_map_nested_column, + const ReadColumnFromArrowColumnSettings & settings) +{ + bool read_as_nullable_column = arrow_column->null_count() || is_nullable_column || (type_hint && type_hint->isNullable()); + if (read_as_nullable_column && + arrow_column->type()->id() != arrow::Type::LIST && + arrow_column->type()->id() != arrow::Type::LARGE_LIST && + arrow_column->type()->id() != arrow::Type::MAP && + arrow_column->type()->id() != arrow::Type::STRUCT && + arrow_column->type()->id() != arrow::Type::DICTIONARY) + { + DataTypePtr nested_type_hint; + if (type_hint) + nested_type_hint = removeNullable(type_hint); + + auto nested_column = readNonNullableColumnFromArrowColumn(arrow_column, + column_name, + dictionary_infos, + nested_type_hint, + is_map_nested_column, + settings); + + if (!nested_column.column) + return {}; + + auto nullmap_column = readByteMapFromArrowColumn(arrow_column); + auto nullable_type = std::make_shared(std::move(nested_column.type)); + auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column); + + return {std::move(nullable_column), std::move(nullable_type), column_name}; + } + + return readNonNullableColumnFromArrowColumn(arrow_column, + column_name, + dictionary_infos, + type_hint, + is_map_nested_column, + settings); +} // Creating CH header by arrow schema. Will be useful in task about inserting // data from file without knowing table structure. @@ -1042,44 +1130,56 @@ static void checkStatus(const arrow::Status & status, const String & column_name throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()}; } +/// Create empty arrow column using specified field +static std::shared_ptr createArrowColumn(const std::shared_ptr & field, const String & format_name) +{ + arrow::MemoryPool * pool = arrow::default_memory_pool(); + std::unique_ptr array_builder; + arrow::Status status = MakeBuilder(pool, field->type(), &array_builder); + checkStatus(status, field->name(), format_name); + + std::shared_ptr arrow_array; + status = array_builder->Finish(&arrow_array); + checkStatus(status, field->name(), format_name); + + return std::make_shared(arrow::ArrayVector{arrow_array}); +} Block ArrowColumnToCHColumn::arrowSchemaToCHHeader( - const arrow::Schema & schema, const std::string & format_name, - bool skip_columns_with_unsupported_types, const Block * hint_header, bool ignore_case) + const arrow::Schema & schema, + const std::string & format_name, + bool skip_columns_with_unsupported_types) { + ReadColumnFromArrowColumnSettings settings + { + .format_name = format_name, + .date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore, + .allow_arrow_null_type = false, + .skip_columns_with_unsupported_types = skip_columns_with_unsupported_types + }; + ColumnsWithTypeAndName sample_columns; - std::unordered_set nested_table_names; - if (hint_header) - nested_table_names = Nested::getAllTableNames(*hint_header, ignore_case); for (const auto & field : schema.fields()) { - if (hint_header && !hint_header->has(field->name(), ignore_case) - && !nested_table_names.contains(ignore_case ? boost::to_lower_copy(field->name()) : field->name())) - continue; - /// Create empty arrow column by it's type and convert it to ClickHouse column. - arrow::MemoryPool * pool = arrow::default_memory_pool(); - std::unique_ptr array_builder; - arrow::Status status = MakeBuilder(pool, field->type(), &array_builder); - checkStatus(status, field->name(), format_name); + auto arrow_column = createArrowColumn(field, format_name); - std::shared_ptr arrow_array; - status = array_builder->Finish(&arrow_array); - checkStatus(status, field->name(), format_name); - - arrow::ArrayVector array_vector = {arrow_array}; - auto arrow_column = std::make_shared(array_vector); std::unordered_map dict_infos; - bool skipped = false; - bool allow_null_type = false; - if (hint_header && hint_header->has(field->name()) && hint_header->getByName(field->name()).type->isNullable()) - allow_null_type = true; - ColumnWithTypeAndName sample_column = readColumnFromArrowColumn( - arrow_column, field->name(), format_name, false, dict_infos, allow_null_type, skip_columns_with_unsupported_types, skipped); - if (!skipped) + + auto sample_column = readColumnFromArrowColumn( + arrow_column, + field->name(), + dict_infos, + nullptr /*nested_type_hint*/, + field->nullable() /*is_nullable_column*/, + false /*is_map_nested_column*/, + settings); + + if (sample_column.column) sample_columns.emplace_back(std::move(sample_column)); } + return Block(std::move(sample_columns)); } @@ -1101,30 +1201,43 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn( { } -void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr & table, size_t num_rows, BlockMissingValues * block_missing_values) +Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(const std::shared_ptr & table, size_t num_rows, BlockMissingValues * block_missing_values) { - NameToColumnPtr name_to_column_ptr; + NameToArrowColumn name_to_arrow_column; + for (auto column_name : table->ColumnNames()) { - std::shared_ptr arrow_column = table->GetColumnByName(column_name); + auto arrow_column = table->GetColumnByName(column_name); if (!arrow_column) throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Column '{}' is duplicated", column_name); + auto arrow_field = table->schema()->GetFieldByName(column_name); + if (case_insensitive_matching) boost::to_lower(column_name); - name_to_column_ptr[std::move(column_name)] = arrow_column; + + name_to_arrow_column[std::move(column_name)] = {std::move(arrow_column), std::move(arrow_field)}; } - arrowColumnsToCHChunk(res, name_to_column_ptr, num_rows, block_missing_values); + return arrowColumnsToCHChunk(name_to_arrow_column, num_rows, block_missing_values); } -void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows, BlockMissingValues * block_missing_values) +Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values) { - Columns columns_list; - columns_list.reserve(header.columns()); + ReadColumnFromArrowColumnSettings settings + { + .format_name = format_name, + .date_time_overflow_behavior = date_time_overflow_behavior, + .allow_arrow_null_type = true, + .skip_columns_with_unsupported_types = false + }; + + Columns columns; + columns.reserve(header.columns()); + std::unordered_map>> nested_tables; - bool skipped = false; - for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i) + + for (size_t column_i = 0, header_columns = header.columns(); column_i < header_columns; ++column_i) { const ColumnWithTypeAndName & header_column = header.getByPosition(column_i); @@ -1133,15 +1246,17 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & boost::to_lower(search_column_name); ColumnWithTypeAndName column; - if (!name_to_column_ptr.contains(search_column_name)) + if (!name_to_arrow_column.contains(search_column_name)) { bool read_from_nested = false; + /// Check if it's a subcolumn from some struct. String nested_table_name = Nested::extractTableName(header_column.name); String search_nested_table_name = nested_table_name; if (case_insensitive_matching) boost::to_lower(search_nested_table_name); - if (name_to_column_ptr.contains(search_nested_table_name)) + + if (name_to_arrow_column.contains(search_nested_table_name)) { if (!nested_tables.contains(search_nested_table_name)) { @@ -1153,10 +1268,19 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & } auto nested_table_type = Nested::collect(nested_columns).front().type; - std::shared_ptr arrow_column = name_to_column_ptr[search_nested_table_name]; - ColumnsWithTypeAndName cols = { - readColumnFromArrowColumn(arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false, - skipped, date_time_overflow_behavior, nested_table_type)}; + const auto & arrow_column = name_to_arrow_column.find(search_nested_table_name)->second; + + ColumnsWithTypeAndName cols = + { + readColumnFromArrowColumn(arrow_column.column, + nested_table_name, + dictionary_infos, + nested_table_type, + arrow_column.field->nullable() /*is_nullable_column*/, + false /*is_map_nested_column*/, + settings) + }; + BlockPtr block_ptr = std::make_shared(cols); auto column_extractor = std::make_shared(*block_ptr, case_insensitive_matching); nested_tables[search_nested_table_name] = {block_ptr, column_extractor}; @@ -1180,7 +1304,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & column.name = header_column.name; column.type = header_column.type; column.column = header_column.column->cloneResized(num_rows); - columns_list.push_back(std::move(column.column)); + columns.push_back(std::move(column.column)); if (block_missing_values) block_missing_values->setBits(column_i, num_rows); continue; @@ -1189,9 +1313,14 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & } else { - auto arrow_column = name_to_column_ptr[search_column_name]; - column = readColumnFromArrowColumn( - arrow_column, header_column.name, format_name, false, dictionary_infos, true, false, skipped, date_time_overflow_behavior, header_column.type); + const auto & arrow_column = name_to_arrow_column.find(search_column_name)->second; + column = readColumnFromArrowColumn(arrow_column.column, + header_column.name, + dictionary_infos, + header_column.type, + arrow_column.field->nullable(), + false /*is_map_nested_column*/, + settings); } if (null_as_default) @@ -1216,10 +1345,10 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & } column.type = header_column.type; - columns_list.push_back(std::move(column.column)); + columns.push_back(std::move(column.column)); } - res.setColumns(columns_list, num_rows); + return Chunk(std::move(columns), num_rows); } } diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h index 079e0374917..27e9afdf763 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h @@ -19,8 +19,6 @@ class Chunk; class ArrowColumnToCHColumn { public: - using NameToColumnPtr = std::unordered_map>; - ArrowColumnToCHColumn( const Block & header_, const std::string & format_name_, @@ -30,18 +28,13 @@ public: bool case_insensitive_matching_ = false, bool is_stream_ = false); - void arrowTableToCHChunk(Chunk & res, std::shared_ptr & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr); + Chunk arrowTableToCHChunk(const std::shared_ptr & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr); - void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows, BlockMissingValues * block_missing_values = nullptr); - - /// Transform arrow schema to ClickHouse header. If hint_header is provided, - /// we will skip columns in schema that are not in hint_header. + /// Transform arrow schema to ClickHouse header static Block arrowSchemaToCHHeader( const arrow::Schema & schema, const std::string & format_name, - bool skip_columns_with_unsupported_types = false, - const Block * hint_header = nullptr, - bool ignore_case = false); + bool skip_columns_with_unsupported_types = false); struct DictionaryInfo { @@ -52,6 +45,16 @@ public: private: + struct ArrowColumn + { + std::shared_ptr column; + std::shared_ptr field; + }; + + using NameToArrowColumn = std::unordered_map; + + Chunk arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values); + const Block & header; const std::string format_name; /// If false, throw exception if some columns in header not exists in arrow table. diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index a41eacf26b7..aa83b87b2d2 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -71,12 +71,10 @@ Chunk ORCBlockInputFormat::read() approx_bytes_read_for_chunk = file_reader->GetRawORCReader()->getStripe(stripe_current)->getDataLength(); ++stripe_current; - Chunk res; /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. /// Otherwise fill the missing columns with zero values of its type. BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr; - arrow_column_to_ch_column->arrowTableToCHChunk(res, table, num_rows, block_missing_values_ptr); - return res; + return arrow_column_to_ch_column->arrowTableToCHChunk(table, num_rows, block_missing_values_ptr); } void ORCBlockInputFormat::resetParser() diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 62e576d4953..d41cb3447de 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -601,7 +601,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. /// Otherwise fill the missing columns with zero values of its type. BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr; - row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr); + res.chunk = row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(*tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr); lock.lock(); diff --git a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp index 3584f137225..14a912a180d 100644 --- a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp +++ b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp @@ -282,11 +282,10 @@ struct DeltaLakeMetadataParser::Impl format_settings.date_time_overflow_behavior, /* case_insensitive_column_matching */false); - Chunk res; std::shared_ptr table; THROW_ARROW_NOT_OK(reader->ReadTable(&table)); - column_reader.arrowTableToCHChunk(res, table, reader->parquet_reader()->metadata()->num_rows()); + Chunk res = column_reader.arrowTableToCHChunk(table, reader->parquet_reader()->metadata()->num_rows()); const auto & res_columns = res.getColumns(); if (res_columns.size() != 2) From 1e05d9ed3f4a960ca6a219514fd56fa13c644efc Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Sun, 7 Apr 2024 20:36:02 +0300 Subject: [PATCH 2/2] Added tests --- .../03036_parquet_arrow_nullable.reference | 40 ++++++++++++ .../03036_parquet_arrow_nullable.sh | 63 +++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 tests/queries/0_stateless/03036_parquet_arrow_nullable.reference create mode 100755 tests/queries/0_stateless/03036_parquet_arrow_nullable.sh diff --git a/tests/queries/0_stateless/03036_parquet_arrow_nullable.reference b/tests/queries/0_stateless/03036_parquet_arrow_nullable.reference new file mode 100644 index 00000000000..8820bb7cb9f --- /dev/null +++ b/tests/queries/0_stateless/03036_parquet_arrow_nullable.reference @@ -0,0 +1,40 @@ +Parquet +a UInt64 +a_nullable Nullable(UInt64) +Arrow +a UInt64 +a_nullable Nullable(UInt64) +Parquet +b Array(Nullable(UInt64)) +b_nullable Array(Nullable(UInt64)) +Arrow +b Array(Nullable(UInt64)) +b_nullable Array(Nullable(UInt64)) +Parquet +c Tuple(\n a UInt64,\n b String) +c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String)) +Arrow +c Tuple(\n a UInt64,\n b String) +c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String)) +Parquet +d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String)))) +Arrow +d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String)))) +Parquet +e Map(UInt64, Nullable(String)) +e_nullable Map(UInt64, Nullable(String)) +Arrow +e Map(UInt64, Nullable(String)) +e_nullable Map(UInt64, Nullable(String)) +Parquet +f Map(UInt64, Map(UInt64, Nullable(String))) +f_nullables Map(UInt64, Map(UInt64, Nullable(String))) +Arrow +f Map(UInt64, Map(UInt64, Nullable(String))) +f_nullables Map(UInt64, Map(UInt64, Nullable(String))) +Parquet +g String +g_nullable Nullable(String) +Arrow +g LowCardinality(String) +g_nullable LowCardinality(String) diff --git a/tests/queries/0_stateless/03036_parquet_arrow_nullable.sh b/tests/queries/0_stateless/03036_parquet_arrow_nullable.sh new file mode 100755 index 00000000000..bdd641e2b94 --- /dev/null +++ b/tests/queries/0_stateless/03036_parquet_arrow_nullable.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.data + +formats="Parquet Arrow" + +for format in $formats +do + echo $format + $CLICKHOUSE_LOCAL -q "select * from generateRandom('a UInt64, a_nullable Nullable(UInt64)', 42) limit 10 format $format" > $DATA_FILE + $CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0" +done + +for format in $formats +do + echo $format + $CLICKHOUSE_LOCAL -q "select * from generateRandom('b Array(UInt64), b_nullable Array(Nullable(UInt64))', 42) limit 10 format $format" > $DATA_FILE + $CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0" +done + +for format in $formats +do + echo $format + $CLICKHOUSE_LOCAL -q "select * from generateRandom('c Tuple(a UInt64, b String), c_nullable Tuple(a Nullable(UInt64), b Nullable(String))', 42) limit 10 format $format" > $DATA_FILE + $CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0" +done + +for format in $formats +do + echo $format + $CLICKHOUSE_LOCAL -q "select * from generateRandom('d Tuple(a UInt64, b Tuple(a UInt64, b String), d_nullable Tuple(a UInt64, b Tuple(a Nullable(UInt64), b Nullable(String))))', 42) limit 10 format $format" > $DATA_FILE + $CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0" +done + +for format in $formats +do + echo $format + $CLICKHOUSE_LOCAL -q "select * from generateRandom('e Map(UInt64, String), e_nullable Map(UInt64, Nullable(String))', 42) limit 10 format $format" > $DATA_FILE + $CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0" +done + +for format in $formats +do + echo $format + $CLICKHOUSE_LOCAL -q "select * from generateRandom('f Map(UInt64, Map(UInt64, String)), f_nullables Map(UInt64, Map(UInt64, Nullable(String)))', 42) limit 10 format $format" > $DATA_FILE + $CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0" +done + +for format in $formats +do + echo $format + $CLICKHOUSE_LOCAL -q "select * from generateRandom('g LowCardinality(String), g_nullable LowCardinality(Nullable(String))', 42) limit 10 settings output_format_arrow_low_cardinality_as_dictionary=1, allow_suspicious_low_cardinality_types=1 format $format" > $DATA_FILE + $CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0" +done + +rm $DATA_FILE +