From 096616bd1f5f105a760243dfaec5f4493bccabeb Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 28 Jun 2024 02:28:02 +0000 Subject: [PATCH] introduce mutations snapshot --- src/Interpreters/MutationsInterpreter.cpp | 17 ++- src/Interpreters/MutationsInterpreter.h | 6 +- .../optimizeUseAggregateProjection.cpp | 2 +- .../optimizeUseNormalProjection.cpp | 2 +- .../Optimizations/projectionsCommon.cpp | 9 +- src/Processors/QueryPlan/PartsSplitter.cpp | 1 - .../QueryPlan/ReadFromMergeTree.cpp | 24 ++-- src/Processors/QueryPlan/ReadFromMergeTree.h | 10 +- src/Storages/MergeTree/AlterConversions.cpp | 9 +- src/Storages/MergeTree/AlterConversions.h | 3 +- src/Storages/MergeTree/MergeTask.cpp | 14 ++- src/Storages/MergeTree/MergeTask.h | 2 + src/Storages/MergeTree/MergeTreeData.cpp | 77 ++++++++---- src/Storages/MergeTree/MergeTreeData.h | 46 +++++-- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 51 ++------ .../MergeTree/MergeTreeDataSelectExecutor.h | 8 +- .../MergeTree/MergeTreeMutationEntry.cpp | 12 +- .../MergeTree/MergeTreeMutationEntry.h | 2 +- .../MergeTree/MergeTreePrefetchedReadPool.cpp | 2 + .../MergeTree/MergeTreePrefetchedReadPool.h | 1 + src/Storages/MergeTree/MergeTreeReadPool.cpp | 2 + src/Storages/MergeTree/MergeTreeReadPool.h | 1 + .../MergeTree/MergeTreeReadPoolBase.cpp | 6 +- .../MergeTree/MergeTreeReadPoolBase.h | 4 + .../MergeTree/MergeTreeReadPoolInOrder.cpp | 2 + .../MergeTree/MergeTreeReadPoolInOrder.h | 1 + .../MergeTreeReadPoolParallelReplicas.cpp | 2 + .../MergeTreeReadPoolParallelReplicas.h | 1 + ...rgeTreeReadPoolParallelReplicasInOrder.cpp | 2 + ...MergeTreeReadPoolParallelReplicasInOrder.h | 1 + .../MergeTree/MergeTreeSequentialSource.cpp | 21 +++- .../MergeTree/MergeTreeSequentialSource.h | 2 + src/Storages/MergeTree/MutateTask.cpp | 27 ++-- src/Storages/MergeTree/RangesInDataPart.h | 6 +- .../MergeTree/ReplicatedMergeTreeLogEntry.h | 1 - .../MergeTree/ReplicatedMergeTreeQueue.cpp | 82 ++++++++---- .../MergeTree/ReplicatedMergeTreeQueue.h | 24 +++- .../MergeTree/StorageFromMergeTreeDataPart.h | 11 +- src/Storages/StorageMergeTree.cpp | 117 ++++++++++-------- src/Storages/StorageMergeTree.h | 24 +++- src/Storages/StorageReplicatedMergeTree.cpp | 6 +- src/Storages/StorageReplicatedMergeTree.h | 2 +- 42 files changed, 413 insertions(+), 230 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 6d3a4f30b34..fc15a20f992 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -145,6 +145,7 @@ ColumnDependencies getAllColumnDependencies( bool isStorageTouchedByMutations( MergeTreeData & storage, MergeTreeData::DataPartPtr source_part, + MergeTreeData::MutationsSnapshotPtr mutations_snapshot, const StorageMetadataPtr & metadata_snapshot, const std::vector & commands, ContextPtr context) @@ -181,7 +182,7 @@ bool isStorageTouchedByMutations( if (all_commands_can_be_skipped) return false; - auto storage_from_part = std::make_shared(source_part); + auto storage_from_part = std::make_shared(source_part, mutations_snapshot); std::optional interpreter_select_query; BlockIO io; @@ -283,8 +284,13 @@ MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(st { } -MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_) - : data(&storage_), part(std::move(source_part_)) +MutationsInterpreter::Source::Source( + MergeTreeData & storage_, + MergeTreeData::DataPartPtr source_part_, + AlterConversionsPtr alter_conversions_) + : data(&storage_) + , part(std::move(source_part_)) + , alter_conversions(std::move(alter_conversions_)) { } @@ -384,13 +390,14 @@ MutationsInterpreter::MutationsInterpreter( MutationsInterpreter::MutationsInterpreter( MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_, + AlterConversionsPtr alter_conversions_, StorageMetadataPtr metadata_snapshot_, MutationCommands commands_, Names available_columns_, ContextPtr context_, Settings settings_) : MutationsInterpreter( - Source(storage_, std::move(source_part_)), + Source(storage_, std::move(source_part_), std::move(alter_conversions_)), std::move(metadata_snapshot_), std::move(commands_), std::move(available_columns_), std::move(context_), std::move(settings_)) { @@ -1210,7 +1217,7 @@ void MutationsInterpreter::Source::read( createReadFromPartStep( MergeTreeSequentialSourceType::Mutation, plan, *data, storage_snapshot, - part, required_columns, + part, alter_conversions, required_columns, apply_deleted_mask_, filter, context_, getLogger("MutationsInterpreter")); } diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 6aaa233cda3..8ae438efc19 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -6,6 +6,7 @@ #include #include #include +#include "Storages/MergeTree/AlterConversions.h" namespace DB @@ -21,6 +22,7 @@ using QueryPipelineBuilderPtr = std::unique_ptr; bool isStorageTouchedByMutations( MergeTreeData & storage, MergeTreeData::DataPartPtr source_part, + MergeTreeData::MutationsSnapshotPtr mutations_snapshot, const StorageMetadataPtr & metadata_snapshot, const std::vector & commands, ContextPtr context @@ -71,6 +73,7 @@ public: MutationsInterpreter( MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_, + AlterConversionsPtr alter_conversions_, StorageMetadataPtr metadata_snapshot_, MutationCommands commands_, Names available_columns_, @@ -138,7 +141,7 @@ public: bool can_execute_) const; explicit Source(StoragePtr storage_); - Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_); + Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_, AlterConversionsPtr alter_conversions_); private: StoragePtr storage; @@ -146,6 +149,7 @@ public: /// Special case for *MergeTree. MergeTreeData * data = nullptr; MergeTreeData::DataPartPtr part; + AlterConversionsPtr alter_conversions; }; private: diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index 70327bc95b4..7e69734a7e5 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -764,7 +764,7 @@ std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, Qu projection_reading = reader.readFromParts( /* parts = */ {}, - /* alter_conversions = */ {}, + reading->getMutationsSnapshot()->cloneEmpty(), best_candidate->dag->getRequiredColumnsNames(), proj_snapshot, projection_query_info, diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp index 0af3869ccf1..43e60318004 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp @@ -199,7 +199,7 @@ std::optional optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod auto projection_reading = reader.readFromParts( /*parts=*/ {}, - /*alter_conversions=*/ {}, + reading->getMutationsSnapshot()->cloneEmpty(), required_columns, proj_snapshot, query_info_copy, diff --git a/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp b/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp index af1578d6af8..e6939581b9e 100644 --- a/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp +++ b/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp @@ -217,20 +217,15 @@ bool analyzeProjectionCandidate( { MergeTreeData::DataPartsVector projection_parts; MergeTreeData::DataPartsVector normal_parts; - std::vector alter_conversions; + for (const auto & part_with_ranges : parts_with_ranges) { const auto & created_projections = part_with_ranges.data_part->getProjectionParts(); auto it = created_projections.find(candidate.projection->name); if (it != created_projections.end() && !it->second->is_broken) - { projection_parts.push_back(it->second); - } else - { normal_parts.push_back(part_with_ranges.data_part); - alter_conversions.push_back(part_with_ranges.alter_conversions); - } } if (projection_parts.empty()) @@ -255,7 +250,7 @@ bool analyzeProjectionCandidate( if (!normal_parts.empty()) { /// TODO: We can reuse existing analysis_result by filtering out projection parts - auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions)); + auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts)); if (normal_result_ptr->selected_marks != 0) { diff --git a/src/Processors/QueryPlan/PartsSplitter.cpp b/src/Processors/QueryPlan/PartsSplitter.cpp index ed4b1906635..65f1c89f990 100644 --- a/src/Processors/QueryPlan/PartsSplitter.cpp +++ b/src/Processors/QueryPlan/PartsSplitter.cpp @@ -229,7 +229,6 @@ public: { ranges_in_data_parts.emplace_back( initial_ranges_in_data_parts[part_index].data_part, - initial_ranges_in_data_parts[part_index].alter_conversions, initial_ranges_in_data_parts[part_index].part_index_in_query, MarkRanges{mark_range}); part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index aba3f6ff2da..e958430ff16 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -262,7 +262,7 @@ void ReadFromMergeTree::AnalysisResult::checkLimits(const Settings & settings, c ReadFromMergeTree::ReadFromMergeTree( MergeTreeData::DataPartsVector parts_, - std::vector alter_conversions_, + MergeTreeData::MutationsSnapshotPtr mutations_, Names all_column_names_, const MergeTreeData & data_, const SelectQueryInfo & query_info_, @@ -279,7 +279,7 @@ ReadFromMergeTree::ReadFromMergeTree( query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_) , reader_settings(getMergeTreeReaderSettings(context_, query_info_)) , prepared_parts(std::move(parts_)) - , alter_conversions_for_parts(std::move(alter_conversions_)) + , mutations_snapshot(std::move(mutations_)) , all_column_names(std::move(all_column_names_)) , data(data_) , actions_settings(ExpressionActionsSettings::fromContext(context_)) @@ -361,6 +361,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas( auto pool = std::make_shared( std::move(extension), std::move(parts_with_range), + mutations_snapshot, shared_virtual_fields, storage_snapshot, prewhere_info, @@ -442,6 +443,7 @@ Pipe ReadFromMergeTree::readFromPool( { pool = std::make_shared( std::move(parts_with_range), + mutations_snapshot, shared_virtual_fields, storage_snapshot, prewhere_info, @@ -455,6 +457,7 @@ Pipe ReadFromMergeTree::readFromPool( { pool = std::make_shared( std::move(parts_with_range), + mutations_snapshot, shared_virtual_fields, storage_snapshot, prewhere_info, @@ -535,6 +538,7 @@ Pipe ReadFromMergeTree::readInOrder( std::move(extension), mode, parts_with_ranges, + mutations_snapshot, shared_virtual_fields, storage_snapshot, prewhere_info, @@ -550,6 +554,7 @@ Pipe ReadFromMergeTree::readInOrder( has_limit_below_one_block, read_type, parts_with_ranges, + mutations_snapshot, shared_virtual_fields, storage_snapshot, prewhere_info, @@ -1016,7 +1021,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( } ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction); - new_parts.emplace_back(part.data_part, part.alter_conversions, part.part_index_in_query, std::move(ranges_to_get_from_part)); + new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part)); } splitted_parts_and_ranges.emplace_back(std::move(new_parts)); @@ -1243,7 +1248,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( RangesInDataParts new_parts; for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) - new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges); + new_parts.emplace_back(part_it->data_part, part_it->part_index_in_query, part_it->ranges); if (new_parts.empty()) continue; @@ -1356,15 +1361,13 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(bool find_exact_ranges) const { - return selectRangesToRead(prepared_parts, alter_conversions_for_parts, find_exact_ranges); + return selectRangesToRead(prepared_parts, find_exact_ranges); } -ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead( - MergeTreeData::DataPartsVector parts, std::vector alter_conversions, bool find_exact_ranges) const +ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts, bool find_exact_ranges) const { return selectRangesToRead( std::move(parts), - std::move(alter_conversions), metadata_for_reading, query_info, context, @@ -1534,7 +1537,6 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes) ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead( MergeTreeData::DataPartsVector parts, - std::vector alter_conversions, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info_, ContextPtr context_, @@ -1596,10 +1598,9 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead( { MergeTreeDataSelectExecutor::filterPartsByPartition( + parts, indexes->partition_pruner, indexes->minmax_idx_condition, - parts, - alter_conversions, indexes->part_values, metadata_snapshot, data, @@ -1628,7 +1629,6 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead( auto reader_settings = getMergeTreeReaderSettings(context_, query_info_); result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes( std::move(parts), - std::move(alter_conversions), metadata_snapshot, context_, indexes->key_condition, diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index caa8aa2e1bd..57e19441b82 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -110,7 +110,7 @@ public: ReadFromMergeTree( MergeTreeData::DataPartsVector parts_, - std::vector alter_conversions_, + MergeTreeData::MutationsSnapshotPtr mutations_snapshot_, Names all_column_names_, const MergeTreeData & data_, const SelectQueryInfo & query_info_, @@ -154,7 +154,6 @@ public: static AnalysisResultPtr selectRangesToRead( MergeTreeData::DataPartsVector parts, - std::vector alter_conversions, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, ContextPtr context, @@ -166,8 +165,7 @@ public: std::optional & indexes, bool find_exact_ranges); - AnalysisResultPtr selectRangesToRead( - MergeTreeData::DataPartsVector parts, std::vector alter_conversions, bool find_exact_ranges = false) const; + AnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts, bool find_exact_ranges = false) const; AnalysisResultPtr selectRangesToRead(bool find_exact_ranges = false) const; @@ -188,7 +186,7 @@ public: void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); } const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; } - const std::vector & getAlterConvertionsForParts() const { return alter_conversions_for_parts; } + MergeTreeData::MutationsSnapshotPtr getMutationsSnapshot() const { return mutations_snapshot; } const MergeTreeData & getMergeTreeData() const { return data; } size_t getMaxBlockSize() const { return block_size.max_block_size_rows; } @@ -209,7 +207,7 @@ private: MergeTreeReaderSettings reader_settings; MergeTreeData::DataPartsVector prepared_parts; - std::vector alter_conversions_for_parts; + MergeTreeData::MutationsSnapshotPtr mutations_snapshot; Names all_column_names; diff --git a/src/Storages/MergeTree/AlterConversions.cpp b/src/Storages/MergeTree/AlterConversions.cpp index 31f8f17e2c1..82bef500b34 100644 --- a/src/Storages/MergeTree/AlterConversions.cpp +++ b/src/Storages/MergeTree/AlterConversions.cpp @@ -9,9 +9,14 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -bool AlterConversions::supportsMutationCommandType(MutationCommand::Type t) +bool AlterConversions::isSupportedDataMutation(MutationCommand::Type) { - return t == MutationCommand::Type::RENAME_COLUMN; + return false; +} + +bool AlterConversions::isSupportedMetadataMutation(MutationCommand::Type type) +{ + return type == MutationCommand::Type::RENAME_COLUMN; } void AlterConversions::addMutationCommand(const MutationCommand & command) diff --git a/src/Storages/MergeTree/AlterConversions.h b/src/Storages/MergeTree/AlterConversions.h index 0f857d351dd..68966f88f84 100644 --- a/src/Storages/MergeTree/AlterConversions.h +++ b/src/Storages/MergeTree/AlterConversions.h @@ -35,7 +35,8 @@ public: /// Get column old name before rename (lookup by key in rename_map) std::string getColumnOldName(const std::string & new_name) const; - static bool supportsMutationCommandType(MutationCommand::Type); + static bool isSupportedDataMutation(MutationCommand::Type type); + static bool isSupportedMetadataMutation(MutationCommand::Type type); private: /// Rename map new_name -> old_name. diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 7ab8fa2430a..5dab0cd0c08 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -257,6 +257,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() if (enabledBlockOffsetColumn(global_ctx)) addGatheringColumn(global_ctx, BlockOffsetColumn::name, BlockOffsetColumn::type); + auto mutations_snapshot = global_ctx->data->getMutationsSnapshot( + global_ctx->metadata_snapshot->getMetadataVersion(), + /*need_data_mutations=*/ false); + SerializationInfo::Settings info_settings = { .ratio_of_defaults_for_sparse = global_ctx->data->getSettings()->ratio_of_defaults_for_sparse_serialization, @@ -264,10 +268,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() }; SerializationInfoByName infos(global_ctx->storage_columns, info_settings); + global_ctx->alter_conversions.reserve(global_ctx->future_part->parts.size()); for (const auto & part : global_ctx->future_part->parts) { global_ctx->new_data_part->ttl_infos.update(part->ttl_infos); + if (global_ctx->metadata_snapshot->hasAnyTTL() && !part->checkAllTTLCalculated(global_ctx->metadata_snapshot)) { LOG_INFO(ctx->log, "Some TTL values were not calculated for part {}. Will calculate them forcefully during merge.", part->name); @@ -288,6 +294,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() infos.add(part_infos); } + + global_ctx->alter_conversions.push_back(MergeTreeData::getAlterConversionsForPart(part, mutations_snapshot)); } const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl; @@ -604,6 +612,7 @@ Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String & *global_ctx->data, global_ctx->storage_snapshot, global_ctx->future_part->parts[part_num], + global_ctx->alter_conversions[part_num], Names{column_name}, /*mark_ranges=*/ {}, global_ctx->input_rows_filtered, @@ -996,13 +1005,14 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() global_ctx->horizontal_stage_progress = std::make_unique( ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0); - for (const auto & part : global_ctx->future_part->parts) + for (size_t i = 0; i < global_ctx->future_part->parts.size(); ++i) { Pipe pipe = createMergeTreeSequentialSource( MergeTreeSequentialSourceType::Merge, *global_ctx->data, global_ctx->storage_snapshot, - part, + global_ctx->future_part->parts[i], + global_ctx->alter_conversions[i], global_ctx->merging_columns.getNames(), /*mark_ranges=*/ {}, global_ctx->input_rows_filtered, diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 56909d1b7a0..c394a47aff0 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -4,6 +4,7 @@ #include #include +#include "Storages/MergeTree/AlterConversions.h" #include #include @@ -154,6 +155,7 @@ private: StorageSnapshotPtr storage_snapshot{nullptr}; StorageMetadataPtr metadata_snapshot{nullptr}; FutureMergedMutatedPartPtr future_part{nullptr}; + std::vector alter_conversions; /// This will be either nullptr or new_data_part, so raw pointer is ok. IMergeTreeDataPart * parent_part{nullptr}; ContextPtr context{nullptr}; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index f9cc65871fe..98c308f5fd1 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -8115,11 +8115,13 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S return true; } -AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(MergeTreeDataPartPtr part) const +AlterConversionsPtr MergeTreeData::getAlterConversionsForPart( + const MergeTreeDataPartPtr & part, + const MutationsSnapshotPtr & snapshot) { - auto commands = getAlterMutationCommandsForPart(part); - + auto commands = snapshot->getAlterMutationCommandsForPart(part); auto result = std::make_shared(); + for (const auto & command : commands | std::views::reverse) result->addMutationCommand(command); @@ -8427,9 +8429,9 @@ StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr & object_columns_copy = object_columns; } - snapshot_data->alter_conversions.reserve(snapshot_data->parts.size()); - for (const auto & part : snapshot_data->parts) - snapshot_data->alter_conversions.push_back(getAlterConversionsForPart(part)); + snapshot_data->mutations_snapshot = getMutationsSnapshot( + metadata_snapshot->getMetadataVersion(), + query_context->getSettingsRef().apply_mutations_on_fly); return std::make_shared(*this, metadata_snapshot, std::move(object_columns_copy), std::move(snapshot_data)); } @@ -8616,28 +8618,59 @@ void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key) } } -bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic & alter_conversions_mutations, bool remove) +static void updateMutationsCounters( + Int64 & data_mutations_to_apply, + Int64 & metadata_mutations_to_apply, + const MutationCommands & commands, + Int64 increment) { + if (data_mutations_to_apply < 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", data_mutations_to_apply); + + if (metadata_mutations_to_apply < 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", metadata_mutations_to_apply); + + bool has_data_mutation = false; + bool has_metadata_mutation = false; + for (const auto & command : commands) { - if (AlterConversions::supportsMutationCommandType(command.type)) + if (!has_data_mutation && AlterConversions::isSupportedDataMutation(command.type)) { - if (remove) - { - --alter_conversions_mutations; - if (alter_conversions_mutations < 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations); - } - else - { - if (alter_conversions_mutations < 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations); - ++alter_conversions_mutations; - } - return true; + data_mutations_to_apply += increment; + has_data_mutation = true; + + if (data_mutations_to_apply < 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", data_mutations_to_apply); + } + + if (!has_metadata_mutation && AlterConversions::isSupportedMetadataMutation(command.type)) + { + metadata_mutations_to_apply += increment; + has_metadata_mutation = true; + + if (metadata_mutations_to_apply < 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", metadata_mutations_to_apply); } } - return false; +} + +void incrementMutationsCounters( + Int64 & data_mutations_to_apply, + Int64 & metadata_mutations_to_apply, + const MutationCommands & commands, + std::lock_guard & /*lock*/) +{ + return updateMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, 1); +} + +void decrementMutationsCounters( + Int64 & data_mutations_to_apply, + Int64 & metadata_mutations_to_apply, + const MutationCommands & commands, + std::lock_guard & /*lock*/) +{ + return updateMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, -1); } } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index c6f736a4afd..765b68b7559 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -35,6 +35,7 @@ #include #include #include +#include "Storages/ProjectionsDescription.h" #include #include @@ -445,12 +446,27 @@ public: bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override; + struct IMutationsSnapshot + { + /// Return pending mutations that weren't applied to `part` yet and should be applied on the fly + /// (i.e. when reading from the part). Mutations not supported by AlterConversions + /// (supportsMutationCommandType()) can be omitted. + /// + /// @return list of mutations, in *reverse* order (newest to oldest) + virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0; + virtual std::shared_ptr cloneEmpty() const = 0; + + virtual ~IMutationsSnapshot() = default; + }; + + using MutationsSnapshotPtr = std::shared_ptr; + /// Snapshot for MergeTree contains the current set of data parts - /// at the moment of the start of query. + /// and mutations required to be applied at the moment of the start of query. struct SnapshotData : public StorageSnapshot::Data { DataPartsVector parts; - std::vector alter_conversions; + MutationsSnapshotPtr mutations_snapshot; }; StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override; @@ -934,8 +950,13 @@ public: Disks getDisks() const { return getStoragePolicy()->getDisks(); } + /// TODO: comment + virtual MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const = 0; + /// Return alter conversions for part which must be applied on fly. - AlterConversionsPtr getAlterConversionsForPart(MergeTreeDataPartPtr part) const; + static AlterConversionsPtr getAlterConversionsForPart( + const MergeTreeDataPartPtr & part, + const MutationsSnapshotPtr & snapshot); /// Returns destination disk or volume for the TTL rule according to current storage policy. SpacePtr getDestinationForMoveTTL(const TTLDescription & move_ttl) const; @@ -1448,13 +1469,6 @@ protected: /// mechanisms for parts locking virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0; - /// Return pending mutations that weren't applied to `part` yet and should be applied on the fly - /// (i.e. when reading from the part). Mutations not supported by AlterConversions - /// (supportsMutationCommandType()) can be omitted. - /// - /// @return list of mutations, in *reverse* order (newest to oldest) - virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0; - struct PartBackupEntries { String part_name; @@ -1738,6 +1752,16 @@ struct CurrentlySubmergingEmergingTagger /// Look at MutationCommands if it contains mutations for AlterConversions, update the counter. /// Return true if the counter had been updated -bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic & alter_conversions_mutations, bool remove); +void incrementMutationsCounters( + Int64 & data_mutations_to_apply, + Int64 & metadata_mutations_to_apply, + const MutationCommands & commands, + std::lock_guard & lock); + +void decrementMutationsCounters( + Int64 & data_mutations_to_apply, + Int64 & metadata_mutations_to_apply, + const MutationCommands & commands, + std::lock_guard & lock); } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 2e287ff3042..0ad7bd47648 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -130,12 +130,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( bool enable_parallel_reading) const { const auto & snapshot_data = assert_cast(*storage_snapshot->data); - const auto & parts = snapshot_data.parts; - const auto & alter_conversions = snapshot_data.alter_conversions; auto step = readFromParts( - parts, - alter_conversions, + snapshot_data.parts, + snapshot_data.mutations_snapshot, column_names_to_return, storage_snapshot, query_info, @@ -491,10 +489,9 @@ std::optional> MergeTreeDataSelectExecutor::filterPar } void MergeTreeDataSelectExecutor::filterPartsByPartition( + MergeTreeData::DataPartsVector & parts, const std::optional & partition_pruner, const std::optional & minmax_idx_condition, - MergeTreeData::DataPartsVector & parts, - std::vector & alter_conversions, const std::optional> & part_values, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData & data, @@ -503,8 +500,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( LoggerPtr log, ReadFromMergeTree::IndexStats & index_stats) { - chassert(alter_conversions.empty() || parts.size() == alter_conversions.size()); - const Settings & settings = context->getSettingsRef(); DataTypes minmax_columns_types; @@ -528,7 +523,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( if (query_context->getSettingsRef().allow_experimental_query_deduplication) selectPartsToReadWithUUIDFilter( parts, - alter_conversions, part_values, data.getPinnedPartUUIDs(), minmax_idx_condition, @@ -541,7 +535,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( else selectPartsToRead( parts, - alter_conversions, part_values, minmax_idx_condition, minmax_columns_types, @@ -580,7 +573,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes( MergeTreeData::DataPartsVector && parts, - std::vector && alter_conversions, StorageMetadataPtr metadata_snapshot, const ContextPtr & context, const KeyCondition & key_condition, @@ -593,8 +585,6 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd bool use_skip_indexes, bool find_exact_ranges) { - chassert(alter_conversions.empty() || parts.size() == alter_conversions.size()); - RangesInDataParts parts_with_ranges; parts_with_ranges.resize(parts.size()); const Settings & settings = context->getSettingsRef(); @@ -653,11 +643,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd auto process_part = [&](size_t part_index) { auto & part = parts[part_index]; - auto alter_conversions_for_part = !alter_conversions.empty() - ? alter_conversions[part_index] - : std::make_shared(); - RangesInDataPart ranges(part, alter_conversions_for_part, part_index); + RangesInDataPart ranges(part, part_index); size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal(); if (metadata_snapshot->hasPrimaryKey() || part_offset_condition) @@ -907,11 +894,11 @@ ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar return std::make_shared(); std::optional indexes; - /// NOTE: We don't need alter_conversions because the returned analysis_result is only used for: - /// 1. estimate the number of rows to read; 2. projection reading, which doesn't have alter_conversions. + /// NOTE: We don't need mutations snapshot because the returned analysis_result is only used for: + /// 1. estimate the number of rows to read; + /// 2. projection reading, which doesn't have alter conversions. return ReadFromMergeTree::selectRangesToRead( std::move(parts), - /*alter_conversions=*/{}, metadata_snapshot, query_info, context, @@ -926,7 +913,7 @@ ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts( MergeTreeData::DataPartsVector parts, - std::vector alter_conversions, + MergeTreeData::MutationsSnapshotPtr mutations_snapshot, const Names & column_names_to_return, const StorageSnapshotPtr & storage_snapshot, const SelectQueryInfo & query_info, @@ -948,7 +935,7 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts( return std::make_unique( std::move(parts), - std::move(alter_conversions), + std::move(mutations_snapshot), column_names_to_return, data, query_info, @@ -1546,7 +1533,6 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex( void MergeTreeDataSelectExecutor::selectPartsToRead( MergeTreeData::DataPartsVector & parts, - std::vector & alter_conversions, const std::optional> & part_values, const std::optional & minmax_idx_condition, const DataTypes & minmax_columns_types, @@ -1555,10 +1541,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead( PartFilterCounters & counters) { MergeTreeData::DataPartsVector prev_parts; - std::vector prev_conversions; - std::swap(prev_parts, parts); - std::swap(prev_conversions, alter_conversions); for (size_t i = 0; i < prev_parts.size(); ++i) { @@ -1600,14 +1583,11 @@ void MergeTreeDataSelectExecutor::selectPartsToRead( counters.num_granules_after_partition_pruner += num_granules; parts.push_back(prev_parts[i]); - if (!prev_conversions.empty()) - alter_conversions.push_back(prev_conversions[i]); } } void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( MergeTreeData::DataPartsVector & parts, - std::vector & alter_conversions, const std::optional> & part_values, MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids, const std::optional & minmax_idx_condition, @@ -1620,18 +1600,13 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( { /// process_parts prepare parts that have to be read for the query, /// returns false if duplicated parts' UUID have been met - auto select_parts = [&] ( - MergeTreeData::DataPartsVector & selected_parts, - std::vector & selected_conversions) -> bool + auto select_parts = [&](MergeTreeData::DataPartsVector & selected_parts) -> bool { auto ignored_part_uuids = query_context->getIgnoredPartUUIDs(); std::unordered_set temp_part_uuids; MergeTreeData::DataPartsVector prev_parts; - std::vector prev_conversions; - std::swap(prev_parts, selected_parts); - std::swap(prev_conversions, selected_conversions); for (size_t i = 0; i < prev_parts.size(); ++i) { @@ -1686,8 +1661,6 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( } selected_parts.push_back(prev_parts[i]); - if (!prev_conversions.empty()) - selected_conversions.push_back(prev_conversions[i]); } if (!temp_part_uuids.empty()) @@ -1706,7 +1679,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( }; /// Process parts that have to be read for a query. - auto needs_retry = !select_parts(parts, alter_conversions); + auto needs_retry = !select_parts(parts); /// If any duplicated part UUIDs met during the first step, try to ignore them in second pass. /// This may happen when `prefer_localhost_replica` is set and "distributed" stage runs in the same process with "remote" stage. @@ -1717,7 +1690,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( counters = PartFilterCounters(); /// Second attempt didn't help, throw an exception - if (!select_parts(parts, alter_conversions)) + if (!select_parts(parts)) throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate UUIDs while processing query."); } } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 788355c1e59..0d02456e480 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -40,7 +40,7 @@ public: /// The same as read, but with specified set of parts. QueryPlanStepPtr readFromParts( MergeTreeData::DataPartsVector parts, - std::vector alter_conversions, + MergeTreeData::MutationsSnapshotPtr mutations_snapshot, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, const SelectQueryInfo & query_info, @@ -120,7 +120,6 @@ private: /// as well as `max_block_number_to_read`. static void selectPartsToRead( MergeTreeData::DataPartsVector & parts, - std::vector & alter_conversions, const std::optional> & part_values, const std::optional & minmax_idx_condition, const DataTypes & minmax_columns_types, @@ -131,7 +130,6 @@ private: /// Same as previous but also skip parts uuids if any to the query context, or skip parts which uuids marked as excluded. static void selectPartsToReadWithUUIDFilter( MergeTreeData::DataPartsVector & parts, - std::vector & alter_conversions, const std::optional> & part_values, MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids, const std::optional & minmax_idx_condition, @@ -175,10 +173,9 @@ public: /// Filter parts using minmax index and partition key. static void filterPartsByPartition( + MergeTreeData::DataPartsVector & parts, const std::optional & partition_pruner, const std::optional & minmax_idx_condition, - MergeTreeData::DataPartsVector & parts, - std::vector & alter_conversions, const std::optional> & part_values, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData & data, @@ -192,7 +189,6 @@ public: /// If 'check_limits = true' it will throw exception if the amount of data exceed the limits from settings. static RangesInDataParts filterPartsByPrimaryKeyAndSkipIndexes( MergeTreeData::DataPartsVector && parts, - std::vector && alter_conversions, StorageMetadataPtr metadata_snapshot, const ContextPtr & context, const KeyCondition & key_condition, diff --git a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp index 4dbccb91620..6f06b921031 100644 --- a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp +++ b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp @@ -6,6 +6,7 @@ #include #include #include +#include "Storages/MutationCommands.h" #include @@ -50,7 +51,7 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_) MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number, const TransactionID & tid_, const WriteSettings & settings) : create_time(time(nullptr)) - , commands(std::move(commands_)) + , commands(std::make_shared(std::move(commands_))) , disk(std::move(disk_)) , path_prefix(path_prefix_) , file_name("tmp_mutation_" + toString(tmp_number) + ".txt") @@ -63,7 +64,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP *out << "format version: 1\n" << "create time: " << LocalDateTime(create_time, DateLUT::serverTimezoneInstance()) << "\n"; *out << "commands: "; - commands.writeText(*out, /* with_pure_metadata_commands = */ false); + commands->writeText(*out, /* with_pure_metadata_commands = */ false); *out << "\n"; if (tid.isPrehistoric()) { @@ -116,7 +117,8 @@ void MergeTreeMutationEntry::writeCSN(CSN csn_) } MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & path_prefix_, const String & file_name_) - : disk(std::move(disk_)) + : commands(std::make_shared()) + , disk(std::move(disk_)) , path_prefix(path_prefix_) , file_name(file_name_) , is_temp(false) @@ -133,7 +135,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat create_time_dt.hour(), create_time_dt.minute(), create_time_dt.second()); *buf >> "commands: "; - commands.readText(*buf); + commands->readText(*buf); *buf >> "\n"; if (buf->eof()) @@ -177,7 +179,7 @@ std::shared_ptr MergeTreeMutationEntry::backup() const out << "block number: " << block_number << "\n"; out << "commands: "; - commands.writeText(out, /* with_pure_metadata_commands = */ false); + commands->writeText(out, /* with_pure_metadata_commands = */ false); out << "\n"; return std::make_shared(out.str()); diff --git a/src/Storages/MergeTree/MergeTreeMutationEntry.h b/src/Storages/MergeTree/MergeTreeMutationEntry.h index 04297f2852a..f41ad2a17f8 100644 --- a/src/Storages/MergeTree/MergeTreeMutationEntry.h +++ b/src/Storages/MergeTree/MergeTreeMutationEntry.h @@ -16,7 +16,7 @@ class IBackupEntry; struct MergeTreeMutationEntry { time_t create_time = 0; - MutationCommands commands; + std::shared_ptr commands; DiskPtr disk; String path_prefix; diff --git a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp index 2c249f7b63b..2aaf06fde7f 100644 --- a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp @@ -84,6 +84,7 @@ MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetchedReaders::get() MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool( RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -94,6 +95,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool( const ContextPtr & context_) : MergeTreeReadPoolBase( std::move(parts_), + std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h index a3a57227630..65a7d62ad2d 100644 --- a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h +++ b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h @@ -19,6 +19,7 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo public: MergeTreePrefetchedReadPool( RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index dc1ba030f45..d45e2e9c578 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -35,6 +35,7 @@ size_t getApproxSizeOfPart(const IMergeTreeDataPart & part, const Names & column MergeTreeReadPool::MergeTreeReadPool( RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -45,6 +46,7 @@ MergeTreeReadPool::MergeTreeReadPool( const ContextPtr & context_) : MergeTreeReadPoolBase( std::move(parts_), + std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPool.h b/src/Storages/MergeTree/MergeTreeReadPool.h index cb0e8a9657f..d7b354a2799 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.h +++ b/src/Storages/MergeTree/MergeTreeReadPool.h @@ -26,6 +26,7 @@ public: MergeTreeReadPool( RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp b/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp index 0ea19370d45..2935890cba5 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp @@ -13,6 +13,7 @@ namespace ErrorCodes MergeTreeReadPoolBase::MergeTreeReadPoolBase( RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -22,6 +23,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase( const PoolSettings & pool_settings_, const ContextPtr & context_) : parts_ranges(std::move(parts_)) + , mutations_snapshot(std::move(mutations_snapshot_)) , shared_virtual_fields(std::move(shared_virtual_fields_)) , storage_snapshot(storage_snapshot_) , prewhere_info(prewhere_info_) @@ -67,9 +69,9 @@ void MergeTreeReadPoolBase::fillPerPartInfos() } read_task_info.part_index_in_query = part_with_ranges.part_index_in_query; - read_task_info.alter_conversions = part_with_ranges.alter_conversions; + read_task_info.alter_conversions = MergeTreeData::getAlterConversionsForPart(part_with_ranges.data_part, mutations_snapshot); - LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, part_with_ranges.alter_conversions); + LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, read_task_info.alter_conversions); read_task_info.task_columns = getReadTaskColumns( part_info, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolBase.h b/src/Storages/MergeTree/MergeTreeReadPoolBase.h index 1b5bfec5898..8286ff52a5c 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolBase.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolBase.h @@ -9,6 +9,8 @@ namespace DB class MergeTreeReadPoolBase : public IMergeTreeReadPool { public: + using MutationsSnapshotPtr = MergeTreeData::MutationsSnapshotPtr; + struct PoolSettings { size_t threads = 0; @@ -23,6 +25,7 @@ public: MergeTreeReadPoolBase( RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -37,6 +40,7 @@ public: protected: /// Initialized in constructor const RangesInDataParts parts_ranges; + const MutationsSnapshotPtr mutations_snapshot; const VirtualFields shared_virtual_fields; const StorageSnapshotPtr storage_snapshot; const PrewhereInfoPtr prewhere_info; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp index 4c0391ffa57..60f127acdae 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp @@ -12,6 +12,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder( bool has_limit_below_one_block_, MergeTreeReadType read_type_, RangesInDataParts parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -22,6 +23,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder( const ContextPtr & context_) : MergeTreeReadPoolBase( std::move(parts_), + std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h index 9fedf396a6b..a3668acb170 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h @@ -11,6 +11,7 @@ public: bool has_limit_below_one_block_, MergeTreeReadType read_type_, RangesInDataParts parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index 38035d97f56..0d615fae443 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -13,6 +13,7 @@ namespace ErrorCodes MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( ParallelReadingExtension extension_, RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -23,6 +24,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( const ContextPtr & context_) : MergeTreeReadPoolBase( std::move(parts_), + std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h index ca159edb91c..d9d628b8be2 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h @@ -11,6 +11,7 @@ public: MergeTreeReadPoolParallelReplicas( ParallelReadingExtension extension_, RangesInDataParts && parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp index 01c0a9f91be..b1f9ffc8ea4 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp @@ -12,6 +12,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd ParallelReadingExtension extension_, CoordinationMode mode_, RangesInDataParts parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -22,6 +23,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd const ContextPtr & context_) : MergeTreeReadPoolBase( std::move(parts_), + std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h index 4fe3f7a699c..7c549ed3c4a 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h @@ -12,6 +12,7 @@ public: ParallelReadingExtension extension_, CoordinationMode mode_, RangesInDataParts parts_, + MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 02f8d6f4f6a..bd4aa066dfd 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -13,6 +13,7 @@ #include #include #include +#include "Storages/MergeTree/AlterConversions.h" #include namespace DB @@ -38,6 +39,7 @@ public: const MergeTreeData & storage_, const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, + AlterConversionsPtr alter_conversions_, Names columns_to_read_, std::optional mark_ranges_, bool apply_deleted_mask, @@ -62,6 +64,9 @@ private: /// Data part will not be removed if the pointer owns it MergeTreeData::DataPartPtr data_part; + /// TODO: comment. + AlterConversionsPtr alter_conversions; + /// Columns we have to read (each Block from read will contain them) Names columns_to_read; @@ -91,6 +96,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( const MergeTreeData & storage_, const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, + AlterConversionsPtr alter_conversions_, Names columns_to_read_, std::optional mark_ranges_, bool apply_deleted_mask, @@ -100,6 +106,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( , storage(storage_) , storage_snapshot(storage_snapshot_) , data_part(std::move(data_part_)) + , alter_conversions(std::move(alter_conversions_)) , columns_to_read(std::move(columns_to_read_)) , read_with_direct_io(read_with_direct_io_) , mark_ranges(std::move(mark_ranges_)) @@ -113,8 +120,6 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part", data_part->getMarksCount(), data_part->name, data_part->rows_count); - auto alter_conversions = storage.getAlterConversionsForPart(data_part); - /// Note, that we don't check setting collaborate_with_coordinator presence, because this source /// is only used in background merges. addTotalRowsApprox(data_part->rows_count); @@ -300,6 +305,7 @@ Pipe createMergeTreeSequentialSource( const MergeTreeData & storage, const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, + AlterConversionsPtr alter_conversions, Names columns_to_read, std::optional mark_ranges, std::shared_ptr> filtered_rows_count, @@ -316,7 +322,8 @@ Pipe createMergeTreeSequentialSource( columns_to_read.emplace_back(RowExistsColumn::name); auto column_part_source = std::make_shared(type, - storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges), + storage, storage_snapshot, data_part, alter_conversions, + columns_to_read, std::move(mark_ranges), /*apply_deleted_mask=*/ false, read_with_direct_io, prefetch); Pipe pipe(std::move(column_part_source)); @@ -347,6 +354,7 @@ public: const MergeTreeData & storage_, const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, + AlterConversionsPtr alter_conversions_, Names columns_to_read_, bool apply_deleted_mask_, ActionsDAGPtr filter_, @@ -357,6 +365,7 @@ public: , storage(storage_) , storage_snapshot(storage_snapshot_) , data_part(std::move(data_part_)) + , alter_conversions(std::move(alter_conversions_)) , columns_to_read(std::move(columns_to_read_)) , apply_deleted_mask(apply_deleted_mask_) , filter(std::move(filter_)) @@ -400,6 +409,7 @@ public: storage, storage_snapshot, data_part, + alter_conversions, columns_to_read, std::move(mark_ranges), /*filtered_rows_count=*/ nullptr, @@ -415,6 +425,7 @@ private: const MergeTreeData & storage; StorageSnapshotPtr storage_snapshot; MergeTreeData::DataPartPtr data_part; + AlterConversionsPtr alter_conversions; Names columns_to_read; bool apply_deleted_mask; ActionsDAGPtr filter; @@ -428,6 +439,7 @@ void createReadFromPartStep( const MergeTreeData & storage, const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, + AlterConversionsPtr alter_conversions, Names columns_to_read, bool apply_deleted_mask, ActionsDAGPtr filter, @@ -435,7 +447,8 @@ void createReadFromPartStep( LoggerPtr log) { auto reading = std::make_unique(type, - storage, storage_snapshot, std::move(data_part), + storage, storage_snapshot, + std::move(data_part), std::move(alter_conversions), std::move(columns_to_read), apply_deleted_mask, filter, std::move(context), log); diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index e6f055f776c..1ecc721d4a8 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -21,6 +21,7 @@ Pipe createMergeTreeSequentialSource( const MergeTreeData & storage, const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, + AlterConversionsPtr alter_conversions, Names columns_to_read, std::optional mark_ranges, std::shared_ptr> filtered_rows_count, @@ -36,6 +37,7 @@ void createReadFromPartStep( const MergeTreeData & storage, const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, + AlterConversionsPtr alter_conversions, Names columns_to_read, bool apply_deleted_mask, ActionsDAGPtr filter, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index a552ee89aee..3c4ef44dbd8 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -29,6 +29,7 @@ #include #include #include +#include "Storages/MergeTree/AlterConversions.h" #include @@ -104,6 +105,7 @@ static UInt64 getExistingRowsCount(const Block & block) static void splitAndModifyMutationCommands( MergeTreeData::DataPartPtr part, StorageMetadataPtr metadata_snapshot, + AlterConversionsPtr alter_conversions, const MutationCommands & commands, MutationCommands & for_interpreter, MutationCommands & for_file_renames, @@ -169,8 +171,6 @@ static void splitAndModifyMutationCommands( } - auto alter_conversions = part->storage.getAlterConversionsForPart(part); - /// We don't add renames from commands, instead we take them from rename_map. /// It's important because required renames depend not only on part's data version (i.e. mutation version) /// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter @@ -286,7 +286,6 @@ static void splitAndModifyMutationCommands( } } - auto alter_conversions = part->storage.getAlterConversionsForPart(part); /// We don't add renames from commands, instead we take them from rename_map. /// It's important because required renames depend not only on part's data version (i.e. mutation version) /// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter @@ -2119,6 +2118,14 @@ bool MutateTask::prepare() ctx->num_mutations = std::make_unique(CurrentMetrics::PartMutation); + auto mutations_snapshot = ctx->data->getMutationsSnapshot( + ctx->metadata_snapshot->getMetadataVersion(), + /*need_data_mutations=*/ false); + + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + ctx->source_part, + mutations_snapshot); + auto context_for_reading = Context::createCopy(ctx->context); /// Allow mutations to work when force_index_by_date or force_primary_key is on. @@ -2133,7 +2140,7 @@ bool MutateTask::prepare() ctx->commands_for_part.emplace_back(command); if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations( - *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, context_for_reading)) + *ctx->data, ctx->source_part, mutations_snapshot, ctx->metadata_snapshot, ctx->commands_for_part, context_for_reading)) { NameSet files_to_copy_instead_of_hardlinks; auto settings_ptr = ctx->data->getSettings(); @@ -2192,8 +2199,13 @@ bool MutateTask::prepare() context_for_reading->setSetting("read_from_filesystem_cache_if_exists_otherwise_bypass_cache", 1); MutationHelpers::splitAndModifyMutationCommands( - ctx->source_part, ctx->metadata_snapshot, - ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames, ctx->log); + ctx->source_part, + ctx->metadata_snapshot, + alter_conversions, + ctx->commands_for_part, + ctx->for_interpreter, + ctx->for_file_renames, + ctx->log); ctx->stage_progress = std::make_unique(1.0); @@ -2205,7 +2217,8 @@ bool MutateTask::prepare() settings.apply_deleted_mask = false; ctx->interpreter = std::make_unique( - *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter, + *ctx->data, ctx->source_part, alter_conversions, + ctx->metadata_snapshot, ctx->for_interpreter, ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings); ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); diff --git a/src/Storages/MergeTree/RangesInDataPart.h b/src/Storages/MergeTree/RangesInDataPart.h index bf9e4c7dfb2..966637d0812 100644 --- a/src/Storages/MergeTree/RangesInDataPart.h +++ b/src/Storages/MergeTree/RangesInDataPart.h @@ -42,7 +42,6 @@ struct RangesInDataPartsDescription: public std::deque #include #include +#include +#include #include #include @@ -949,7 +951,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper { const auto commands = entry.commands; it = mutations_by_znode.erase(it); - updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true); + decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, lock); } else it = mutations_by_znode.erase(it); @@ -1001,7 +1003,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version)) .first->second; - updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ false); + incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, entry->commands, lock); NOEXCEPT_SCOPE({ for (const auto & pair : entry->block_numbers) { @@ -1075,7 +1077,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( } mutations_by_znode.erase(it); - /// updateAlterConversionsMutations() will be called in updateMutations() + /// decrementMutationsCounters() will be called in updateMutations() LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name); } @@ -1899,25 +1901,15 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk return ReplicatedMergeTreeMergePredicate(*this, zookeeper, std::move(partition_ids_hint)); } - -MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const +MutationCommands ReplicatedMergeTreeQueue::MutationsSnapshot::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const { - int32_t part_metadata_version = part->getMetadataVersion(); - int32_t metadata_version = storage.getInMemoryMetadataPtr()->getMetadataVersion(); - - chassert(alter_conversions_mutations >= 0); - /// NOTE: that just checking part_metadata_version is not enough, since we - /// need to check for non-metadata mutations as well. - if (alter_conversions_mutations == 0 && metadata_version == part_metadata_version) - return {}; - - std::unique_lock lock(state_mutex); - auto in_partition = mutations_by_partition.find(part->info.partition_id); if (in_partition == mutations_by_partition.end()) return {}; Int64 part_data_version = part->info.getDataVersion(); + int32_t part_metadata_version = part->getMetadataVersion(); + MutationCommands result; bool seen_all_data_mutations = false; @@ -1926,20 +1918,22 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const auto add_to_result = [&](const ReplicatedMergeTreeMutationEntryPtr & entry) { for (const auto & command : entry->commands | std::views::reverse) - if (AlterConversions::supportsMutationCommandType(command.type)) - result.emplace_back(command); + { + if (need_data_mutations && AlterConversions::isSupportedDataMutation(command.type)) + result.push_back(command); + else if (AlterConversions::isSupportedMetadataMutation(command.type)) + result.push_back(command); + } }; /// Here we return mutation commands for part which has bigger alter version than part metadata version. /// Please note, we don't use getDataVersion(). It's because these alter commands are used for in-fly conversions /// of part's metadata. - for (const auto & [mutation_version, mutation_status] : in_partition->second | std::views::reverse) + for (const auto & [mutation_version, entry] : in_partition->second | std::views::reverse) { if (seen_all_data_mutations && seen_all_metadata_mutations) break; - auto & entry = mutation_status->entry; - auto alter_version = entry->alter_version; if (alter_version != -1) { @@ -1964,6 +1958,48 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const return result; } +MergeTreeData::MutationsSnapshotPtr ReplicatedMergeTreeQueue::getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const +{ + auto res = std::make_shared(); + res->metadata_version = metadata_version; + res->need_data_mutations = need_data_mutations; + + std::lock_guard lock(state_mutex); + + bool have_data_mutations = res->need_data_mutations && data_mutations_to_apply > 0; + bool have_metadata_mutations = metadata_mutations_to_apply > 0; + + if (!have_data_mutations && !have_metadata_mutations) + return res; + + for (const auto & [partition_id, mutations] : mutations_by_partition) + { + auto & in_partition = res->mutations_by_partition[partition_id]; + + for (const auto & [version, status] : mutations | std::views::reverse) + { + if (status->is_done) + break; + + bool has_required_command = std::ranges::any_of(status->entry->commands, [&](const auto & command) + { + if (have_data_mutations && AlterConversions::isSupportedDataMutation(command.type)) + return true; + + if (have_metadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type)) + return true; + + return false; + }); + + if (has_required_command) + in_partition.emplace(version, status->entry); + } + } + + return res; +} + MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version, Strings & mutation_ids) const { @@ -2044,7 +2080,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep mutation.parts_to_do.clear(); } - updateAlterConversionsMutations(mutation.entry->commands, alter_conversions_mutations, /* remove= */ true); + decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, mutation.entry->commands, lock); } else if (mutation.parts_to_do.size() == 0) { @@ -2101,7 +2137,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name); alter_sequence.finishDataAlter(entry->alter_version, lock); } - updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ true); + decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, entry->commands, lock); } } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 89ef6240558..f9d5487ee3f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -151,8 +152,11 @@ private: /// Mapping from znode path to Mutations Status std::map mutations_by_znode; - /// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart()) - std::atomic alter_conversions_mutations = 0; + + /// Unfinished mutations that are required for AlterConversions. + Int64 data_mutations_to_apply = 0; + Int64 metadata_mutations_to_apply = 0; + /// Partition -> (block_number -> MutationStatus) std::unordered_map> mutations_by_partition; /// Znode ID of the latest mutation that is done. @@ -409,10 +413,24 @@ public: MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version, Strings & mutation_ids) const; + struct MutationsSnapshot : public MergeTreeData::IMutationsSnapshot + { + MutationsSnapshot() = default; + + Int64 metadata_version = -1; + bool need_data_mutations = false; + + using MutationsByPartititon = std::unordered_map>; + MutationsByPartititon mutations_by_partition; + + MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override; + std::shared_ptr cloneEmpty() const override { return std::make_shared(); } + }; + /// Return mutation commands for part which could be not applied to /// it according to part mutation version. Used when we apply alter commands on fly, /// without actual data modification on disk. - MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const; + MergeTreeData::MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const; /// Mark finished mutations as done. If the function needs to be called again at some later time /// (because some mutations are probably done but we are not sure yet), returns true. diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index a94508ad41f..f871157c2c9 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -26,10 +26,12 @@ class StorageFromMergeTreeDataPart final : public IStorage { public: /// Used in part mutation. - explicit StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_) + explicit StorageFromMergeTreeDataPart( + const MergeTreeData::DataPartPtr & part_, + const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot_) : IStorage(getIDFromPart(part_)) , parts({part_}) - , alter_conversions({part_->storage.getAlterConversionsForPart(part_)}) + , mutations_snapshot(mutations_snapshot_) , storage(part_->storage) , partition_id(part_->info.partition_id) { @@ -71,10 +73,11 @@ public: size_t max_block_size, size_t num_streams) override { + /// TODO: fix query_plan.addStep(MergeTreeDataSelectExecutor(storage) .readFromParts( parts, - alter_conversions, + mutations_snapshot, column_names, storage_snapshot, query_info, @@ -121,7 +124,7 @@ public: private: const MergeTreeData::DataPartsVector parts; - const std::vector alter_conversions; + const MergeTreeData::MutationsSnapshotPtr mutations_snapshot; const MergeTreeData & storage; const String partition_id; const ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 9255ee00340..636d2ba5d53 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -498,18 +498,11 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context if (txn) txn->addMutation(shared_from_this(), mutation_id); - bool alter_conversions_mutations_updated = updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false); - bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second; + auto [it, inserted] = current_mutations_by_version.try_emplace(version, std::move(entry)); if (!inserted) - { - if (alter_conversions_mutations_updated) - { - --alter_conversions_mutations; - chassert(alter_conversions_mutations >= 0); - } throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version); - } + incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *it->second.commands, lock); LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info); } background_operations_assignee.trigger(); @@ -545,7 +538,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re if (static_cast(result_part->part_info.mutation) == it->first) mutation_backoff_policy.removePartFromFailed(failed_part->name); - updateAlterConversionsMutations(it->second.commands, alter_conversions_mutations, /* remove= */ true); + decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *entry.commands, lock); } } else @@ -744,17 +737,15 @@ std::map StorageMergeTree::getUnfinishedMutationC std::map result; - for (const auto & kv : current_mutations_by_version) + for (const auto & [mutation_version, entry] : current_mutations_by_version) { - Int64 mutation_version = kv.first; - const MergeTreeMutationEntry & entry = kv.second; - const PartVersionWithName needle{mutation_version, ""}; + const PartVersionWithName needle{static_cast(mutation_version), ""}; auto versions_it = std::lower_bound( part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator); size_t parts_to_do = versions_it - part_versions_with_names.begin(); if (parts_to_do > 0) - result.emplace(entry.file_name, entry.commands); + result.emplace(entry.file_name, *entry.commands); } return result; } @@ -787,7 +778,7 @@ std::vector StorageMergeTree::getMutationsStatus() cons std::map block_numbers_map({{"", entry.block_number}}); - for (const MutationCommand & command : entry.commands) + for (const MutationCommand & command : *entry.commands) { WriteBufferFromOwnString buf; formatAST(*command.ast, buf, false, true); @@ -824,20 +815,15 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) auto it = current_mutations_by_version.find(mutation_version); if (it != current_mutations_by_version.end()) { - bool mutation_finished = true; if (std::optional min_version = getMinPartDataVersion()) - mutation_finished = *min_version > static_cast(mutation_version); + { + bool mutation_finished = *min_version > static_cast(mutation_version); + if (!mutation_finished) + decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *it->second.commands, lock); + } to_kill.emplace(std::move(it->second)); - - if (!mutation_finished) - { - const auto commands = it->second.commands; - current_mutations_by_version.erase(it); - updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true); - } - else - current_mutations_by_version.erase(it); + current_mutations_by_version.erase(it); } } @@ -885,6 +871,8 @@ void StorageMergeTree::loadDeduplicationLog() void StorageMergeTree::loadMutations() { + std::lock_guard lock(currently_processing_in_background_mutex); + for (const auto & disk : getDisks()) { for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next()) @@ -893,7 +881,7 @@ void StorageMergeTree::loadMutations() { MergeTreeMutationEntry entry(disk, relative_data_path, it->name()); UInt64 block_number = entry.block_number; - LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size()); + LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands->size()); if (!entry.tid.isPrehistoric() && !entry.csn) { @@ -912,10 +900,11 @@ void StorageMergeTree::loadMutations() } } - auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second; + auto [entry_it, inserted] = current_mutations_by_version.try_emplace(block_number, std::move(entry)); if (!inserted) throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number); - updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false); + + incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *entry_it->second.commands, lock); } else if (startsWith(it->name(), "tmp_mutation_")) { @@ -1264,7 +1253,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( size_t commands_size = 0; MutationCommands commands_for_size_validation; - for (const auto & command : it->second.commands) + for (const auto & command : *it->second.commands) { if (command.type != MutationCommand::Type::DROP_COLUMN && command.type != MutationCommand::Type::DROP_INDEX @@ -1308,11 +1297,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( const auto & single_mutation_commands = it->second.commands; - if (single_mutation_commands.containBarrierCommand()) + if (single_mutation_commands->containBarrierCommand()) { if (commands->empty()) { - commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end()); + commands->insert(commands->end(), single_mutation_commands->begin(), single_mutation_commands->end()); last_mutation_to_apply = it; } break; @@ -1320,7 +1309,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( else { current_ast_elements += commands_size; - commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end()); + commands->insert(commands->end(), single_mutation_commands->begin(), single_mutation_commands->end()); last_mutation_to_apply = it; } @@ -2431,34 +2420,62 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts) } } - -MutationCommands StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const +MutationCommands StorageMergeTree::MutationsSnapshot::getAlterMutationCommandsForPart(const DataPartPtr & part) const { - /// NOTE: there is no need to check part metadata_version, since - /// ALTER_METADATA cannot be done asynchronously, like in - /// ReplicatedMergeTree. - chassert(alter_conversions_mutations >= 0); - if (alter_conversions_mutations == 0) - return {}; - - std::lock_guard lock(currently_processing_in_background_mutex); - - UInt64 part_data_version = part->info.getDataVersion(); MutationCommands result; + UInt64 part_data_version = part->info.getDataVersion(); - for (const auto & [mutation_version, entry] : current_mutations_by_version | std::views::reverse) + for (const auto & [mutation_version, commands] : mutations_by_version | std::views::reverse) { if (mutation_version <= part_data_version) break; - for (const auto & command : entry.commands | std::views::reverse) - if (AlterConversions::supportsMutationCommandType(command.type)) - result.emplace_back(command); + for (const auto & command : *commands | std::views::reverse) + { + if (need_data_mutations && AlterConversions::isSupportedDataMutation(command.type)) + result.push_back(command); + else if (AlterConversions::isSupportedMetadataMutation(command.type)) + result.push_back(command); + } } return result; } +MergeTreeData::MutationsSnapshotPtr StorageMergeTree::getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const +{ + auto res = std::make_shared(); + res->metadata_version = metadata_version; + res->need_data_mutations = need_data_mutations; + + std::lock_guard lock(currently_processing_in_background_mutex); + + bool have_data_mutations = res->need_data_mutations && data_mutations_to_apply > 0; + bool have_metadata_mutations = metadata_mutations_to_apply > 0; + + if (!have_data_mutations && !have_metadata_mutations) + return res; + + for (const auto & [version, entry] : current_mutations_by_version) + { + bool has_required_command = std::ranges::any_of(*entry.commands, [&](const auto & command) + { + if (have_data_mutations && AlterConversions::isSupportedDataMutation(command.type)) + return true; + + if (have_metadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type)) + return true; + + return false; + }); + + if (has_required_command) + res->mutations_by_version.emplace(version, entry.commands); + } + + return res; +} + void StorageMergeTree::startBackgroundMovesIfNeeded() { if (areBackgroundMovesNeeded()) diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 4d819508934..1f2af8b9571 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -17,6 +17,7 @@ #include #include +#include "Storages/MutationCommands.h" namespace DB @@ -147,8 +148,10 @@ private: DataParts currently_merging_mutating_parts; std::map current_mutations_by_version; - /// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart()) - std::atomic alter_conversions_mutations = 0; + + /// Unfinished mutations that are required for AlterConversions. + Int64 data_mutations_to_apply = 0; + Int64 metadata_mutations_to_apply = 0; std::atomic shutdown_called {false}; std::atomic flush_called {false}; @@ -309,8 +312,21 @@ private: ContextPtr context; }; -protected: - MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override; + struct MutationsSnapshot : public IMutationsSnapshot + { + MutationsSnapshot() = default; + + Int64 metadata_version = -1; + bool need_data_mutations = false; + + using MutationsByVersion = std::map>; + MutationsByVersion mutations_by_version; + + MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override; + std::shared_ptr cloneEmpty() const override { return std::make_shared(); } + }; + + MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const override; }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a127384c03c..e2bba1e8068 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -9149,13 +9149,11 @@ bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const (!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity)); } - -MutationCommands StorageReplicatedMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const +MergeTreeData::MutationsSnapshotPtr StorageReplicatedMergeTree::getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const { - return queue.getAlterMutationCommandsForPart(part); + return queue.getMutationsSnapshot(metadata_version, need_data_mutations); } - void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded() { if (areBackgroundMovesNeeded()) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index f96206ce657..3ef367d09ce 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -932,7 +932,7 @@ private: void waitMutationToFinishOnReplicas( const Strings & replicas, const String & mutation_id) const; - MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override; + MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const override; void startBackgroundMovesIfNeeded() override;