support reading simple types by native parquet reader

Change-Id: I38b8368b022263d9a71cb3f3e9fdad5d6ca26753
This commit is contained in:
copperybean 2024-01-28 09:56:36 +08:00
parent 8fb89cec9f
commit dbdff6c038
9 changed files with 153 additions and 73 deletions

View File

@ -1013,6 +1013,7 @@ class IColumn;
M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \ M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \
M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \ M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
M(Bool, input_format_parquet_filter_push_down, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata.", 0) \ M(Bool, input_format_parquet_filter_push_down, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata.", 0) \
M(Bool, input_format_parquet_use_native_reader, false, "When reading Parquet files, to use native reader instead of arrow reader.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \ M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, true, "Allow missing columns while reading ORC input formats", 0) \ M(Bool, input_format_orc_allow_missing_columns, true, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \ M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \

View File

@ -154,6 +154,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.case_insensitive_column_matching = settings.input_format_parquet_case_insensitive_column_matching; format_settings.parquet.case_insensitive_column_matching = settings.input_format_parquet_case_insensitive_column_matching;
format_settings.parquet.preserve_order = settings.input_format_parquet_preserve_order; format_settings.parquet.preserve_order = settings.input_format_parquet_preserve_order;
format_settings.parquet.filter_push_down = settings.input_format_parquet_filter_push_down; format_settings.parquet.filter_push_down = settings.input_format_parquet_filter_push_down;
format_settings.parquet.use_native_reader = settings.input_format_parquet_use_native_reader;
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns; format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference; format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string; format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;

View File

@ -258,6 +258,7 @@ struct FormatSettings
bool skip_columns_with_unsupported_types_in_schema_inference = false; bool skip_columns_with_unsupported_types_in_schema_inference = false;
bool case_insensitive_column_matching = false; bool case_insensitive_column_matching = false;
bool filter_push_down = true; bool filter_push_down = true;
bool use_native_reader = false;
std::unordered_set<int> skip_row_groups = {}; std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false; bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true; bool output_fixed_string_as_fixed_byte_array = true;

View File

@ -34,7 +34,7 @@ public:
void ALWAYS_INLINE readValue(TValue & dst) void ALWAYS_INLINE readValue(TValue & dst)
{ {
checkAvaible(sizeof(TValue)); checkAvaible(sizeof(TValue));
dst = *reinterpret_cast<const TValue *>(data); dst = *(reinterpret_cast<const TValue *>(data));
consume(sizeof(TValue)); consume(sizeof(TValue));
} }

View File

@ -274,7 +274,14 @@ void ParquetLeafColReader<TColumn>::resetColumn(UInt64 rows_num)
template <typename TColumn> template <typename TColumn>
void ParquetLeafColReader<TColumn>::degradeDictionary() void ParquetLeafColReader<TColumn>::degradeDictionary()
{ {
// if last batch read all dictionary indices, then degrade is not needed this time
if (!column)
{
dictionary = nullptr;
return;
}
assert(dictionary && column->size()); assert(dictionary && column->size());
null_map = std::make_unique<LazyNullMap>(reading_rows_num); null_map = std::make_unique<LazyNullMap>(reading_rows_num);
auto col_existing = std::move(column); auto col_existing = std::move(column);
column = ColumnString::create(); column = ColumnString::create();
@ -304,7 +311,8 @@ void ParquetLeafColReader<TColumn>::degradeDictionary()
col_dest.getOffsets()[i] = col_dest.getChars().size(); col_dest.getOffsets()[i] = col_dest.getChars().size();
} }
}); });
LOG_INFO(log, "degraded dictionary to normal column"); dictionary = nullptr;
LOG_DEBUG(log, "degraded dictionary to normal column");
} }
template <typename TColumn> template <typename TColumn>
@ -364,7 +372,7 @@ void ParquetLeafColReader<TColumn>::readPage()
throw new Exception( throw new Exception(
ErrorCodes::NOT_IMPLEMENTED, "Unsupported dictionary page encoding {}", dict_page.encoding()); ErrorCodes::NOT_IMPLEMENTED, "Unsupported dictionary page encoding {}", dict_page.encoding());
} }
LOG_INFO(log, "{} values in dictionary page of column {}", dict_page.num_values(), col_descriptor.name()); LOG_DEBUG(log, "{} values in dictionary page of column {}", dict_page.num_values(), col_descriptor.name());
dictionary = readDictPage<TColumn>(dict_page, col_descriptor, base_data_type); dictionary = readDictPage<TColumn>(dict_page, col_descriptor, base_data_type);
if (std::is_same_v<TColumn, ColumnString>) if (std::is_same_v<TColumn, ColumnString>)

View File

@ -45,17 +45,15 @@ namespace ErrorCodes
namespace namespace
{ {
Int64 getTotalRows(const parquet::FileMetaData & meta_data) std::unique_ptr<parquet::ParquetFileReader> createFileReader(
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file)
{ {
Int64 res = 0; std::unique_ptr<parquet::ParquetFileReader> res;
for (int i = 0; i < meta_data.num_row_groups(); i++) THROW_PARQUET_EXCEPTION(res = parquet::ParquetFileReader::Open(std::move(arrow_file)));
{
res += meta_data.RowGroup(i)->num_rows();
}
return res; return res;
} }
std::unique_ptr<ParquetColumnReader> createReader( std::unique_ptr<ParquetColumnReader> createColReader(
const parquet::ColumnDescriptor & col_descriptor, const parquet::ColumnDescriptor & col_descriptor,
DataTypePtr ch_type, DataTypePtr ch_type,
std::unique_ptr<parquet::ColumnChunkMetaData> meta, std::unique_ptr<parquet::ColumnChunkMetaData> meta,
@ -86,7 +84,7 @@ std::unique_ptr<ParquetColumnReader> createReader(
} }
case parquet::Type::FIXED_LEN_BYTE_ARRAY: case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{ {
if (col_descriptor.type_length() <= static_cast<int>(DecimalUtils::max_precision<Decimal128>)) if (col_descriptor.type_length() <= static_cast<int>(sizeof(Decimal128)))
{ {
auto data_type = std::make_shared<DataTypeDecimal128>( auto data_type = std::make_shared<DataTypeDecimal128>(
col_descriptor.type_precision(), col_descriptor.type_scale()); col_descriptor.type_precision(), col_descriptor.type_scale());
@ -148,16 +146,21 @@ std::unique_ptr<ParquetColumnReader> createReader(
ParquetRecordReader::ParquetRecordReader( ParquetRecordReader::ParquetRecordReader(
Block header_, Block header_,
std::shared_ptr<::arrow::io::RandomAccessFile> file, parquet::ArrowReaderProperties reader_properties_,
const parquet::ReaderProperties& properties) std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
: header(std::move(header_)) const FormatSettings & format_settings,
std::vector<int> row_groups_indices_)
: file_reader(createFileReader(std::move(arrow_file)))
, reader_properties(reader_properties_)
, header(std::move(header_))
, max_block_size(format_settings.parquet.max_block_size)
, row_groups_indices(std::move(row_groups_indices_))
, left_rows(getTotalRows(*file_reader->metadata()))
{ {
// Only little endian system is supported currently // Only little endian system is supported currently
static_assert(std::endian::native == std::endian::little); static_assert(std::endian::native == std::endian::little);
log = &Poco::Logger::get("ParquetRecordReader"); log = &Poco::Logger::get("ParquetRecordReader");
THROW_PARQUET_EXCEPTION(file_reader = parquet::ParquetFileReader::Open(std::move(file), properties));
left_rows = getTotalRows(*file_reader->metadata());
parquet_col_indice.reserve(header.columns()); parquet_col_indice.reserve(header.columns());
column_readers.reserve(header.columns()); column_readers.reserve(header.columns());
@ -167,13 +170,18 @@ ParquetRecordReader::ParquetRecordReader(
if (idx < 0) if (idx < 0)
{ {
auto msg = PreformattedMessage::create("can not find column with name: {}", col_with_name.name); auto msg = PreformattedMessage::create("can not find column with name: {}", col_with_name.name);
throw Exception(std::move(msg), ErrorCodes::BAD_ARGUMENTS); throw Exception(std::move(msg), ErrorCodes::PARQUET_EXCEPTION);
} }
parquet_col_indice.push_back(idx); parquet_col_indice.push_back(idx);
} }
if (reader_properties.pre_buffer())
{
THROW_PARQUET_EXCEPTION(file_reader->PreBuffer(
row_groups_indices, parquet_col_indice, reader_properties.io_context(), reader_properties.cache_options()));
}
} }
Chunk ParquetRecordReader::readChunk(size_t num_rows) Chunk ParquetRecordReader::readChunk()
{ {
if (!left_rows) if (!left_rows)
{ {
@ -185,7 +193,7 @@ Chunk ParquetRecordReader::readChunk(size_t num_rows)
} }
Columns columns(header.columns()); Columns columns(header.columns());
auto num_rows_read = std::min(num_rows, cur_row_group_left_rows); auto num_rows_read = std::min(max_block_size, cur_row_group_left_rows);
for (size_t i = 0; i < header.columns(); i++) for (size_t i = 0; i < header.columns(); i++)
{ {
columns[i] = castColumn( columns[i] = castColumn(
@ -201,20 +209,33 @@ Chunk ParquetRecordReader::readChunk(size_t num_rows)
void ParquetRecordReader::loadNextRowGroup() void ParquetRecordReader::loadNextRowGroup()
{ {
Stopwatch watch(CLOCK_MONOTONIC); Stopwatch watch(CLOCK_MONOTONIC);
cur_row_group_reader = file_reader->RowGroup(next_row_group_idx); cur_row_group_reader = file_reader->RowGroup(row_groups_indices[next_row_group_idx]);
column_readers.clear(); column_readers.clear();
for (size_t i = 0; i < parquet_col_indice.size(); i++) for (size_t i = 0; i < parquet_col_indice.size(); i++)
{ {
column_readers.emplace_back(createReader( column_readers.emplace_back(createColReader(
*file_reader->metadata()->schema()->Column(parquet_col_indice[i]), *file_reader->metadata()->schema()->Column(parquet_col_indice[i]),
header.getByPosition(i).type, header.getByPosition(i).type,
cur_row_group_reader->metadata()->ColumnChunk(parquet_col_indice[i]), cur_row_group_reader->metadata()->ColumnChunk(parquet_col_indice[i]),
cur_row_group_reader->GetColumnPageReader(parquet_col_indice[i]))); cur_row_group_reader->GetColumnPageReader(parquet_col_indice[i])));
} }
LOG_DEBUG(log, "reading row group {} consumed {} ms", next_row_group_idx, watch.elapsedNanoseconds() / 1e6);
auto duration = watch.elapsedNanoseconds() / 1e6;
LOG_DEBUG(log, "reading row group {} consumed {} ms", row_groups_indices[next_row_group_idx], duration);
++next_row_group_idx; ++next_row_group_idx;
cur_row_group_left_rows = cur_row_group_reader->metadata()->num_rows(); cur_row_group_left_rows = cur_row_group_reader->metadata()->num_rows();
} }
Int64 ParquetRecordReader::getTotalRows(const parquet::FileMetaData & meta_data)
{
Int64 res = 0;
for (size_t i = 0; i < row_groups_indices.size(); i++)
{
res += meta_data.RowGroup(row_groups_indices[i])->num_rows();
}
return res;
}
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Core/Block.h> #include <Core/Block.h>
#include <Formats/FormatSettings.h>
#include <Processors/Chunk.h> #include <Processors/Chunk.h>
#include <Processors/Formats/Impl/Parquet/ParquetColumnReader.h> #include <Processors/Formats/Impl/Parquet/ParquetColumnReader.h>
@ -18,23 +19,29 @@ class ParquetRecordReader
public: public:
ParquetRecordReader( ParquetRecordReader(
Block header_, Block header_,
std::shared_ptr<::arrow::io::RandomAccessFile> file, parquet::ArrowReaderProperties reader_properties_,
const parquet::ReaderProperties& properties); std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
const FormatSettings & format_settings,
std::vector<int> row_groups_indices_);
Chunk readChunk(size_t num_rows); Chunk readChunk();
// follow the scale generated by spark // follow the scale generated by spark
static constexpr UInt8 default_datetime64_scale = 9; static constexpr UInt8 default_datetime64_scale = 9;
private: private:
std::unique_ptr<parquet::ParquetFileReader> file_reader; std::unique_ptr<parquet::ParquetFileReader> file_reader;
parquet::ArrowReaderProperties reader_properties;
Block header; Block header;
std::shared_ptr<parquet::RowGroupReader> cur_row_group_reader; std::shared_ptr<parquet::RowGroupReader> cur_row_group_reader;
ParquetColReaders column_readers; ParquetColReaders column_readers;
UInt64 max_block_size;
std::vector<int> parquet_col_indice; std::vector<int> parquet_col_indice;
std::vector<int> row_groups_indices;
UInt64 left_rows; UInt64 left_rows;
UInt64 cur_row_group_left_rows = 0; UInt64 cur_row_group_left_rows = 0;
int next_row_group_idx = 0; int next_row_group_idx = 0;
@ -42,6 +49,7 @@ private:
Poco::Logger * log; Poco::Logger * log;
void loadNextRowGroup(); void loadNextRowGroup();
Int64 getTotalRows(const parquet::FileMetaData & meta_data);
}; };
} }

View File

@ -23,6 +23,7 @@
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <Common/FieldVisitorsAccurateComparison.h> #include <Common/FieldVisitorsAccurateComparison.h>
#include <Processors/Formats/Impl/Parquet/ParquetRecordReader.h>
namespace CurrentMetrics namespace CurrentMetrics
{ {
@ -392,6 +393,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
{ {
if (std::exchange(is_initialized, true)) if (std::exchange(is_initialized, true))
return; return;
if (format_settings.parquet.use_native_reader)
LOG_INFO(&Poco::Logger::get("ParquetBlockInputFormat"), "using native parquet reader");
// Create arrow file adapter. // Create arrow file adapter.
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
@ -479,6 +482,17 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
if (metadata->writer_version().VersionLt(parquet::ApplicationVersion::PARQUET_816_FIXED_VERSION())) if (metadata->writer_version().VersionLt(parquet::ApplicationVersion::PARQUET_816_FIXED_VERSION()))
properties.set_pre_buffer(false); properties.set_pre_buffer(false);
if (format_settings.parquet.use_native_reader)
{
row_group_batch.native_record_reader = std::make_shared<ParquetRecordReader>(
getPort().getHeader(),
std::move(properties),
arrow_file,
format_settings,
row_group_batch.row_groups_idxs);
}
else
{
parquet::arrow::FileReaderBuilder builder; parquet::arrow::FileReaderBuilder builder;
THROW_ARROW_NOT_OK( THROW_ARROW_NOT_OK(
builder.Open(arrow_file, /* not to be confused with ArrowReaderProperties */ parquet::default_reader_properties(), metadata)); builder.Open(arrow_file, /* not to be confused with ArrowReaderProperties */ parquet::default_reader_properties(), metadata));
@ -496,6 +510,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
format_settings.null_as_default, format_settings.null_as_default,
format_settings.date_time_overflow_behavior, format_settings.date_time_overflow_behavior,
format_settings.parquet.case_insensitive_column_matching); format_settings.parquet.case_insensitive_column_matching);
}
} }
void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_batch_idx) void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_batch_idx)
@ -561,6 +576,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
lock.unlock(); lock.unlock();
auto end_of_row_group = [&] { auto end_of_row_group = [&] {
row_group_batch.native_record_reader.reset();
row_group_batch.arrow_column_to_ch_column.reset(); row_group_batch.arrow_column_to_ch_column.reset();
row_group_batch.record_batch_reader.reset(); row_group_batch.record_batch_reader.reset();
row_group_batch.file_reader.reset(); row_group_batch.file_reader.reset();
@ -573,10 +589,37 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
// reached. Wake up read() instead. // reached. Wake up read() instead.
condvar.notify_all(); condvar.notify_all();
}; };
auto get_pending_chunk = [&](size_t num_rows, Chunk chunk = {})
{
size_t approx_chunk_original_size = static_cast<size_t>(std::ceil(
static_cast<double>(row_group_batch.total_bytes_compressed) / row_group_batch.total_rows * num_rows));
return PendingChunk{
.chunk = std::move(chunk),
.block_missing_values = {},
.chunk_idx = row_group_batch.next_chunk_idx,
.row_group_batch_idx = row_group_batch_idx,
.approx_original_chunk_size = approx_chunk_original_size
};
};
if (!row_group_batch.record_batch_reader) if (!row_group_batch.record_batch_reader && !row_group_batch.native_record_reader)
initializeRowGroupBatchReader(row_group_batch_idx); initializeRowGroupBatchReader(row_group_batch_idx);
PendingChunk res;
if (format_settings.parquet.use_native_reader)
{
auto chunk = row_group_batch.native_record_reader->readChunk();
if (!chunk)
{
end_of_row_group();
return;
}
auto num_rows = chunk.getNumRows();
res = get_pending_chunk(num_rows, std::move(chunk));
}
else
{
auto batch = row_group_batch.record_batch_reader->Next(); auto batch = row_group_batch.record_batch_reader->Next();
if (!batch.ok()) if (!batch.ok())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString()); throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString());
@ -588,20 +631,13 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
} }
auto tmp_table = arrow::Table::FromRecordBatches({*batch}); auto tmp_table = arrow::Table::FromRecordBatches({*batch});
res = get_pending_chunk((*tmp_table)->num_rows());
size_t approx_chunk_original_size = static_cast<size_t>(std::ceil(static_cast<double>(row_group_batch.total_bytes_compressed) / row_group_batch.total_rows * (*tmp_table)->num_rows()));
PendingChunk res = {
.chunk = {},
.block_missing_values = {},
.chunk_idx = row_group_batch.next_chunk_idx,
.row_group_batch_idx = row_group_batch_idx,
.approx_original_chunk_size = approx_chunk_original_size
};
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type. /// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr; BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr;
res.chunk = row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(*tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr); res.chunk = row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(*tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
}
lock.lock(); lock.lock();

View File

@ -16,6 +16,7 @@ namespace DB
{ {
class ArrowColumnToCHColumn; class ArrowColumnToCHColumn;
class ParquetRecordReader;
// Parquet files contain a metadata block with the following information: // Parquet files contain a metadata block with the following information:
// * list of columns, // * list of columns,
@ -210,6 +211,9 @@ private:
std::vector<int> row_groups_idxs; std::vector<int> row_groups_idxs;
// These are only used by the decoding thread, so don't require locking the mutex. // These are only used by the decoding thread, so don't require locking the mutex.
// If use_native_reader, only native_record_reader is used;
// otherwise, only native_record_reader is not used.
std::shared_ptr<ParquetRecordReader> native_record_reader;
std::unique_ptr<parquet::arrow::FileReader> file_reader; std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader; std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column; std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;