Merge pull request #64427 from liuneng1994/adapting-parquet-block-size

Adapting parquet reader output block rows
This commit is contained in:
János Benjamin Antal 2024-06-11 09:16:36 +00:00 committed by GitHub
commit 8792db6321
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 72 additions and 3 deletions

View File

@ -2165,6 +2165,8 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `lz4`.
- [input_format_parquet_max_block_size](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_max_block_size) - Max block row size for parquet reader. Default value - `65409`.
- [input_format_parquet_prefer_block_bytes](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_prefer_block_bytes) - Average block bytes output by parquet reader. Default value - `16744704`.
## ParquetMetadata {data-format-parquet-metadata}

View File

@ -1417,6 +1417,17 @@ Compression method used in output Parquet format. Supported codecs: `snappy`, `l
Default value: `lz4`.
### input_format_parquet_max_block_size {#input_format_parquet_max_block_size}
Max block row size for parquet reader. By controlling the number of rows in each block, you can control the memory usage,
and in some operators that cache blocks, you can improve the accuracy of the operator's memory control。
Default value: `65409`.
### input_format_parquet_prefer_block_bytes {#input_format_parquet_prefer_block_bytes}
Average block bytes output by parquet reader. Lowering the configuration in the case of reading some high compression parquet relieves the memory pressure.
Default value: `65409 * 256 = 16744704`
## Hive format settings {#hive-format-settings}
### input_format_hive_text_fields_delimiter {#input_format_hive_text_fields_delimiter}

View File

@ -1060,7 +1060,8 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -96,6 +96,8 @@ static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Increase block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
{"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"},
{"allow_statistic_optimize", false, false, "Old setting which popped up here being renamed."},
{"allow_experimental_statistic", false, false, "Old setting which popped up here being renamed."},

View File

@ -161,6 +161,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;
format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder;

View File

@ -265,7 +265,8 @@ struct FormatSettings
bool preserve_order = false;
bool use_custom_encoder = true;
bool parallel_encoding = true;
UInt64 max_block_size = 8192;
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
bool output_compliant_nested_types = true;

View File

@ -420,6 +420,24 @@ void ParquetBlockInputFormat::initializeIfNeeded()
int num_row_groups = metadata->num_row_groups();
row_group_batches.reserve(num_row_groups);
auto adative_chunk_size = [&](int row_group_idx) -> size_t
{
size_t total_size = 0;
auto row_group_meta = metadata->RowGroup(row_group_idx);
for (int column_index : column_indices)
{
total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size();
}
if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0;
auto average_row_bytes = floor(static_cast<double>(total_size) / row_group_meta->num_rows());
// avoid inf preferred_num_rows;
if (average_row_bytes < 1) return 0;
const size_t preferred_num_rows = static_cast<size_t>(floor(format_settings.parquet.prefer_block_bytes/average_row_bytes));
const size_t MIN_ROW_NUM = 128;
// size_t != UInt64 in darwin
return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast<size_t>(format_settings.parquet.max_block_size));
};
for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
@ -439,6 +457,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
auto rows = adative_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
}
}
@ -449,7 +469,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
parquet::ArrowReaderProperties arrow_properties;
parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance());
arrow_properties.set_use_threads(false);
arrow_properties.set_batch_size(format_settings.parquet.max_block_size);
arrow_properties.set_batch_size(row_group_batch.adaptive_chunk_size);
// When reading a row group, arrow will:
// 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one

View File

@ -208,6 +208,8 @@ private:
size_t total_rows = 0;
size_t total_bytes_compressed = 0;
size_t adaptive_chunk_size = 0;
std::vector<int> row_groups_idxs;
// These are only used by the decoding thread, so don't require locking the mutex.

View File

@ -0,0 +1,4 @@
65409
16
128
2363

View File

@ -0,0 +1,25 @@
-- Tags: no-fasttest, no-parallel, no-random-settings
set max_insert_threads=1;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet);
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_max_block_size=16;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30720;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;