ClickHouse/tests/integration/test_parallel_replicas_protocol/test.py
2024-11-27 19:57:56 +00:00

81 lines
2.6 KiB
Python

import re
import uuid
from random import randint
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"node{num}",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
macros={"replica": f"node{num}", "shard": "shard"},
)
for num in range(3)
]
def _create_tables(table_name):
nodes[0].query(
f"DROP TABLE IF EXISTS {table_name} ON CLUSTER 'parallel_replicas'",
settings={"database_atomic_wait_for_drop_and_detach_synchronously": True},
)
# big number of granules + low total size in bytes = super tiny granules = big min_marks_per_task
# => big mark_segment_size will be chosen. it is not required to be big, just not equal to the default
nodes[0].query(
f"""
CREATE TABLE {table_name} ON CLUSTER 'parallel_replicas' (value Int64)
Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{{replica}}')
ORDER BY ()
SETTINGS index_granularity = 1
"""
)
nodes[0].query(f"INSERT INTO {table_name} SELECT 42 FROM numbers(1000)")
nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}")
table_name = "t"
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
_create_tables(table_name)
yield cluster
finally:
cluster.shutdown()
# now mark_segment_size is part of the protocol and is communicated to the initiator.
# let's check that the correct value is actually used by the coordinator
@pytest.mark.parametrize("local_plan", [0, 1])
@pytest.mark.parametrize("index_analysis_only_on_coordinator", [0, 1])
def test_mark_segment_size_communicated_correctly(
start_cluster, local_plan, index_analysis_only_on_coordinator
):
query_id = f"query_id_{str(uuid.uuid4())}"
nodes[0].query(
f"SELECT sum(value) FROM {table_name}",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 100,
"cluster_for_parallel_replicas": "parallel_replicas",
"parallel_replicas_mark_segment_size": 0,
"parallel_replicas_local_plan": local_plan,
"query_id": query_id,
"parallel_replicas_index_analysis_only_on_coordinator": index_analysis_only_on_coordinator,
},
)
nodes[0].query("SYSTEM FLUSH LOGS")
log_line = nodes[0].grep_in_log(f"{query_id}.*Reading state is fully initialized")
assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384"