From b30d11f046c3bb80612e4510f863c78200a98b93 Mon Sep 17 00:00:00 2001 From: liuneng <1398775315@qq.com> Date: Mon, 27 May 2024 16:02:15 +0800 Subject: [PATCH] adapting parquet reader output block rows --- src/Core/Settings.h | 3 ++- src/Core/SettingsChangesHistory.h | 2 ++ src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 3 ++- .../Formats/Impl/ParquetBlockInputFormat.cpp | 19 ++++++++++++++++++- .../Formats/Impl/ParquetBlockInputFormat.h | 2 ++ 6 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f0389e7e2d5..c9efd1e4a97 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -1053,7 +1053,8 @@ class IColumn; M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \ M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \ M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \ - M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \ + M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \ + M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \ M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \ M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \ M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \ diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index 16f28d94640..be031592c12 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -92,6 +92,8 @@ static std::map sett {"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"}, {"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"}, {"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"}, + {"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader."}, + {"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."}, }}, {"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"}, {"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."}, diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index a7883919c4c..e90986f2236 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -161,6 +161,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string; format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array; format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; + format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes; format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method; format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types; format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index b296928e4d4..337aafbbe9c 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -265,7 +265,8 @@ struct FormatSettings bool preserve_order = false; bool use_custom_encoder = true; bool parallel_encoding = true; - UInt64 max_block_size = 8192; + UInt64 max_block_size = DEFAULT_BLOCK_SIZE; + size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256; ParquetVersion output_version; ParquetCompression output_compression_method = ParquetCompression::SNAPPY; bool output_compliant_nested_types = true; diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 7fc7b9c3cab..fd2e7f88fb2 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -418,6 +418,21 @@ void ParquetBlockInputFormat::initializeIfNeeded() int num_row_groups = metadata->num_row_groups(); row_group_batches.reserve(num_row_groups); + auto adative_chunk_size = [&](int row_group_idx) -> size_t + { + size_t total_size = 0; + auto row_group_meta = metadata->RowGroup(row_group_idx); + for (int column_index : column_indices) + { + total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size(); + } + if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0; + auto average_row_bytes = total_size / row_group_meta->num_rows(); + /// max_block_bytes >= num_rows >= 128 + auto num_rows = std::min(format_settings.parquet.prefer_block_bytes/average_row_bytes, format_settings.parquet.max_block_size); + return std::max(num_rows, 128UL); + }; + for (int row_group = 0; row_group < num_row_groups; ++row_group) { if (skip_row_groups.contains(row_group)) @@ -437,6 +452,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().row_groups_idxs.push_back(row_group); row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows(); row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size(); + auto rows = adative_chunk_size(row_group); + row_group_batches.back().adaptive_chunk_size = rows ? format_settings.parquet.max_block_size :rows; } } @@ -446,7 +463,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat parquet::ArrowReaderProperties properties; properties.set_use_threads(false); - properties.set_batch_size(format_settings.parquet.max_block_size); + properties.set_batch_size(row_group_batch.adaptive_chunk_size); // When reading a row group, arrow will: // 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index d6591f5c0a3..24735ee4371 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -208,6 +208,8 @@ private: size_t total_rows = 0; size_t total_bytes_compressed = 0; + size_t adaptive_chunk_size = 0; + std::vector row_groups_idxs; // These are only used by the decoding thread, so don't require locking the mutex.