Merge branch 'issues/70174/cluster_versions' of github.com:ClickHouse/ClickHouse into issues/70174/cluster_versions

This commit is contained in:
Mikhail Artemenko 2024-11-19 12:51:56 +00:00
commit 19aec5e572

View File

@ -591,13 +591,21 @@ def test_cluster_table_function(started_cluster, format_version, storage_type):
table_function_expr = get_creation_expression( table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True storage_type, TABLE_NAME, started_cluster, table_function=True
) )
select_regular = instance.query(f"SELECT * FROM {table_function_expr}").strip().split() select_regular = (
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
)
# Cluster Query with node1 as coordinator # Cluster Query with node1 as coordinator
table_function_expr_cluster = get_creation_expression( table_function_expr_cluster = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True storage_type,
TABLE_NAME,
started_cluster,
table_function=True,
run_on_cluster=True,
)
select_cluster = (
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
) )
select_cluster = instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
# Simple size check # Simple size check
assert len(select_regular) == 600 assert len(select_regular) == 600
@ -611,8 +619,9 @@ def test_cluster_table_function(started_cluster, format_version, storage_type):
replica.query("SYSTEM FLUSH LOGS") replica.query("SYSTEM FLUSH LOGS")
for node_name, replica in started_cluster.instances.items(): for node_name, replica in started_cluster.instances.items():
cluster_secondary_queries = replica.query( cluster_secondary_queries = (
f""" replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE WHERE
type = 'QueryStart' AND type = 'QueryStart' AND
@ -621,9 +630,14 @@ def test_cluster_table_function(started_cluster, format_version, storage_type):
position(query, 'system.query_log') = 0 AND position(query, 'system.query_log') = 0 AND
NOT is_initial_query NOT is_initial_query
""" """
).strip().split("\n") )
.strip()
.split("\n")
)
logging.info(f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}") logging.info(
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
)
assert len(cluster_secondary_queries) == 1 assert len(cluster_secondary_queries) == 1