mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
97 lines
4.2 KiB
Python
97 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
|
|
import pytest
|
|
import time
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.network import PartitionManager
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
import random
|
|
import string
|
|
import json
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance('node1', with_zookeeper=True)
|
|
node2 = cluster.add_instance('node2', with_zookeeper=True)
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
def get_random_string(length):
|
|
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
|
|
|
|
def test_system_replicated_fetches(started_cluster):
|
|
node1.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple()")
|
|
node2.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple()")
|
|
|
|
with PartitionManager() as pm:
|
|
node2.query("SYSTEM STOP FETCHES t")
|
|
node1.query("INSERT INTO t SELECT number, '{}' FROM numbers(10000)".format(get_random_string(104857)))
|
|
pm.add_network_delay(node1, 80)
|
|
node2.query("SYSTEM START FETCHES t")
|
|
fetches_result = []
|
|
for _ in range(1000):
|
|
result = json.loads(node2.query("SELECT * FROM system.replicated_fetches FORMAT JSON"))
|
|
if not result["data"]:
|
|
if fetches_result:
|
|
break
|
|
time.sleep(0.1)
|
|
else:
|
|
fetches_result.append(result["data"][0])
|
|
print(fetches_result[-1])
|
|
time.sleep(0.1)
|
|
|
|
node2.query("SYSTEM SYNC REPLICA t", timeout=10)
|
|
assert node2.query("SELECT COUNT() FROM t") == "10000\n"
|
|
|
|
for elem in fetches_result:
|
|
elem['bytes_read_compressed'] = float(elem['bytes_read_compressed'])
|
|
elem['total_size_bytes_compressed'] = float(elem['total_size_bytes_compressed'])
|
|
elem['progress'] = float(elem['progress'])
|
|
elem['elapsed'] = float(elem['elapsed'])
|
|
|
|
assert len(fetches_result) > 0
|
|
first_non_empty = fetches_result[0]
|
|
|
|
assert first_non_empty['database'] == "default"
|
|
assert first_non_empty['table'] == "t"
|
|
assert first_non_empty['source_replica_hostname'] == 'node1'
|
|
assert first_non_empty['source_replica_port'] == 9009
|
|
assert first_non_empty['source_replica_path'] == '/clickhouse/test/t/replicas/1'
|
|
assert first_non_empty['interserver_scheme'] == 'http'
|
|
assert first_non_empty['result_part_name'] == 'all_0_0_0'
|
|
assert first_non_empty['result_part_path'].startswith('/var/lib/clickhouse/')
|
|
assert first_non_empty['result_part_path'].endswith('all_0_0_0/')
|
|
assert first_non_empty['partition_id'] == 'all'
|
|
assert first_non_empty['URI'].startswith('http://node1:9009/?endpoint=DataPartsExchange')
|
|
|
|
for elem in fetches_result:
|
|
assert elem['bytes_read_compressed'] <= elem['total_size_bytes_compressed'], "Bytes read ({}) more than total bytes ({}). It's a bug".format(elem['bytes_read_compressed'], elem['total_size_bytes_compressed'])
|
|
assert 0.0 <= elem['progress'] <= 1.0, "Progress shouldn't less than 0 and bigger than 1, got {}".format(elem['progress'])
|
|
assert 0.0 <= elem['elapsed'], "Elapsed time must be greater than 0, got {}".format(elem['elapsed'])
|
|
|
|
prev_progress = first_non_empty['progress']
|
|
for elem in fetches_result:
|
|
assert elem['progress'] >= prev_progress, "Progress decreasing prev{}, next {}? It's a bug".format(prev_progress, elem['progress'])
|
|
prev_progress = elem['progress']
|
|
|
|
prev_bytes = first_non_empty['bytes_read_compressed']
|
|
for elem in fetches_result:
|
|
assert elem['bytes_read_compressed'] >= prev_bytes, "Bytes read decreasing prev {}, next {}? It's a bug".format(prev_bytes, elem['bytes_read_compressed'])
|
|
prev_bytes = elem['bytes_read_compressed']
|
|
|
|
prev_elapsed = first_non_empty['elapsed']
|
|
for elem in fetches_result:
|
|
assert elem['elapsed'] >= prev_elapsed, "Elapsed time decreasing prev {}, next {}? It's a bug".format(prev_elapsed, elem['elapsed'])
|
|
prev_elapsed = elem['elapsed']
|
|
|
|
node1.query("DROP TABLE IF EXISTS t SYNC")
|
|
node2.query("DROP TABLE IF EXISTS t SYNC")
|