Fix: custom key failover test flakyness

slowdown_count used only by hedge connections
but it's stored inside connection pool and was used for pool shuffling
always. So, query execution which used hedged connections could affect
connection load balancing for queries w/o hedged connections by updating
slowdown_count
This commit is contained in:
Igor Nikonov 2024-02-19 20:44:55 +00:00
parent 425531067d
commit bb5a6dd8d3
5 changed files with 22 additions and 23 deletions

View File

@ -253,13 +253,13 @@ ConnectionPoolWithFailover::tryGetEntry(
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool>
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func)
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func, bool use_slowdown_count)
{
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
return Base::getShuffledPools(max_ignored_errors, priority_func);
return Base::getShuffledPools(max_ignored_errors, priority_func, use_slowdown_count);
}
}

View File

@ -91,7 +91,7 @@ public:
using Status = std::vector<NestedPoolStatus>;
Status getStatus() const;
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {});
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false);
size_t getMaxErrorCup() const { return Base::max_error_cap; }

View File

@ -40,7 +40,8 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
, max_parallel_replicas(max_parallel_replicas_)
, skip_unavailable_shards(skip_unavailable_shards_)
{
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
shuffled_pools = pool->getShuffledPools(settings_, priority_func, /* use_slowdown_count */ true);
for (const auto & shuffled_pool : shuffled_pools)
replicas.emplace_back(
std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));

View File

@ -133,7 +133,7 @@ protected:
void updateErrorCounts(PoolStates & states, time_t & last_decrease_time) const;
std::vector<ShuffledPool> getShuffledPools(size_t max_ignored_errors, const GetPriorityFunc & get_priority);
std::vector<ShuffledPool> getShuffledPools(size_t max_ignored_errors, const GetPriorityFunc & get_priority, bool use_slowdown_count = false);
inline void updateSharedErrorCounts(std::vector<ShuffledPool> & shuffled_pools);
@ -160,7 +160,7 @@ protected:
template <typename TNestedPool>
std::vector<typename PoolWithFailoverBase<TNestedPool>::ShuffledPool>
PoolWithFailoverBase<TNestedPool>::getShuffledPools(
size_t max_ignored_errors, const PoolWithFailoverBase::GetPriorityFunc & get_priority)
size_t max_ignored_errors, const PoolWithFailoverBase::GetPriorityFunc & get_priority, bool use_slowdown_count)
{
/// Update random numbers and error counts.
PoolStates pool_states = updatePoolStates(max_ignored_errors);
@ -175,13 +175,13 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
std::vector<ShuffledPool> shuffled_pools;
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i], &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
shuffled_pools.emplace_back(ShuffledPool{.pool = nested_pools[i], .state = &pool_states[i], .index = i});
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
[](const ShuffledPool & lhs, const ShuffledPool & rhs)
[use_slowdown_count](const ShuffledPool & lhs, const ShuffledPool & rhs)
{
return PoolState::compare(*lhs.state, *rhs.state);
return PoolState::compare(*lhs.state, *rhs.state, use_slowdown_count);
});
return shuffled_pools;
@ -344,10 +344,14 @@ struct PoolWithFailoverBase<TNestedPool>::PoolState
random = rng();
}
static bool compare(const PoolState & lhs, const PoolState & rhs)
static bool compare(const PoolState & lhs, const PoolState & rhs, bool use_slowdown_count)
{
return std::forward_as_tuple(lhs.error_count, lhs.slowdown_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.slowdown_count, rhs.config_priority, rhs.priority, rhs.random);
if (use_slowdown_count)
return std::forward_as_tuple(lhs.error_count, lhs.slowdown_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.slowdown_count, rhs.config_priority, rhs.priority, rhs.random);
else
return std::forward_as_tuple(lhs.error_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.config_priority, rhs.priority, rhs.random);
}
private:

View File

@ -108,15 +108,9 @@ def test_parallel_replicas_custom_key_failover(
== "subqueries\t4\n"
)
# currently this assert is flaky with asan and tsan builds, disable the assert in such cases for now
# will be investigated separately
if (
not node1.is_built_with_thread_sanitizer()
and not node1.is_built_with_address_sanitizer()
):
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"
)
== "n1\t3\nn3\t2\n"
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"
)
== "n1\t3\nn3\t2\n"
)