CurrentlyExecuting: Require mutex usage explicitly

This commit is contained in:
Raúl Marín 2022-01-26 18:44:35 +01:00
parent 662444fe13
commit 5a59d976dd
2 changed files with 19 additions and 8 deletions

View File

@ -1123,7 +1123,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
if (isNotCoveredByFuturePartsImpl(entry, part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
CurrentlyExecuting::setActualPartName(entry, part_name, *this, lock);
return true;
}
@ -1375,7 +1375,8 @@ Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partiti
}
ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_)
ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::lock_guard<std::mutex> & /* state_lock */)
: entry(entry_), queue(queue_)
{
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
@ -1397,8 +1398,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
}
void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry,
const String & actual_part_name, ReplicatedMergeTreeQueue & queue)
void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(
ReplicatedMergeTreeQueue::LogEntry & entry,
const String & actual_part_name,
ReplicatedMergeTreeQueue & queue,
std::lock_guard<std::mutex> & /* state_lock */)
{
if (!entry.actual_new_part_name.empty())
throw Exception("Entry actual part isn't empty yet. This is a bug.", ErrorCodes::LOGICAL_ERROR);
@ -1477,7 +1481,7 @@ ReplicatedMergeTreeQueue::SelectedEntryPtr ReplicatedMergeTreeQueue::selectEntry
}
if (entry)
return std::make_shared<SelectedEntry>(entry, std::unique_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) });
return std::make_shared<SelectedEntry>(entry, std::unique_ptr<CurrentlyExecuting>{new CurrentlyExecuting(entry, *this, lock)});
else
return {};
}

View File

@ -251,11 +251,18 @@ private:
friend class ReplicatedMergeTreeQueue;
/// Created only in the selectEntryToProcess function. It is called under mutex.
CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_);
CurrentlyExecuting(
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_,
ReplicatedMergeTreeQueue & queue_,
std::lock_guard<std::mutex> & state_lock);
/// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex.
static void setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, const String & actual_part_name,
ReplicatedMergeTreeQueue & queue);
static void setActualPartName(
ReplicatedMergeTreeQueue::LogEntry & entry,
const String & actual_part_name,
ReplicatedMergeTreeQueue & queue,
std::lock_guard<std::mutex> & state_lock);
public:
~CurrentlyExecuting();
};