From 70c1000c227072722dd14f731d9f1549edb7a1c9 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 14 Apr 2023 19:42:17 +0200 Subject: [PATCH 1/3] fix some tests --- src/Interpreters/MergeTreeTransaction.cpp | 2 +- src/Storages/IStorage.h | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 9 ++++++++ src/Storages/StorageMergeTree.cpp | 23 +++++++++++-------- src/Storages/StorageMergeTree.h | 9 ++++---- .../config.d/merge_tree_old_dirs_cleanup.xml | 2 +- 6 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp index bfdda354c9b..1358e3ed3c2 100644 --- a/src/Interpreters/MergeTreeTransaction.cpp +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -184,7 +184,7 @@ scope_guard MergeTreeTransaction::beforeCommit() /// We should wait for mutations to finish before committing transaction, because some mutation may fail and cause rollback. for (const auto & table_and_mutation : mutations_to_wait) - table_and_mutation.first->waitForMutation(table_and_mutation.second); + table_and_mutation.first->waitForMutation(table_and_mutation.second, /* wait_for_another_mutation */ false); assert([&]() { diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 351e147e6cd..a65b0f1442b 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -503,7 +503,7 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName()); } - virtual void waitForMutation(const String & /*mutation_id*/) + virtual void waitForMutation(const String & /*mutation_id*/, bool /*wait_for_another_mutation*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName()); } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 45759c449f6..ce42faccc79 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -2001,6 +2002,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif { if (isOldPartDirectory(disk, it->path(), deadline)) { + ThreadFuzzer::maybeInjectSleep(); if (temporary_parts.contains(basename)) { /// Actually we don't rely on temporary_directories_lifetime when removing old temporaries directories, @@ -2008,6 +2010,13 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif LOG_INFO(LogFrequencyLimiter(log, 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path); continue; } + else if (!disk->exists(it->path())) + { + /// We should recheck that the dir exists, otherwise we can get "No such file or directory" + /// due to a race condition with "Renaming temporary part" (temporary part holder could be already released, so the check above is not enough) + LOG_WARNING(log, "Temporary directory {} suddenly disappeared while iterating, assuming it was concurrently renamed to persistent", it->path()); + continue; + } else { LOG_WARNING(log, "Removing temporary directory {}", full_path); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 6cb3ce35e5b..9d43bb77a92 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -341,7 +341,7 @@ void StorageMergeTree::alter( if (prev_mutation != 0) { LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation); - waitForMutation(prev_mutation); + waitForMutation(prev_mutation, /* from_another_mutation */ true); LOG_DEBUG(log, "Mutation {} finished", prev_mutation); } } @@ -537,19 +537,19 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re mutation_wait_event.notify_all(); } -void StorageMergeTree::waitForMutation(Int64 version) +void StorageMergeTree::waitForMutation(Int64 version, bool wait_for_another_mutation) { String mutation_id = MergeTreeMutationEntry::versionToFileName(version); - waitForMutation(version, mutation_id); + waitForMutation(version, mutation_id, wait_for_another_mutation); } -void StorageMergeTree::waitForMutation(const String & mutation_id) +void StorageMergeTree::waitForMutation(const String & mutation_id, bool wait_for_another_mutation) { Int64 version = MergeTreeMutationEntry::parseFileName(mutation_id); - waitForMutation(version, mutation_id); + waitForMutation(version, mutation_id, wait_for_another_mutation); } -void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id) +void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation) { LOG_INFO(log, "Waiting mutation: {}", mutation_id); { @@ -569,7 +569,7 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id std::set mutation_ids; mutation_ids.insert(mutation_id); - auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids); + auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids, wait_for_another_mutation); checkMutationStatus(mutation_status, mutation_ids); LOG_INFO(log, "Mutation {} done", mutation_id); @@ -619,7 +619,8 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s) } -std::optional StorageMergeTree::getIncompleteMutationsStatus(Int64 mutation_version, std::set * mutation_ids) const +std::optional StorageMergeTree::getIncompleteMutationsStatus( + Int64 mutation_version, std::set * mutation_ids, bool from_another_mutation) const { std::lock_guard lock(currently_processing_in_background_mutex); @@ -633,7 +634,9 @@ std::optional StorageMergeTree::getIncompleteMutationsS const auto & mutation_entry = current_mutation_it->second; auto txn = tryGetTransactionForMutation(mutation_entry, log); - assert(txn || mutation_entry.tid.isPrehistoric()); + /// There's no way a transaction may finish before a mutation that was started by the transaction. + /// But sometimes we need to check status of an unrelated mutation, in this case we don't care about transactions. + assert(txn || mutation_entry.tid.isPrehistoric() || from_another_mutation); auto data_parts = getVisibleDataPartsVector(txn); for (const auto & data_part : data_parts) { @@ -658,7 +661,7 @@ std::optional StorageMergeTree::getIncompleteMutationsS mutation_ids->insert(it->second.file_name); } } - else if (txn) + else if (txn && !from_another_mutation) { /// Part is locked by concurrent transaction, most likely it will never be mutated TIDHash part_locked = data_part->version.removal_tid_lock.load(); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index a0629bb8d3e..0ea72743753 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -183,9 +183,9 @@ private: /// and into in-memory structures. Wake up merge-mutation task. Int64 startMutation(const MutationCommands & commands, ContextPtr query_context); /// Wait until mutation with version will finish mutation for all parts - void waitForMutation(Int64 version); - void waitForMutation(const String & mutation_id) override; - void waitForMutation(Int64 version, const String & mutation_id); + void waitForMutation(Int64 version, bool wait_for_another_mutation = false); + void waitForMutation(const String & mutation_id, bool wait_for_another_mutation) override; + void waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation = false); void setMutationCSN(const String & mutation_id, CSN csn) override; @@ -246,7 +246,8 @@ private: /// because we can execute several mutations at once. Order is important for /// better readability of exception message. If mutation was killed doesn't /// return any ids. - std::optional getIncompleteMutationsStatus(Int64 mutation_version, std::set * mutation_ids = nullptr) const; + std::optional getIncompleteMutationsStatus(Int64 mutation_version, std::set * mutation_ids = nullptr, + bool from_another_mutation = true) const; void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock); diff --git a/tests/config/config.d/merge_tree_old_dirs_cleanup.xml b/tests/config/config.d/merge_tree_old_dirs_cleanup.xml index 41932cb6d61..2b8ea63b63d 100644 --- a/tests/config/config.d/merge_tree_old_dirs_cleanup.xml +++ b/tests/config/config.d/merge_tree_old_dirs_cleanup.xml @@ -3,6 +3,6 @@ 1 - 10 + 5 From fde55f3ea6ceae0e4128af6de9bcedf28a8c3a9f Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 25 Apr 2023 18:25:10 +0200 Subject: [PATCH 2/3] fix --- src/Storages/StorageMergeTree.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 5849f30969d..3a39ae12856 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -254,7 +254,7 @@ private: /// better readability of exception message. If mutation was killed doesn't /// return any ids. std::optional getIncompleteMutationsStatus(Int64 mutation_version, std::set * mutation_ids = nullptr, - bool from_another_mutation = true) const; + bool from_another_mutation = false) const; void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock); From d969de2d57a29ddaf78dc744826a1575d7c41802 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 25 Apr 2023 20:21:57 +0200 Subject: [PATCH 3/3] another fix --- .../00992_system_parts_race_condition_zookeeper_long.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/00992_system_parts_race_condition_zookeeper_long.sh b/tests/queries/0_stateless/00992_system_parts_race_condition_zookeeper_long.sh index d49f63e143d..5b1c50262bf 100755 --- a/tests/queries/0_stateless/00992_system_parts_race_condition_zookeeper_long.sh +++ b/tests/queries/0_stateless/00992_system_parts_race_condition_zookeeper_long.sh @@ -13,8 +13,8 @@ $CLICKHOUSE_CLIENT -n -q " DROP TABLE IF EXISTS alter_table0; DROP TABLE IF EXISTS alter_table1; - CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50)); - CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50)); + CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 100)); + CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 200)); " function thread1()