add test case

This commit is contained in:
liuneng 2024-05-28 15:17:08 +08:00
parent 15fc35699f
commit b4c2fa7e27
10 changed files with 40 additions and 14 deletions

View File

@ -1053,7 +1053,7 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_max_block_rows, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \

View File

@ -92,8 +92,8 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
{"input_format_parquet_max_block_rows", 8192, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
}},
{"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},

View File

@ -117,7 +117,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
"min_insert_block_size_bytes_for_materialized_views",
"min_external_table_block_size_rows",
"max_joined_block_size_rows",
"input_format_parquet_max_block_size"};
"input_format_parquet_max_block_rows"};
for (auto const & setting : block_rows_settings)
{
if (auto block_size = get_current_value(setting).get<UInt64>();

View File

@ -160,7 +160,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.max_block_rows = settings.input_format_parquet_max_block_rows;
format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;

View File

@ -265,7 +265,7 @@ struct FormatSettings
bool preserve_order = false;
bool use_custom_encoder = true;
bool parallel_encoding = true;
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
UInt64 max_block_rows = DEFAULT_BLOCK_SIZE;
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;

View File

@ -307,7 +307,7 @@ ParquetRecordReader::ParquetRecordReader(
: file_reader(createFileReader(std::move(arrow_file), std::move(metadata)))
, reader_properties(reader_properties_)
, header(std::move(header_))
, max_block_size(format_settings.parquet.max_block_size)
, max_block_rows(format_settings.parquet.max_block_rows)
, row_groups_indices(std::move(row_groups_indices_))
, left_rows(getTotalRows(*file_reader->metadata()))
{
@ -356,7 +356,7 @@ Chunk ParquetRecordReader::readChunk()
}
Columns columns(header.columns());
auto num_rows_read = std::min(max_block_size, cur_row_group_left_rows);
auto num_rows_read = std::min(max_block_rows, cur_row_group_left_rows);
for (size_t i = 0; i < header.columns(); i++)
{
columns[i] = castColumn(

View File

@ -36,7 +36,7 @@ private:
std::shared_ptr<parquet::RowGroupReader> cur_row_group_reader;
ParquetColReaders column_readers;
UInt64 max_block_size;
UInt64 max_block_rows;
std::vector<int> parquet_col_indice;
std::vector<int> row_groups_indices;

View File

@ -424,13 +424,14 @@ void ParquetBlockInputFormat::initializeIfNeeded()
auto row_group_meta = metadata->RowGroup(row_group_idx);
for (int column_index : column_indices)
{
auto column = row_group_meta->ColumnChunk(column_index);
total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size();
}
if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0;
auto average_row_bytes = total_size / row_group_meta->num_rows();
/// max_block_size >= num_rows >= 128
auto num_rows = std::min(format_settings.parquet.prefer_block_bytes/average_row_bytes, format_settings.parquet.max_block_size);
return std::max(num_rows, 128UL);
auto average_row_bytes = static_cast<double>(total_size) / row_group_meta->num_rows();
const size_t preferred_num_rows = static_cast<size_t>(format_settings.parquet.prefer_block_bytes/average_row_bytes);
const size_t MIN_ROW_NUM = 128;
return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), format_settings.parquet.max_block_rows);
};
for (int row_group = 0; row_group < num_row_groups; ++row_group)
@ -453,7 +454,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
auto rows = adative_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_rows;
}
}

View File

@ -0,0 +1,4 @@
65409
16
128
2183

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet);
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_max_block_rows=16;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30000;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;