improve parquet struct field reading

This commit is contained in:
lgbo-ustc 2023-10-30 16:10:29 +08:00
parent f1c7665feb
commit 8334585eaf
2 changed files with 97 additions and 22 deletions

View File

@ -22,6 +22,7 @@ namespace ErrorCodes
extern const int THERE_IS_NO_COLUMN;
}
// This is only used for parquet now.
class ArrowFieldIndexUtil
{
public:
@ -61,30 +62,13 @@ public:
std::unordered_set<int> added_indices;
/// Flat all named fields' index information into a map.
auto fields_indices = calculateFieldIndices(schema);
for (size_t i = 0; i < header.columns(); ++i)
for (size_t i = 0, n = header.columns(); i < n; ++i)
{
const auto & named_col = header.getByPosition(i);
std::string col_name = named_col.name;
if (ignore_case)
boost::to_lower(col_name);
/// Since all named fields are flatten into a map, we should find the column by name
/// in this map.
auto it = fields_indices.find(col_name);
if (it == fields_indices.end())
{
if (!allow_missing_columns)
throw Exception(
ErrorCodes::THERE_IS_NO_COLUMN, "Not found field ({}) in the following Arrow schema:\n{}\n", named_col.name, schema.ToString());
else
continue;
}
for (int j = 0; j < it->second.second; ++j)
{
auto index = it->second.first + j;
if (added_indices.insert(index).second)
required_indices.emplace_back(index);
}
findRequiredIndices(col_name, named_col.type, fields_indices, added_indices, required_indices);
}
return required_indices;
}
@ -146,9 +130,7 @@ private:
calculateFieldIndices(*sub_field, sub_field->name(), current_start_index, result, full_path_name);
}
}
else if (
field_type->id() == arrow::Type::LIST
&& static_cast<arrow::ListType *>(field_type.get())->value_type()->id() == arrow::Type::STRUCT)
else if (field_type->id() == arrow::Type::LIST)
{
// It is a nested table.
const auto * list_type = static_cast<arrow::ListType *>(field_type.get());
@ -159,12 +141,75 @@ private:
// rewrite it back to the original value.
index_info.first = index_snapshot;
}
else if (field_type->id() == arrow::Type::MAP)
{
const auto * map_type = static_cast<arrow::MapType *>(field_type.get());
auto index_snapshot = current_start_index;
current_start_index += countIndicesForType(map_type->key_type());
calculateFieldIndices(*map_type->item_field(), field_name, current_start_index, result, name_prefix);
index_info.first = index_snapshot;
}
else
{
current_start_index += countIndicesForType(field_type);
}
index_info.second = current_start_index - index_info.first;
}
void findRequiredIndices(
const String & name,
DataTypePtr data_type,
const std::unordered_map<std::string, std::pair<int, int>> & field_indices,
std::unordered_set<int> & added_indices,
std::vector<int> & required_indices)
{
auto nested_type = removeNullable(data_type);
if (const DB::DataTypeTuple * type_tuple = typeid_cast<const DB::DataTypeTuple *>(nested_type.get()))
{
if (type_tuple->haveExplicitNames())
{
auto field_names = type_tuple->getElementNames();
auto field_types = type_tuple->getElements();
for (size_t i = 0, n = field_names.size(); i < n; ++i)
{
auto field_name = field_names[i];
if (ignore_case)
boost::to_lower(field_name);
const auto & field_type = field_types[i];
findRequiredIndices(Nested::concatenateName(name, field_name), field_type, field_indices, added_indices, required_indices);
}
return;
}
}
else if (const auto * type_array = typeid_cast<const DB::DataTypeArray *>(nested_type.get()))
{
findRequiredIndices(name, type_array->getNestedType(), field_indices, added_indices, required_indices);
return;
}
else if (const auto * type_map = typeid_cast<const DB::DataTypeMap *>(nested_type.get()))
{
findRequiredIndices(name, type_map->getKeyType(), field_indices, added_indices, required_indices);
findRequiredIndices(name, type_map->getValueType(), field_indices, added_indices, required_indices);
return;
}
auto it = field_indices.find(name);
if (it == field_indices.end())
{
if (!allow_missing_columns)
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Not found field ({})", name);
}
else
{
for (int j = 0; j < it->second.second; ++j)
{
auto index = it->second.first + j;
if (added_indices.insert(index).second)
{
required_indices.emplace_back(index);
}
}
}
}
};
}
#endif

View File

@ -0,0 +1,30 @@
<test>
<fill_query>
INSERT INTO FUNCTION file('test_pq', Parquet) SELECT * FROM generateRandom('int64_column Nullable(Int64), tuple_column Tuple(a Nullable(String), b Nullable(Float64), c Tuple(i UInt32, j UInt32)),array_tuple_column Array(Tuple(a Nullable(String), b Nullable(Float64), c Nullable(Int64))), map_tuple_column Map(String, Tuple(a Nullable(String), b Nullable(Float64), c Nullable(Int64)))') limit 1000000
</fill_query>
<query>
SELECT * FROM file('test_pq', Parquet, 'tuple_column Tuple(a Nullable(String))') Format Null
</query>
<query>
SELECT tuple_column.a FROM file('test_pq', Parquet) Format Null
</query>
<query>
SELECT tuple_column.a FROM file('test_pq', Parquet, 'tuple_column Tuple(a Nullable(String))') Format Null
</query>
<query>
SELECT tuple_column.c.i FROM file('test_pq', Parquet) Format Null
</query>
<query>
SELECT * FROM file('test_pq', Parquet, 'array_tuple_column Array (Tuple(a Nullable(String)))') Format Null
</query>
<query>
SELECT * FROM file('test_pq', Parquet, 'map_tuple_column Map(String, Tuple(a Nullable(String)))') Format Null
</query>
</test>