diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 8a9dbceba04..6325b1adca4 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -646,7 +646,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C } } - if (some_active_mutations_were_killed) + if (some_active_mutations_were_killed && storage.queue_task_handle) storage.queue_task_handle->signalReadyToRun(); if (!entries_to_load.empty()) @@ -759,7 +759,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name); } - if (mutation_was_active) + if (mutation_was_active && storage.queue_task_handle) storage.queue_task_handle->signalReadyToRun(); return entry; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d8e45b97438..7cb4a149ec5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5660,9 +5660,16 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI /// Let's fetch new log entries firstly queue.pullLogsToQueue(getZooKeeper()); - /// This is significant, because the execution of this task could be delayed at BackgroundPool. - /// And we force it to be executed. - queue_task_handle->signalReadyToRun(); + + { + auto lock = queue.lockQueue(); + if (!queue_task_handle) + return false; + + /// This is significant, because the execution of this task could be delayed at BackgroundPool. + /// And we force it to be executed. + queue_task_handle->signalReadyToRun(); + } Poco::Event target_size_event; auto callback = [&target_size_event, queue_size] (size_t new_queue_size) diff --git a/tests/queries/0_stateless/01320_create_sync_race_condition.reference b/tests/queries/0_stateless/01320_create_sync_race_condition.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01320_create_sync_race_condition.sh b/tests/queries/0_stateless/01320_create_sync_race_condition.sh new file mode 100755 index 00000000000..2e42033644a --- /dev/null +++ b/tests/queries/0_stateless/01320_create_sync_race_condition.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +set -e + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS r;" + +function thread1() +{ + while true; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r') ORDER BY x; DROP TABLE r;"; done +} + +function thread2() +{ + while true; do $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA r" 2>/dev/null; done +} + +export -f thread1 +export -f thread2 + +timeout 10 bash -c thread1 & +timeout 10 bash -c thread2 & + +wait + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS r;"