This commit is contained in:
Michael Kolupaev 2024-05-28 19:35:06 +00:00
parent 10a78f0841
commit 4e38e89ffd
3 changed files with 18 additions and 14 deletions

View File

@ -46,12 +46,13 @@ namespace
std::unique_ptr<parquet::ParquetFileReader> createFileReader(
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
parquet::ReaderProperties reader_properties,
std::shared_ptr<parquet::FileMetaData> metadata = nullptr)
{
std::unique_ptr<parquet::ParquetFileReader> res;
THROW_PARQUET_EXCEPTION(res = parquet::ParquetFileReader::Open(
std::move(arrow_file),
parquet::default_reader_properties(),
reader_properties,
metadata));
return res;
}
@ -60,12 +61,12 @@ class ColReaderFactory
{
public:
ColReaderFactory(
const parquet::ArrowReaderProperties & reader_properties_,
const parquet::ArrowReaderProperties & arrow_properties_,
const parquet::ColumnDescriptor & col_descriptor_,
DataTypePtr ch_type_,
std::unique_ptr<parquet::ColumnChunkMetaData> meta_,
std::unique_ptr<parquet::PageReader> page_reader_)
: reader_properties(reader_properties_)
: arrow_properties(arrow_properties_)
, col_descriptor(col_descriptor_)
, ch_type(std::move(ch_type_))
, meta(std::move(meta_))
@ -74,7 +75,7 @@ public:
std::unique_ptr<ParquetColumnReader> makeReader();
private:
const parquet::ArrowReaderProperties & reader_properties;
const parquet::ArrowReaderProperties & arrow_properties;
const parquet::ColumnDescriptor & col_descriptor;
DataTypePtr ch_type;
std::unique_ptr<parquet::ColumnChunkMetaData> meta;
@ -274,7 +275,7 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::makeReader()
DataTypePtr read_type = ch_type;
if (!isDateTime64(ch_type))
{
auto scale = getScaleFromArrowTimeUnit(reader_properties.coerce_int96_timestamp_unit());
auto scale = getScaleFromArrowTimeUnit(arrow_properties.coerce_int96_timestamp_unit());
read_type = std::make_shared<DataTypeDateTime64>(scale);
}
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DateTime64>>>(
@ -299,13 +300,14 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::makeReader()
ParquetRecordReader::ParquetRecordReader(
Block header_,
parquet::ArrowReaderProperties reader_properties_,
parquet::ArrowReaderProperties arrow_properties_,
parquet::ReaderProperties reader_properties_,
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
const FormatSettings & format_settings,
std::vector<int> row_groups_indices_,
std::shared_ptr<parquet::FileMetaData> metadata)
: file_reader(createFileReader(std::move(arrow_file), std::move(metadata)))
, reader_properties(reader_properties_)
: file_reader(createFileReader(std::move(arrow_file), reader_properties_, std::move(metadata)))
, arrow_properties(arrow_properties_)
, header(std::move(header_))
, max_block_size(format_settings.parquet.max_block_size)
, row_groups_indices(std::move(row_groups_indices_))
@ -337,10 +339,10 @@ ParquetRecordReader::ParquetRecordReader(
chassert(idx >= 0);
parquet_col_indice.push_back(idx);
}
if (reader_properties.pre_buffer())
if (arrow_properties.pre_buffer())
{
THROW_PARQUET_EXCEPTION(file_reader->PreBuffer(
row_groups_indices, parquet_col_indice, reader_properties.io_context(), reader_properties.cache_options()));
row_groups_indices, parquet_col_indice, arrow_properties.io_context(), arrow_properties.cache_options()));
}
}
@ -378,7 +380,7 @@ void ParquetRecordReader::loadNextRowGroup()
for (size_t i = 0; i < parquet_col_indice.size(); i++)
{
ColReaderFactory factory(
reader_properties,
arrow_properties,
*file_reader->metadata()->schema()->Column(parquet_col_indice[i]),
header.getByPosition(i).type,
cur_row_group_reader->metadata()->ColumnChunk(parquet_col_indice[i]),

View File

@ -19,7 +19,8 @@ class ParquetRecordReader
public:
ParquetRecordReader(
Block header_,
parquet::ArrowReaderProperties reader_properties_,
parquet::ArrowReaderProperties arrow_properties_,
parquet::ReaderProperties reader_properties_,
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
const FormatSettings & format_settings,
std::vector<int> row_groups_indices_,
@ -29,7 +30,7 @@ public:
private:
std::unique_ptr<parquet::ParquetFileReader> file_reader;
parquet::ArrowReaderProperties reader_properties;
parquet::ArrowReaderProperties arrow_properties;
Block header;

View File

@ -499,7 +499,8 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
row_group_batch.native_record_reader = std::make_shared<ParquetRecordReader>(
getPort().getHeader(),
std::move(properties),
arrow_properties,
reader_properties,
arrow_file,
format_settings,
row_group_batch.row_groups_idxs);