mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 12:22:12 +00:00
Merge pull request #6990 from Akazz/timeout_for_sync_replica_cmd
Fixed timeout mechanism for SYNC REPLICA command + simplified related code
(cherry picked from commit 123b8cb43c
)
This commit is contained in:
parent
9690fbec09
commit
599e63425a
@ -38,6 +38,7 @@ namespace ErrorCodes
|
|||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int CANNOT_KILL;
|
extern const int CANNOT_KILL;
|
||||||
extern const int NOT_IMPLEMENTED;
|
extern const int NOT_IMPLEMENTED;
|
||||||
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -331,7 +332,17 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
|
|||||||
StoragePtr table = context.getTable(database_name, table_name);
|
StoragePtr table = context.getTable(database_name, table_name);
|
||||||
|
|
||||||
if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
|
if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
|
||||||
storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds());
|
{
|
||||||
|
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
|
||||||
|
if (!storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.totalMilliseconds()))
|
||||||
|
{
|
||||||
|
LOG_ERROR(log, "SYNC REPLICA " + database_name + "." + table_name + ": Timed out!");
|
||||||
|
throw Exception(
|
||||||
|
"SYNC REPLICA " + database_name + "." + table_name + ": command timed out! "
|
||||||
|
"See the 'receive_timeout' setting", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
}
|
||||||
|
LOG_TRACE(log, "SYNC REPLICA " + database_name + "." + table_name + ": OK");
|
||||||
|
}
|
||||||
else
|
else
|
||||||
throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
|
throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
|
||||||
}
|
}
|
||||||
|
@ -5126,38 +5126,29 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
|
|||||||
|
|
||||||
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
|
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
|
||||||
{
|
{
|
||||||
|
Stopwatch watch;
|
||||||
|
|
||||||
/// Let's fetch new log entries firstly
|
/// Let's fetch new log entries firstly
|
||||||
queue.pullLogsToQueue(getZooKeeper());
|
queue.pullLogsToQueue(getZooKeeper());
|
||||||
|
|
||||||
Stopwatch watch;
|
Poco::Event target_size_event;
|
||||||
Poco::Event event;
|
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
|
||||||
std::atomic<bool> cond_reached{false};
|
|
||||||
|
|
||||||
auto callback = [&event, &cond_reached, queue_size] (size_t new_queue_size)
|
|
||||||
{
|
{
|
||||||
if (new_queue_size <= queue_size)
|
if (new_queue_size <= queue_size)
|
||||||
cond_reached.store(true, std::memory_order_relaxed);
|
target_size_event.set();
|
||||||
|
|
||||||
event.set();
|
|
||||||
};
|
};
|
||||||
|
const auto handler = queue.addSubscriber(std::move(callback));
|
||||||
|
|
||||||
auto handler = queue.addSubscriber(std::move(callback));
|
while (!target_size_event.tryWait(50))
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
{
|
||||||
event.tryWait(50);
|
|
||||||
|
|
||||||
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
|
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
|
||||||
break;
|
return false;
|
||||||
|
|
||||||
if (cond_reached)
|
|
||||||
break;
|
|
||||||
|
|
||||||
if (partial_shutdown_called)
|
if (partial_shutdown_called)
|
||||||
throw Exception("Shutdown is called for table", ErrorCodes::ABORTED);
|
throw Exception("Shutdown is called for table", ErrorCodes::ABORTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
return cond_reached.load(std::memory_order_relaxed);
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
OK
|
30
dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.sh
Executable file
30
dbms/tests/queries/0_stateless/01013_sync_replica_timeout_zookeeper.sh
Executable file
@ -0,0 +1,30 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CURDIR/../shell_config.sh
|
||||||
|
|
||||||
|
|
||||||
|
R1=table_1013_1
|
||||||
|
R2=table_1013_2
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n -q "
|
||||||
|
DROP TABLE IF EXISTS $R1;
|
||||||
|
DROP TABLE IF EXISTS $R2;
|
||||||
|
|
||||||
|
CREATE TABLE $R1 (x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/${CLICKHOUSE_DATABASE}.table_1013', 'r1') ORDER BY x;
|
||||||
|
CREATE TABLE $R2 (x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/${CLICKHOUSE_DATABASE}.table_1013', 'r2') ORDER BY x;
|
||||||
|
|
||||||
|
SYSTEM STOP FETCHES $R2;
|
||||||
|
INSERT INTO $R1 VALUES (1)
|
||||||
|
"
|
||||||
|
|
||||||
|
timeout 10s ${CLICKHOUSE_CLIENT} -n -q "
|
||||||
|
SET receive_timeout=1;
|
||||||
|
SYSTEM SYNC REPLICA $R2
|
||||||
|
" 2>&1 | fgrep -q "DB::Exception: SYNC REPLICA ${CLICKHOUSE_DATABASE}.$R2: command timed out!" && echo 'OK' || echo 'Failed!'
|
||||||
|
|
||||||
|
# By dropping tables all related SYNC REPLICA queries would be terminated as well
|
||||||
|
${CLICKHOUSE_CLIENT} -n -q "
|
||||||
|
DROP TABLE IF EXISTS $R2;
|
||||||
|
DROP TABLE IF EXISTS $R1;
|
||||||
|
"
|
Loading…
Reference in New Issue
Block a user