Merge pull request #45878 from liuneng1994/optimize_parquet_reader

Optimization for parquet reader
This commit is contained in:
Antonio Andelic 2023-02-03 10:24:56 +01:00 committed by GitHub
commit c83f701696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 35 additions and 25 deletions

View File

@ -774,6 +774,7 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -116,6 +116,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;

View File

@ -183,6 +183,7 @@ struct FormatSettings
bool case_insensitive_column_matching = false;
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
UInt64 max_block_size = 8192;
} parquet;
struct Pretty

View File

@ -17,6 +17,7 @@
#include "ArrowFieldIndexUtil.h"
#include <DataTypes/NestedUtils.h>
namespace DB
{
@ -44,35 +45,40 @@ Chunk ParquetBlockInputFormat::generate()
block_missing_values.clear();
if (!file_reader)
{
prepareReader();
file_reader->set_batch_size(format_settings.parquet.max_block_size);
std::vector<int> row_group_indices;
for (int i = 0; i < row_group_total; ++i)
{
if (!skip_row_groups.contains(i))
row_group_indices.emplace_back(i);
}
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
}
if (is_stopped)
return {};
while (row_group_current < row_group_total && skip_row_groups.contains(row_group_current))
++row_group_current;
if (row_group_current >= row_group_total)
return res;
std::shared_ptr<arrow::Table> table;
std::unique_ptr<::arrow::RecordBatchReader> rbr;
std::vector<int> row_group_indices { row_group_current };
arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr);
if (!get_batch_reader_status.ok())
auto batch = current_record_batch_reader->Next();
if (!batch.ok())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}",
get_batch_reader_status.ToString());
arrow::Status read_status = rbr->ReadAll(&table);
if (!read_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
++row_group_current;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, table->num_rows());
batch.status().ToString());
}
if (*batch)
{
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows());
}
else
{
current_record_batch_reader.reset();
file_reader.reset();
return {};
}
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.

View File

@ -8,7 +8,7 @@
namespace parquet::arrow { class FileReader; }
namespace arrow { class Buffer; }
namespace arrow { class Buffer; class RecordBatchReader;}
namespace DB
{
@ -46,6 +46,7 @@ private:
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
const std::unordered_set<int> & skip_row_groups;
std::shared_ptr<arrow::RecordBatchReader> current_record_batch_reader;
std::atomic<int> is_stopped{0};
};

View File

@ -37,6 +37,6 @@ DATA_FILE=$CUR_DIR/data_parquet/int-list-zero-based-chunked-array.parquet
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (arr Array(Int64)) ENGINE = Memory"
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load SETTINGS max_threads=1" | md5sum
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load"
${CLICKHOUSE_CLIENT} --query="drop table parquet_load"