Fix test flakiness with hedge connections enabled

This commit is contained in:
Igor Nikonov 2024-01-10 15:59:48 +00:00
parent f566423e19
commit e9fc869254
5 changed files with 65 additions and 30 deletions

View File

@ -257,6 +257,7 @@ ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPrior
{
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
return Base::getShuffledPools(max_ignored_errors, priority_func);
}

View File

@ -272,17 +272,25 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
const String query_string = formattedAST(query);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
if (!priority_func_factory.has_value())
priority_func_factory = GetPriorityForLoadBalancing(LoadBalancing::ROUND_ROBIN, randomSeed());
remote_query_executor->setPriorityFunction(
priority_func_factory->getPriorityFunc(LoadBalancing::ROUND_ROBIN, 0, shard.shard_info.pool->getPoolSize()));
GetPriorityForLoadBalancing::Func priority_func
= priority_func_factory->getPriorityFunc(LoadBalancing::ROUND_ROBIN, 0, shard.shard_info.pool->getPoolSize());
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool,
query_string,
output_stream->header,
context,
throttler,
scalars,
external_tables,
stage,
std::nullopt,
priority_func);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);

View File

@ -43,13 +43,24 @@ namespace ErrorCodes
}
RemoteQueryExecutor::RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_)
const String & query_,
const Block & header_,
ContextPtr context_,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func_)
: header(header_)
, query(query_)
, context(context_)
, scalars(scalars_)
, external_tables(external_tables_)
, stage(stage_)
, extension(extension_)
{}
, priority_func(priority_func_)
{
}
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
@ -100,10 +111,16 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_, priority_func_)
{
create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
@ -117,7 +134,8 @@ RemoteQueryExecutor::RemoteQueryExecutor(
if (main_table)
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback), priority_func);
auto res = std::make_unique<HedgedConnections>(
pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback), priority_func);
if (extension && extension->replica_info)
res->setReplicaInfo(*extension->replica_info);
return res;

View File

@ -50,6 +50,7 @@ public:
std::shared_ptr<TaskIterator> task_iterator = nullptr;
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator = nullptr;
std::optional<IConnections::ReplicaInfo> replica_info = {};
GetPriorityForLoadBalancing::Func priority_func;
};
/// Takes already set connection.
@ -76,9 +77,15 @@ public:
/// Takes a pool and gets one or several connections from it.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
~RemoteQueryExecutor();
@ -189,14 +196,16 @@ public:
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
using LoadBalancingPriorityFunc = std::function<Priority(size_t index)>;
void setPriorityFunction(LoadBalancingPriorityFunc priority_func_) { priority_func = priority_func_; }
private:
RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_);
const String & query_,
const Block & header_,
ContextPtr context_,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func = {});
Block header;
Block totals;
@ -276,7 +285,7 @@ private:
Poco::Logger * log = nullptr;
LoadBalancingPriorityFunc priority_func;
GetPriorityForLoadBalancing::Func priority_func;
/// Send all scalars to remote servers
void sendScalars();

View File

@ -75,7 +75,6 @@ def test_parallel_replicas_custom_key_failover(
filter_type,
prefer_localhost_replica,
):
filter_type = "default"
cluster = "test_single_shard_multiple_replicas"
table = "test_table"