mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Improve replicated fetches timeouts test and make it 3x faster
This commit is contained in:
parent
6181daf92d
commit
84489b8243
@ -1,6 +0,0 @@
|
||||
<yandex>
|
||||
<merge_tree>
|
||||
<replicated_fetches_http_connection_timeout>30</replicated_fetches_http_connection_timeout>
|
||||
<replicated_fetches_http_receive_timeout>1</replicated_fetches_http_receive_timeout>
|
||||
</merge_tree>
|
||||
</yandex>
|
@ -1,88 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
import time
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
import random
|
||||
import string
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance('node1', with_zookeeper=True)
|
||||
node2 = cluster.add_instance('node2', with_zookeeper=True)
|
||||
node3 = cluster.add_instance('node3', with_zookeeper=True, main_configs=['configs/merge_tree.xml'])
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_random_string(length):
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
|
||||
|
||||
|
||||
def test_no_stall(started_cluster):
|
||||
node1.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple() PARTITION BY key")
|
||||
node2.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple() PARTITION BY key")
|
||||
node3.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '3') ORDER BY tuple() PARTITION BY key")
|
||||
|
||||
node1.query("SYSTEM STOP MERGES")
|
||||
node2.query("SYSTEM STOP MERGES")
|
||||
node3.query("SYSTEM STOP MERGES")
|
||||
|
||||
# Pause node3 until the test setup is prepared
|
||||
node3.query("SYSTEM STOP FETCHES t")
|
||||
|
||||
node1.query("INSERT INTO t SELECT 1, '{}' FROM numbers(5000)".format(get_random_string(104857)))
|
||||
node1.query("INSERT INTO t SELECT 2, '{}' FROM numbers(5000)".format(get_random_string(104857)))
|
||||
node1.query("INSERT INTO t SELECT 3, '{}' FROM numbers(5000)".format(get_random_string(104857)))
|
||||
node1.query("INSERT INTO t SELECT 4, '{}' FROM numbers(5000)".format(get_random_string(104857)))
|
||||
node1.query("INSERT INTO t SELECT 5, '{}' FROM numbers(5000)".format(get_random_string(104857)))
|
||||
|
||||
# Make sure node2 has all the parts.
|
||||
node2.query("SYSTEM SYNC REPLICA t")
|
||||
|
||||
# Do not allow sending from replica 2 yet, force node3 to initiate replication from node1.
|
||||
node2.query("SYSTEM STOP REPLICATED SENDS")
|
||||
|
||||
print("replica 2 fully synced")
|
||||
|
||||
with PartitionManager() as pm:
|
||||
# Make node1 very slow, node3 should replicate from node2 instead.
|
||||
pm.add_network_delay(node1, 2000)
|
||||
|
||||
# node3 starts to replicate from node 1
|
||||
node3.query("SYSTEM START FETCHES t")
|
||||
|
||||
# Wait some time to give a chance for node3 to try replicating without success from node1.
|
||||
time.sleep(10)
|
||||
|
||||
# Wait for replication...
|
||||
node2.query("SYSTEM START REPLICATED SENDS")
|
||||
|
||||
for _ in range(1000):
|
||||
print('Currently running fetches:')
|
||||
print(node3.query("SELECT result_part_name, source_replica_hostname, progress FROM system.replicated_fetches").strip())
|
||||
print()
|
||||
|
||||
parts_fetched = node3.query("SELECT count() FROM system.parts WHERE table = 't'").strip()
|
||||
print('parts_fetched:', parts_fetched)
|
||||
print()
|
||||
|
||||
# Replication done.
|
||||
if parts_fetched == "5":
|
||||
break
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
for n in [node1, node2, node3]:
|
||||
# Workaround for drop not finishing if it is started while table is readonly.
|
||||
n.query("SYSTEM RESTART REPLICA t")
|
||||
n.query("DROP TABLE t SYNC")
|
@ -0,0 +1,3 @@
|
||||
<yandex>
|
||||
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.1</background_processing_pool_task_sleep_seconds_when_no_work_max>
|
||||
</yandex>
|
95
tests/integration/test_replicated_fetches_timeouts/test.py
Normal file
95
tests/integration/test_replicated_fetches_timeouts/test.py
Normal file
@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
'node1', with_zookeeper=True,
|
||||
main_configs=['configs/server.xml'])
|
||||
|
||||
node2 = cluster.add_instance(
|
||||
'node2', with_zookeeper=True,
|
||||
main_configs=['configs/server.xml'])
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_random_string(length):
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
|
||||
|
||||
|
||||
def test_no_stall(started_cluster):
|
||||
for instance in started_cluster.instances.values():
|
||||
instance.query("""
|
||||
CREATE TABLE t (key UInt64, data String)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '{instance}')
|
||||
ORDER BY tuple()
|
||||
PARTITION BY key""")
|
||||
|
||||
# Pause node3 until the test setup is prepared
|
||||
node2.query("SYSTEM STOP FETCHES t")
|
||||
|
||||
node1.query("INSERT INTO t SELECT 1, '{}' FROM numbers(500)".format(get_random_string(104857)))
|
||||
node1.query("INSERT INTO t SELECT 2, '{}' FROM numbers(500)".format(get_random_string(104857)))
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm.add_network_delay(node1, 2000)
|
||||
node2.query("SYSTEM START FETCHES t")
|
||||
|
||||
# Wait for timeout exceptions to confirm that timeout is triggered.
|
||||
while True:
|
||||
conn_timeout_exceptions = int(node2.query(
|
||||
"""
|
||||
SELECT count()
|
||||
FROM system.replication_queue
|
||||
WHERE last_exception LIKE '%connect timed out%'
|
||||
"""))
|
||||
|
||||
if conn_timeout_exceptions >= 2:
|
||||
break
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
print("Connection timeouts tested!")
|
||||
|
||||
# Increase connection timeout and wait for receive timeouts.
|
||||
node2.query("""
|
||||
ALTER TABLE t
|
||||
MODIFY SETTING replicated_fetches_http_connection_timeout = 30,
|
||||
replicated_fetches_http_receive_timeout = 1""")
|
||||
|
||||
while True:
|
||||
timeout_exceptions = int(node2.query(
|
||||
"""
|
||||
SELECT count()
|
||||
FROM system.replication_queue
|
||||
WHERE last_exception LIKE '%e.displayText() = Timeout%'
|
||||
AND last_exception NOT LIKE '%connect timed out%'
|
||||
""").strip())
|
||||
|
||||
if timeout_exceptions >= 2:
|
||||
break
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
for instance in started_cluster.instances.values():
|
||||
# Workaround for DROP TABLE not finishing if it is started while table is readonly.
|
||||
instance.query("SYSTEM RESTART REPLICA t")
|
||||
|
||||
# Cleanup data directory from test results archive.
|
||||
instance.query("DROP TABLE t SYNC")
|
Loading…
Reference in New Issue
Block a user