support read tuple and map without filter

This commit is contained in:
liuneng1994 2024-11-19 11:39:58 +08:00
parent b5ab19225e
commit 01681a746a
6 changed files with 254 additions and 70 deletions

View File

@ -5,6 +5,8 @@
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <Processors/Formats/Impl/Parquet/SelectiveColumnReader.h>
#include <Processors/Formats/Impl/Parquet/RowGroupChunkReader.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
@ -440,7 +442,16 @@ bool isListElement(parquet::schema::Node & node)
node.name().ends_with("_tuple");
}
SelectiveColumnReaderPtr createColumnReaderRecursive(const RowGroupContext& context, parquet::schema::NodePtr node, int def_level, int rep_level, bool condition_column, const ColumnFilterPtr & filter, const DataTypePtr & target_type)
std::shared_ptr<parquet::schema::GroupNode> checkAndGetGroupNode(parquet::schema::NodePtr node)
{
if (!node)
return nullptr;
if (!node->is_group())
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "need group node");
return std::static_pointer_cast<parquet::schema::GroupNode>(node);
}
SelectiveColumnReaderPtr ColumnReaderBuilder::buildReader(parquet::schema::NodePtr node, const DataTypePtr & target_type, int def_level, int rep_level)
{
if (node->repetition() == parquet::Repetition::UNDEFINED)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Undefined repetition level");
@ -450,9 +461,10 @@ SelectiveColumnReaderPtr createColumnReaderRecursive(const RowGroupContext& cont
rep_level++;
if (node->is_primitive())
{
auto full_name = node->path()->ToDotString();
int column_idx = context.parquet_reader->metaData().schema()->ColumnIndex(*node);
RowGroupPrefetchPtr row_group_prefetch;
if (condition_column)
if (predicate_columns.contains(full_name))
row_group_prefetch = context.prefetch_conditions;
else
row_group_prefetch = context.prefetch;
@ -473,35 +485,92 @@ SelectiveColumnReaderPtr createColumnReaderRecursive(const RowGroupContext& cont
};
const auto * column_desc = context.parquet_reader->metaData().schema()->Column(column_idx);
return ParquetColumnReaderFactory::builder()
.nullable(node->is_optional())
.dictionary(context.row_group_meta->ColumnChunk(column_idx)->has_dictionary_page())
.columnDescriptor(column_desc)
.pageReader(std::move(creator))
.targetType(target_type)
.filter(filter)
.build();
auto leaf_reader = ParquetColumnReaderFactory::builder()
.nullable(node->is_optional())
.dictionary(context.row_group_meta->ColumnChunk(column_idx)->has_dictionary_page())
.columnDescriptor(column_desc)
.pageReader(std::move(creator))
.targetType(target_type)
.filter(inplace_filter_mapping.contains(full_name) ? inplace_filter_mapping.at(full_name) : nullptr)
.build();
return leaf_reader;
}
else if (node->converted_type() == parquet::ConvertedType::LIST)
{
auto group_node = std::static_pointer_cast<parquet::schema::GroupNode>(node);
auto group_node = checkAndGetGroupNode(node);
if (group_node->field_count() != 1)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "List group node must have exactly one field");
auto repeated_field = group_node->field(0);
if (isListElement(*repeated_field))
{
const auto * array_type = checkAndGetDataType<DataTypeArray>(target_type.get());
auto reader = createColumnReaderRecursive(context, repeated_field, def_level, rep_level, condition_column, nullptr, array_type->getNestedType());
auto reader = buildReader(repeated_field, array_type->getNestedType(), def_level, rep_level);
return std::make_shared<ListColumnReader>(rep_level, def_level, reader);
}
else
{
auto child_field = std::static_pointer_cast<parquet::schema::GroupNode>(repeated_field)->field(0);
const auto * array_type = checkAndGetDataType<DataTypeArray>(target_type.get());
auto reader = createColumnReaderRecursive(context, child_field, def_level+1, rep_level+1, condition_column, nullptr, array_type->getNestedType());
auto reader = buildReader(child_field, array_type->getNestedType(), def_level, rep_level);
return std::make_shared<ListColumnReader>(rep_level, def_level, reader);
}
}
return DB::SelectiveColumnReaderPtr();
else if (node->converted_type() == parquet::ConvertedType::MAP || node->converted_type() == parquet::ConvertedType::MAP_KEY_VALUE)
{
auto map_node = checkAndGetGroupNode(node);
if (map_node->field_count() != 1)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Map group node must have exactly one field");
auto key_value_node = checkAndGetGroupNode(map_node->field(0));
if (key_value_node->field_count() != 2)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Map key-value group node must have exactly two fields");
if (!key_value_node->field(0)->is_primitive())
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Map key field must be primitive");
const auto& map_type = checkAndGetDataType<DataTypeMap>(*target_type);
auto key_value_types = checkAndGetDataType<DataTypeTuple>(*checkAndGetDataType<DataTypeArray>(*map_type.getNestedType()).getNestedType()).getElements();
auto key_reader = buildReader(key_value_node->field(0), key_value_types.front(), def_level+1, rep_level+1);
auto value_reader = buildReader(key_value_node->field(1), key_value_types.back(), def_level+1, rep_level+1);
return std::make_shared<MapColumnReader>(rep_level, def_level, key_reader, value_reader);
}
// Structure type
else
{
auto struct_node = checkAndGetGroupNode(node);
const auto *struct_type = checkAndGetDataType<DataTypeTuple>(target_type.get());
if (!struct_type)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Target type for node {} must be DataTypeTuple", struct_node->name());
auto names = struct_type->getElementNames();
int child_num = struct_node->field_count();
std::unordered_map<String, SelectiveColumnReaderPtr> readers;
for (const auto& name : names)
{
for (int i = 0; i < child_num; ++i)
{
if (struct_node->field(i)->name() == name)
{
auto child_field = struct_node->field(i);
auto child_type = struct_type->getElements().at(i);
auto reader = buildReader(child_field, child_type, def_level, rep_level);
readers.emplace(name, reader);
}
}
if (!readers.contains(name))
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "{} not found in struct node {}", name, struct_node->name());
}
}
return std::make_shared<StructColumnReader>(readers, target_type);
}
}
ColumnReaderBuilder::ColumnReaderBuilder(
const Block & requiredColumns_,
const RowGroupContext & context_,
const std::unordered_map<String, ColumnFilterPtr> & inplaceFilterMapping_,
const std::unordered_set<String> & predicateColumns_)
: required_columns(requiredColumns_)
, context(context_)
, inplace_filter_mapping(inplaceFilterMapping_)
, predicate_columns(predicateColumns_)
{
}
}

View File

@ -48,6 +48,20 @@ public:
static Builder builder();
};
SelectiveColumnReaderPtr createColumnReaderRecursive(const RowGroupContext& context, parquet::schema::NodePtr node, int def_level, int rep_level, bool condition_column, const ColumnFilterPtr & filter, const DataTypePtr & target_type);
class ColumnReaderBuilder
{
public:
ColumnReaderBuilder(
const Block & requiredColumns,
const RowGroupContext & context,
const std::unordered_map<String, ColumnFilterPtr> & inplaceFilterMapping,
const std::unordered_set<String> & predicateColumns);
SelectiveColumnReaderPtr buildReader(parquet::schema::NodePtr node, const DataTypePtr & target_type, int def_level = 0, int rep_level = 0);
private:
const Block& required_columns;
const RowGroupContext& context;
const std::unordered_map<String, ColumnFilterPtr>& inplace_filter_mapping;
const std::unordered_set<String>& predicate_columns;
};
}

View File

@ -223,6 +223,8 @@ RowGroupChunkReader::RowGroupChunkReader(
context.prefetch_conditions = prefetch_conditions;
context.filter_columns = filter_columns;
remain_rows = row_group_meta->num_rows();
builder = std::make_unique<ColumnReaderBuilder>(parquet_reader->header, context, parquet_reader->filters, parquet_reader->condition_columns);
for (const auto & col_with_name : parquet_reader->header)
{
if (!parquetReader->parquet_columns.contains(col_with_name.name))
@ -231,61 +233,7 @@ RowGroupChunkReader::RowGroupChunkReader(
const auto & node = parquetReader->parquet_columns.at(col_with_name.name);
SelectiveColumnReaderPtr column_reader;
auto filter = filters.contains(col_with_name.name) ? filters.at(col_with_name.name) : nullptr;
bool condition = parquet_reader->condition_columns.contains(col_with_name.name);
column_reader = createColumnReaderRecursive(context, node, 0, 0, condition, filter, col_with_name.type);
// if (!node->is_primitive())
// throw Exception(ErrorCodes::NOT_IMPLEMENTED, "arrays and maps are not implemented in native parquet reader");
//
// auto idx = parquet_reader->meta_data->schema()->ColumnIndex(*node);
// auto filter = filters.contains(col_with_name.name) ? filters.at(col_with_name.name) : nullptr;
// remain_rows = row_group_meta->ColumnChunk(idx)->num_values();
// SelectiveColumnReaderPtr column_reader;
// if (parquet_reader->condition_columns.contains(col_with_name.name))
// {
// PageReaderCreator creator = [&, idx]
// {
// Stopwatch time;
// auto data = prefetch_conditions->readRange(getColumnRange(*row_group_meta->ColumnChunk(idx)));
// auto page_reader = std::make_unique<LazyPageReader>(
// std::make_shared<ReadBufferFromMemory>(reinterpret_cast<char *>(data.data), data.size),
// parquet_reader->properties,
// remain_rows,
// row_group_meta->ColumnChunk(idx)->compression());
// ProfileEvents::increment(ProfileEvents::ParquetFetchWaitTimeMicroseconds, time.elapsedMicroseconds());
// return page_reader;
// };
// const auto * column_desc = parquet_reader->meta_data->schema()->Column(idx);
// column_reader = ParquetColumnReaderFactory::builder()
// .nullable(node->is_optional())
// .dictionary(row_group_meta->ColumnChunk(idx)->has_dictionary_page())
// .columnDescriptor(column_desc)
// .pageReader(std::move(creator))
// .targetType(col_with_name.type)
// .filter(filter)
// .build();
// }
// else
// {
// PageReaderCreator creator = [&, idx]
// {
// auto data = prefetch->readRange(getColumnRange(*row_group_meta->ColumnChunk(idx)));
// auto page_reader = std::make_unique<LazyPageReader>(
// std::make_shared<ReadBufferFromMemory>(reinterpret_cast<char *>(data.data), data.size),
// parquet_reader->properties,
// remain_rows,
// row_group_meta->ColumnChunk(idx)->compression());
// return page_reader;
// };
// const auto * column_desc = parquet_reader->meta_data->schema()->Column(idx);
// column_reader = ParquetColumnReaderFactory::builder()
// .nullable(node->is_optional())
// .dictionary(row_group_meta->ColumnChunk(idx)->has_dictionary_page())
// .columnDescriptor(column_desc)
// .pageReader(std::move(creator))
// .targetType(col_with_name.type)
// .filter(filter)
// .build();
// }
column_reader = builder->buildReader(node, col_with_name.type, 0, 0);
column_readers.push_back(column_reader);
reader_columns_mapping[col_with_name.name] = column_reader;
if (filter)

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/Formats/Impl/Parquet/SelectiveColumnReader.h>
#include <Processors/Formats/Impl/Parquet/ParquetColumnReaderFactory.h>
#include <Common/threadPoolCallbackRunner.h>
namespace DB
@ -125,5 +126,6 @@ private:
ReadMetrics metrics;
std::unique_ptr<SelectConditions> selectConditions;
RowGroupContext context;
std::unique_ptr<ColumnReaderBuilder> builder;
};
}

View File

@ -2,10 +2,13 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Processors/Formats/Impl/Parquet/ParquetColumnReaderFactory.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
#include <Common/assert_cast.h>
@ -1371,4 +1374,105 @@ size_t ListColumnReader::skipValuesInCurrentPage(size_t )
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unimplemented operation");
}
void MapColumnReader::read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read)
{
ColumnNullable* null_column = nullptr;
if (column->isNullable())
{
null_column = static_cast<ColumnNullable *>(column.get());
column = null_column->getNestedColumnPtr()->assumeMutable();
}
if (!checkColumn<ColumnMap>(*column))
{
throw DB::Exception(ErrorCodes::PARQUET_EXCEPTION, "column type should be map, but is {}", column->getName());
}
ColumnMap* map_column = static_cast<ColumnMap *>(column.get());
ColumnArray* nested_column = &map_column->getNestedColumn();
auto data_column = nested_column->getDataPtr();
const ColumnTuple & tuple_col = checkAndGetColumn<ColumnTuple>(*data_column);
if (tuple_col.getColumns().size() !=2)
{
throw DB::Exception(ErrorCodes::PARQUET_EXCEPTION, "map column should have 2 columns, but has {}", tuple_col.getColumns().size());
}
auto key_column = tuple_col.getColumns()[0]->assumeMutable();
auto value_column = tuple_col.getColumns()[1]->assumeMutable();
if (!row_set)
{
auto & offsets = nested_column->getOffsets();
auto old_size = offsets.size();
offsets.reserve(old_size + rows_to_read);
size_t count = 0;
const auto & rep_levels = key_reader->getRepetitionLevels();
size_t start = rep_levels.size() - key_reader->currentRemainRows();
// generate offsets;
int array_size = 0;
while (offsets.size() - old_size < rows_to_read)
{
size_t idx = start + count;
if (idx >= rep_levels.size())
break;
if (rep_levels[idx] <= rep_level)
{
if (count)
{
offsets.push_back(offsets.back() + array_size);
array_size = 0;
}
array_size++;
}
else
array_size ++;
count++;
}
offsets.push_back(offsets.back() + array_size);
OptionalRowSet filter;
key_reader->read(key_column, filter, offsets.back());
value_reader->read(value_column, filter, offsets.back());
}
}
void MapColumnReader::computeRowSet(std::optional<RowSet> & , size_t )
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unsupported operation");
}
MutableColumnPtr MapColumnReader::createColumn()
{
MutableColumns columns;
columns.push_back(key_reader->createColumn());
columns.push_back(value_reader->createColumn());
MutableColumnPtr tuple = ColumnTuple::create(std::move(columns));
MutableColumnPtr array = ColumnArray::create(std::move(tuple));
return ColumnMap::create(std::move(array));
}
size_t MapColumnReader::skipValuesInCurrentPage(size_t )
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unimplemented operation");
}
void StructColumnReader::read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read)
{
checkColumn<ColumnTuple>(*column);
ColumnTuple * tuple_column = static_cast<ColumnTuple*>(column.get());
const auto *tuple_type = checkAndGetDataType<DataTypeTuple>(structType.get());
auto names = tuple_type->getElementNames();
for (size_t i = 0; i < names.size(); i++)
{
auto nested_column = tuple_column->getColumn(i).assumeMutable();
auto & nested_reader = children.at(names.at(i));
nested_reader->read(nested_column, row_set, rows_to_read);
}
}
void StructColumnReader::computeRowSet(std::optional<RowSet> & , size_t )
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unsupported operation");
}
MutableColumnPtr StructColumnReader::createColumn()
{
return structType->createColumn();
}
size_t StructColumnReader::skipValuesInCurrentPage(size_t )
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unimplemented operation");
}
}

View File

@ -475,4 +475,51 @@ private:
int16_t def_level = 0;
int16_t rep_level = 0;
};
class MapColumnReader : public SelectiveColumnReader
{
public:
MapColumnReader(int16_t rep_level_, int16_t def_level_, const SelectiveColumnReaderPtr key_, const SelectiveColumnReaderPtr value_)
: SelectiveColumnReader(nullptr, ScanSpec{}), key_reader(key_), value_reader(value_), def_level(def_level_), rep_level(rep_level_)
{
}
~MapColumnReader() override = default;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
int16_t maxDefinitionLevel() const override { return def_level; }
int16_t maxRepetitionLevel() const override { return rep_level; }
void computeRowSet(std::optional<RowSet> & row_set, size_t rows_to_read) override;
MutableColumnPtr createColumn() override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
private:
SelectiveColumnReaderPtr key_reader;
SelectiveColumnReaderPtr value_reader;
int16_t def_level = 0;
int16_t rep_level = 0;
};
class StructColumnReader : public SelectiveColumnReader
{
public:
StructColumnReader(const std::unordered_map<String, SelectiveColumnReaderPtr> & children_, DataTypePtr structType_)
: SelectiveColumnReader(nullptr, ScanSpec{}), children(children_), structType(structType_)
{
}
~StructColumnReader() override = default;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
void computeRowSet(OptionalRowSet & row_set, size_t rows_to_read) override;
MutableColumnPtr createColumn() override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
private:
std::unordered_map<String, SelectiveColumnReaderPtr> children;
DataTypePtr structType;
};
}