diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 73ea2098c71..c0749818ebf 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1549,6 +1549,10 @@ std::vector ReplicatedMergeTreeQueue::getMutationsStatu return result; } +ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue() +{ + return QueueLocks(state_mutex, pull_logs_to_queue_mutex, update_mutations_mutex); +} ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 22d198b9f19..534978873c2 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -384,6 +384,15 @@ public: std::vector getMutationsStatus() const; void removeCurrentPartsFromMutations(); + + using QueueLocks = std::scoped_lock; + + /// This method locks all important queue mutexes: state_mutex, + /// pull_logs_to_queue and update_mutations_mutex. It should be used only + /// once while we want to shutdown our queue and remove it's task from pool. + /// It's needed because queue itself can trigger it's task handler and in + /// this case race condition is possible. + QueueLocks lockQueue(); }; class ReplicatedMergeTreeMergePredicate diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 37b428e1e47..cd82a865827 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -334,7 +334,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.partial_shutdown_called = true; storage.partial_shutdown_event.set(); - storage.alter_query_event->set(); storage.replica_is_active_node = nullptr; LOG_TRACE(log, "Waiting for threads to finish"); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 08fa596e382..386e232cc60 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2882,7 +2882,14 @@ void StorageReplicatedMergeTree::shutdown() if (queue_task_handle) global_context.getBackgroundPool().removeTask(queue_task_handle); - queue_task_handle.reset(); + + { + /// Queue can trigger queue_task_handle itself. So we ensure that all + /// queue processes finished and after that reset queue_task_handle. + auto lock = queue.lockQueue(); + queue_task_handle.reset(); + } + if (move_parts_task_handle) global_context.getBackgroundMovePool().removeTask(move_parts_task_handle); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index f6483baf353..1b92310b39d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -284,9 +284,6 @@ private: /// A thread that processes reconnection to ZooKeeper when the session expires. ReplicatedMergeTreeRestartingThread restarting_thread; - /// An event that awakens `alter` method from waiting for the completion of the ALTER query. - zkutil::EventPtr alter_query_event = std::make_shared(); - /// True if replica was created for existing table with fixed granularity bool other_replicas_fixed_granularity = false; diff --git a/dbms/tests/queries/0_stateless/01103_optimize_drop_race_zookeeper.reference b/dbms/tests/queries/0_stateless/01103_optimize_drop_race_zookeeper.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/01103_optimize_drop_race_zookeeper.sh b/dbms/tests/queries/0_stateless/01103_optimize_drop_race_zookeeper.sh new file mode 100755 index 00000000000..82df41d6223 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01103_optimize_drop_race_zookeeper.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +set -e + +function thread1() +{ + while true; do + $CLICKHOUSE_CLIENT -q "INSERT INTO concurrent_optimize_table SELECT rand(1), rand(2), 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(10000)"; + done +} + + +function thread2() +{ + while true; do + $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE concurrent_optimize_table FINAL"; + sleep 0.$RANDOM; + done +} + +function thread3() +{ + while true; do + $CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS concurrent_optimize_table; + CREATE TABLE concurrent_optimize_table (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_optimize_table', '1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0;"; + sleep 0.$RANDOM; + sleep 0.$RANDOM; + sleep 0.$RANDOM; + done +} + + +export -f thread1; +export -f thread2; +export -f thread3; + + +TIMEOUT=15 + +timeout $TIMEOUT bash -c thread1 2> /dev/null & +timeout $TIMEOUT bash -c thread2 2> /dev/null & +timeout $TIMEOUT bash -c thread3 2> /dev/null & + +timeout $TIMEOUT bash -c thread1 2> /dev/null & +timeout $TIMEOUT bash -c thread2 2> /dev/null & +timeout $TIMEOUT bash -c thread3 2> /dev/null & + +timeout $TIMEOUT bash -c thread1 2> /dev/null & +timeout $TIMEOUT bash -c thread2 2> /dev/null & +timeout $TIMEOUT bash -c thread3 2> /dev/null & + +timeout $TIMEOUT bash -c thread1 2> /dev/null & +timeout $TIMEOUT bash -c thread2 2> /dev/null & +timeout $TIMEOUT bash -c thread3 2> /dev/null & + +wait + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_optimize_table"