From 4213ec609fd66a4e3bf947b5ffcfb0232d87a385 Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 13 Mar 2023 18:22:09 +0000 Subject: [PATCH] Proper fix for bug in parquet, revert reverted #45878 --- src/Core/Settings.h | 1 + src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 1 + .../Formats/Impl/ArrowColumnToCHColumn.cpp | 14 ++--- .../Formats/Impl/ParquetBlockInputFormat.cpp | 56 ++++++++++--------- 5 files changed, 39 insertions(+), 34 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 881cbe42a02..f67ce6be9ed 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -803,6 +803,7 @@ class IColumn; M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \ M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \ M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \ + M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \ M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \ M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \ M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index a6db9d5ba0d..a951a7fdd92 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -117,6 +117,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference; format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string; format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array; + format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method; format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8; format_settings.pretty.color = settings.output_format_pretty_color; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index ef6be805bea..7be7b5b98aa 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -211,6 +211,7 @@ struct FormatSettings std::unordered_set skip_row_groups = {}; bool output_string_as_string = false; bool output_fixed_string_as_fixed_byte_array = true; + UInt64 max_block_size = 8192; ParquetVersion output_version; ParquetCompression output_compression_method = ParquetCompression::SNAPPY; } parquet; diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index 26ceff95a86..54a6c8493ea 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -93,7 +93,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr buffer = chunk->data()->buffers[1]; - const auto * raw_data = reinterpret_cast(buffer->data()); + const auto * raw_data = reinterpret_cast(buffer->data()) + chunk->offset(); column_data.insert_assume_reserved(raw_data, raw_data + chunk->length()); } return {std::move(internal_column), std::move(internal_type), column_name}; @@ -159,8 +159,8 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptrnum_chunks(); chunk_i < num_chunks; ++chunk_i) { arrow::FixedSizeBinaryArray & chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); - std::shared_ptr buffer = chunk.values(); - column_chars_t.insert_assume_reserved(buffer->data(), buffer->data() + buffer->size()); + const uint8_t * raw_data = chunk.raw_values(); + column_chars_t.insert_assume_reserved(raw_data, raw_data + fixed_len * chunk.length()); } return {std::move(internal_column), std::move(internal_type), column_name}; } @@ -178,9 +178,6 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr buffer = chunk.data()->buffers[1]; - for (size_t bool_i = 0; bool_i != static_cast(chunk.length()); ++bool_i) column_data.emplace_back(chunk.Value(bool_i)); } @@ -402,7 +399,7 @@ static ColumnWithTypeAndName readColumnWithIndexesDataImpl(std::shared_ptr buffer = chunk->data()->buffers[1]; - const auto * data = reinterpret_cast(buffer->data()); + const auto * data = reinterpret_cast(buffer->data()) + chunk->offset(); /// Check that indexes are correct (protection against corrupted files) /// Note that on null values index can be arbitrary value. @@ -554,8 +551,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptrnum_chunks(); chunk_i < num_chunks; ++chunk_i) { auto & chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); - std::shared_ptr buffer = chunk.value_data(); - const auto * raw_data = reinterpret_cast(buffer->data()); + const auto * raw_data = reinterpret_cast(chunk.raw_data() + chunk.raw_value_offsets()[0]); data.insert_assume_reserved(raw_data, raw_data + chunk.length()); } return {std::move(internal_column), std::move(internal_type), column_name}; diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 87a0fbf77a8..fca097d8ea7 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -45,38 +45,44 @@ Chunk ParquetBlockInputFormat::generate() block_missing_values.clear(); if (!file_reader) + { prepareReader(); + file_reader->set_batch_size(format_settings.parquet.max_block_size); + std::vector row_group_indices; + for (int i = 0; i < row_group_total; ++i) + { + if (!skip_row_groups.contains(i)) + row_group_indices.emplace_back(i); + } + auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, ¤t_record_batch_reader); + if (!read_status.ok()) + throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString()); + } if (is_stopped) return {}; - while (row_group_current < row_group_total && skip_row_groups.contains(row_group_current)) - ++row_group_current; - - if (row_group_current >= row_group_total) - return res; - - std::shared_ptr table; - - std::unique_ptr<::arrow::RecordBatchReader> rbr; - std::vector row_group_indices { row_group_current }; - arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr); - - if (!get_batch_reader_status.ok()) + auto batch = current_record_batch_reader->Next(); + if (!batch.ok()) + { throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", - get_batch_reader_status.ToString()); + batch.status().ToString()); + } + if (*batch) + { + auto tmp_table = arrow::Table::FromRecordBatches({*batch}); + /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. + /// Otherwise fill the missing columns with zero values of its type. + BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr; + arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr); + } + else + { + current_record_batch_reader.reset(); + file_reader.reset(); + return {}; + } - arrow::Status read_status = rbr->ReadAll(&table); - - if (!read_status.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString()); - - ++row_group_current; - - /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. - /// Otherwise fill the missing columns with zero values of its type. - BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr; - arrow_column_to_ch_column->arrowTableToCHChunk(res, table, table->num_rows(), block_missing_values_ptr); return res; }