From 17fc22a21e3b36ece62f84cb024673184fa26466 Mon Sep 17 00:00:00 2001 From: liuneng <1398775315@qq.com> Date: Wed, 1 Feb 2023 18:29:20 +0800 Subject: [PATCH] add parquet max_block_size setting --- src/Core/Settings.h | 1 + src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 1 + .../Formats/Impl/ParquetBlockInputFormat.cpp | 12 ++++++++---- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 8ddd42dbecf..db44184a77d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -774,6 +774,7 @@ class IColumn; M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \ M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \ M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \ + M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \ M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \ M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \ M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index a9045733cac..3fcecd23f5b 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -116,6 +116,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns; format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference; format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string; + format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8; format_settings.pretty.color = settings.output_format_pretty_color; format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 54de8907169..92e499abb10 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -183,6 +183,7 @@ struct FormatSettings bool case_insensitive_column_matching = false; std::unordered_set skip_row_groups = {}; bool output_string_as_string = false; + UInt64 max_block_size = 8192; } parquet; struct Pretty diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 550da8adef0..6f0b6b62ad8 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -47,23 +47,27 @@ Chunk ParquetBlockInputFormat::generate() if (!file_reader) { prepareReader(); - /// It may be necessary to add a parameter - file_reader->set_batch_size(8192); + file_reader->set_batch_size(format_settings.parquet.max_block_size); std::vector row_group_indices; - for (int i = 0; i < file_reader->num_row_groups(); ++i) + for (int i = 0; i < row_group_total; ++i) { if (!skip_row_groups.contains(i)) row_group_indices.emplace_back(i); } auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, ¤t_record_batch_reader); if (!read_status.ok()) - throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString()); + throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString()); } if (is_stopped) return {}; auto batch = current_record_batch_reader->Next(); + if (!batch.ok()) + { + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", + batch.status().ToString()); + } if (*batch) { auto tmp_table = arrow::Table::FromRecordBatches({*batch});