force_connected flag for connection establisher

This commit is contained in:
Igor Nikonov 2024-07-29 20:29:15 +00:00
parent 82c06920e3
commit 2cae0cb5ec
3 changed files with 7 additions and 5 deletions

View File

@ -33,12 +33,12 @@ ConnectionEstablisher::ConnectionEstablisher(
{
}
void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::string & fail_message)
void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::string & fail_message, bool force_connected)
{
try
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionTries);
result.entry = pool->get(*timeouts, settings, /* force_connected = */ true);
result.entry = pool->get(*timeouts, settings, force_connected);
AsyncCallbackSetter async_setter(&*result.entry, std::move(async_callback));
UInt64 server_revision = 0;

View File

@ -24,7 +24,9 @@ public:
const QualifiedTableName * table_to_check = nullptr);
/// Establish connection and save it in result, write possible exception message in fail_message.
void run(TryResult & result, std::string & fail_message);
/// The connection is returned from the pool, it can be stale. Use force_connected flag
/// to ensure that connection is working one
void run(TryResult & result, std::string & fail_message, bool force_connected = false);
/// Set async callback that will be called when reading from socket blocks.
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }

View File

@ -89,12 +89,12 @@ RemoteQueryExecutor::RemoteQueryExecutor(
auto table_name = main_table.getQualifiedName();
ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, &table_name);
connection_establisher.run(result, fail_message);
connection_establisher.run(result, fail_message, /*force_connected=*/ true);
}
else
{
ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, nullptr);
connection_establisher.run(result, fail_message);
connection_establisher.run(result, fail_message, /*force_connected=*/ true);
}
std::vector<IConnectionPool::Entry> connection_entries;