Merge pull request #17089 from ClickHouse/fix_ddl_worker_non_leader

Fix ON CLUSTER queries hung for non leader replicas
This commit is contained in:
alesapin 2020-11-18 10:21:35 +03:00 committed by GitHub
commit 0bc60e2d53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 11 deletions

View File

@ -871,13 +871,16 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
zookeeper->tryCreate(tries_to_execute_path, "0", zkutil::CreateMode::Persistent);
static constexpr int MAX_TRIES_TO_EXECUTE = 3;
static constexpr int MAX_EXECUTION_TIMEOUT_SEC = 3600;
String executed_by;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
if (zookeeper->tryGet(is_executed_path, executed_by, nullptr, event))
/// We must use exists request instead of get, because zookeeper will not setup event
/// for non existing node after get request
if (zookeeper->exists(is_executed_path, nullptr, event))
{
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, executed_by);
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
return true;
}
@ -885,8 +888,13 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
Stopwatch stopwatch;
bool executed_by_leader = false;
while (true)
/// Defensive programming. One hour is more than enough to execute almost all DDL queries.
/// If it will be very long query like ALTER DELETE for a huge table it's still will be executed,
/// but DDL worker can continue processing other queries.
while (stopwatch.elapsedSeconds() <= MAX_EXECUTION_TIMEOUT_SEC)
{
StorageReplicatedMergeTree::Status status;
replicated_storage->getStatus(status);
@ -895,8 +903,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
if (status.is_leader && lock->tryLock())
{
/// In replicated merge tree we can have multiple leaders. So we can
/// be "leader", but another "leader" replica may already execute
/// this task.
/// be "leader" and took lock, but another "leader" replica may have
/// already executed this task.
if (zookeeper->tryGet(is_executed_path, executed_by))
{
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, executed_by);
@ -904,7 +912,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
break;
}
/// Doing it exclusively
/// Checking and incrementing counter exclusively.
size_t counter = parse<int>(zookeeper->get(tries_to_execute_path));
if (counter > MAX_TRIES_TO_EXECUTE)
break;
@ -923,24 +931,45 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
lock->unlock();
}
/// Waiting for someone who will execute query and change is_executed_path node
if (event->tryWait(std::uniform_int_distribution<int>(0, 1000)(rng)))
{
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
executed_by_leader = true;
break;
}
else if (parse<int>(zookeeper->get(tries_to_execute_path)) > MAX_TRIES_TO_EXECUTE)
else
{
String tries_count;
zookeeper->tryGet(tries_to_execute_path, tries_count);
if (parse<int>(tries_count) > MAX_TRIES_TO_EXECUTE)
{
/// Nobody will try to execute query again
LOG_WARNING(log, "Maximum retries count for task {} exceeded, cannot execute replicated DDL query", task.entry_name);
break;
}
else
{
/// Will try to wait or execute
LOG_TRACE(log, "Task {} still not executed, will try to wait for it or execute ourselves, tries count {}", task.entry_name, tries_count);
}
}
}
/// Not executed by leader so was not executed at all
if (!executed_by_leader)
{
task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED, "Cannot execute replicated DDL query");
/// If we failed with timeout
if (stopwatch.elapsedSeconds() >= MAX_EXECUTION_TIMEOUT_SEC)
{
LOG_WARNING(log, "Task {} was not executed by anyone, maximum timeout {} seconds exceeded", task.entry_name, MAX_EXECUTION_TIMEOUT_SEC);
task.execution_status = ExecutionStatus(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot execute replicated DDL query, timeout exceeded");
}
else /// If we exceeded amount of tries
{
LOG_WARNING(log, "Task {} was not executed by anyone, maximum number of retries exceeded", task.entry_name);
task.execution_status = ExecutionStatus(ErrorCodes::UNFINISHED, "Cannot execute replicated DDL query, maximum retires exceeded");
}
return false;
}

View File

@ -0,0 +1,17 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,59 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_non_leader_replica(started_cluster):
node1.query('''CREATE TABLE sometable(id UInt32, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '1') ORDER BY tuple()''')
node2.query('''CREATE TABLE sometable(id UInt32, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '2') ORDER BY tuple() SETTINGS replicated_can_become_leader = 0''')
node1.query("INSERT INTO sometable SELECT number, toString(number) FROM numbers(100)")
node2.query("SYSTEM SYNC REPLICA sometable", timeout=10)
assert node1.query("SELECT COUNT() FROM sometable") == "100\n"
assert node2.query("SELECT COUNT() FROM sometable") == "100\n"
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node1)
# this query should be executed by leader, but leader partitioned from zookeeper
with pytest.raises(Exception):
node2.query("ALTER TABLE sometable ON CLUSTER 'test_cluster' MODIFY COLUMN value UInt64 SETTINGS distributed_ddl_task_timeout=5")
for _ in range(100):
if 'UInt64' in node1.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'"):
break
time.sleep(0.1)
for _ in range(100):
if 'UInt64' in node2.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'"):
break
time.sleep(0.1)
assert 'UInt64' in node1.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'")
assert 'UInt64' in node2.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'")
# Checking that DDLWorker doesn't hung and still able to execute DDL queries
node1.query("CREATE TABLE new_table_with_ddl ON CLUSTER 'test_cluster' (key UInt32) ENGINE=MergeTree() ORDER BY tuple()", settings={"distributed_ddl_task_timeout": "10"})
assert node1.query("EXISTS new_table_with_ddl") == "1\n"
assert node2.query("EXISTS new_table_with_ddl") == "1\n"