adapting parquet reader output block rows

This commit is contained in:
liuneng 2024-05-27 16:02:15 +08:00
parent 7a552f5b06
commit b30d11f046
6 changed files with 27 additions and 3 deletions

View File

@ -1053,7 +1053,8 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -92,6 +92,8 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
}},
{"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},

View File

@ -161,6 +161,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;
format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder;

View File

@ -265,7 +265,8 @@ struct FormatSettings
bool preserve_order = false;
bool use_custom_encoder = true;
bool parallel_encoding = true;
UInt64 max_block_size = 8192;
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
bool output_compliant_nested_types = true;

View File

@ -418,6 +418,21 @@ void ParquetBlockInputFormat::initializeIfNeeded()
int num_row_groups = metadata->num_row_groups();
row_group_batches.reserve(num_row_groups);
auto adative_chunk_size = [&](int row_group_idx) -> size_t
{
size_t total_size = 0;
auto row_group_meta = metadata->RowGroup(row_group_idx);
for (int column_index : column_indices)
{
total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size();
}
if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0;
auto average_row_bytes = total_size / row_group_meta->num_rows();
/// max_block_bytes >= num_rows >= 128
auto num_rows = std::min(format_settings.parquet.prefer_block_bytes/average_row_bytes, format_settings.parquet.max_block_size);
return std::max(num_rows, 128UL);
};
for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
@ -437,6 +452,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
auto rows = adative_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? format_settings.parquet.max_block_size :rows;
}
}
@ -446,7 +463,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
parquet::ArrowReaderProperties properties;
properties.set_use_threads(false);
properties.set_batch_size(format_settings.parquet.max_block_size);
properties.set_batch_size(row_group_batch.adaptive_chunk_size);
// When reading a row group, arrow will:
// 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one

View File

@ -208,6 +208,8 @@ private:
size_t total_rows = 0;
size_t total_bytes_compressed = 0;
size_t adaptive_chunk_size = 0;
std::vector<int> row_groups_idxs;
// These are only used by the decoding thread, so don't require locking the mutex.