ClickHouse/tests/integration/test_system_replicated_fetches/test.py
2022-03-22 17:39:58 +01:00

136 lines
4.6 KiB
Python

#!/usr/bin/env python3
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
import random
import string
import json
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_random_string(length):
return "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(length)
)
def test_system_replicated_fetches(started_cluster):
node1.query(
"CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple()"
)
node2.query(
"CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple()"
)
with PartitionManager() as pm:
node2.query("SYSTEM STOP FETCHES t")
node1.query(
"INSERT INTO t SELECT number, '{}' FROM numbers(10000)".format(
get_random_string(104857)
)
)
pm.add_network_delay(node1, 80)
node2.query("SYSTEM START FETCHES t")
fetches_result = []
for _ in range(1000):
result = json.loads(
node2.query("SELECT * FROM system.replicated_fetches FORMAT JSON")
)
if not result["data"]:
if fetches_result:
break
time.sleep(0.1)
else:
fetches_result.append(result["data"][0])
print(fetches_result[-1])
time.sleep(0.1)
node2.query("SYSTEM SYNC REPLICA t", timeout=10)
assert node2.query("SELECT COUNT() FROM t") == "10000\n"
for elem in fetches_result:
elem["bytes_read_compressed"] = float(elem["bytes_read_compressed"])
elem["total_size_bytes_compressed"] = float(elem["total_size_bytes_compressed"])
elem["progress"] = float(elem["progress"])
elem["elapsed"] = float(elem["elapsed"])
assert len(fetches_result) > 0
first_non_empty = fetches_result[0]
assert first_non_empty["database"] == "default"
assert first_non_empty["table"] == "t"
assert first_non_empty["source_replica_hostname"] == "node1"
assert first_non_empty["source_replica_port"] == 9009
assert first_non_empty["source_replica_path"] == "/clickhouse/test/t/replicas/1"
assert first_non_empty["interserver_scheme"] == "http"
assert first_non_empty["result_part_name"] == "all_0_0_0"
assert first_non_empty["result_part_path"].startswith("/var/lib/clickhouse/")
assert first_non_empty["result_part_path"].endswith("all_0_0_0/")
assert first_non_empty["partition_id"] == "all"
assert first_non_empty["URI"].startswith(
"http://node1:9009/?endpoint=DataPartsExchange"
)
for elem in fetches_result:
assert (
elem["bytes_read_compressed"] <= elem["total_size_bytes_compressed"]
), "Bytes read ({}) more than total bytes ({}). It's a bug".format(
elem["bytes_read_compressed"], elem["total_size_bytes_compressed"]
)
assert (
0.0 <= elem["progress"] <= 1.0
), "Progress shouldn't less than 0 and bigger than 1, got {}".format(
elem["progress"]
)
assert (
0.0 <= elem["elapsed"]
), "Elapsed time must be greater than 0, got {}".format(elem["elapsed"])
prev_progress = first_non_empty["progress"]
for elem in fetches_result:
assert (
elem["progress"] >= prev_progress
), "Progress decreasing prev{}, next {}? It's a bug".format(
prev_progress, elem["progress"]
)
prev_progress = elem["progress"]
prev_bytes = first_non_empty["bytes_read_compressed"]
for elem in fetches_result:
assert (
elem["bytes_read_compressed"] >= prev_bytes
), "Bytes read decreasing prev {}, next {}? It's a bug".format(
prev_bytes, elem["bytes_read_compressed"]
)
prev_bytes = elem["bytes_read_compressed"]
prev_elapsed = first_non_empty["elapsed"]
for elem in fetches_result:
assert (
elem["elapsed"] >= prev_elapsed
), "Elapsed time decreasing prev {}, next {}? It's a bug".format(
prev_elapsed, elem["elapsed"]
)
prev_elapsed = elem["elapsed"]
node1.query("DROP TABLE IF EXISTS t SYNC")
node2.query("DROP TABLE IF EXISTS t SYNC")