diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index a2984421c2a..624de2886a8 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -248,7 +248,23 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) } } -void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) +IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const +{ + String table_name = Nested::extractTableName(column_name); + for (const auto & part_column : data_part->getColumns()) + { + if (typeid_cast(part_column.type.get())) + { + auto position = data_part->getColumnPosition(part_column.name); + if (position && Nested::extractTableName(part_column.name) == table_name) + return position; + } + } + + return {}; +} + +void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const { if (num_columns_to_read != columns.size()) throw Exception("invalid number of columns passed to MergeTreeReader::readRows. " diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index 79f7860d1cc..90a43a61536 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -61,7 +61,7 @@ protected: /// Returns actual column type in part, which can differ from table metadata. NameAndTypePair getColumnFromPart(const NameAndTypePair & required_column) const; - void checkNumberOfColumns(size_t columns_num_to_read); + void checkNumberOfColumns(size_t columns_num_to_read) const; /// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size. ValueSizeMap avg_value_size_hints; @@ -79,6 +79,9 @@ protected: const MergeTreeData & storage; MarkRanges all_mark_ranges; + using ColumnPosition = std::optional; + ColumnPosition findColumnForOffsets(const String & column_name) const; + friend class MergeTreeRangeReader::DelayedStream; private: diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index a63397b9b9c..5b84069cc2c 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -84,12 +84,11 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( if (!position && typeid_cast(type.get())) { /// If array of Nested column is missing in part, - /// we have to read it's offsets if they exists. + /// we have to read its offsets if they exist. position = findColumnForOffsets(name); read_only_offsets[i] = (position != std::nullopt); } - column_positions[i] = std::move(position); } @@ -168,23 +167,6 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, return read_rows; } -MergeTreeReaderCompact::ColumnPosition MergeTreeReaderCompact::findColumnForOffsets(const String & column_name) -{ - String table_name = Nested::extractTableName(column_name); - for (const auto & part_column : data_part->getColumns()) - { - if (typeid_cast(part_column.type.get())) - { - auto position = data_part->getColumnPosition(part_column.name); - if (position && Nested::extractTableName(part_column.name) == table_name) - return position; - } - } - - return {}; -} - - void MergeTreeReaderCompact::readData( const String & name, IColumn & column, const IDataType & type, size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets) diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index 827306cd983..75d1da342fb 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -40,7 +40,6 @@ private: MergeTreeMarksLoader marks_loader; - using ColumnPosition = std::optional; /// Positions of columns in part structure. std::vector column_positions; /// Should we read full column or only it's offsets @@ -53,8 +52,6 @@ private: void readData(const String & name, IColumn & column, const IDataType & type, size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false); - - ColumnPosition findColumnForOffsets(const String & column_name); }; } diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp index 5e4c3e49e3b..ce6eb44a50b 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include namespace DB @@ -24,10 +25,20 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory( std::move(settings_), {}) , part_in_memory(std::move(data_part_)) { + for (const auto & name_and_type : columns) + { + auto [name, type] = getColumnFromPart(name_and_type); + if (!part_in_memory->block.has(name) && typeid_cast(type.get())) + if (auto offset_position = findColumnForOffsets(name)) + positions_for_offsets[name] = *offset_position; + } } -size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool /* continue_reading */, size_t max_rows_to_read, Columns & res_columns) +size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { + if (!continue_reading) + total_rows_read = 0; + size_t total_marks = data_part->index_granularity.getMarksCount(); if (from_mark >= total_marks) throw Exception("Mark " + toString(from_mark) + " is out of bound. Max mark: " @@ -41,34 +52,49 @@ size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool /* continue_read throw Exception("Cannot read data in MergeTreeReaderInMemory. Rows already read: " + toString(total_rows_read) + ". Rows in part: " + toString(part_rows), ErrorCodes::CANNOT_READ_ALL_DATA); + size_t rows_to_read = std::min(max_rows_to_read, part_rows - total_rows_read); auto column_it = columns.begin(); - size_t rows_read = 0; for (size_t i = 0; i < num_columns; ++i, ++column_it) { auto [name, type] = getColumnFromPart(*column_it); - if (!part_in_memory->block.has(name)) - continue; - const auto & block_column = part_in_memory->block.getByName(name).column; - if (total_rows_read == 0 && part_rows <= max_rows_to_read) - { - res_columns[i] = block_column; - rows_read = part_rows; - } - else + auto offsets_it = positions_for_offsets.find(name); + if (offsets_it != positions_for_offsets.end()) { + const auto & source_offsets = assert_cast( + *part_in_memory->block.getByPosition(offsets_it->second).column).getOffsets(); + if (res_columns[i] == nullptr) res_columns[i] = type->createColumn(); auto mutable_column = res_columns[i]->assumeMutable(); - rows_read = std::min(max_rows_to_read, part_rows - total_rows_read); - mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_read); + auto & res_offstes = assert_cast(*mutable_column).getOffsets(); + for (size_t row = 0; row < rows_to_read; ++row) + res_offstes.push_back(source_offsets[total_rows_read + row]); + res_columns[i] = std::move(mutable_column); } + else if (part_in_memory->block.has(name)) + { + const auto & block_column = part_in_memory->block.getByName(name).column; + if (rows_to_read == part_rows) + { + res_columns[i] = block_column; + } + else + { + if (res_columns[i] == nullptr) + res_columns[i] = type->createColumn(); + + auto mutable_column = res_columns[i]->assumeMutable(); + mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_to_read); + res_columns[i] = std::move(mutable_column); + } + } } - total_rows_read += rows_read; - return rows_read; + total_rows_read += rows_to_read; + return rows_to_read; } } diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.h b/src/Storages/MergeTree/MergeTreeReaderInMemory.h index 6d64801682e..196fc53725a 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.h +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.h @@ -29,6 +29,8 @@ public: private: size_t total_rows_read = 0; DataPartInMemoryPtr part_in_memory; + + std::unordered_map positions_for_offsets; }; } diff --git a/tests/queries/0_stateless/01130_in_memory_parts_nested.reference b/tests/queries/0_stateless/01130_in_memory_parts_nested.reference new file mode 100644 index 00000000000..abc233c46f4 --- /dev/null +++ b/tests/queries/0_stateless/01130_in_memory_parts_nested.reference @@ -0,0 +1,15 @@ +[0] +[0,0,0] +[0,0,0,0,0] +[0,0,0,0,0,0,0] +[0,0,0,0,0,0,0,0,0] +[0] +[0,2,4] +[0,2,4,6,8] +[0,2,4,6,8,10,12] +[0,2,4,6,8,10,12,14,16] +[0] [0] +[0,1,2] [0,2,4] +[0,1,2,3,4] [0,2,4,6,8] +[0,1,2,3,4,5,6] [0,2,4,6,8,10,12] +[0,1,2,3,4,5,6,7,8] [0,2,4,6,8,10,12,14,16] diff --git a/tests/queries/0_stateless/01130_in_memory_parts_nested.sql b/tests/queries/0_stateless/01130_in_memory_parts_nested.sql new file mode 100644 index 00000000000..c09593d01bc --- /dev/null +++ b/tests/queries/0_stateless/01130_in_memory_parts_nested.sql @@ -0,0 +1,16 @@ +-- Test 00576_nested_and_prewhere, but with in-memory parts. +DROP TABLE IF EXISTS nested; + +CREATE TABLE nested (x UInt64, filter UInt8, n Nested(a UInt64)) ENGINE = MergeTree ORDER BY x + SETTINGS min_rows_for_compact_part = 200000, min_rows_for_wide_part = 300000; + +INSERT INTO nested SELECT number, number % 2, range(number % 10) FROM system.numbers LIMIT 100000; + +ALTER TABLE nested ADD COLUMN n.b Array(UInt64); +SELECT DISTINCT n.b FROM nested PREWHERE filter; + +ALTER TABLE nested ADD COLUMN n.c Array(UInt64) DEFAULT arrayMap(x -> x * 2, n.a); +SELECT DISTINCT n.c FROM nested PREWHERE filter; +SELECT DISTINCT n.a, n.c FROM nested PREWHERE filter; + +DROP TABLE nested;