diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index fc7f5c2f765..1ba1fc0cb0d 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -871,13 +871,16 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( zookeeper->tryCreate(tries_to_execute_path, "0", zkutil::CreateMode::Persistent); static constexpr int MAX_TRIES_TO_EXECUTE = 3; + static constexpr int MAX_EXECUTION_TIMEOUT_SEC = 3600; String executed_by; zkutil::EventPtr event = std::make_shared(); - if (zookeeper->tryGet(is_executed_path, executed_by, nullptr, event)) + /// We must use exists request instead of get, because zookeeper will not setup event + /// for non existing node after get request + if (zookeeper->exists(is_executed_path, nullptr, event)) { - LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, executed_by); + LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path)); return true; } @@ -885,8 +888,13 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str); + Stopwatch stopwatch; + bool executed_by_leader = false; - while (true) + /// Defensive programming. One hour is more than enough to execute almost all DDL queries. + /// If it will be very long query like ALTER DELETE for a huge table it's still will be executed, + /// but DDL worker can continue processing other queries. + while (stopwatch.elapsedSeconds() <= MAX_EXECUTION_TIMEOUT_SEC) { StorageReplicatedMergeTree::Status status; replicated_storage->getStatus(status); @@ -895,8 +903,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( if (status.is_leader && lock->tryLock()) { /// In replicated merge tree we can have multiple leaders. So we can - /// be "leader", but another "leader" replica may already execute - /// this task. + /// be "leader" and took lock, but another "leader" replica may have + /// already executed this task. if (zookeeper->tryGet(is_executed_path, executed_by)) { LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, executed_by); @@ -904,7 +912,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( break; } - /// Doing it exclusively + /// Checking and incrementing counter exclusively. size_t counter = parse(zookeeper->get(tries_to_execute_path)); if (counter > MAX_TRIES_TO_EXECUTE) break; @@ -923,24 +931,45 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( lock->unlock(); } - + /// Waiting for someone who will execute query and change is_executed_path node if (event->tryWait(std::uniform_int_distribution(0, 1000)(rng))) { LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path)); executed_by_leader = true; break; } - else if (parse(zookeeper->get(tries_to_execute_path)) > MAX_TRIES_TO_EXECUTE) + else { - /// Nobody will try to execute query again - break; + String tries_count; + zookeeper->tryGet(tries_to_execute_path, tries_count); + if (parse(tries_count) > MAX_TRIES_TO_EXECUTE) + { + /// Nobody will try to execute query again + LOG_WARNING(log, "Maximum retries count for task {} exceeded, cannot execute replicated DDL query", task.entry_name); + break; + } + else + { + /// Will try to wait or execute + LOG_TRACE(log, "Task {} still not executed, will try to wait for it or execute ourselves, tries count {}", task.entry_name, tries_count); + } } } /// Not executed by leader so was not executed at all if (!executed_by_leader) { - task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED, "Cannot execute replicated DDL query"); + /// If we failed with timeout + if (stopwatch.elapsedSeconds() >= MAX_EXECUTION_TIMEOUT_SEC) + { + LOG_WARNING(log, "Task {} was not executed by anyone, maximum timeout {} seconds exceeded", task.entry_name, MAX_EXECUTION_TIMEOUT_SEC); + task.execution_status = ExecutionStatus(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot execute replicated DDL query, timeout exceeded"); + } + else /// If we exceeded amount of tries + { + LOG_WARNING(log, "Task {} was not executed by anyone, maximum number of retries exceeded", task.entry_name); + task.execution_status = ExecutionStatus(ErrorCodes::UNFINISHED, "Cannot execute replicated DDL query, maximum retires exceeded"); + } return false; } diff --git a/tests/integration/test_ddl_worker_non_leader/__init__.py b/tests/integration/test_ddl_worker_non_leader/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_ddl_worker_non_leader/configs/remote_servers.xml b/tests/integration/test_ddl_worker_non_leader/configs/remote_servers.xml new file mode 100644 index 00000000000..64239dfdb6c --- /dev/null +++ b/tests/integration/test_ddl_worker_non_leader/configs/remote_servers.xml @@ -0,0 +1,17 @@ + + + + + true + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_ddl_worker_non_leader/test.py b/tests/integration/test_ddl_worker_non_leader/test.py new file mode 100644 index 00000000000..b64f99d5345 --- /dev/null +++ b/tests/integration/test_ddl_worker_non_leader/test.py @@ -0,0 +1,59 @@ +import pytest +import time +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager +from helpers.client import QueryRuntimeException + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True) +node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + finally: + cluster.shutdown() + + +def test_non_leader_replica(started_cluster): + + node1.query('''CREATE TABLE sometable(id UInt32, value String) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '1') ORDER BY tuple()''') + + node2.query('''CREATE TABLE sometable(id UInt32, value String) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '2') ORDER BY tuple() SETTINGS replicated_can_become_leader = 0''') + + node1.query("INSERT INTO sometable SELECT number, toString(number) FROM numbers(100)") + node2.query("SYSTEM SYNC REPLICA sometable", timeout=10) + + assert node1.query("SELECT COUNT() FROM sometable") == "100\n" + assert node2.query("SELECT COUNT() FROM sometable") == "100\n" + + + with PartitionManager() as pm: + pm.drop_instance_zk_connections(node1) + + # this query should be executed by leader, but leader partitioned from zookeeper + with pytest.raises(Exception): + node2.query("ALTER TABLE sometable ON CLUSTER 'test_cluster' MODIFY COLUMN value UInt64 SETTINGS distributed_ddl_task_timeout=5") + + for _ in range(100): + if 'UInt64' in node1.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'"): + break + time.sleep(0.1) + + for _ in range(100): + if 'UInt64' in node2.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'"): + break + time.sleep(0.1) + + assert 'UInt64' in node1.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'") + assert 'UInt64' in node2.query("SELECT type FROM system.columns WHERE name='value' and table = 'sometable'") + + # Checking that DDLWorker doesn't hung and still able to execute DDL queries + node1.query("CREATE TABLE new_table_with_ddl ON CLUSTER 'test_cluster' (key UInt32) ENGINE=MergeTree() ORDER BY tuple()", settings={"distributed_ddl_task_timeout": "10"}) + assert node1.query("EXISTS new_table_with_ddl") == "1\n" + assert node2.query("EXISTS new_table_with_ddl") == "1\n"