diff --git a/src/Core/BackgroundSchedulePool.cpp b/src/Core/BackgroundSchedulePool.cpp index 6f5a5a1fa54..b7a33c4930d 100644 --- a/src/Core/BackgroundSchedulePool.cpp +++ b/src/Core/BackgroundSchedulePool.cpp @@ -73,6 +73,11 @@ bool BackgroundSchedulePoolTaskInfo::activateAndSchedule() return true; } +std::unique_lock BackgroundSchedulePoolTaskInfo::getExecLock() +{ + return std::unique_lock{exec_mutex}; +} + void BackgroundSchedulePoolTaskInfo::execute() { Stopwatch watch; diff --git a/src/Core/BackgroundSchedulePool.h b/src/Core/BackgroundSchedulePool.h index ebd0d52ee20..36cbad145c9 100644 --- a/src/Core/BackgroundSchedulePool.h +++ b/src/Core/BackgroundSchedulePool.h @@ -121,6 +121,10 @@ public: /// get Coordination::WatchCallback needed for notifications from ZooKeeper watches. Coordination::WatchCallback getWatchCallback(); + /// Returns lock that protects from concurrent task execution. + /// This lock should not be held for a long time. + std::unique_lock getExecLock(); + private: friend class TaskNotification; friend class BackgroundSchedulePool; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 53397106a56..a5206d42be9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -70,23 +70,21 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t void ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck(const MergeTreePartInfo & drop_range_info) { /// Wait for running tasks to finish and temporarily stop checking - stop(); - SCOPE_EXIT({ start(); }); + auto pause_checking_parts = task->getExecLock(); + + std::lock_guard lock(parts_mutex); + for (auto it = parts_queue.begin(); it != parts_queue.end();) { - std::lock_guard lock(parts_mutex); - for (auto it = parts_queue.begin(); it != parts_queue.end();) + if (drop_range_info.contains(MergeTreePartInfo::fromPartName(it->first, storage.format_version))) { - if (drop_range_info.contains(MergeTreePartInfo::fromPartName(it->first, storage.format_version))) - { - /// Remove part from the queue to avoid part resurrection - /// if we will check it and enqueue fetch after DROP/REPLACE execution. - parts_set.erase(it->first); - it = parts_queue.erase(it); - } - else - { - ++it; - } + /// Remove part from the queue to avoid part resurrection + /// if we will check it and enqueue fetch after DROP/REPLACE execution. + parts_set.erase(it->first); + it = parts_queue.erase(it); + } + else + { + ++it; } } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 2c32d9f266c..c7a7c18848f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1024,7 +1024,7 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( [[maybe_unused]] bool called_from_alter_query_directly = covering_entry && covering_entry->replace_range_entry && covering_entry->replace_range_entry->columns_version < 0; [[maybe_unused]] bool called_for_broken_part = !covering_entry; - assert(currently_executing_drop_or_replace_range || called_from_alter_query_directly || called_for_broken_part); + assert(currently_executing_drop_replace_ranges.contains(part_info) || called_from_alter_query_directly || called_for_broken_part); for (Queue::iterator it = queue.begin(); it != queue.end();) { @@ -1367,15 +1367,26 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( /// DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range. /// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish. /// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other. + /// But it should not happen if ranges are disjoint. /// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting. - if (currently_executing_drop_or_replace_range) + + if (auto drop_range = entry.getDropRange(format_version)) { - out_postpone_reason = fmt::format( - "Not executing log entry {} of type {} for part {} " - "because another DROP_RANGE or REPLACE_RANGE entry are currently executing.", - entry.znode_name, entry.typeToString(), entry.new_part_name); - LOG_TRACE(log, fmt::runtime(out_postpone_reason)); - return false; + auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, format_version); + for (const auto & info : currently_executing_drop_replace_ranges) + { + if (drop_range_info.isDisjoint(info)) + continue; + out_postpone_reason = fmt::format( + "Not executing log entry {} of type {} for part {} " + "because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.", + entry.znode_name, + entry.typeToString(), + entry.new_part_name, + info.getPartName()); + LOG_TRACE(log, fmt::runtime(out_postpone_reason)); + return false; + } } if (entry.isDropPart(format_version)) @@ -1442,10 +1453,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting( const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::unique_lock & /* state_lock */) : entry(entry_), queue(queue_) { - if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE) + if (auto drop_range = entry->getDropRange(queue.format_version)) { - assert(!queue.currently_executing_drop_or_replace_range); - queue.currently_executing_drop_or_replace_range = true; + auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, queue.format_version); + [[maybe_unused]] bool inserted = queue.currently_executing_drop_replace_ranges.emplace(drop_range_info).second; + assert(inserted); } entry->currently_executing = true; ++entry->num_tries; @@ -1497,10 +1509,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting() { std::lock_guard lock(queue.state_mutex); - if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE) + if (auto drop_range = entry->getDropRange(queue.format_version)) { - assert(queue.currently_executing_drop_or_replace_range); - queue.currently_executing_drop_or_replace_range = false; + auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, queue.format_version); + [[maybe_unused]] bool removed = queue.currently_executing_drop_replace_ranges.erase(drop_range_info); + assert(removed); } entry->currently_executing = false; entry->execution_complete.notify_all(); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index f4cae7152ef..e8362e5cc6b 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -96,7 +96,7 @@ private: FuturePartsSet future_parts; /// Avoid parallel execution of queue enties, which may remove other entries from the queue. - bool currently_executing_drop_or_replace_range = false; + std::set currently_executing_drop_replace_ranges; /** What will be the set of active parts after executing all log entries up to log_pointer. * Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index dfb5eb0bd69..11f668bafbe 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -151,13 +151,13 @@ bool ReplicatedMergeTreeRestartingThread::runImpl() setNotReadonly(); /// Start queue processing - storage.part_check_thread.start(); storage.background_operations_assignee.start(); storage.queue_updating_task->activateAndSchedule(); storage.mutations_updating_task->activateAndSchedule(); storage.mutations_finalizing_task->activateAndSchedule(); storage.merge_selecting_task->activateAndSchedule(); storage.cleanup_thread.start(); + storage.part_check_thread.start(); return true; } @@ -356,6 +356,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shut storage.mutations_finalizing_task->deactivate(); storage.cleanup_thread.stop(); + storage.part_check_thread.stop(); /// Stop queue processing { @@ -365,9 +366,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shut storage.background_operations_assignee.finish(); } - /// Stop part_check_thread after queue processing, because some queue tasks may restart part_check_thread - storage.part_check_thread.stop(); - LOG_TRACE(log, "Threads finished"); } diff --git a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh index dc01ce40398..5ccef802c0c 100755 --- a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh +++ b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh @@ -61,6 +61,7 @@ function thread6() done } + # https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout export -f thread1; export -f thread2; diff --git a/tests/queries/0_stateless/01154_move_partition_long.sh b/tests/queries/0_stateless/01154_move_partition_long.sh index 7cefac28e22..a8f12d6afbd 100755 --- a/tests/queries/0_stateless/01154_move_partition_long.sh +++ b/tests/queries/0_stateless/01154_move_partition_long.sh @@ -9,6 +9,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=./replication.lib . "$CURDIR"/replication.lib + declare -A engines engines[0]="MergeTree" engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src', '{replica}_' || toString(randConstant()))"