diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c9efd1e4a97..011541088ac 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -1053,7 +1053,7 @@ class IColumn; M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \ M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \ M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \ - M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \ + M(UInt64, input_format_parquet_max_block_rows, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \ M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \ M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \ M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \ diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index be031592c12..ab83da5de8d 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -92,8 +92,8 @@ static std::map sett {"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"}, {"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"}, {"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"}, - {"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader."}, - {"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."}, + {"input_format_parquet_max_block_rows", 8192, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader."}, + {"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."}, }}, {"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"}, {"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."}, diff --git a/src/Core/SettingsQuirks.cpp b/src/Core/SettingsQuirks.cpp index 5541cc19653..4065ee40285 100644 --- a/src/Core/SettingsQuirks.cpp +++ b/src/Core/SettingsQuirks.cpp @@ -117,7 +117,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log) "min_insert_block_size_bytes_for_materialized_views", "min_external_table_block_size_rows", "max_joined_block_size_rows", - "input_format_parquet_max_block_size"}; + "input_format_parquet_max_block_rows"}; for (auto const & setting : block_rows_settings) { if (auto block_size = get_current_value(setting).get(); diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index e90986f2236..a01be503c4f 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -160,7 +160,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference; format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string; format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array; - format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; + format_settings.parquet.max_block_rows = settings.input_format_parquet_max_block_rows; format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes; format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method; format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 337aafbbe9c..f7b57ddd4aa 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -265,7 +265,7 @@ struct FormatSettings bool preserve_order = false; bool use_custom_encoder = true; bool parallel_encoding = true; - UInt64 max_block_size = DEFAULT_BLOCK_SIZE; + UInt64 max_block_rows = DEFAULT_BLOCK_SIZE; size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256; ParquetVersion output_version; ParquetCompression output_compression_method = ParquetCompression::SNAPPY; diff --git a/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp index a7e51f88b3c..ad98db3b8ab 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp @@ -307,7 +307,7 @@ ParquetRecordReader::ParquetRecordReader( : file_reader(createFileReader(std::move(arrow_file), std::move(metadata))) , reader_properties(reader_properties_) , header(std::move(header_)) - , max_block_size(format_settings.parquet.max_block_size) + , max_block_rows(format_settings.parquet.max_block_rows) , row_groups_indices(std::move(row_groups_indices_)) , left_rows(getTotalRows(*file_reader->metadata())) { @@ -356,7 +356,7 @@ Chunk ParquetRecordReader::readChunk() } Columns columns(header.columns()); - auto num_rows_read = std::min(max_block_size, cur_row_group_left_rows); + auto num_rows_read = std::min(max_block_rows, cur_row_group_left_rows); for (size_t i = 0; i < header.columns(); i++) { columns[i] = castColumn( diff --git a/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h index 2f728a586a0..a682d724960 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h +++ b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h @@ -36,7 +36,7 @@ private: std::shared_ptr cur_row_group_reader; ParquetColReaders column_readers; - UInt64 max_block_size; + UInt64 max_block_rows; std::vector parquet_col_indice; std::vector row_groups_indices; diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 95da938f4e6..008b7b41b57 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -424,13 +424,14 @@ void ParquetBlockInputFormat::initializeIfNeeded() auto row_group_meta = metadata->RowGroup(row_group_idx); for (int column_index : column_indices) { + auto column = row_group_meta->ColumnChunk(column_index); total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size(); } if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0; - auto average_row_bytes = total_size / row_group_meta->num_rows(); - /// max_block_size >= num_rows >= 128 - auto num_rows = std::min(format_settings.parquet.prefer_block_bytes/average_row_bytes, format_settings.parquet.max_block_size); - return std::max(num_rows, 128UL); + auto average_row_bytes = static_cast(total_size) / row_group_meta->num_rows(); + const size_t preferred_num_rows = static_cast(format_settings.parquet.prefer_block_bytes/average_row_bytes); + const size_t MIN_ROW_NUM = 128; + return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), format_settings.parquet.max_block_rows); }; for (int row_group = 0; row_group < num_row_groups; ++row_group) @@ -453,7 +454,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows(); row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size(); auto rows = adative_chunk_size(row_group); - row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_rows; } } diff --git a/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.reference b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.reference new file mode 100644 index 00000000000..332202dd23b --- /dev/null +++ b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.reference @@ -0,0 +1,4 @@ +65409 +16 +128 +2183 diff --git a/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.sql b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.sql new file mode 100644 index 00000000000..25fe4695e25 --- /dev/null +++ b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.sql @@ -0,0 +1,21 @@ +DROP TABLE IF EXISTS test_parquet; +CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet); +INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); +SELECT max(blockSize()) FROM test_parquet; + +DROP TABLE IF EXISTS test_parquet; +CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_max_block_rows=16; +INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); +SELECT max(blockSize()) FROM test_parquet; + +DROP TABLE IF EXISTS test_parquet; +CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30; +INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); +SELECT max(blockSize()) FROM test_parquet; + +DROP TABLE IF EXISTS test_parquet; +CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30000; +INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); +SELECT max(blockSize()) FROM test_parquet; + +DROP TABLE IF EXISTS test_parquet; \ No newline at end of file