Fix named tuples output in ORC/Arrow/Parquet formats

This commit is contained in:
avogar 2022-05-23 14:21:08 +00:00
parent 008de5c779
commit ce4adb447f
6 changed files with 40 additions and 19 deletions

View File

@ -36,7 +36,6 @@
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/case_conv.hpp> #include <boost/algorithm/string/case_conv.hpp>
/// UINT16 and UINT32 are processed separately, see comments in readColumnFromArrowColumn. /// UINT16 and UINT32 are processed separately, see comments in readColumnFromArrowColumn.
#define FOR_ARROW_NUMERIC_TYPES(M) \ #define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \ M(arrow::Type::UINT8, DB::UInt8) \

View File

@ -11,7 +11,6 @@
#include <Columns/ColumnLowCardinality.h> #include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h> #include <Columns/ColumnMap.h>
#include <Core/callOnTypeIndex.h> #include <Core/callOnTypeIndex.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h> #include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
@ -215,14 +214,16 @@ namespace DB
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values) std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
{ {
const auto * column_tuple = assert_cast<const ColumnTuple *>(column.get()); const auto * column_tuple = assert_cast<const ColumnTuple *>(column.get());
const auto & nested_types = assert_cast<const DataTypeTuple *>(column_type.get())->getElements(); const auto * type_tuple = assert_cast<const DataTypeTuple *>(column_type.get());
const auto & nested_types = type_tuple->getElements();
const auto & nested_names = type_tuple->getElementNames();
arrow::StructBuilder & builder = assert_cast<arrow::StructBuilder &>(*array_builder); arrow::StructBuilder & builder = assert_cast<arrow::StructBuilder &>(*array_builder);
for (size_t i = 0; i != column_tuple->tupleSize(); ++i) for (size_t i = 0; i != column_tuple->tupleSize(); ++i)
{ {
ColumnPtr nested_column = column_tuple->getColumnPtr(i); ColumnPtr nested_column = column_tuple->getColumnPtr(i);
fillArrowArray(column_name + "." + std::to_string(i), nested_column, nested_types[i], null_bytemap, builder.field_builder(i), format_name, start, end, output_string_as_string, dictionary_values); fillArrowArray(column_name + "." + nested_names[i], nested_column, nested_types[i], null_bytemap, builder.field_builder(i), format_name, start, end, output_string_as_string, dictionary_values);
} }
for (size_t i = start; i != end; ++i) for (size_t i = start; i != end; ++i)
@ -661,14 +662,15 @@ namespace DB
if (isTuple(column_type)) if (isTuple(column_type))
{ {
const auto & nested_types = assert_cast<const DataTypeTuple *>(column_type.get())->getElements(); const auto & tuple_type = assert_cast<const DataTypeTuple *>(column_type.get());
const auto & nested_types = tuple_type->getElements();
const auto & nested_names = tuple_type->getElementNames();
const auto * tuple_column = assert_cast<const ColumnTuple *>(column.get()); const auto * tuple_column = assert_cast<const ColumnTuple *>(column.get());
std::vector<std::shared_ptr<arrow::Field>> nested_fields; std::vector<std::shared_ptr<arrow::Field>> nested_fields;
for (size_t i = 0; i != nested_types.size(); ++i) for (size_t i = 0; i != nested_types.size(); ++i)
{ {
String name = column_name + "." + std::to_string(i); auto nested_arrow_type = getArrowType(nested_types[i], tuple_column->getColumnPtr(i), nested_names[i], format_name, output_string_as_string, out_is_column_nullable);
auto nested_arrow_type = getArrowType(nested_types[i], tuple_column->getColumnPtr(i), name, format_name, output_string_as_string, out_is_column_nullable); nested_fields.push_back(std::make_shared<arrow::Field>(nested_names[i], nested_arrow_type, *out_is_column_nullable));
nested_fields.push_back(std::make_shared<arrow::Field>(name, nested_arrow_type, *out_is_column_nullable));
} }
return arrow::struct_(nested_fields); return arrow::struct_(nested_fields);
} }

View File

@ -55,7 +55,7 @@ ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & hea
data_types.push_back(recursiveRemoveLowCardinality(type)); data_types.push_back(recursiveRemoveLowCardinality(type));
} }
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type, const std::string & column_name) ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type)
{ {
switch (type->getTypeId()) switch (type->getTypeId())
{ {
@ -106,12 +106,12 @@ ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & t
} }
case TypeIndex::Nullable: case TypeIndex::Nullable:
{ {
return getORCType(removeNullable(type), column_name); return getORCType(removeNullable(type));
} }
case TypeIndex::Array: case TypeIndex::Array:
{ {
const auto * array_type = assert_cast<const DataTypeArray *>(type.get()); const auto * array_type = assert_cast<const DataTypeArray *>(type.get());
return orc::createListType(getORCType(array_type->getNestedType(), column_name)); return orc::createListType(getORCType(array_type->getNestedType()));
} }
case TypeIndex::Decimal32: case TypeIndex::Decimal32:
{ {
@ -131,21 +131,19 @@ ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & t
case TypeIndex::Tuple: case TypeIndex::Tuple:
{ {
const auto * tuple_type = assert_cast<const DataTypeTuple *>(type.get()); const auto * tuple_type = assert_cast<const DataTypeTuple *>(type.get());
const auto & nested_names = tuple_type->getElementNames();
const auto & nested_types = tuple_type->getElements(); const auto & nested_types = tuple_type->getElements();
auto struct_type = orc::createStructType(); auto struct_type = orc::createStructType();
for (size_t i = 0; i < nested_types.size(); ++i) for (size_t i = 0; i < nested_types.size(); ++i)
{ struct_type->addStructField(nested_names[i], getORCType(nested_types[i]));
String name = column_name + "." + std::to_string(i);
struct_type->addStructField(name, getORCType(nested_types[i], name));
}
return struct_type; return struct_type;
} }
case TypeIndex::Map: case TypeIndex::Map:
{ {
const auto * map_type = assert_cast<const DataTypeMap *>(type.get()); const auto * map_type = assert_cast<const DataTypeMap *>(type.get());
return orc::createMapType( return orc::createMapType(
getORCType(map_type->getKeyType(), column_name), getORCType(map_type->getKeyType()),
getORCType(map_type->getValueType(), column_name) getORCType(map_type->getValueType())
); );
} }
default: default:
@ -514,7 +512,7 @@ void ORCBlockOutputFormat::prepareWriter()
options.setCompression(orc::CompressionKind::CompressionKind_NONE); options.setCompression(orc::CompressionKind::CompressionKind_NONE);
size_t columns_count = header.columns(); size_t columns_count = header.columns();
for (size_t i = 0; i != columns_count; ++i) for (size_t i = 0; i != columns_count; ++i)
schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i]), header.safeGetByPosition(i).name)); schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i])));
writer = orc::createWriter(*schema, &output_stream, options); writer = orc::createWriter(*schema, &output_stream, options);
} }

View File

@ -42,7 +42,7 @@ private:
void consume(Chunk chunk) override; void consume(Chunk chunk) override;
void finalizeImpl() override; void finalizeImpl() override;
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type, const std::string & column_name); ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type);
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be /// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be
/// converted to unsigned char (bugprone-signed-char-misuse in clang). /// converted to unsigned char (bugprone-signed-char-misuse in clang).

View File

@ -0,0 +1,9 @@
(1,2)
(2,3)
(3,4)
(1,2)
(2,3)
(3,4)
(1,2)
(2,3)
(3,4)

View File

@ -0,0 +1,13 @@
create table parquet_02312 (x Tuple(a UInt32, b UInt32)) engine=File(Parquet);
insert into parquet_02312 values ((1,2)), ((2,3)), ((3,4));
select * from parquet_02312;
drop table parquet_02312;
create table parquet_02312 (x Tuple(a UInt32, b UInt32)) engine=File(Arrow);
insert into parquet_02312 values ((1,2)), ((2,3)), ((3,4));
select * from parquet_02312;
drop table parquet_02312;
create table parquet_02312 (x Tuple(a UInt32, b UInt32)) engine=File(ORC);
insert into parquet_02312 values ((1,2)), ((2,3)), ((3,4));
select * from parquet_02312;
drop table parquet_02312;