From 84489b82433783f32572f8150dbd8ef4d1959acd Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 8 Feb 2021 21:19:32 +0000 Subject: [PATCH] Improve replicated fetches timeouts test and make it 3x faster --- .../configs/merge_tree.xml | 6 -- .../test.py | 88 ----------------- .../__init__.py | 0 .../configs/server.xml | 3 + .../test_replicated_fetches_timeouts/test.py | 95 +++++++++++++++++++ 5 files changed, 98 insertions(+), 94 deletions(-) delete mode 100644 tests/integration/test_replicated_fetches_network_partition/configs/merge_tree.xml delete mode 100644 tests/integration/test_replicated_fetches_network_partition/test.py rename tests/integration/{test_replicated_fetches_network_partition => test_replicated_fetches_timeouts}/__init__.py (100%) create mode 100644 tests/integration/test_replicated_fetches_timeouts/configs/server.xml create mode 100644 tests/integration/test_replicated_fetches_timeouts/test.py diff --git a/tests/integration/test_replicated_fetches_network_partition/configs/merge_tree.xml b/tests/integration/test_replicated_fetches_network_partition/configs/merge_tree.xml deleted file mode 100644 index eba2c5e8ffc..00000000000 --- a/tests/integration/test_replicated_fetches_network_partition/configs/merge_tree.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - 30 - 1 - - diff --git a/tests/integration/test_replicated_fetches_network_partition/test.py b/tests/integration/test_replicated_fetches_network_partition/test.py deleted file mode 100644 index b0b5534cf1f..00000000000 --- a/tests/integration/test_replicated_fetches_network_partition/test.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 - -import pytest -import time -from helpers.cluster import ClickHouseCluster -from helpers.network import PartitionManager -import random -import string - -cluster = ClickHouseCluster(__file__) -node1 = cluster.add_instance('node1', with_zookeeper=True) -node2 = cluster.add_instance('node2', with_zookeeper=True) -node3 = cluster.add_instance('node3', with_zookeeper=True, main_configs=['configs/merge_tree.xml']) - - -@pytest.fixture(scope="module") -def started_cluster(): - try: - cluster.start() - - yield cluster - - finally: - cluster.shutdown() - - -def get_random_string(length): - return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length)) - - -def test_no_stall(started_cluster): - node1.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple() PARTITION BY key") - node2.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple() PARTITION BY key") - node3.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '3') ORDER BY tuple() PARTITION BY key") - - node1.query("SYSTEM STOP MERGES") - node2.query("SYSTEM STOP MERGES") - node3.query("SYSTEM STOP MERGES") - - # Pause node3 until the test setup is prepared - node3.query("SYSTEM STOP FETCHES t") - - node1.query("INSERT INTO t SELECT 1, '{}' FROM numbers(5000)".format(get_random_string(104857))) - node1.query("INSERT INTO t SELECT 2, '{}' FROM numbers(5000)".format(get_random_string(104857))) - node1.query("INSERT INTO t SELECT 3, '{}' FROM numbers(5000)".format(get_random_string(104857))) - node1.query("INSERT INTO t SELECT 4, '{}' FROM numbers(5000)".format(get_random_string(104857))) - node1.query("INSERT INTO t SELECT 5, '{}' FROM numbers(5000)".format(get_random_string(104857))) - - # Make sure node2 has all the parts. - node2.query("SYSTEM SYNC REPLICA t") - - # Do not allow sending from replica 2 yet, force node3 to initiate replication from node1. - node2.query("SYSTEM STOP REPLICATED SENDS") - - print("replica 2 fully synced") - - with PartitionManager() as pm: - # Make node1 very slow, node3 should replicate from node2 instead. - pm.add_network_delay(node1, 2000) - - # node3 starts to replicate from node 1 - node3.query("SYSTEM START FETCHES t") - - # Wait some time to give a chance for node3 to try replicating without success from node1. - time.sleep(10) - - # Wait for replication... - node2.query("SYSTEM START REPLICATED SENDS") - - for _ in range(1000): - print('Currently running fetches:') - print(node3.query("SELECT result_part_name, source_replica_hostname, progress FROM system.replicated_fetches").strip()) - print() - - parts_fetched = node3.query("SELECT count() FROM system.parts WHERE table = 't'").strip() - print('parts_fetched:', parts_fetched) - print() - - # Replication done. - if parts_fetched == "5": - break - - time.sleep(3) - - for n in [node1, node2, node3]: - # Workaround for drop not finishing if it is started while table is readonly. - n.query("SYSTEM RESTART REPLICA t") - n.query("DROP TABLE t SYNC") diff --git a/tests/integration/test_replicated_fetches_network_partition/__init__.py b/tests/integration/test_replicated_fetches_timeouts/__init__.py similarity index 100% rename from tests/integration/test_replicated_fetches_network_partition/__init__.py rename to tests/integration/test_replicated_fetches_timeouts/__init__.py diff --git a/tests/integration/test_replicated_fetches_timeouts/configs/server.xml b/tests/integration/test_replicated_fetches_timeouts/configs/server.xml new file mode 100644 index 00000000000..d4b441b91fb --- /dev/null +++ b/tests/integration/test_replicated_fetches_timeouts/configs/server.xml @@ -0,0 +1,3 @@ + + 0.1 + diff --git a/tests/integration/test_replicated_fetches_timeouts/test.py b/tests/integration/test_replicated_fetches_timeouts/test.py new file mode 100644 index 00000000000..963ec2487fd --- /dev/null +++ b/tests/integration/test_replicated_fetches_timeouts/test.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 + +import random +import string +import time + +import pytest +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance( + 'node1', with_zookeeper=True, + main_configs=['configs/server.xml']) + +node2 = cluster.add_instance( + 'node2', with_zookeeper=True, + main_configs=['configs/server.xml']) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + + +def get_random_string(length): + return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length)) + + +def test_no_stall(started_cluster): + for instance in started_cluster.instances.values(): + instance.query(""" + CREATE TABLE t (key UInt64, data String) + ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '{instance}') + ORDER BY tuple() + PARTITION BY key""") + + # Pause node3 until the test setup is prepared + node2.query("SYSTEM STOP FETCHES t") + + node1.query("INSERT INTO t SELECT 1, '{}' FROM numbers(500)".format(get_random_string(104857))) + node1.query("INSERT INTO t SELECT 2, '{}' FROM numbers(500)".format(get_random_string(104857))) + + with PartitionManager() as pm: + pm.add_network_delay(node1, 2000) + node2.query("SYSTEM START FETCHES t") + + # Wait for timeout exceptions to confirm that timeout is triggered. + while True: + conn_timeout_exceptions = int(node2.query( + """ + SELECT count() + FROM system.replication_queue + WHERE last_exception LIKE '%connect timed out%' + """)) + + if conn_timeout_exceptions >= 2: + break + + time.sleep(0.1) + + print("Connection timeouts tested!") + + # Increase connection timeout and wait for receive timeouts. + node2.query(""" + ALTER TABLE t + MODIFY SETTING replicated_fetches_http_connection_timeout = 30, + replicated_fetches_http_receive_timeout = 1""") + + while True: + timeout_exceptions = int(node2.query( + """ + SELECT count() + FROM system.replication_queue + WHERE last_exception LIKE '%e.displayText() = Timeout%' + AND last_exception NOT LIKE '%connect timed out%' + """).strip()) + + if timeout_exceptions >= 2: + break + + time.sleep(0.1) + + for instance in started_cluster.instances.values(): + # Workaround for DROP TABLE not finishing if it is started while table is readonly. + instance.query("SYSTEM RESTART REPLICA t") + + # Cleanup data directory from test results archive. + instance.query("DROP TABLE t SYNC")