Fix load balancing for hedged connections

This commit is contained in:
Igor Nikonov 2023-12-16 21:17:01 +00:00
parent a735820400
commit ec1539728d
3 changed files with 18 additions and 17 deletions

View File

@ -33,9 +33,7 @@ std::function<Priority(size_t index)> GetPriorityForLoadBalancing::getPriorityFu
get_priority = [offset](size_t i) { return i != offset ? Priority{1} : Priority{0}; };
break;
case LoadBalancing::ROUND_ROBIN:
auto local_last_used = last_used.fetch_add(1);
--local_last_used;
local_last_used = (pool_size - 1) - local_last_used % pool_size;
auto local_last_used = last_used.fetch_add(1) % pool_size;
get_priority = [pool_size, local_last_used](size_t i)
{

View File

@ -150,9 +150,9 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
new_settings.timeout_overflow_mode = settings.timeout_overflow_mode_leaf;
}
/// in case of parallel replicas custom key w/o hedged connenctions use round robing load balancing
/// load_balancing setting doesn't affect hedged connections
if (context->canUseParallelReplicasCustomKey(cluster) && !settings.load_balancing.changed && !settings.use_hedged_requests)
/// in case of parallel replicas custom key use round robing load balancing
/// so custom key partitions will be spread over nodes in round-robin fasion
if (context->canUseParallelReplicasCustomKey(cluster) && !settings.load_balancing.changed)
{
new_settings.load_balancing = LoadBalancing::ROUND_ROBIN;
}

View File

@ -67,11 +67,13 @@ def create_tables(cluster, table_name):
@pytest.mark.parametrize("use_hedged_requests", [1, 0])
@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"])
@pytest.mark.parametrize("filter_type", ["default", "range"])
@pytest.mark.parametrize("prefer_localhost_replica", [0, 1])
def test_parallel_replicas_custom_key_failover(
start_cluster,
use_hedged_requests,
custom_key,
filter_type,
prefer_localhost_replica,
):
filter_type = "default"
cluster = "test_single_shard_multiple_replicas"
@ -89,7 +91,7 @@ def test_parallel_replicas_custom_key_failover(
f"SELECT key, count() FROM {table}_d GROUP BY key ORDER BY key",
settings={
"log_comment": log_comment,
"prefer_localhost_replica": 0,
"prefer_localhost_replica": prefer_localhost_replica,
"max_parallel_replicas": 4,
"parallel_replicas_custom_key": custom_key,
"parallel_replicas_custom_key_filter_type": filter_type,
@ -111,16 +113,17 @@ def test_parallel_replicas_custom_key_failover(
assert query_id != ""
query_id = query_id[:-1]
assert (
node1.query(
f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1"
if prefer_localhost_replica == 0:
assert (
node1.query(
f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1"
)
== "subqueries\t4\n"
)
== "subqueries\t4\n"
)
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h SETTINGS skip_unavailable_shards=1"
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h SETTINGS skip_unavailable_shards=1"
)
== "n1\t3\nn3\t2\n"
)
== "n1\t3\nn3\t2\n"
)