diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 4d24f491551..8b0751f4bbf 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1123,7 +1123,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa if (isNotCoveredByFuturePartsImpl(entry, part_name, reject_reason, lock)) { - CurrentlyExecuting::setActualPartName(entry, part_name, *this); + CurrentlyExecuting::setActualPartName(entry, part_name, *this, lock); return true; } @@ -1375,7 +1375,8 @@ Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partiti } -ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_) +ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting( + const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::lock_guard & /* state_lock */) : entry(entry_), queue(queue_) { if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE) @@ -1397,8 +1398,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate } -void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, - const String & actual_part_name, ReplicatedMergeTreeQueue & queue) +void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName( + ReplicatedMergeTreeQueue::LogEntry & entry, + const String & actual_part_name, + ReplicatedMergeTreeQueue & queue, + std::lock_guard & /* state_lock */) { if (!entry.actual_new_part_name.empty()) throw Exception("Entry actual part isn't empty yet. This is a bug.", ErrorCodes::LOGICAL_ERROR); @@ -1477,7 +1481,7 @@ ReplicatedMergeTreeQueue::SelectedEntryPtr ReplicatedMergeTreeQueue::selectEntry } if (entry) - return std::make_shared(entry, std::unique_ptr{ new CurrentlyExecuting(entry, *this) }); + return std::make_shared(entry, std::unique_ptr{new CurrentlyExecuting(entry, *this, lock)}); else return {}; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 133c154059e..208ce73e5f1 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -251,11 +251,18 @@ private: friend class ReplicatedMergeTreeQueue; /// Created only in the selectEntryToProcess function. It is called under mutex. - CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_); + CurrentlyExecuting( + const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, + ReplicatedMergeTreeQueue & queue_, + std::lock_guard & state_lock); /// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex. - static void setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, const String & actual_part_name, - ReplicatedMergeTreeQueue & queue); + static void setActualPartName( + ReplicatedMergeTreeQueue::LogEntry & entry, + const String & actual_part_name, + ReplicatedMergeTreeQueue & queue, + std::lock_guard & state_lock); + public: ~CurrentlyExecuting(); };