fix nested and subcolumns

This commit is contained in:
Anton Popov 2020-12-07 22:02:26 +03:00
parent d7aad3bf79
commit 06d5b87bc9
7 changed files with 54 additions and 19 deletions

View File

@ -86,7 +86,7 @@ Block flatten(const Block & block)
for (const auto & elem : block)
{
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(elem.type.get());
if (!isNested(elem.type) && type_arr)
if (type_arr)
{
const DataTypeTuple * type_tuple = typeid_cast<const DataTypeTuple *>(type_arr->getNestedType().get());
if (type_tuple && type_tuple->haveExplicitNames())
@ -130,12 +130,14 @@ Block flatten(const Block & block)
return res;
}
NamesAndTypesList collect(const NamesAndTypesList & names_and_types)
namespace
{
NamesAndTypesList res = names_and_types;
std::map<std::string, NamesAndTypesList> nested;
using NameToDataType = std::map<String, DataTypePtr>;
NameToDataType getSubcolumnsOfNested(const NamesAndTypesList & names_and_types)
{
std::unordered_map<String, NamesAndTypesList> nested;
for (const auto & name_type : names_and_types)
{
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(name_type.type.get());
@ -149,10 +151,36 @@ NamesAndTypesList collect(const NamesAndTypesList & names_and_types)
}
}
std::unordered_map<String, DataTypePtr> nested_types;
std::map<String, DataTypePtr> nested_types;
for (const auto & [name, elems] : nested)
nested_types.emplace(name, createNested(elems.getTypes(), elems.getNames()));
return nested_types;
}
}
NamesAndTypesList collect(const NamesAndTypesList & names_and_types)
{
NamesAndTypesList res;
auto nested_types = getSubcolumnsOfNested(names_and_types);
for (const auto & name_type : names_and_types)
if (!nested_types.count(splitName(name_type.name).first))
res.push_back(name_type);
for (const auto & name_type : nested_types)
res.emplace_back(name_type.first, name_type.second);
return res;
}
NamesAndTypesList convertToSubcolumns(const NamesAndTypesList & names_and_types)
{
auto nested_types = getSubcolumnsOfNested(names_and_types);
auto res = names_and_types;
for (auto & name_type : res)
{
auto split = splitName(name_type.name);

View File

@ -23,6 +23,9 @@ namespace Nested
/// Collect Array columns in a form of `column_name.element_name` to single Array(Tuple(...)) column.
NamesAndTypesList collect(const NamesAndTypesList & names_and_types);
/// Convert old-style nested (single arrays with same prefix, `n.a`, `n.b`...) to subcolumns of data type Nested.
NamesAndTypesList convertToSubcolumns(const NamesAndTypesList & names_and_types);
/// Check that sizes of arrays - elements of nested data structures - are equal.
void validateArraySizes(const Block & block);
}

View File

@ -51,7 +51,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeReaderInMemory>(
ptr, Nested::collect(columns_to_read), metadata_snapshot, mark_ranges, reader_settings);
ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(

View File

@ -50,7 +50,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartWide>(shared_from_this());
return std::make_unique<MergeTreeReaderWide>(
ptr, Nested::collect(columns_to_read), metadata_snapshot, uncompressed_cache,
ptr, Nested::convertToSubcolumns(columns_to_read), metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
}

View File

@ -95,7 +95,7 @@ public:
BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage, const StorageMetadataPtr & metadata_snapshot)
: SourceWithProgress(
metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(column_names_.begin(), column_names_.end())
, column_names_and_types(metadata_snapshot->getColumns().getAllWithSubcolumns().addTypes(column_names_))
, buffer(buffer_) {}
String getName() const override { return "Buffer"; }
@ -115,10 +115,16 @@ protected:
return res;
Columns columns;
columns.reserve(column_names.size());
columns.reserve(column_names_and_types.size());
for (const auto & name : column_names)
columns.push_back(buffer.data.getByName(name).column);
for (const auto & elem : column_names_and_types)
{
const auto & current_column = buffer.data.getByName(elem.getStorageName()).column;
if (elem.isSubcolumn())
columns.emplace_back(elem.getStorageType()->getSubcolumn(elem.getSubcolumnName(), *current_column->assumeMutable()));
else
columns.emplace_back(std::move(current_column));
}
UInt64 size = columns.at(0)->size();
res.setColumns(std::move(columns), size);
@ -127,7 +133,7 @@ protected:
}
private:
Names column_names;
NamesAndTypesList column_names_and_types;
StorageBuffer::Buffer & buffer;
bool has_been_read = false;
};
@ -188,8 +194,8 @@ void StorageBuffer::read(
{
const auto & dest_columns = destination_metadata_snapshot->getColumns();
const auto & our_columns = metadata_snapshot->getColumns();
return dest_columns.hasPhysical(column_name) &&
dest_columns.get(column_name).type->equals(*our_columns.get(column_name).type);
return dest_columns.hasPhysicalOrSubcolumn(column_name) &&
dest_columns.getPhysicalOrSubcolumn(column_name).type->equals(*our_columns.getPhysicalOrSubcolumn(column_name).type);
});
if (dst_has_same_structure)

View File

@ -57,7 +57,6 @@ public:
for (const auto & name_type : columns)
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
// return Nested::flatten(res);
return res;
}
@ -628,7 +627,7 @@ Pipe StorageLog::read(
loadMarks();
auto all_columns = metadata_snapshot->getColumns().getAllWithSubcolumns().addTypes(column_names);
all_columns = Nested::collect(all_columns);
all_columns = Nested::convertToSubcolumns(all_columns);
std::shared_lock<std::shared_mutex> lock(rwlock);

View File

@ -219,7 +219,6 @@ Chunk TinyLogSource::generate()
streams.clear();
}
// auto flatten = Nested::flatten(res);
return Chunk(res.getColumns(), res.rows());
}
@ -449,7 +448,7 @@ Pipe StorageTinyLog::read(
// When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently.
return Pipe(std::make_shared<TinyLogSource>(
max_block_size, Nested::collect(all_columns),
max_block_size, Nested::convertToSubcolumns(all_columns),
*this, context.getSettingsRef().max_read_buffer_size));
}