try to fix Nested

This commit is contained in:
Anton Popov 2022-07-26 17:31:56 +00:00
parent 3d03b2714b
commit 9321ca34cf
7 changed files with 29 additions and 15 deletions

View File

@ -224,7 +224,11 @@ String ISerialization::getSubcolumnNameForStream(const SubstreamPath & path, siz
void ISerialization::addToSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path, ColumnPtr column)
{
if (cache && !path.empty())
if (!cache || path.empty())
return;
auto subcolumn_name = getSubcolumnNameForStream(path);
if (!subcolumn_name.empty())
cache->emplace(getSubcolumnNameForStream(path), column);
}

View File

@ -448,12 +448,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns)
for (const auto & column : columns)
column_name_to_position.emplace(column.name, pos++);
/// For wide parts convert plain arrays to Nested for
/// more convinient managing of shared offsets column.
if (part_type == Type::Wide)
columns_description = ColumnsDescription(Nested::collect(columns));
else
columns_description = ColumnsDescription(columns);
columns_description = ColumnsDescription(columns);
}
NameAndTypePair IMergeTreeDataPart::getColumn(const String & column_name) const

View File

@ -136,6 +136,7 @@ public:
void setColumns(const NamesAndTypesList & new_columns);
const NamesAndTypesList & getColumns() const { return columns; }
const ColumnsDescription & getColumnsDescription() const { return columns_description; }
NameAndTypePair getColumn(const String & name) const;
std::optional<NameAndTypePair> tryGetColumn(const String & column_name) const;

View File

@ -55,16 +55,14 @@ public:
const NamesAndTypesList & getColumns() const { return columns; }
size_t numColumnsInResult() const { return columns.size(); }
size_t getFirstMarkToRead() const
{
return all_mark_ranges.front().begin;
}
size_t getFirstMarkToRead() const { return all_mark_ranges.front().begin; }
MergeTreeData::DataPartPtr data_part;
protected:
/// Returns actual column name in part, which can differ from table metadata.
String getColumnNameInPart(const NameAndTypePair & required_column) const;
/// Returns actual column name and type in part, which can differ from table metadata.
NameAndTypePair getColumnInPart(const NameAndTypePair & required_column) const;

View File

@ -49,7 +49,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartWide>(shared_from_this());
return std::make_unique<MergeTreeReaderWide>(
ptr, Nested::convertToSubcolumns(columns_to_read),
ptr, columns_to_read,
metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
@ -66,7 +66,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
{
return std::make_unique<MergeTreeDataPartWriterWide>(
shared_from_this(), data_part_storage_builder,
Nested::convertToSubcolumns(columns_list), metadata_snapshot, indices_to_recalc,
columns_list, metadata_snapshot, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec_, writer_settings, computed_index_granularity);
}

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnSparse.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNested.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
@ -60,6 +61,19 @@ MergeTreeReaderWide::MergeTreeReaderWide(
}
}
String MergeTreeReaderWide::getNameForSubstreamCache(const NameAndTypePair & column) const
{
if (!column.isSubcolumn() && isArray(column.type))
{
auto split = Nested::splitName(column.name);
const auto & part_columns = data_part->getColumnsDescription();
if (!split.second.empty() && part_columns.hasNested(split.first))
return split.first;
}
return column.getNameInStorage();
}
size_t MergeTreeReaderWide::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
@ -86,7 +100,7 @@ size_t MergeTreeReaderWide::readRows(
auto column_from_part = getColumnInPart(*name_and_type);
try
{
auto & cache = caches[column_from_part.getNameInStorage()];
auto & cache = caches[getNameForSubstreamCache(column_from_part)];
prefetch(column_from_part, from_mark, continue_reading, current_task_last_mark, cache, prefetched_streams);
}
catch (Exception & e)
@ -117,7 +131,7 @@ size_t MergeTreeReaderWide::readRows(
try
{
size_t column_size_before_reading = column->size();
auto & cache = caches[column_from_part.getNameInStorage()];
auto & cache = caches[getNameForSubstreamCache(column_from_part)];
readData(
column_from_part, column, from_mark, continue_reading, current_task_last_mark,

View File

@ -38,6 +38,8 @@ public:
private:
FileStreams streams;
String getNameForSubstreamCache(const NameAndTypePair & column) const;
void addStreams(const NameAndTypePair & name_and_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);