Merge pull request #49757 from CurtizJ/add-forgotten-lock

Add forgotten lock (addition to #49117)
This commit is contained in:
Anton Popov 2023-05-12 23:55:09 +02:00 committed by GitHub
commit 401030e2be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 2 deletions

View File

@ -619,8 +619,13 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(
Int64 mutation_version, std::set<String> * mutation_ids, bool from_another_mutation) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
std::unique_lock lock(currently_processing_in_background_mutex);
return getIncompleteMutationsStatusUnlocked(mutation_version, lock, mutation_ids, from_another_mutation);
}
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatusUnlocked(
Int64 mutation_version, std::unique_lock<std::mutex> & /*lock*/, std::set<String> * mutation_ids, bool from_another_mutation) const
{
auto current_mutation_it = current_mutations_by_version.find(mutation_version);
/// Killed
if (current_mutation_it == current_mutations_by_version.end())
@ -1343,10 +1348,12 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
size_t StorageMergeTree::getNumberOfUnfinishedMutations() const
{
std::unique_lock lock(currently_processing_in_background_mutex);
size_t count = 0;
for (const auto & [version, _] : current_mutations_by_version | std::views::reverse)
{
auto status = getIncompleteMutationsStatus(version);
auto status = getIncompleteMutationsStatusUnlocked(version, lock);
if (!status)
continue;
@ -2154,6 +2161,8 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec
BackupEntries StorageMergeTree::backupMutations(UInt64 version, const String & data_path_in_backup) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
fs::path mutations_path_in_backup = fs::path{data_path_in_backup} / "mutations";
BackupEntries backup_entries;
for (auto it = current_mutations_by_version.lower_bound(version); it != current_mutations_by_version.end(); ++it)

View File

@ -257,6 +257,9 @@ private:
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr,
bool from_another_mutation = false) const;
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatusUnlocked(Int64 mutation_version, std::unique_lock<std::mutex> & lock,
std::set<String> * mutation_ids = nullptr, bool from_another_mutation = false) const;
void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock);
void startBackgroundMovesIfNeeded() override;