plain int64 with range filter

This commit is contained in:
liuneng1994 2024-07-07 18:30:08 +08:00
parent a7d9ec3e90
commit 8434ad930e
7 changed files with 425 additions and 53 deletions

View File

@ -1 +1,2 @@
#include "ColumnFilter.h"

View File

@ -8,6 +8,7 @@ using ColumnFilterPtr = std::shared_ptr<ColumnFilter>;
enum ColumnFilterKind
{
Unknown,
AlwaysTrue,
AlwaysFalse,
IsNull,
@ -17,19 +18,57 @@ enum ColumnFilterKind
Int64In,
};
class ColumnFilter
{
public:
bool testNull();
bool testNotNull();
bool testInt64(Int64 value);
bool testFloat32(Float32 value);
bool testFloat64(Float64 value);
bool testBool(bool value);
bool testInt64Range(Int64 min, Int64 max);
bool testFloat32Range(Float32 min, Float32 max);
bool testFloat64Range(Float64 min, Float64 max);
virtual ~ColumnFilter() { }
virtual ColumnFilterKind kind() { return Unknown; }
virtual bool testNull() { return true; }
virtual bool testNotNull() { return true; }
virtual bool testInt64(Int64) { return true; }
virtual bool testFloat32(Float32) { return true; }
virtual bool testFloat64(Float64) { return true; }
virtual bool testBool(bool) { return true; }
virtual bool testInt64Range(Int64, Int64) { return true; }
virtual bool testFloat32Range(Float32, Float32) { return true; }
virtual bool testFloat64Range(Float64, Float64) { return true; }
};
class AlwaysTrueFilter : public ColumnFilter
{
public:
ColumnFilterKind kind() override { return AlwaysTrue; }
bool testNull() override { return true; }
bool testNotNull() override { return true; }
bool testInt64(Int64) override { return true; }
bool testFloat32(Float32) override { return true; }
bool testFloat64(Float64) override { return true; }
bool testBool(bool) override { return true; }
bool testInt64Range(Int64, Int64) override { return true; }
bool testFloat32Range(Float32, Float32) override { return true; }
bool testFloat64Range(Float64, Float64) override { return true; }
};
class Int64RangeFilter : public ColumnFilter
{
public:
explicit Int64RangeFilter(Int64 min_, Int64 max_) : max(max_), min(min_) { }
~Int64RangeFilter() override = default;
ColumnFilterKind kind() override { return Int64Range; }
bool testNull() override { return false; }
bool testNotNull() override { return true; }
bool testInt64(Int64 int64) override { return int64 >= min && int64 <= max; }
bool testFloat32(Float32 float32) override { return ColumnFilter::testFloat32(float32); }
bool testFloat64(Float64 float64) override { return ColumnFilter::testFloat64(float64); }
bool testBool(bool b) override { return ColumnFilter::testBool(b); }
bool testInt64Range(Int64 int64, Int64 int641) override { return ColumnFilter::testInt64Range(int64, int641); }
bool testFloat32Range(Float32 float32, Float32 float321) override { return ColumnFilter::testFloat32Range(float32, float321); }
bool testFloat64Range(Float64 float64, Float64 float641) override { return ColumnFilter::testFloat64Range(float64, float641); }
private:
Int64 max = INT64_MAX;
Int64 min = INT64_MIN;
};
}

View File

@ -0,0 +1,70 @@
#include "ParquetReader.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int PARQUET_EXCEPTION;
}
#define THROW_PARQUET_EXCEPTION(s) \
do \
{ \
try { (s); } \
catch (const ::parquet::ParquetException & e) \
{ \
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Parquet exception: {}", e.what()); \
} \
} while (false)
std::unique_ptr<parquet::ParquetFileReader> createFileReader(
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
parquet::ReaderProperties reader_properties,
std::shared_ptr<parquet::FileMetaData> metadata = nullptr)
{
std::unique_ptr<parquet::ParquetFileReader> res;
THROW_PARQUET_EXCEPTION(res = parquet::ParquetFileReader::Open(std::move(arrow_file), reader_properties, metadata));
return res;
}
ParquetReader::ParquetReader(
Block header_,
parquet::ArrowReaderProperties arrow_properties_,
parquet::ReaderProperties reader_properties_,
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file_,
const FormatSettings & format_settings,
std::vector<int> row_groups_indices_,
std::shared_ptr<parquet::FileMetaData> metadata)
: file_reader(createFileReader(arrow_file_, reader_properties_, metadata))
, arrow_properties(arrow_properties_)
, header(std::move(header_))
, max_block_size(format_settings.parquet.max_block_size)
, row_groups_indices(std::move(row_groups_indices_))
, meta_data(file_reader->metadata())
{
}
void ParquetReader::loadRowGroupChunkReaderIfNeeded()
{
if ((!row_group_chunk_reader || !row_group_chunk_reader->hasMoreRows()) && next_row_group_idx < row_groups_indices.size())
{
row_group_chunk_reader = std::make_unique<RowGroupChunkReader>(
this,
file_reader->RowGroup(row_groups_indices[next_row_group_idx]),
filters);
next_row_group_idx ++;
}
}
Block ParquetReader::read()
{
loadRowGroupChunkReaderIfNeeded();
auto chunk = row_group_chunk_reader->readChunk(max_block_size);
return header.cloneWithColumns(chunk.detachColumns());
}
void ParquetReader::addFilter(const String & column_name, const ColumnFilterPtr filter)
{
filters[column_name] = filter;
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <Core/Block.h>
#include <Formats/FormatSettings.h>
#include <Processors/Chunk.h>
#include <Processors/Formats/Impl/Parquet/SelectiveColumnReader.h>
#include <arrow/io/interfaces.h>
#include <parquet/file_reader.h>
#include <parquet/properties.h>
namespace DB
{
class ParquetReader
{
public:
friend class RowGroupChunkReader;
ParquetReader(
Block header_,
parquet::ArrowReaderProperties arrow_properties_,
parquet::ReaderProperties reader_properties_,
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
const FormatSettings & format_settings,
std::vector<int> row_groups_indices_,
std::shared_ptr<parquet::FileMetaData> metadata = nullptr);
Block read();
void addFilter(const String & column_name, const ColumnFilterPtr filter);
private:
void loadRowGroupChunkReaderIfNeeded();
std::unique_ptr<parquet::ParquetFileReader> file_reader;
parquet::ArrowReaderProperties arrow_properties;
Block header;
std::unique_ptr<RowGroupChunkReader> row_group_chunk_reader;
UInt64 max_block_size;
std::unordered_map<String, ColumnFilterPtr> filters;
std::vector<int> parquet_col_indice;
std::vector<int> row_groups_indices;
size_t next_row_group_idx = 0;
std::shared_ptr<parquet::FileMetaData> meta_data;
};
}

View File

@ -2,7 +2,8 @@
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <bits/stat.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
#include <Common/assert_cast.h>
namespace DB
@ -11,11 +12,13 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int PARQUET_EXCEPTION;
}
Chunk RowGroupChunkReader::readChunk(size_t rows)
{
Columns columns;
rows = std::min(rows, remain_rows);
MutableColumns columns;
for (auto & reader : column_readers)
{
@ -26,7 +29,9 @@ Chunk RowGroupChunkReader::readChunk(size_t rows)
size_t rows_read = 0;
while (rows_read < rows)
{
size_t rows_to_read = rows - rows_read;
size_t rows_to_read = std::min(rows - rows_read, remain_rows);
if (!rows_to_read)
break;
for (auto & reader : column_readers)
{
if (!reader->currentRemainRows())
@ -41,7 +46,7 @@ Chunk RowGroupChunkReader::readChunk(size_t rows)
RowSet row_set(rows_to_read);
for (auto & column : filter_columns)
{
reader_columns_mapping[column]->computeRowSet(row_set, rows);
reader_columns_mapping[column]->computeRowSet(row_set, rows_to_read);
}
bool skip_all = row_set.none();
for (size_t i = 0; i < column_readers.size(); i++)
@ -51,9 +56,51 @@ Chunk RowGroupChunkReader::readChunk(size_t rows)
else
column_readers[i]->read(columns[i], row_set, rows_to_read);
}
rows_read += columns[0]->size();
remain_rows -= rows_to_read;
rows_read = columns[0]->size();
}
return Chunk(columns, rows_read);
return Chunk(std::move(columns), rows_read);
}
RowGroupChunkReader::RowGroupChunkReader(ParquetReader * parquetReader,
std::shared_ptr<parquet::RowGroupReader> rowGroupReader,
std::unordered_map<String, ColumnFilterPtr> filters)
: parquet_reader(parquetReader), row_group_reader(rowGroupReader)
{
std::unordered_map<String, parquet::schema::NodePtr> parquet_columns;
const auto * root = parquet_reader->meta_data->schema()->group_node();
for (int i = 0; i < root->field_count(); ++i)
{
const auto & node = root->field(i);
parquet_columns.emplace(node->name(), node);
}
column_readers.reserve(parquet_reader->header.columns());
for (const auto & col_with_name : parquet_reader->header)
{
if (!parquet_columns.contains(col_with_name.name))
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", col_with_name.name);
const auto & node = parquet_columns.at(col_with_name.name);
if (!node->is_primitive())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "arrays and maps are not implemented in native parquet reader");
auto idx = parquet_reader->meta_data->schema()->ColumnIndex(*node);
auto filter = filters.contains(col_with_name.name) ? filters.at(col_with_name.name) : nullptr;
auto column_reader = SelectiveColumnReaderFactory::createLeafColumnReader(
*row_group_reader->metadata()->ColumnChunk(idx),
parquet_reader->meta_data->schema()->Column(idx),
row_group_reader->GetColumnPageReader(idx),
filter);
if (node->is_optional())
{
column_reader = SelectiveColumnReaderFactory::createOptionalColumnReader(column_reader, filter);
}
column_readers.push_back(column_reader);
reader_columns_mapping[col_with_name.name] = column_reader;
chassert(idx >= 0);
if (filter) filter_columns.push_back(col_with_name.name);
}
remain_rows = row_group_reader->metadata()->num_rows();
}
@ -72,7 +119,7 @@ void SelectiveColumnReader::readPage()
state.page = page;
if (page->type() == parquet::PageType::DATA_PAGE)
{
readDataPageV1(assert_cast<const parquet::DataPageV1 &>(*page));
readDataPageV1(static_cast<const parquet::DataPageV1 &>(*page));
}
else if (page->type() == parquet::PageType::DICTIONARY_PAGE)
{
@ -95,61 +142,69 @@ void SelectiveColumnReader::readDataPageV1(const parquet::DataPageV1 & page)
state.rep_levels.resize(0);
if (scan_spec.column_desc->max_repetition_level() > 0)
{
auto rep_bytes = decoder.SetData(page.repetition_level_encoding(), max_rep_level, state.remain_rows, state.buffer, max_size);
auto rep_bytes = decoder.SetData(page.repetition_level_encoding(), max_rep_level, static_cast<int>(state.remain_rows), state.buffer, max_size);
max_size -= rep_bytes;
state.buffer += rep_bytes;
state.rep_levels.resize_fill(state.remain_rows);
decoder.Decode(state.remain_rows, state.rep_levels.data());
decoder.Decode(static_cast<int>(state.remain_rows), state.rep_levels.data());
}
if (scan_spec.column_desc->max_definition_level() > 0)
{
auto def_bytes = decoder.SetData(page.definition_level_encoding(), max_def_level, state.remain_rows, state.buffer, max_size);
auto def_bytes = decoder.SetData(page.definition_level_encoding(), max_def_level, static_cast<int>(state.remain_rows), state.buffer, max_size);
state.buffer += def_bytes;
state.def_levels.resize_fill(state.remain_rows);
decoder.Decode(state.remain_rows, state.def_levels.data());
decoder.Decode(static_cast<int>(state.remain_rows), state.def_levels.data());
}
}
void Int64ColumnDirectReader::computeRowSet(RowSet& row_set, size_t offset, size_t value_offset, size_t rows_to_read)
template <typename DataType>
void Int64ColumnDirectReader<DataType>::computeRowSet(RowSet& row_set, size_t rows_to_read)
{
readPageIfNeeded();
chassert(rows_to_read <= state.remain_rows);
const Int64 * start = reinterpret_cast<const Int64 *>(state.buffer) + value_offset;
const Int64 * start = reinterpret_cast<const Int64 *>(state.buffer);
if (scan_spec.filter)
{
for (size_t i = 0; i < rows_to_read; i++)
{
row_set.set(offset + i, scan_spec.filter->testInt64(start[i]));
bool pass = scan_spec.filter->testInt64(start[i]);
row_set.set(i, pass);
}
}
}
void Int64ColumnDirectReader::read(MutableColumnPtr & column, RowSet & row_set, size_t rows_to_read)
template <typename DataType>
void Int64ColumnDirectReader<DataType>::read(MutableColumnPtr & column, RowSet & row_set, size_t rows_to_read)
{
ColumnInt64 * data = assert_cast<ColumnInt64 *>(column.get());
auto * int_column = static_cast<DataType::ColumnType *>(column.get());
auto & data = int_column->getData();
size_t rows_read = 0;
const Int64 * start = reinterpret_cast<const Int64 *>(state.buffer);
while (rows_read < rows_to_read)
{
if (row_set.get(rows_read))
{
data->getData().push_back(start[rows_read]);
data.push_back(start[rows_read]);
}
rows_readed ++;
rows_read ++;
}
state.buffer += rows_to_read * sizeof(Int64);
state.remain_rows -= rows_to_read;
}
void Int64ColumnDirectReader::skip(size_t rows)
template <typename DataType>
void Int64ColumnDirectReader<DataType>::skip(size_t rows)
{
state.remain_rows -= rows;
state.buffer += rows * sizeof(Int64);
}
void Int64ColumnDirectReader::readSpace(MutableColumnPtr & column, RowSet & row_set, PaddedPODArray<UInt8>& null_map, size_t rows_to_read)
template <typename DataType>
void Int64ColumnDirectReader<DataType>::readSpace(MutableColumnPtr & column, RowSet & row_set, PaddedPODArray<UInt8>& null_map, size_t rows_to_read)
{
auto * column = assert_cast<ColumnInt64 *>(column.get());
auto & data = column->getData();
auto * int_column = static_cast<DataType::ColumnType *>(column.get());
auto & data = int_column->getData();
size_t rows_read = 0;
const Int64 * start = reinterpret_cast<const Int64 *>(state.buffer);
size_t count = 0;
@ -164,7 +219,7 @@ void Int64ColumnDirectReader::readSpace(MutableColumnPtr & column, RowSet & row_
else
{
data.push_back(start[count]);
count++
count++;
}
}
rows_read ++;
@ -172,7 +227,9 @@ void Int64ColumnDirectReader::readSpace(MutableColumnPtr & column, RowSet & row_
state.buffer += count * sizeof(Int64);
state.remain_rows -= rows_to_read;
}
void Int64ColumnDirectReader::computeRowSetSpace(RowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t rows_to_read)
template <typename DataType>
void Int64ColumnDirectReader<DataType>::computeRowSetSpace(RowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t rows_to_read)
{
readPageIfNeeded();
const Int64 * start = reinterpret_cast<const Int64 *>(state.buffer);
@ -191,9 +248,17 @@ void Int64ColumnDirectReader::computeRowSetSpace(RowSet & row_set, PaddedPODArra
}
}
}
MutableColumnPtr Int64ColumnDirectReader::createColumn()
template <typename DataType>
MutableColumnPtr Int64ColumnDirectReader<DataType>::createColumn()
{
return DataType::ColumnType::create();
}
template <typename DataType>
Int64ColumnDirectReader<DataType>::Int64ColumnDirectReader(std::unique_ptr<parquet::PageReader> page_reader_, ScanSpec scan_spec_)
: SelectiveColumnReader(std::move(page_reader_), scan_spec_)
{
return ColumnInt64::create();
}
size_t OptionalColumnReader::currentRemainRows() const
@ -225,7 +290,7 @@ void OptionalColumnReader::computeRowSet(RowSet& row_set, size_t rows_to_read)
{
for (size_t i = 0; i < rows_to_read; i++)
{
if (null_map[i])
if (cur_null_map[i])
{
row_set.set(i, scan_spec.filter->testNull());
}
@ -241,11 +306,11 @@ void OptionalColumnReader::computeRowSet(RowSet& row_set, size_t rows_to_read)
child->computeRowSet(row_set, rows_to_read);
}
void OptionalColumnReader::read(MutableColumnPtr & column, RowSet& row_set, size_t , size_t rows_to_read)
void OptionalColumnReader::read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read)
{
rows_to_read = std::min(child->currentRemainRows(), rows_to_read);
auto* nullable_column = static_cast<ColumnNullable *>(column.get());
auto & nested_column = nullable_column->getNestedColumn();
auto nested_column = nullable_column->getNestedColumnPtr()->assumeMutable();
auto & null_data = nullable_column->getNullMapData();
for (size_t i = 0; i < rows_to_read; i++)
@ -263,6 +328,7 @@ void OptionalColumnReader::read(MutableColumnPtr & column, RowSet& row_set, size
{
child->read(nested_column, row_set, rows_to_read);
}
cleanNullMap();
}
void OptionalColumnReader::skip(size_t rows)
@ -273,6 +339,7 @@ void OptionalColumnReader::skip(size_t rows)
chassert(rows == cur_null_map.size());
child->skipNulls(cur_null_count);
child->skip(rows - cur_null_count);
cleanNullMap();
}
MutableColumnPtr OptionalColumnReader::createColumn()
@ -281,4 +348,32 @@ MutableColumnPtr OptionalColumnReader::createColumn()
}
SelectiveColumnReaderPtr SelectiveColumnReaderFactory::createLeafColumnReader(
const parquet::ColumnChunkMetaData& column_metadata, const parquet::ColumnDescriptor * column_desc, std::unique_ptr<parquet::PageReader> page_reader, ColumnFilterPtr filter)
{
ScanSpec scan_spec{.column_name=column_desc->name(), .column_desc=column_desc, .filter=filter};
if (column_desc->physical_type() == parquet::Type::INT64 &&
(column_desc->logical_type()->type() == parquet::LogicalType::Type::INT
|| column_desc->logical_type()->type() == parquet::LogicalType::Type::NONE))
{
bool plain_encoding = column_metadata.encodings().size() == 1 && column_metadata.encodings()[0] == parquet::Encoding::PLAIN;
if (plain_encoding)
return std::make_shared<Int64ColumnDirectReader<DataTypeInt64>>(std::move(page_reader), scan_spec);
else
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Unsupported encoding for int64 column");
}
else
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Unsupported column type");
}
}
SelectiveColumnReaderPtr SelectiveColumnReaderFactory::createOptionalColumnReader(SelectiveColumnReaderPtr child, ColumnFilterPtr filter)
{
ScanSpec scan_spec;
scan_spec.filter = filter;
return std::make_shared<OptionalColumnReader>(scan_spec, std::move(child));
}
template class Int64ColumnDirectReader<DataTypeInt64>;
}

View File

@ -6,6 +6,7 @@
#include <Processors/Chunk.h>
#include <parquet/column_page.h>
#include <parquet/column_reader.h>
#include <parquet/file_reader.h>
#include <Common/PODArray.h>
namespace parquet
@ -38,7 +39,7 @@ public:
}
if (!count)
return true;
false;
return false;
}
private:
@ -53,7 +54,6 @@ using SelectiveColumnReaderPtr = std::shared_ptr<SelectiveColumnReader>;
struct ScanSpec
{
String column_name;
DataTypePtr expected_type;
const parquet::ColumnDescriptor * column_desc = nullptr;
ColumnFilterPtr filter;
};
@ -71,11 +71,15 @@ public:
class SelectiveColumnReader
{
public:
virtual ~SelectiveColumnReader() = 0;
SelectiveColumnReader(std::unique_ptr<parquet::PageReader> page_reader_, const ScanSpec scan_spec_)
: page_reader(std::move(page_reader_)), scan_spec(scan_spec_)
{
}
virtual ~SelectiveColumnReader() = default;
virtual void computeRowSet(RowSet& row_set, size_t rows_to_read) = 0;
virtual void computeRowSetSpace(RowSet& row_set, PaddedPODArray<UInt8>& null_map, size_t rows_to_read) {};
virtual void computeRowSetSpace(RowSet& , PaddedPODArray<UInt8>& , size_t ) {}
virtual void read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read) = 0;
virtual void readSpace(MutableColumnPtr & column, RowSet& row_set, PaddedPODArray<UInt8>& null_map, size_t rows_to_read) {};
virtual void readSpace(MutableColumnPtr & , RowSet& , PaddedPODArray<UInt8>& , size_t ) {}
virtual void getValues() { }
void readPageIfNeeded();
@ -98,10 +102,6 @@ public:
virtual void skip(size_t rows) = 0;
protected:
void readPage();
void readDataPageV1(const parquet::DataPageV1 & page);
void readDictPage(const parquet::DictionaryPage & page) {}
int16_t max_definition_level() const
{
return scan_spec.column_desc->max_definition_level();
@ -112,43 +112,69 @@ protected:
return scan_spec.column_desc->max_repetition_level();
}
protected:
void readPage();
void readDataPageV1(const parquet::DataPageV1 & page);
void readDictPage(const parquet::DictionaryPage & ) {}
std::unique_ptr<parquet::PageReader> page_reader;
ScanState state;
ScanSpec scan_spec;
};
template <typename DataType>
class Int64ColumnDirectReader : public SelectiveColumnReader
{
public:
~Int64ColumnDirectReader() override = default;
Int64ColumnDirectReader(std::unique_ptr<parquet::PageReader> page_reader_, ScanSpec scan_spec_);
~Int64ColumnDirectReader() override { }
MutableColumnPtr createColumn() override;
void computeRowSet(RowSet& row_set, size_t offset, size_t value_offset, size_t rows_to_read) override;
void computeRowSet(RowSet& row_set, size_t rows_to_read) override;
void computeRowSetSpace(RowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t rows_to_read) override;
void read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read) override;
void readSpace(MutableColumnPtr & column, RowSet & row_set, PaddedPODArray<UInt8>& null_map, size_t rows_to_read) override;
void skip(size_t rows) override;
};
class ParquetReader;
class RowGroupChunkReader
{
public:
RowGroupChunkReader(ParquetReader * parquetReader,
std::shared_ptr<parquet::RowGroupReader> rowGroupReader,
std::unordered_map<String, ColumnFilterPtr> filters);
Chunk readChunk(size_t rows);
bool hasMoreRows() const
{
return remain_rows > 0;
}
private:
ParquetReader * parquet_reader;
std::shared_ptr<parquet::RowGroupReader> row_group_reader;
std::vector<String> filter_columns;
std::unordered_map<String, SelectiveColumnReaderPtr> reader_columns_mapping;
std::vector<SelectiveColumnReaderPtr> column_readers;
size_t remain_rows = 0;
};
class OptionalColumnReader : public SelectiveColumnReader
{
public:
~OptionalColumnReader() override {}
OptionalColumnReader(const ScanSpec & scanSpec, const SelectiveColumnReaderPtr child_)
: SelectiveColumnReader(nullptr, scanSpec), child(child_)
{
def_level = child->max_definition_level();
rep_level = child->max_repetition_level();
}
~OptionalColumnReader() override = default;
MutableColumnPtr createColumn() override;
size_t currentRemainRows() const override;
void computeRowSet(RowSet& row_set, size_t rows_to_read) override;
void read(MutableColumnPtr & column, RowSet& row_set, size_t offset, size_t rows_to_read) override;
void read(MutableColumnPtr & column, RowSet& row_set, size_t rows_to_read) override;
void skip(size_t rows) override;
private:
@ -165,4 +191,11 @@ private:
int def_level = 0;
int rep_level = 0;
};
class SelectiveColumnReaderFactory
{
public:
static SelectiveColumnReaderPtr createLeafColumnReader(const parquet::ColumnChunkMetaData& column_metadata, const parquet::ColumnDescriptor * column_desc, std::unique_ptr<parquet::PageReader> page_reader, ColumnFilterPtr filter);
static SelectiveColumnReaderPtr createOptionalColumnReader(SelectiveColumnReaderPtr child, ColumnFilterPtr filter);
};
}

View File

@ -0,0 +1,85 @@
#include <gtest/gtest.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
using namespace DB;
TEST(Processors, TestReadInt64)
{
auto col1 = ColumnInt64::create();
auto col2 = ColumnInt64::create();
auto col3 = ColumnInt64::create();
int rows = 500000;
for (int i = 0; i < rows; ++i)
{
col1->insertValue(i);
col2->insertValue(std::rand());
col3->insertValue(std::rand());
}
Columns columns;
columns.emplace_back(std::move(col1));
columns.emplace_back(std::move(col2));
columns.emplace_back(std::move(col3));
Chunk chunk(std::move(columns), rows);
Block header = {ColumnWithTypeAndName(ColumnInt64::create(), std::make_shared<DataTypeInt64>(), "x"),
ColumnWithTypeAndName(ColumnInt64::create(), std::make_shared<DataTypeInt64>(), "y"),
ColumnWithTypeAndName(ColumnInt64::create(), std::make_shared<DataTypeInt64>(), "z")};
auto source = std::make_shared<SourceFromSingleChunk>(header, std::move(chunk));
WriteBufferFromFile out("/tmp/test.parquet");
FormatSettings formatSettings;
auto parquet_output = std::make_shared<ParquetBlockOutputFormat>(out, header, formatSettings);
QueryPipelineBuilder builder;
builder.init(Pipe(source));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
pipeline.complete(std::move(parquet_output));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
parquet::ArrowReaderProperties arrow_properties;
parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance());
arrow_properties.set_use_threads(false);
arrow_properties.set_batch_size(8192);
arrow_properties.set_pre_buffer(true);
auto cache_options = arrow::io::CacheOptions::LazyDefaults();
cache_options.hole_size_limit = 10000000;
cache_options.range_size_limit = 1l << 40; // reading the whole row group at once is fine
arrow_properties.set_cache_options(cache_options);
out.close();
ReadBufferFromFile in("/tmp/test.parquet");
std::cerr << in.getFileSize() << std::endl;
std::atomic<int> is_cancelled{0};
FormatSettings settings;
settings.parquet.max_block_size = 8192;
auto arrow_file = asArrowFile(in, settings, is_cancelled, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
ParquetReader reader(header.cloneEmpty(), arrow_properties, reader_properties, arrow_file, settings, {0});
reader.addFilter("x", std::make_shared<Int64RangeFilter>( 1000, 2000));
int count = 0;
while (auto block = reader.read())
{
if (block.rows() == 0)
break;
count += block.rows();
}
ASSERT_EQ(count, 1001);
}