from contextlib import contextmanager import pytest from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml']) node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml']) @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() for node in (node1, node2): node.query('''CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id;''') node1.query('''CREATE TABLE distributed_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table, id);''') yield cluster finally: cluster.shutdown() def test_start_and_stop_replica_send(started_cluster): node1.query("SYSTEM STOP DISTRIBUTED SENDS distributed_table;") node1.query("INSERT INTO distributed_table VALUES (0, 'node1')") node1.query("INSERT INTO distributed_table VALUES (1, 'node2')") # Write only to this node when stop distributed sends assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '1' node1.query("SYSTEM START DISTRIBUTED SENDS distributed_table;") node1.query("SYSTEM FLUSH DISTRIBUTED distributed_table;") assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '2'