From 2cae0cb5ecedc2fd041def829b35bdf4dbb50f2f Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Mon, 29 Jul 2024 20:29:15 +0000 Subject: [PATCH] force_connected flag for connection establisher --- src/Client/ConnectionEstablisher.cpp | 4 ++-- src/Client/ConnectionEstablisher.h | 4 +++- src/QueryPipeline/RemoteQueryExecutor.cpp | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Client/ConnectionEstablisher.cpp b/src/Client/ConnectionEstablisher.cpp index 8cebe7a6183..f96546846c7 100644 --- a/src/Client/ConnectionEstablisher.cpp +++ b/src/Client/ConnectionEstablisher.cpp @@ -33,12 +33,12 @@ ConnectionEstablisher::ConnectionEstablisher( { } -void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::string & fail_message) +void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::string & fail_message, bool force_connected) { try { ProfileEvents::increment(ProfileEvents::DistributedConnectionTries); - result.entry = pool->get(*timeouts, settings, /* force_connected = */ true); + result.entry = pool->get(*timeouts, settings, force_connected); AsyncCallbackSetter async_setter(&*result.entry, std::move(async_callback)); UInt64 server_revision = 0; diff --git a/src/Client/ConnectionEstablisher.h b/src/Client/ConnectionEstablisher.h index a3a01e63246..304ec4d34b4 100644 --- a/src/Client/ConnectionEstablisher.h +++ b/src/Client/ConnectionEstablisher.h @@ -24,7 +24,9 @@ public: const QualifiedTableName * table_to_check = nullptr); /// Establish connection and save it in result, write possible exception message in fail_message. - void run(TryResult & result, std::string & fail_message); + /// The connection is returned from the pool, it can be stale. Use force_connected flag + /// to ensure that connection is working one + void run(TryResult & result, std::string & fail_message, bool force_connected = false); /// Set async callback that will be called when reading from socket blocks. void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); } diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index b08f2002f64..09ea6a9fb3c 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -89,12 +89,12 @@ RemoteQueryExecutor::RemoteQueryExecutor( auto table_name = main_table.getQualifiedName(); ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, &table_name); - connection_establisher.run(result, fail_message); + connection_establisher.run(result, fail_message, /*force_connected=*/ true); } else { ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, nullptr); - connection_establisher.run(result, fail_message); + connection_establisher.run(result, fail_message, /*force_connected=*/ true); } std::vector connection_entries;