From 8434ad930e0fbd3ea8ed4df5f1c66af12770fbea Mon Sep 17 00:00:00 2001 From: liuneng1994 Date: Sun, 7 Jul 2024 18:30:08 +0800 Subject: [PATCH] plain int64 with range filter --- .../Formats/Impl/Parquet/ColumnFilter.cpp | 1 + .../Formats/Impl/Parquet/ColumnFilter.h | 59 +++++-- .../Formats/Impl/Parquet/ParquetReader.cpp | 70 ++++++++ .../Formats/Impl/Parquet/ParquetReader.h | 49 ++++++ .../Impl/Parquet/SelectiveColumnReader.cpp | 153 ++++++++++++++---- .../Impl/Parquet/SelectiveColumnReader.h | 61 +++++-- .../tests/gtest_native_parquet_reader.cpp | 85 ++++++++++ 7 files changed, 425 insertions(+), 53 deletions(-) create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetReader.cpp create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetReader.h create mode 100644 src/Processors/tests/gtest_native_parquet_reader.cpp diff --git a/src/Processors/Formats/Impl/Parquet/ColumnFilter.cpp b/src/Processors/Formats/Impl/Parquet/ColumnFilter.cpp index b9965dd3705..b9d841cbbe8 100644 --- a/src/Processors/Formats/Impl/Parquet/ColumnFilter.cpp +++ b/src/Processors/Formats/Impl/Parquet/ColumnFilter.cpp @@ -1 +1,2 @@ #include "ColumnFilter.h" + diff --git a/src/Processors/Formats/Impl/Parquet/ColumnFilter.h b/src/Processors/Formats/Impl/Parquet/ColumnFilter.h index fdb7883a14d..0d685e4c5af 100644 --- a/src/Processors/Formats/Impl/Parquet/ColumnFilter.h +++ b/src/Processors/Formats/Impl/Parquet/ColumnFilter.h @@ -8,6 +8,7 @@ using ColumnFilterPtr = std::shared_ptr; enum ColumnFilterKind { + Unknown, AlwaysTrue, AlwaysFalse, IsNull, @@ -17,19 +18,57 @@ enum ColumnFilterKind Int64In, }; + class ColumnFilter { public: - bool testNull(); - bool testNotNull(); - bool testInt64(Int64 value); - bool testFloat32(Float32 value); - bool testFloat64(Float64 value); - bool testBool(bool value); - bool testInt64Range(Int64 min, Int64 max); - bool testFloat32Range(Float32 min, Float32 max); - bool testFloat64Range(Float64 min, Float64 max); + virtual ~ColumnFilter() { } + virtual ColumnFilterKind kind() { return Unknown; } + virtual bool testNull() { return true; } + virtual bool testNotNull() { return true; } + virtual bool testInt64(Int64) { return true; } + virtual bool testFloat32(Float32) { return true; } + virtual bool testFloat64(Float64) { return true; } + virtual bool testBool(bool) { return true; } + virtual bool testInt64Range(Int64, Int64) { return true; } + virtual bool testFloat32Range(Float32, Float32) { return true; } + virtual bool testFloat64Range(Float64, Float64) { return true; } +}; + +class AlwaysTrueFilter : public ColumnFilter +{ +public: + ColumnFilterKind kind() override { return AlwaysTrue; } + bool testNull() override { return true; } + bool testNotNull() override { return true; } + bool testInt64(Int64) override { return true; } + bool testFloat32(Float32) override { return true; } + bool testFloat64(Float64) override { return true; } + bool testBool(bool) override { return true; } + bool testInt64Range(Int64, Int64) override { return true; } + bool testFloat32Range(Float32, Float32) override { return true; } + bool testFloat64Range(Float64, Float64) override { return true; } +}; + +class Int64RangeFilter : public ColumnFilter +{ +public: + explicit Int64RangeFilter(Int64 min_, Int64 max_) : max(max_), min(min_) { } + ~Int64RangeFilter() override = default; + ColumnFilterKind kind() override { return Int64Range; } + bool testNull() override { return false; } + bool testNotNull() override { return true; } + bool testInt64(Int64 int64) override { return int64 >= min && int64 <= max; } + bool testFloat32(Float32 float32) override { return ColumnFilter::testFloat32(float32); } + bool testFloat64(Float64 float64) override { return ColumnFilter::testFloat64(float64); } + bool testBool(bool b) override { return ColumnFilter::testBool(b); } + bool testInt64Range(Int64 int64, Int64 int641) override { return ColumnFilter::testInt64Range(int64, int641); } + bool testFloat32Range(Float32 float32, Float32 float321) override { return ColumnFilter::testFloat32Range(float32, float321); } + bool testFloat64Range(Float64 float64, Float64 float641) override { return ColumnFilter::testFloat64Range(float64, float641); } + +private: + Int64 max = INT64_MAX; + Int64 min = INT64_MIN; }; } - diff --git a/src/Processors/Formats/Impl/Parquet/ParquetReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetReader.cpp new file mode 100644 index 00000000000..e0895859920 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetReader.cpp @@ -0,0 +1,70 @@ +#include "ParquetReader.h" + +namespace DB +{ +namespace ErrorCodes +{ +extern const int NOT_IMPLEMENTED; +extern const int PARQUET_EXCEPTION; +} + +#define THROW_PARQUET_EXCEPTION(s) \ + do \ + { \ + try { (s); } \ + catch (const ::parquet::ParquetException & e) \ + { \ + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Parquet exception: {}", e.what()); \ + } \ + } while (false) + + +std::unique_ptr createFileReader( + std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file, + parquet::ReaderProperties reader_properties, + std::shared_ptr metadata = nullptr) +{ + std::unique_ptr res; + THROW_PARQUET_EXCEPTION(res = parquet::ParquetFileReader::Open(std::move(arrow_file), reader_properties, metadata)); + return res; +} + +ParquetReader::ParquetReader( + Block header_, + parquet::ArrowReaderProperties arrow_properties_, + parquet::ReaderProperties reader_properties_, + std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file_, + const FormatSettings & format_settings, + std::vector row_groups_indices_, + std::shared_ptr metadata) + : file_reader(createFileReader(arrow_file_, reader_properties_, metadata)) + , arrow_properties(arrow_properties_) + , header(std::move(header_)) + , max_block_size(format_settings.parquet.max_block_size) + , row_groups_indices(std::move(row_groups_indices_)) + , meta_data(file_reader->metadata()) +{ +} + +void ParquetReader::loadRowGroupChunkReaderIfNeeded() +{ + if ((!row_group_chunk_reader || !row_group_chunk_reader->hasMoreRows()) && next_row_group_idx < row_groups_indices.size()) + { + row_group_chunk_reader = std::make_unique( + this, + file_reader->RowGroup(row_groups_indices[next_row_group_idx]), + filters); + next_row_group_idx ++; + } +} +Block ParquetReader::read() +{ + loadRowGroupChunkReaderIfNeeded(); + auto chunk = row_group_chunk_reader->readChunk(max_block_size); + return header.cloneWithColumns(chunk.detachColumns()); +} +void ParquetReader::addFilter(const String & column_name, const ColumnFilterPtr filter) +{ + filters[column_name] = filter; +} +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetReader.h b/src/Processors/Formats/Impl/Parquet/ParquetReader.h new file mode 100644 index 00000000000..6c0cbe74a32 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetReader.h @@ -0,0 +1,49 @@ +#pragma once +#include +#include +#include +#include + + +#include +#include +#include + +namespace DB +{ +class ParquetReader +{ +public: + friend class RowGroupChunkReader; + ParquetReader( + Block header_, + parquet::ArrowReaderProperties arrow_properties_, + parquet::ReaderProperties reader_properties_, + std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file, + const FormatSettings & format_settings, + std::vector row_groups_indices_, + std::shared_ptr metadata = nullptr); + + Block read(); + void addFilter(const String & column_name, const ColumnFilterPtr filter); +private: + void loadRowGroupChunkReaderIfNeeded(); + + std::unique_ptr file_reader; + parquet::ArrowReaderProperties arrow_properties; + + Block header; + + std::unique_ptr row_group_chunk_reader; + + UInt64 max_block_size; + + std::unordered_map filters; + std::vector parquet_col_indice; + std::vector row_groups_indices; + size_t next_row_group_idx = 0; + std::shared_ptr meta_data; +}; +} + + diff --git a/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.cpp b/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.cpp index 9f1cbbc74f1..1f8f8fe2a12 100644 --- a/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.cpp @@ -2,7 +2,8 @@ #include #include -#include +#include +#include #include namespace DB @@ -11,11 +12,13 @@ namespace DB namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; +extern const int PARQUET_EXCEPTION; } Chunk RowGroupChunkReader::readChunk(size_t rows) { - Columns columns; + rows = std::min(rows, remain_rows); + MutableColumns columns; for (auto & reader : column_readers) { @@ -26,7 +29,9 @@ Chunk RowGroupChunkReader::readChunk(size_t rows) size_t rows_read = 0; while (rows_read < rows) { - size_t rows_to_read = rows - rows_read; + size_t rows_to_read = std::min(rows - rows_read, remain_rows); + if (!rows_to_read) + break; for (auto & reader : column_readers) { if (!reader->currentRemainRows()) @@ -41,7 +46,7 @@ Chunk RowGroupChunkReader::readChunk(size_t rows) RowSet row_set(rows_to_read); for (auto & column : filter_columns) { - reader_columns_mapping[column]->computeRowSet(row_set, rows); + reader_columns_mapping[column]->computeRowSet(row_set, rows_to_read); } bool skip_all = row_set.none(); for (size_t i = 0; i < column_readers.size(); i++) @@ -51,9 +56,51 @@ Chunk RowGroupChunkReader::readChunk(size_t rows) else column_readers[i]->read(columns[i], row_set, rows_to_read); } - rows_read += columns[0]->size(); + remain_rows -= rows_to_read; + rows_read = columns[0]->size(); } - return Chunk(columns, rows_read); + return Chunk(std::move(columns), rows_read); +} +RowGroupChunkReader::RowGroupChunkReader(ParquetReader * parquetReader, + std::shared_ptr rowGroupReader, + std::unordered_map filters) + : parquet_reader(parquetReader), row_group_reader(rowGroupReader) +{ + std::unordered_map parquet_columns; + const auto * root = parquet_reader->meta_data->schema()->group_node(); + for (int i = 0; i < root->field_count(); ++i) + { + const auto & node = root->field(i); + parquet_columns.emplace(node->name(), node); + } + + column_readers.reserve(parquet_reader->header.columns()); + for (const auto & col_with_name : parquet_reader->header) + { + if (!parquet_columns.contains(col_with_name.name)) + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", col_with_name.name); + + const auto & node = parquet_columns.at(col_with_name.name); + if (!node->is_primitive()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "arrays and maps are not implemented in native parquet reader"); + + auto idx = parquet_reader->meta_data->schema()->ColumnIndex(*node); + auto filter = filters.contains(col_with_name.name) ? filters.at(col_with_name.name) : nullptr; + auto column_reader = SelectiveColumnReaderFactory::createLeafColumnReader( + *row_group_reader->metadata()->ColumnChunk(idx), + parquet_reader->meta_data->schema()->Column(idx), + row_group_reader->GetColumnPageReader(idx), + filter); + if (node->is_optional()) + { + column_reader = SelectiveColumnReaderFactory::createOptionalColumnReader(column_reader, filter); + } + column_readers.push_back(column_reader); + reader_columns_mapping[col_with_name.name] = column_reader; + chassert(idx >= 0); + if (filter) filter_columns.push_back(col_with_name.name); + } + remain_rows = row_group_reader->metadata()->num_rows(); } @@ -72,7 +119,7 @@ void SelectiveColumnReader::readPage() state.page = page; if (page->type() == parquet::PageType::DATA_PAGE) { - readDataPageV1(assert_cast(*page)); + readDataPageV1(static_cast(*page)); } else if (page->type() == parquet::PageType::DICTIONARY_PAGE) { @@ -95,61 +142,69 @@ void SelectiveColumnReader::readDataPageV1(const parquet::DataPageV1 & page) state.rep_levels.resize(0); if (scan_spec.column_desc->max_repetition_level() > 0) { - auto rep_bytes = decoder.SetData(page.repetition_level_encoding(), max_rep_level, state.remain_rows, state.buffer, max_size); + auto rep_bytes = decoder.SetData(page.repetition_level_encoding(), max_rep_level, static_cast(state.remain_rows), state.buffer, max_size); max_size -= rep_bytes; state.buffer += rep_bytes; state.rep_levels.resize_fill(state.remain_rows); - decoder.Decode(state.remain_rows, state.rep_levels.data()); + decoder.Decode(static_cast(state.remain_rows), state.rep_levels.data()); } if (scan_spec.column_desc->max_definition_level() > 0) { - auto def_bytes = decoder.SetData(page.definition_level_encoding(), max_def_level, state.remain_rows, state.buffer, max_size); + auto def_bytes = decoder.SetData(page.definition_level_encoding(), max_def_level, static_cast(state.remain_rows), state.buffer, max_size); state.buffer += def_bytes; state.def_levels.resize_fill(state.remain_rows); - decoder.Decode(state.remain_rows, state.def_levels.data()); + decoder.Decode(static_cast(state.remain_rows), state.def_levels.data()); } } -void Int64ColumnDirectReader::computeRowSet(RowSet& row_set, size_t offset, size_t value_offset, size_t rows_to_read) +template +void Int64ColumnDirectReader::computeRowSet(RowSet& row_set, size_t rows_to_read) { readPageIfNeeded(); chassert(rows_to_read <= state.remain_rows); - const Int64 * start = reinterpret_cast(state.buffer) + value_offset; + const Int64 * start = reinterpret_cast(state.buffer); if (scan_spec.filter) { for (size_t i = 0; i < rows_to_read; i++) { - row_set.set(offset + i, scan_spec.filter->testInt64(start[i])); + bool pass = scan_spec.filter->testInt64(start[i]); + row_set.set(i, pass); } } } -void Int64ColumnDirectReader::read(MutableColumnPtr & column, RowSet & row_set, size_t rows_to_read) +template +void Int64ColumnDirectReader::read(MutableColumnPtr & column, RowSet & row_set, size_t rows_to_read) { - ColumnInt64 * data = assert_cast(column.get()); + auto * int_column = static_cast(column.get()); + auto & data = int_column->getData(); size_t rows_read = 0; const Int64 * start = reinterpret_cast(state.buffer); while (rows_read < rows_to_read) { if (row_set.get(rows_read)) { - data->getData().push_back(start[rows_read]); + data.push_back(start[rows_read]); } - rows_readed ++; + rows_read ++; } state.buffer += rows_to_read * sizeof(Int64); state.remain_rows -= rows_to_read; } -void Int64ColumnDirectReader::skip(size_t rows) + +template +void Int64ColumnDirectReader::skip(size_t rows) { state.remain_rows -= rows; state.buffer += rows * sizeof(Int64); } -void Int64ColumnDirectReader::readSpace(MutableColumnPtr & column, RowSet & row_set, PaddedPODArray& null_map, size_t rows_to_read) + +template +void Int64ColumnDirectReader::readSpace(MutableColumnPtr & column, RowSet & row_set, PaddedPODArray& null_map, size_t rows_to_read) { - auto * column = assert_cast(column.get()); - auto & data = column->getData(); + auto * int_column = static_cast(column.get()); + auto & data = int_column->getData(); size_t rows_read = 0; const Int64 * start = reinterpret_cast(state.buffer); size_t count = 0; @@ -164,7 +219,7 @@ void Int64ColumnDirectReader::readSpace(MutableColumnPtr & column, RowSet & row_ else { data.push_back(start[count]); - count++ + count++; } } rows_read ++; @@ -172,7 +227,9 @@ void Int64ColumnDirectReader::readSpace(MutableColumnPtr & column, RowSet & row_ state.buffer += count * sizeof(Int64); state.remain_rows -= rows_to_read; } -void Int64ColumnDirectReader::computeRowSetSpace(RowSet & row_set, PaddedPODArray & null_map, size_t rows_to_read) + +template +void Int64ColumnDirectReader::computeRowSetSpace(RowSet & row_set, PaddedPODArray & null_map, size_t rows_to_read) { readPageIfNeeded(); const Int64 * start = reinterpret_cast(state.buffer); @@ -191,9 +248,17 @@ void Int64ColumnDirectReader::computeRowSetSpace(RowSet & row_set, PaddedPODArra } } } -MutableColumnPtr Int64ColumnDirectReader::createColumn() + +template +MutableColumnPtr Int64ColumnDirectReader::createColumn() +{ + return DataType::ColumnType::create(); +} + +template +Int64ColumnDirectReader::Int64ColumnDirectReader(std::unique_ptr page_reader_, ScanSpec scan_spec_) + : SelectiveColumnReader(std::move(page_reader_), scan_spec_) { - return ColumnInt64::create(); } size_t OptionalColumnReader::currentRemainRows() const @@ -225,7 +290,7 @@ void OptionalColumnReader::computeRowSet(RowSet& row_set, size_t rows_to_read) { for (size_t i = 0; i < rows_to_read; i++) { - if (null_map[i]) + if (cur_null_map[i]) { row_set.set(i, scan_spec.filter->testNull()); } @@ -241,11 +306,11 @@ void OptionalColumnReader::computeRowSet(RowSet& row_set, size_t rows_to_read) child->computeRowSet(row_set, rows_to_read); } -void OptionalColumnReader::read(MutableColumnPtr & column, RowSet& row_set, size_t , size_t rows_to_read) +void OptionalColumnReader::read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read) { rows_to_read = std::min(child->currentRemainRows(), rows_to_read); auto* nullable_column = static_cast(column.get()); - auto & nested_column = nullable_column->getNestedColumn(); + auto nested_column = nullable_column->getNestedColumnPtr()->assumeMutable(); auto & null_data = nullable_column->getNullMapData(); for (size_t i = 0; i < rows_to_read; i++) @@ -263,6 +328,7 @@ void OptionalColumnReader::read(MutableColumnPtr & column, RowSet& row_set, size { child->read(nested_column, row_set, rows_to_read); } + cleanNullMap(); } void OptionalColumnReader::skip(size_t rows) @@ -273,6 +339,7 @@ void OptionalColumnReader::skip(size_t rows) chassert(rows == cur_null_map.size()); child->skipNulls(cur_null_count); child->skip(rows - cur_null_count); + cleanNullMap(); } MutableColumnPtr OptionalColumnReader::createColumn() @@ -281,4 +348,32 @@ MutableColumnPtr OptionalColumnReader::createColumn() } +SelectiveColumnReaderPtr SelectiveColumnReaderFactory::createLeafColumnReader( + const parquet::ColumnChunkMetaData& column_metadata, const parquet::ColumnDescriptor * column_desc, std::unique_ptr page_reader, ColumnFilterPtr filter) +{ + ScanSpec scan_spec{.column_name=column_desc->name(), .column_desc=column_desc, .filter=filter}; + if (column_desc->physical_type() == parquet::Type::INT64 && + (column_desc->logical_type()->type() == parquet::LogicalType::Type::INT + || column_desc->logical_type()->type() == parquet::LogicalType::Type::NONE)) + { + bool plain_encoding = column_metadata.encodings().size() == 1 && column_metadata.encodings()[0] == parquet::Encoding::PLAIN; + if (plain_encoding) + return std::make_shared>(std::move(page_reader), scan_spec); + else + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Unsupported encoding for int64 column"); + } + else + { + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Unsupported column type"); + } +} +SelectiveColumnReaderPtr SelectiveColumnReaderFactory::createOptionalColumnReader(SelectiveColumnReaderPtr child, ColumnFilterPtr filter) +{ + ScanSpec scan_spec; + scan_spec.filter = filter; + return std::make_shared(scan_spec, std::move(child)); +} + + +template class Int64ColumnDirectReader; } diff --git a/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.h b/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.h index 8a9ffcd846c..3fda6a8b02d 100644 --- a/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.h +++ b/src/Processors/Formats/Impl/Parquet/SelectiveColumnReader.h @@ -6,6 +6,7 @@ #include #include #include +#include #include namespace parquet @@ -38,7 +39,7 @@ public: } if (!count) return true; - false; + return false; } private: @@ -53,7 +54,6 @@ using SelectiveColumnReaderPtr = std::shared_ptr; struct ScanSpec { String column_name; - DataTypePtr expected_type; const parquet::ColumnDescriptor * column_desc = nullptr; ColumnFilterPtr filter; }; @@ -71,11 +71,15 @@ public: class SelectiveColumnReader { public: - virtual ~SelectiveColumnReader() = 0; + SelectiveColumnReader(std::unique_ptr page_reader_, const ScanSpec scan_spec_) + : page_reader(std::move(page_reader_)), scan_spec(scan_spec_) + { + } + virtual ~SelectiveColumnReader() = default; virtual void computeRowSet(RowSet& row_set, size_t rows_to_read) = 0; - virtual void computeRowSetSpace(RowSet& row_set, PaddedPODArray& null_map, size_t rows_to_read) {}; + virtual void computeRowSetSpace(RowSet& , PaddedPODArray& , size_t ) {} virtual void read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read) = 0; - virtual void readSpace(MutableColumnPtr & column, RowSet& row_set, PaddedPODArray& null_map, size_t rows_to_read) {}; + virtual void readSpace(MutableColumnPtr & , RowSet& , PaddedPODArray& , size_t ) {} virtual void getValues() { } void readPageIfNeeded(); @@ -98,10 +102,6 @@ public: virtual void skip(size_t rows) = 0; -protected: - void readPage(); - void readDataPageV1(const parquet::DataPageV1 & page); - void readDictPage(const parquet::DictionaryPage & page) {} int16_t max_definition_level() const { return scan_spec.column_desc->max_definition_level(); @@ -112,43 +112,69 @@ protected: return scan_spec.column_desc->max_repetition_level(); } +protected: + void readPage(); + void readDataPageV1(const parquet::DataPageV1 & page); + void readDictPage(const parquet::DictionaryPage & ) {} + + std::unique_ptr page_reader; ScanState state; ScanSpec scan_spec; }; - +template class Int64ColumnDirectReader : public SelectiveColumnReader { public: - ~Int64ColumnDirectReader() override = default; + Int64ColumnDirectReader(std::unique_ptr page_reader_, ScanSpec scan_spec_); + ~Int64ColumnDirectReader() override { } MutableColumnPtr createColumn() override; - void computeRowSet(RowSet& row_set, size_t offset, size_t value_offset, size_t rows_to_read) override; + void computeRowSet(RowSet& row_set, size_t rows_to_read) override; void computeRowSetSpace(RowSet & row_set, PaddedPODArray & null_map, size_t rows_to_read) override; void read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read) override; void readSpace(MutableColumnPtr & column, RowSet & row_set, PaddedPODArray& null_map, size_t rows_to_read) override; void skip(size_t rows) override; }; +class ParquetReader; + class RowGroupChunkReader { public: + RowGroupChunkReader(ParquetReader * parquetReader, + std::shared_ptr rowGroupReader, + std::unordered_map filters); Chunk readChunk(size_t rows); + bool hasMoreRows() const + { + return remain_rows > 0; + } private: + ParquetReader * parquet_reader; + std::shared_ptr row_group_reader; std::vector filter_columns; std::unordered_map reader_columns_mapping; std::vector column_readers; + size_t remain_rows = 0; }; class OptionalColumnReader : public SelectiveColumnReader { public: - ~OptionalColumnReader() override {} + OptionalColumnReader(const ScanSpec & scanSpec, const SelectiveColumnReaderPtr child_) + : SelectiveColumnReader(nullptr, scanSpec), child(child_) + { + def_level = child->max_definition_level(); + rep_level = child->max_repetition_level(); + } + + ~OptionalColumnReader() override = default; MutableColumnPtr createColumn() override; size_t currentRemainRows() const override; void computeRowSet(RowSet& row_set, size_t rows_to_read) override; - void read(MutableColumnPtr & column, RowSet& row_set, size_t offset, size_t rows_to_read) override; + void read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read) override; void skip(size_t rows) override; private: @@ -165,4 +191,11 @@ private: int def_level = 0; int rep_level = 0; }; + +class SelectiveColumnReaderFactory +{ +public: + static SelectiveColumnReaderPtr createLeafColumnReader(const parquet::ColumnChunkMetaData& column_metadata, const parquet::ColumnDescriptor * column_desc, std::unique_ptr page_reader, ColumnFilterPtr filter); + static SelectiveColumnReaderPtr createOptionalColumnReader(SelectiveColumnReaderPtr child, ColumnFilterPtr filter); +}; } diff --git a/src/Processors/tests/gtest_native_parquet_reader.cpp b/src/Processors/tests/gtest_native_parquet_reader.cpp new file mode 100644 index 00000000000..930504c3348 --- /dev/null +++ b/src/Processors/tests/gtest_native_parquet_reader.cpp @@ -0,0 +1,85 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +using namespace DB; + +TEST(Processors, TestReadInt64) +{ + auto col1 = ColumnInt64::create(); + auto col2 = ColumnInt64::create(); + auto col3 = ColumnInt64::create(); + int rows = 500000; + for (int i = 0; i < rows; ++i) + { + col1->insertValue(i); + col2->insertValue(std::rand()); + col3->insertValue(std::rand()); + } + Columns columns; + columns.emplace_back(std::move(col1)); + columns.emplace_back(std::move(col2)); + columns.emplace_back(std::move(col3)); + Chunk chunk(std::move(columns), rows); + + Block header = {ColumnWithTypeAndName(ColumnInt64::create(), std::make_shared(), "x"), + ColumnWithTypeAndName(ColumnInt64::create(), std::make_shared(), "y"), + ColumnWithTypeAndName(ColumnInt64::create(), std::make_shared(), "z")}; + + auto source = std::make_shared(header, std::move(chunk)); + WriteBufferFromFile out("/tmp/test.parquet"); + FormatSettings formatSettings; + auto parquet_output = std::make_shared(out, header, formatSettings); + + QueryPipelineBuilder builder; + builder.init(Pipe(source)); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); + pipeline.complete(std::move(parquet_output)); + CompletedPipelineExecutor executor(pipeline); + executor.execute(); + + parquet::ArrowReaderProperties arrow_properties; + parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance()); + arrow_properties.set_use_threads(false); + arrow_properties.set_batch_size(8192); + + arrow_properties.set_pre_buffer(true); + auto cache_options = arrow::io::CacheOptions::LazyDefaults(); + cache_options.hole_size_limit = 10000000; + cache_options.range_size_limit = 1l << 40; // reading the whole row group at once is fine + arrow_properties.set_cache_options(cache_options); + out.close(); + ReadBufferFromFile in("/tmp/test.parquet"); + std::cerr << in.getFileSize() << std::endl; + std::atomic is_cancelled{0}; + FormatSettings settings; + settings.parquet.max_block_size = 8192; + auto arrow_file = asArrowFile(in, settings, is_cancelled, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + + + ParquetReader reader(header.cloneEmpty(), arrow_properties, reader_properties, arrow_file, settings, {0}); + + reader.addFilter("x", std::make_shared( 1000, 2000)); + int count = 0; + while (auto block = reader.read()) + { + if (block.rows() == 0) + break; + count += block.rows(); + } + ASSERT_EQ(count, 1001); + +}