node1.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
node3.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
node1.query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
node3.query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
# populate data
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(2000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(3000, 1000)"
# the subqueries should be spread over available nodes
query_id=node1.query(
f"SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '{log_comment}' AND type = 'QueryFinish' AND initial_query_id = query_id"
f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1"
# With enabled hedged requests, we can't guarantee exact query distribution among nodes
# In case of a replica being slow in terms of responsiveness, hedged connection can change initial replicas choice
ifuse_hedged_requests==0:
assert(
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"