From 599e63425abe31c42cdf57b176f881db6f1f9e49 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Fri, 20 Sep 2019 20:32:51 +0300 Subject: [PATCH] Merge pull request #6990 from Akazz/timeout_for_sync_replica_cmd Fixed timeout mechanism for SYNC REPLICA command + simplified related code (cherry picked from commit 123b8cb43ccb3342805db79764dc03766289d904) --- .../Interpreters/InterpreterSystemQuery.cpp | 13 +++++++- .../Storages/StorageReplicatedMergeTree.cpp | 27 ++++++----------- ...3_sync_replica_timeout_zookeeper.reference | 1 + .../01013_sync_replica_timeout_zookeeper.sh | 30 +++++++++++++++++++ 4 files changed, 52 insertions(+), 19 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.reference create mode 100755 dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.sh diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp index 6e434189c66..b6ed4dcb2e6 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSystemQuery.cpp @@ -38,6 +38,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int CANNOT_KILL; extern const int NOT_IMPLEMENTED; + extern const int TIMEOUT_EXCEEDED; } @@ -331,7 +332,17 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query) StoragePtr table = context.getTable(database_name, table_name); if (auto storage_replicated = dynamic_cast(table.get())) - storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds()); + { + LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty"); + if (!storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.totalMilliseconds())) + { + LOG_ERROR(log, "SYNC REPLICA " + database_name + "." + table_name + ": Timed out!"); + throw Exception( + "SYNC REPLICA " + database_name + "." + table_name + ": command timed out! " + "See the 'receive_timeout' setting", ErrorCodes::TIMEOUT_EXCEEDED); + } + LOG_TRACE(log, "SYNC REPLICA " + database_name + "." + table_name + ": OK"); + } else throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index e5821c1bcaf..64e011df99c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -5126,38 +5126,29 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds) { + Stopwatch watch; + /// Let's fetch new log entries firstly queue.pullLogsToQueue(getZooKeeper()); - Stopwatch watch; - Poco::Event event; - std::atomic cond_reached{false}; - - auto callback = [&event, &cond_reached, queue_size] (size_t new_queue_size) + Poco::Event target_size_event; + auto callback = [&target_size_event, queue_size] (size_t new_queue_size) { if (new_queue_size <= queue_size) - cond_reached.store(true, std::memory_order_relaxed); - - event.set(); + target_size_event.set(); }; + const auto handler = queue.addSubscriber(std::move(callback)); - auto handler = queue.addSubscriber(std::move(callback)); - - while (true) + while (!target_size_event.tryWait(50)) { - event.tryWait(50); - if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds) - break; - - if (cond_reached) - break; + return false; if (partial_shutdown_called) throw Exception("Shutdown is called for table", ErrorCodes::ABORTED); } - return cond_reached.load(std::memory_order_relaxed); + return true; } diff --git a/dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.reference b/dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.reference new file mode 100644 index 00000000000..d86bac9de59 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.reference @@ -0,0 +1 @@ +OK diff --git a/dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.sh b/dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.sh new file mode 100755 index 00000000000..9e846b42591 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + + +R1=table_1013_1 +R2=table_1013_2 + +${CLICKHOUSE_CLIENT} -n -q " + DROP TABLE IF EXISTS $R1; + DROP TABLE IF EXISTS $R2; + + CREATE TABLE $R1 (x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/${CLICKHOUSE_DATABASE}.table_1013', 'r1') ORDER BY x; + CREATE TABLE $R2 (x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/${CLICKHOUSE_DATABASE}.table_1013', 'r2') ORDER BY x; + + SYSTEM STOP FETCHES $R2; + INSERT INTO $R1 VALUES (1) +" + +timeout 10s ${CLICKHOUSE_CLIENT} -n -q " + SET receive_timeout=1; + SYSTEM SYNC REPLICA $R2 +" 2>&1 | fgrep -q "DB::Exception: SYNC REPLICA ${CLICKHOUSE_DATABASE}.$R2: command timed out!" && echo 'OK' || echo 'Failed!' + +# By dropping tables all related SYNC REPLICA queries would be terminated as well +${CLICKHOUSE_CLIENT} -n -q " + DROP TABLE IF EXISTS $R2; + DROP TABLE IF EXISTS $R1; +"