Support reading Array(Record) into flatten nested table in Avro

This commit is contained in:
avogar 2022-08-23 11:05:02 +00:00
parent a3e4753d83
commit 581e569d04
6 changed files with 112 additions and 1 deletions

View File

@ -265,6 +265,18 @@ std::unordered_set<String> getAllTableNames(const Block & block, bool to_lower_c
}
return nested_table_names;
}
Names getAllNestedColumnsForTable(const Block & block, const std::string & table_name)
{
Names names;
for (const auto & name: block.getNames())
{
if (extractTableName(name) == table_name)
names.push_back(name);
}
return names;
}
}
NestedColumnExtractHelper::NestedColumnExtractHelper(const Block & block_, bool case_insentive_)

View File

@ -34,6 +34,9 @@ namespace Nested
/// Get all nested tables names from a block.
std::unordered_set<String> getAllTableNames(const Block & block, bool to_lower_case = false);
/// Extract all column names that are nested for specifying table.
Names getAllNestedColumnsForTable(const Block & block, const std::string & table_name);
}
/// Use this class to extract element columns from columns of nested type in a block, e.g. named Tuple.

View File

@ -25,6 +25,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
@ -572,6 +573,39 @@ AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
}
}
void AvroDeserializer::Action::deserializeNested(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const
{
/// We should deserialize all nested columns together, because
/// in avro we have single row Array(Record) and we can
/// deserialize it once.
std::vector<ColumnArray::Offsets *> arrays_offsets;
arrays_offsets.reserve(nested_column_indexes.size());
std::vector<IColumn *> nested_columns;
nested_columns.reserve(nested_column_indexes.size());
for (size_t index : nested_column_indexes)
{
ColumnArray & column_array = assert_cast<ColumnArray &>(*columns[index]);
arrays_offsets.push_back(&column_array.getOffsets());
nested_columns.push_back(&column_array.getData());
ext.read_columns[index] = true;
}
size_t total = 0;
for (size_t n = decoder.arrayStart(); n != 0; n = decoder.arrayNext())
{
total += n;
for (size_t i = 0; i < n; ++i)
{
for (size_t j = 0; j != nested_deserializers.size(); ++j)
nested_deserializers[j](*nested_columns[j], decoder);
}
}
for (auto & offsets : arrays_offsets)
offsets->push_back(offsets->back() + total);
}
static inline std::string concatPath(const std::string & a, const std::string & b)
{
return a.empty() ? b : a + "." + b;
@ -631,6 +665,42 @@ AvroDeserializer::Action AvroDeserializer::createAction(const Block & header, co
}
return AvroDeserializer::Action::unionAction(branch_actions);
}
else if (node->type() == avro::AVRO_ARRAY)
{
/// If header doesn't have column with current_path name and node is Array(Record),
/// check if we have a flattened Nested table with such name.
Names nested_names = Nested::getAllNestedColumnsForTable(header, current_path);
auto nested_avro_node = node->leafAt(0);
if (nested_names.empty() || nested_avro_node->type() != avro::AVRO_RECORD)
return AvroDeserializer::Action(createSkipFn(node));
/// Check that all nested columns are Arrays.
std::unordered_map<String, DataTypePtr> nested_types;
for (const auto & name : nested_names)
{
auto type = header.getByName(name).type;
if (!isArray(type))
return AvroDeserializer::Action(createSkipFn(node));
nested_types[Nested::splitName(name).second] = assert_cast<const DataTypeArray *>(type.get())->getNestedType();
}
/// Create nested deserializer for each nested column.
std::vector<DeserializeFn> nested_deserializers;
std::vector<size_t> nested_indexes;
for (size_t i = 0; i != nested_avro_node->leaves(); ++i)
{
const auto & name = nested_avro_node->nameAt(i);
if (!nested_types.contains(name))
return AvroDeserializer::Action(createSkipFn(node));
size_t nested_column_index = header.getPositionByName(Nested::concatenateName(current_path, name));
column_found[nested_column_index] = true;
auto nested_deserializer = createDeserializeFn(nested_avro_node->leafAt(i), nested_types[name]);
nested_deserializers.emplace_back(nested_deserializer);
nested_indexes.push_back(nested_column_index);
}
return AvroDeserializer::Action(nested_indexes, nested_deserializers);
}
else
{
return AvroDeserializer::Action(createSkipFn(node));

View File

@ -37,13 +37,15 @@ public:
private:
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
using DeserializeNestedFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
using SkipFn = std::function<void(avro::Decoder & decoder)>;
DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
SkipFn createSkipFn(avro::NodePtr root_node);
struct Action
{
enum Type {Noop, Deserialize, Skip, Record, Union};
enum Type {Noop, Deserialize, Skip, Record, Union, Nested};
Type type;
/// Deserialize
int target_column_idx;
@ -52,6 +54,9 @@ private:
SkipFn skip_fn;
/// Record | Union
std::vector<Action> actions;
/// For flattened Nested column
std::vector<size_t> nested_column_indexes;
std::vector<DeserializeFn> nested_deserializers;
Action() : type(Noop) {}
@ -65,6 +70,11 @@ private:
: type(Skip)
, skip_fn(skip_fn_) {}
Action(std::vector<size_t> nested_column_indexes_, std::vector<DeserializeFn> nested_deserializers_)
: type(Nested)
, nested_column_indexes(nested_column_indexes_)
, nested_deserializers(nested_deserializers_) {}
static Action recordAction(std::vector<Action> field_actions) { return Action(Type::Record, field_actions); }
static Action unionAction(std::vector<Action> branch_actions) { return Action(Type::Union, branch_actions); }
@ -87,6 +97,9 @@ private:
for (const auto & action : actions)
action.execute(columns, decoder, ext);
break;
case Nested:
deserializeNested(columns, decoder, ext);
break;
case Union:
auto index = decoder.decodeUnionIndex();
if (index >= actions.size())
@ -101,6 +114,8 @@ private:
Action(Type type_, std::vector<Action> actions_)
: type(type_)
, actions(actions_) {}
void deserializeNested(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const;
};
/// Populate actions by recursively traversing root schema

View File

@ -0,0 +1,2 @@
[1,2] ['aa','bb']
[1,5] [(2,['aa','bb']),(6,['ee','ff'])] [[(3,'cc'),(4,'dd')],[(7,'gg'),(8,'hh')]]

View File

@ -0,0 +1,9 @@
-- Tags: no-fasttest, no-parallel
set flatten_nested = 1;
insert into function file(02405_data.avro) select [(1, 'aa'), (2, 'bb')]::Nested(x UInt32, y String) as nested settings engine_file_truncate_on_insert=1;
select * from file(02405_data.avro, auto, 'nested Nested(x UInt32, y String)');
insert into function file(02405_data.avro) select [(1, (2, ['aa', 'bb']), [(3, 'cc'), (4, 'dd')]), (5, (6, ['ee', 'ff']), [(7, 'gg'), (8, 'hh')])]::Nested(x UInt32, y Tuple(y1 UInt32, y2 Array(String)), z Nested(z1 UInt32, z2 String)) as nested settings engine_file_truncate_on_insert=1;
select * from file(data.avro, auto, 'nested Nested(x UInt32, y Tuple(y1 UInt32, y2 Array(String)), z Nested(z1 UInt32, z2 String))');