diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b3ca721a6d2..4b0e0d68fb4 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4714,6 +4714,21 @@ std::set MergeTreeData::getPartitionIdsAffectedByCommands( return affected_partition_ids; } +std::unordered_set MergeTreeData::getAllPartitionIds() const +{ + std::unordered_set res; + String prev_id; + for (const auto & part : getDataPartsStateRange(DataPartState::Active)) + { + if (prev_id == part->info.partition_id) + continue; + + res.insert(part->info.partition_id); + prev_id = part->info.partition_id; + } + return res; +} + MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorForInternalUsage( const DataPartStates & affordable_states, const DataPartsLock & /*lock*/, DataPartStateVector * out_states) const diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index e1cc62b5e35..02303031baa 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -801,6 +801,9 @@ public: std::unordered_set getPartitionIDsFromQuery(const ASTs & asts, ContextPtr context) const; std::set getPartitionIdsAffectedByCommands(const MutationCommands & commands, ContextPtr query_context) const; + /// Returns set of partition_ids of all Active parts + std::unordered_set getAllPartitionIds() const; + /// Extracts MergeTreeData of other *MergeTree* storage /// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM /// Tables structure should be locked. diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 176d45367f6..87cddabe4dd 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1772,9 +1772,9 @@ size_t ReplicatedMergeTreeQueue::countFinishedMutations() const } -ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper) +ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint) { - return ReplicatedMergeTreeMergePredicate(*this, zookeeper); + return ReplicatedMergeTreeMergePredicate(*this, zookeeper, std::move(partition_ids_hint)); } @@ -1882,8 +1882,13 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep /// We need to check committing block numbers and new parts which could be committed. /// Actually we don't need most of predicate logic here but it all the code related to committing blocks - /// and updatading queue state is implemented there. - auto merge_pred = getMergePredicate(zookeeper); + /// and updatating queue state is implemented there. + PartitionIdsHint partition_ids_hint; + for (const auto & candidate : candidates) + for (const auto & partitions : candidate->block_numbers) + partition_ids_hint.insert(partitions.first); + + auto merge_pred = getMergePredicate(zookeeper, /* partition_ids_hint */ {}); std::vector finished; for (const auto & candidate : candidates) @@ -2081,8 +2086,9 @@ ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue() } ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( - ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper) + ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_) : queue(queue_) + , partition_ids_hint(std::move(partition_ids_hint_)) , prev_virtual_parts(queue.format_version) { { @@ -2094,7 +2100,15 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( auto quorum_status_future = zookeeper->asyncTryGet(fs::path(queue.zookeeper_path) / "quorum" / "status"); /// Load current inserts - Strings partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers"); + /// Hint avoids listing partitions that we don't really need. + /// Dropped (or cleaned up by TTL) partitions are never removed from ZK, + /// so without hint it can do a few thousands requests (if not using MultiRead). + Strings partitions; + if (partition_ids_hint.empty()) + partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers"); + else + std::copy(partition_ids_hint.begin(), partition_ids_hint.end(), std::back_inserter(partitions)); + std::vector paths; paths.reserve(partitions.size()); for (const String & partition : partitions) @@ -2226,6 +2240,13 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts( if (left_max_block + 1 < right_min_block) { + if (!partition_ids_hint.empty() && !partition_ids_hint.contains(left->info.partition_id)) + { + if (out_reason) + *out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id); + return false; + } + auto committing_blocks_in_partition = committing_blocks.find(left->info.partition_id); if (committing_blocks_in_partition != committing_blocks.end()) { @@ -2419,6 +2440,9 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z const String & partition_id = kv.first; Int64 block_num = kv.second; + if (!partition_ids_hint.empty() && !partition_ids_hint.contains(partition_id)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition id {} was not provided as hint, it's a bug", partition_id); + auto partition_it = committing_blocks.find(partition_id); if (partition_it != committing_blocks.end()) { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 95bb23029a0..36f1ee07ad4 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -25,6 +25,7 @@ class MergeTreeDataMergerMutator; class ReplicatedMergeTreeMergePredicate; class ReplicatedMergeTreeMergeStrategyPicker; +using PartitionIdsHint = std::unordered_set; class ReplicatedMergeTreeQueue { @@ -382,7 +383,7 @@ public: size_t countFinishedMutations() const; /// Returns functor which used by MergeTreeMergerMutator to select parts for merge - ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper); + ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint); /// Return the version (block number) of the last mutation that we don't need to apply to the part /// with getDataVersion() == data_version. (Either this mutation was already applied or the part @@ -486,7 +487,7 @@ public: class ReplicatedMergeTreeMergePredicate { public: - ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper); + ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_); /// Depending on the existence of left part checks a merge predicate for two parts or for single part. bool operator()(const MergeTreeData::DataPartPtr & left, @@ -531,6 +532,8 @@ public: private: const ReplicatedMergeTreeQueue & queue; + PartitionIdsHint partition_ids_hint; + /// A snapshot of active parts that would appear if the replica executes all log entries in its queue. ActiveDataPartSet prev_virtual_parts; /// partition ID -> block numbers of the inserts and mutations that are about to commit diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 8a1e4ebaec8..3110ae8aa97 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3128,7 +3128,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() auto zookeeper = getZooKeeperAndAssertNotReadonly(); - ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper); + ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, getAllPartitionIds()); /// If many merges is already queued, then will queue only small enough merges. /// Otherwise merge queue could be filled with only large merges, @@ -4573,7 +4573,12 @@ bool StorageReplicatedMergeTree::optimize( /// We must select parts for merge under merge_selecting_mutex because other threads /// (merge_selecting_thread or OPTIMIZE queries) could assign new merges. std::lock_guard merge_selecting_lock(merge_selecting_mutex); - ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper); + PartitionIdsHint partition_ids_hint; + if (partition_id.empty()) + partition_ids_hint = getAllPartitionIds(); + else + partition_ids_hint.insert(partition_id); + ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper, std::move(partition_ids_hint)); auto future_merged_part = std::make_shared(); if (storage_settings.get()->assign_part_uuids) @@ -7057,7 +7062,7 @@ void StorageReplicatedMergeTree::movePartitionToShard( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part {} does not have an uuid assigned and it can't be moved between shards", part_name); - ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper); + ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, {part_info.partition_id}); /// The following block is pretty much copy & paste from StorageReplicatedMergeTree::dropPart to avoid conflicts while this is WIP. /// Extract it to a common method and re-use it before merging. @@ -7265,7 +7270,7 @@ bool StorageReplicatedMergeTree::dropPartImpl( while (true) { - ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper); + ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, {part_info.partition_id}); auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Active}); @@ -8334,7 +8339,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP /// We can enqueue part for check from DataPartExchange or SelectProcessor /// and it's hard to synchronize it with ReplicatedMergeTreeQueue and PartCheckThread... /// But at least we can ignore parts that are definitely not needed according to virtual parts and drop ranges. - auto pred = queue.getMergePredicate(zookeeper); + auto pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{new_part_info.partition_id}); String covering_virtual = pred.getCoveringVirtualPart(lost_part_name); if (covering_virtual.empty()) {