Merge pull request #6990 from Akazz/timeout_for_sync_replica_cmd

Fixed timeout mechanism for SYNC REPLICA command + simplified related code
This commit is contained in:
alexey-milovidov 2019-09-20 20:32:51 +03:00 committed by GitHub
commit 123b8cb43c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 19 deletions

View File

@ -38,6 +38,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int CANNOT_KILL;
extern const int NOT_IMPLEMENTED;
extern const int TIMEOUT_EXCEEDED;
}
@ -338,7 +339,17 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
StoragePtr table = context.getTable(database_name, table_name);
if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds());
{
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
if (!storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.totalMilliseconds()))
{
LOG_ERROR(log, "SYNC REPLICA " + database_name + "." + table_name + ": Timed out!");
throw Exception(
"SYNC REPLICA " + database_name + "." + table_name + ": command timed out! "
"See the 'receive_timeout' setting", ErrorCodes::TIMEOUT_EXCEEDED);
}
LOG_TRACE(log, "SYNC REPLICA " + database_name + "." + table_name + ": OK");
}
else
throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -5110,38 +5110,29 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
{
Stopwatch watch;
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeper());
Stopwatch watch;
Poco::Event event;
std::atomic<bool> cond_reached{false};
auto callback = [&event, &cond_reached, queue_size] (size_t new_queue_size)
Poco::Event target_size_event;
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
{
if (new_queue_size <= queue_size)
cond_reached.store(true, std::memory_order_relaxed);
event.set();
target_size_event.set();
};
const auto handler = queue.addSubscriber(std::move(callback));
auto handler = queue.addSubscriber(std::move(callback));
while (true)
while (!target_size_event.tryWait(50))
{
event.tryWait(50);
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
break;
if (cond_reached)
break;
return false;
if (partial_shutdown_called)
throw Exception("Shutdown is called for table", ErrorCodes::ABORTED);
}
return cond_reached.load(std::memory_order_relaxed);
return true;
}

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
R1=table_1013_1
R2=table_1013_2
${CLICKHOUSE_CLIENT} -n -q "
DROP TABLE IF EXISTS $R1;
DROP TABLE IF EXISTS $R2;
CREATE TABLE $R1 (x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/${CLICKHOUSE_DATABASE}.table_1013', 'r1') ORDER BY x;
CREATE TABLE $R2 (x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/${CLICKHOUSE_DATABASE}.table_1013', 'r2') ORDER BY x;
SYSTEM STOP FETCHES $R2;
INSERT INTO $R1 VALUES (1)
"
timeout 10s ${CLICKHOUSE_CLIENT} -n -q "
SET receive_timeout=1;
SYSTEM SYNC REPLICA $R2
" 2>&1 | fgrep -q "DB::Exception: SYNC REPLICA ${CLICKHOUSE_DATABASE}.$R2: command timed out!" && echo 'OK' || echo 'Failed!'
# By dropping tables all related SYNC REPLICA queries would be terminated as well
${CLICKHOUSE_CLIENT} -n -q "
DROP TABLE IF EXISTS $R2;
DROP TABLE IF EXISTS $R1;
"