From e9fc86925406d721beaff519aaf023eeb435718c Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Wed, 10 Jan 2024 15:59:48 +0000 Subject: [PATCH] Fix test flakiness with hedge connections enabled --- src/Client/ConnectionPoolWithFailover.cpp | 1 + src/Processors/QueryPlan/ReadFromRemote.cpp | 24 +++++++---- src/QueryPipeline/RemoteQueryExecutor.cpp | 40 ++++++++++++++----- src/QueryPipeline/RemoteQueryExecutor.h | 29 +++++++++----- .../test.py | 1 - 5 files changed, 65 insertions(+), 30 deletions(-) diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index e5f7bfb0747..43166659b18 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -257,6 +257,7 @@ ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPrior { if (!priority_func) priority_func = makeGetPriorityFunc(settings); + UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value; return Base::getShuffledPools(max_ignored_errors, priority_func); } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 7e24ef8c342..56371a5de56 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -272,17 +272,25 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact const String query_string = formattedAST(query); - auto remote_query_executor = std::make_shared( - shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage); - remote_query_executor->setLogger(log); - - remote_query_executor->setPoolMode(PoolMode::GET_ONE); - if (!priority_func_factory.has_value()) priority_func_factory = GetPriorityForLoadBalancing(LoadBalancing::ROUND_ROBIN, randomSeed()); - remote_query_executor->setPriorityFunction( - priority_func_factory->getPriorityFunc(LoadBalancing::ROUND_ROBIN, 0, shard.shard_info.pool->getPoolSize())); + GetPriorityForLoadBalancing::Func priority_func + = priority_func_factory->getPriorityFunc(LoadBalancing::ROUND_ROBIN, 0, shard.shard_info.pool->getPoolSize()); + + auto remote_query_executor = std::make_shared( + shard.shard_info.pool, + query_string, + output_stream->header, + context, + throttler, + scalars, + external_tables, + stage, + std::nullopt, + priority_func); + remote_query_executor->setLogger(log); + remote_query_executor->setPoolMode(PoolMode::GET_ONE); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index eb3bf27b103..a43571c8114 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -43,13 +43,24 @@ namespace ErrorCodes } RemoteQueryExecutor::RemoteQueryExecutor( - const String & query_, const Block & header_, ContextPtr context_, - const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional extension_) - : header(header_), query(query_), context(context_), scalars(scalars_) - , external_tables(external_tables_), stage(stage_) + const String & query_, + const Block & header_, + ContextPtr context_, + const Scalars & scalars_, + const Tables & external_tables_, + QueryProcessingStage::Enum stage_, + std::optional extension_, + GetPriorityForLoadBalancing::Func priority_func_) + : header(header_) + , query(query_) + , context(context_) + , scalars(scalars_) + , external_tables(external_tables_) + , stage(stage_) , extension(extension_) -{} + , priority_func(priority_func_) +{ +} RemoteQueryExecutor::RemoteQueryExecutor( Connection & connection, @@ -100,10 +111,16 @@ RemoteQueryExecutor::RemoteQueryExecutor( RemoteQueryExecutor::RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional extension_) - : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_) + const String & query_, + const Block & header_, + ContextPtr context_, + const ThrottlerPtr & throttler, + const Scalars & scalars_, + const Tables & external_tables_, + QueryProcessingStage::Enum stage_, + std::optional extension_, + GetPriorityForLoadBalancing::Func priority_func_) + : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_, priority_func_) { create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr { @@ -117,7 +134,8 @@ RemoteQueryExecutor::RemoteQueryExecutor( if (main_table) table_to_check = std::make_shared(main_table.getQualifiedName()); - auto res = std::make_unique(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback), priority_func); + auto res = std::make_unique( + pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback), priority_func); if (extension && extension->replica_info) res->setReplicaInfo(*extension->replica_info); return res; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 5b9c485c8af..5a8ccc2592b 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -50,6 +50,7 @@ public: std::shared_ptr task_iterator = nullptr; std::shared_ptr parallel_reading_coordinator = nullptr; std::optional replica_info = {}; + GetPriorityForLoadBalancing::Func priority_func; }; /// Takes already set connection. @@ -76,9 +77,15 @@ public: /// Takes a pool and gets one or several connections from it. RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), - QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional extension_ = std::nullopt); + const String & query_, + const Block & header_, + ContextPtr context_, + const ThrottlerPtr & throttler = nullptr, + const Scalars & scalars_ = Scalars(), + const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, + std::optional extension_ = std::nullopt, + GetPriorityForLoadBalancing::Func priority_func = {}); ~RemoteQueryExecutor(); @@ -189,14 +196,16 @@ public: bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; } - using LoadBalancingPriorityFunc = std::function; - void setPriorityFunction(LoadBalancingPriorityFunc priority_func_) { priority_func = priority_func_; } - private: RemoteQueryExecutor( - const String & query_, const Block & header_, ContextPtr context_, - const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional extension_); + const String & query_, + const Block & header_, + ContextPtr context_, + const Scalars & scalars_, + const Tables & external_tables_, + QueryProcessingStage::Enum stage_, + std::optional extension_, + GetPriorityForLoadBalancing::Func priority_func = {}); Block header; Block totals; @@ -276,7 +285,7 @@ private: Poco::Logger * log = nullptr; - LoadBalancingPriorityFunc priority_func; + GetPriorityForLoadBalancing::Func priority_func; /// Send all scalars to remote servers void sendScalars(); diff --git a/tests/integration/test_parallel_replicas_custom_key_failover/test.py b/tests/integration/test_parallel_replicas_custom_key_failover/test.py index df5b44e851a..2b5aa2682d5 100644 --- a/tests/integration/test_parallel_replicas_custom_key_failover/test.py +++ b/tests/integration/test_parallel_replicas_custom_key_failover/test.py @@ -75,7 +75,6 @@ def test_parallel_replicas_custom_key_failover( filter_type, prefer_localhost_replica, ): - filter_type = "default" cluster = "test_single_shard_multiple_replicas" table = "test_table"