mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Proper fix for bug in parquet, revert reverted #45878
This commit is contained in:
parent
ee6c18cbc6
commit
4213ec609f
@ -803,6 +803,7 @@ class IColumn;
|
||||
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
|
||||
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
|
||||
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
|
||||
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
|
||||
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
|
||||
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
|
||||
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \
|
||||
|
@ -117,6 +117,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
|
||||
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
|
||||
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
|
||||
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
|
||||
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
|
||||
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
|
||||
format_settings.pretty.color = settings.output_format_pretty_color;
|
||||
|
@ -211,6 +211,7 @@ struct FormatSettings
|
||||
std::unordered_set<int> skip_row_groups = {};
|
||||
bool output_string_as_string = false;
|
||||
bool output_fixed_string_as_fixed_byte_array = true;
|
||||
UInt64 max_block_size = 8192;
|
||||
ParquetVersion output_version;
|
||||
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
|
||||
} parquet;
|
||||
|
@ -93,7 +93,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch
|
||||
|
||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
|
||||
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
|
||||
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
|
||||
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
@ -159,8 +159,8 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow
|
||||
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::FixedSizeBinaryArray & chunk = dynamic_cast<arrow::FixedSizeBinaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.values();
|
||||
column_chars_t.insert_assume_reserved(buffer->data(), buffer->data() + buffer->size());
|
||||
const uint8_t * raw_data = chunk.raw_values();
|
||||
column_chars_t.insert_assume_reserved(raw_data, raw_data + fixed_len * chunk.length());
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
@ -178,9 +178,6 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::Ch
|
||||
if (chunk.length() == 0)
|
||||
continue;
|
||||
|
||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
|
||||
|
||||
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
|
||||
column_data.emplace_back(chunk.Value(bool_i));
|
||||
}
|
||||
@ -402,7 +399,7 @@ static ColumnWithTypeAndName readColumnWithIndexesDataImpl(std::shared_ptr<arrow
|
||||
|
||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
|
||||
const auto * data = reinterpret_cast<const NumericType *>(buffer->data());
|
||||
const auto * data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
|
||||
|
||||
/// Check that indexes are correct (protection against corrupted files)
|
||||
/// Note that on null values index can be arbitrary value.
|
||||
@ -554,8 +551,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow:
|
||||
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
|
||||
const auto * raw_data = reinterpret_cast<const IPv6 *>(buffer->data());
|
||||
const auto * raw_data = reinterpret_cast<const IPv6 *>(chunk.raw_data() + chunk.raw_value_offsets()[0]);
|
||||
data.insert_assume_reserved(raw_data, raw_data + chunk.length());
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
|
@ -45,38 +45,44 @@ Chunk ParquetBlockInputFormat::generate()
|
||||
block_missing_values.clear();
|
||||
|
||||
if (!file_reader)
|
||||
{
|
||||
prepareReader();
|
||||
file_reader->set_batch_size(format_settings.parquet.max_block_size);
|
||||
std::vector<int> row_group_indices;
|
||||
for (int i = 0; i < row_group_total; ++i)
|
||||
{
|
||||
if (!skip_row_groups.contains(i))
|
||||
row_group_indices.emplace_back(i);
|
||||
}
|
||||
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, ¤t_record_batch_reader);
|
||||
if (!read_status.ok())
|
||||
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
|
||||
}
|
||||
|
||||
if (is_stopped)
|
||||
return {};
|
||||
|
||||
while (row_group_current < row_group_total && skip_row_groups.contains(row_group_current))
|
||||
++row_group_current;
|
||||
|
||||
if (row_group_current >= row_group_total)
|
||||
return res;
|
||||
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
|
||||
std::unique_ptr<::arrow::RecordBatchReader> rbr;
|
||||
std::vector<int> row_group_indices { row_group_current };
|
||||
arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr);
|
||||
|
||||
if (!get_batch_reader_status.ok())
|
||||
auto batch = current_record_batch_reader->Next();
|
||||
if (!batch.ok())
|
||||
{
|
||||
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}",
|
||||
get_batch_reader_status.ToString());
|
||||
batch.status().ToString());
|
||||
}
|
||||
if (*batch)
|
||||
{
|
||||
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
|
||||
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
|
||||
/// Otherwise fill the missing columns with zero values of its type.
|
||||
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
|
||||
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
current_record_batch_reader.reset();
|
||||
file_reader.reset();
|
||||
return {};
|
||||
}
|
||||
|
||||
arrow::Status read_status = rbr->ReadAll(&table);
|
||||
|
||||
if (!read_status.ok())
|
||||
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
|
||||
|
||||
++row_group_current;
|
||||
|
||||
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
|
||||
/// Otherwise fill the missing columns with zero values of its type.
|
||||
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
|
||||
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, table->num_rows(), block_missing_values_ptr);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user