diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index bf4eba1c354..010a97625d1 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -524,9 +524,10 @@ private: /// Map from name of column to its serialization info. SerializationInfoByName serialization_infos; + /// Serializations for every columns and subcolumns by their names. SerializationByName serializations; - /// Columns description for more convenient access + /// Columns description for more convenient access /// to columns by name and getting subcolumns. ColumnsDescription columns_description; diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index 30969939622..8c861248580 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -40,6 +40,8 @@ IMergeTreeReader::IMergeTreeReader( , metadata_snapshot(metadata_snapshot_) , all_mark_ranges(all_mark_ranges_) , alter_conversions(storage.getAlterConversionsForPart(data_part)) + /// For wide parts convert plain arrays of Nested to subcolumns + /// to allow to use shared offset column from cache. , requested_columns(isWidePart(data_part) ? Nested::convertToSubcolumns(columns_) : columns_) , part_columns(isWidePart(data_part) ? Nested::collect(data_part->getColumns()) : data_part->getColumns()) { diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index 0d6c0e607cd..453563522a5 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -75,7 +75,10 @@ protected: /// Stores states for IDataType::deserializeBinaryBulk DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map; + /// Actual column names and types of columns in part, + /// which may differ from table metadata. NamesAndTypes columns_to_read; + /// Actual serialization of columns in part. Serializations serializations; UncompressedCache * uncompressed_cache; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 712c3f74bdd..3d4aa0a7707 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -583,7 +583,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync) { if (column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes() - && data_part->getSerialization(columnn.name)->getKind() == ISerialization::Kind::DEFAULT) + && data_part->getSerialization(column.name)->getKind() == ISerialization::Kind::DEFAULT) { validateColumnOfFixedSize(column); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 5b54c1e6ae7..9b41c7bc623 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -459,8 +459,21 @@ static NameToNameVector collectFilesForRenames( const MutationCommands & commands_for_removes, const String & mrk_extension) { + /// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes. + std::unordered_map stream_counts; + for (const auto & column : source_part->getColumns()) + { + if (auto serialization = source_part->tryGetSerialization(column.name)) + { + serialization->enumerateStreams( + [&](const ISerialization::SubstreamPath & substream_path) + { + ++stream_counts[ISerialization::getFileNameForStream(column, substream_path)]; + }); + } + } + NameToNameVector rename_vector; - NameSet renamed_streams; /// Remove old data for (const auto & command : commands_for_removes) @@ -483,6 +496,22 @@ static NameToNameVector collectFilesForRenames( if (source_part->checksums.has(command.column_name + ".proj")) rename_vector.emplace_back(command.column_name + ".proj", ""); } + else if (command.type == MutationCommand::Type::DROP_COLUMN) + { + ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path) + { + String stream_name = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path); + /// Delete files if they are no longer shared with another column. + if (--stream_counts[stream_name] == 0) + { + rename_vector.emplace_back(stream_name + ".bin", ""); + rename_vector.emplace_back(stream_name + mrk_extension, ""); + } + }; + + if (auto serialization = source_part->tryGetSerialization(command.column_name)) + serialization->enumerateStreams(callback); + } else if (command.type == MutationCommand::Type::RENAME_COLUMN) { String escaped_name_from = escapeForFileName(command.column_name); @@ -495,7 +524,6 @@ static NameToNameVector collectFilesForRenames( if (stream_from != stream_to) { - renamed_streams.insert(stream_from); rename_vector.emplace_back(stream_from + ".bin", stream_to + ".bin"); rename_vector.emplace_back(stream_from + mrk_extension, stream_to + mrk_extension); } @@ -504,41 +532,38 @@ static NameToNameVector collectFilesForRenames( if (auto serialization = source_part->tryGetSerialization(command.column_name)) serialization->enumerateStreams(callback); } - } - - auto collect_all_stream_names = [&](const auto & data_part) - { - NameSet res; - for (const auto & column : data_part->getColumns()) + else if (command.type == MutationCommand::Type::READ_COLUMN) { - if (auto serialization = data_part->tryGetSerialization(column.name)) + /// Remove files for streams that exist in source_part, + /// but were removed in new_part by MODIFY COLUMN from + /// type with higher number of streams (e.g. LowCardinality -> String). + + auto collect_stream_names = [&](const auto & data_part) { - serialization->enumerateStreams( - [&](const ISerialization::SubstreamPath & substream_path) - { - res.insert(ISerialization::getFileNameForStream(column.name, substream_path)); - }); + NameSet res; + if (auto serialization = data_part->tryGetSerialization(command.column_name)) + { + serialization->enumerateStreams( + [&](const ISerialization::SubstreamPath & substream_path) + { + res.insert(ISerialization::getFileNameForStream(command.column_name, substream_path)); + }); + } + return res; + }; + + auto old_streams = collect_stream_names(source_part); + auto new_streams = collect_stream_names(new_part); + + for (const auto & old_stream : old_streams) + { + if (!new_streams.contains(old_stream)) + { + rename_vector.emplace_back(old_stream + ".bin", ""); + rename_vector.emplace_back(old_stream + mrk_extension, ""); + } } } - - return res; - }; - - /// Remove files for streams that exists in source part, - /// but were removed in new_part by DROP COLUMN - /// or MODIFY COLUMN from type with higher number of streams - /// (e.g. LowCardinality -> String). - - auto old_streams = collect_all_stream_names(source_part); - auto new_streams = collect_all_stream_names(new_part); - - for (const auto & old_stream : old_streams) - { - if (!new_streams.contains(old_stream) && !renamed_streams.contains(old_stream)) - { - rename_vector.emplace_back(old_stream + ".bin", ""); - rename_vector.emplace_back(old_stream + mrk_extension, ""); - } } return rename_vector;