mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #60158 from ClickHouse/fix-slowdown-count-side-effect
Fix: custom key failover test flakiness
This commit is contained in:
commit
0ead912127
@ -253,13 +253,13 @@ ConnectionPoolWithFailover::tryGetEntry(
|
||||
}
|
||||
|
||||
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool>
|
||||
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func)
|
||||
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func, bool use_slowdown_count)
|
||||
{
|
||||
if (!priority_func)
|
||||
priority_func = makeGetPriorityFunc(settings);
|
||||
|
||||
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
|
||||
return Base::getShuffledPools(max_ignored_errors, priority_func);
|
||||
return Base::getShuffledPools(max_ignored_errors, priority_func, use_slowdown_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -83,15 +83,15 @@ public:
|
||||
struct NestedPoolStatus
|
||||
{
|
||||
const Base::NestedPoolPtr pool;
|
||||
size_t error_count;
|
||||
size_t slowdown_count;
|
||||
size_t error_count = 0;
|
||||
size_t slowdown_count = 0;
|
||||
std::chrono::seconds estimated_recovery_time;
|
||||
};
|
||||
|
||||
using Status = std::vector<NestedPoolStatus>;
|
||||
Status getStatus() const;
|
||||
|
||||
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {});
|
||||
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false);
|
||||
|
||||
size_t getMaxErrorCup() const { return Base::max_error_cap; }
|
||||
|
||||
|
@ -40,7 +40,8 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
|
||||
, max_parallel_replicas(max_parallel_replicas_)
|
||||
, skip_unavailable_shards(skip_unavailable_shards_)
|
||||
{
|
||||
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
|
||||
shuffled_pools = pool->getShuffledPools(settings_, priority_func, /* use_slowdown_count */ true);
|
||||
|
||||
for (const auto & shuffled_pool : shuffled_pools)
|
||||
replicas.emplace_back(
|
||||
std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
|
||||
|
@ -133,7 +133,7 @@ protected:
|
||||
|
||||
void updateErrorCounts(PoolStates & states, time_t & last_decrease_time) const;
|
||||
|
||||
std::vector<ShuffledPool> getShuffledPools(size_t max_ignored_errors, const GetPriorityFunc & get_priority);
|
||||
std::vector<ShuffledPool> getShuffledPools(size_t max_ignored_errors, const GetPriorityFunc & get_priority, bool use_slowdown_count = false);
|
||||
|
||||
inline void updateSharedErrorCounts(std::vector<ShuffledPool> & shuffled_pools);
|
||||
|
||||
@ -160,7 +160,7 @@ protected:
|
||||
template <typename TNestedPool>
|
||||
std::vector<typename PoolWithFailoverBase<TNestedPool>::ShuffledPool>
|
||||
PoolWithFailoverBase<TNestedPool>::getShuffledPools(
|
||||
size_t max_ignored_errors, const PoolWithFailoverBase::GetPriorityFunc & get_priority)
|
||||
size_t max_ignored_errors, const PoolWithFailoverBase::GetPriorityFunc & get_priority, bool use_slowdown_count)
|
||||
{
|
||||
/// Update random numbers and error counts.
|
||||
PoolStates pool_states = updatePoolStates(max_ignored_errors);
|
||||
@ -175,13 +175,13 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
|
||||
std::vector<ShuffledPool> shuffled_pools;
|
||||
shuffled_pools.reserve(nested_pools.size());
|
||||
for (size_t i = 0; i < nested_pools.size(); ++i)
|
||||
shuffled_pools.push_back(ShuffledPool{nested_pools[i], &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
|
||||
shuffled_pools.emplace_back(ShuffledPool{.pool = nested_pools[i], .state = &pool_states[i], .index = i});
|
||||
|
||||
::sort(
|
||||
shuffled_pools.begin(), shuffled_pools.end(),
|
||||
[](const ShuffledPool & lhs, const ShuffledPool & rhs)
|
||||
[use_slowdown_count](const ShuffledPool & lhs, const ShuffledPool & rhs)
|
||||
{
|
||||
return PoolState::compare(*lhs.state, *rhs.state);
|
||||
return PoolState::compare(*lhs.state, *rhs.state, use_slowdown_count);
|
||||
});
|
||||
|
||||
return shuffled_pools;
|
||||
@ -344,10 +344,14 @@ struct PoolWithFailoverBase<TNestedPool>::PoolState
|
||||
random = rng();
|
||||
}
|
||||
|
||||
static bool compare(const PoolState & lhs, const PoolState & rhs)
|
||||
static bool compare(const PoolState & lhs, const PoolState & rhs, bool use_slowdown_count)
|
||||
{
|
||||
return std::forward_as_tuple(lhs.error_count, lhs.slowdown_count, lhs.config_priority, lhs.priority, lhs.random)
|
||||
< std::forward_as_tuple(rhs.error_count, rhs.slowdown_count, rhs.config_priority, rhs.priority, rhs.random);
|
||||
if (use_slowdown_count)
|
||||
return std::forward_as_tuple(lhs.error_count, lhs.slowdown_count, lhs.config_priority, lhs.priority, lhs.random)
|
||||
< std::forward_as_tuple(rhs.error_count, rhs.slowdown_count, rhs.config_priority, rhs.priority, rhs.random);
|
||||
else
|
||||
return std::forward_as_tuple(lhs.error_count, lhs.config_priority, lhs.priority, lhs.random)
|
||||
< std::forward_as_tuple(rhs.error_count, rhs.config_priority, rhs.priority, rhs.random);
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -108,12 +108,9 @@ def test_parallel_replicas_custom_key_failover(
|
||||
== "subqueries\t4\n"
|
||||
)
|
||||
|
||||
# currently this assert is flaky with asan and tsan builds, disable the assert in such cases for now
|
||||
# will be investigated separately
|
||||
if (
|
||||
not node1.is_built_with_thread_sanitizer()
|
||||
and not node1.is_built_with_address_sanitizer()
|
||||
):
|
||||
# With enabled hedged requests, we can't guarantee exact query distribution among nodes
|
||||
# In case of a replica being slow in terms of responsiveness, hedged connection can change initial replicas choice
|
||||
if use_hedged_requests == 0:
|
||||
assert (
|
||||
node1.query(
|
||||
f"SELECT h, count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"
|
||||
|
Loading…
Reference in New Issue
Block a user