diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index fb895d17763..a5c14dc9957 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -168,7 +168,7 @@ std::vector ConnectionPoolWithFailover::g { return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); }; return getManyImpl(settings, pool_mode, try_get_entry, - /*skip_unavailable_endpoints=*/ std::nullopt, + /*skip_unavailable_endpoints=*/ false, /// skip_unavailable_endpoints is used to get the min number of entries, and we need at least one /*priority_func=*/ {}, settings.distributed_insert_skip_read_only_replicas); } diff --git a/src/Client/ConnectionPoolWithFailover.h b/src/Client/ConnectionPoolWithFailover.h index 19232e0ee7c..6db52140854 100644 --- a/src/Client/ConnectionPoolWithFailover.h +++ b/src/Client/ConnectionPoolWithFailover.h @@ -42,7 +42,7 @@ public: size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT); using Entry = IConnectionPool::Entry; - using PoolWithFailoverBase::checkTryResultIsValid; + using PoolWithFailoverBase::getValidTryResult; /** Allocates connection to work. */ Entry get(const ConnectionTimeouts & timeouts) override; diff --git a/src/Common/PoolWithFailoverBase.h b/src/Common/PoolWithFailoverBase.h index 53a746c316e..a0da72071f9 100644 --- a/src/Common/PoolWithFailoverBase.h +++ b/src/Common/PoolWithFailoverBase.h @@ -122,12 +122,18 @@ public: return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly); } - void checkTryResultIsValid(const TryResult & result, bool skip_read_only_replicas) const + TryResult getValidTryResult(const std::vector & results, bool skip_read_only_replicas) const { + if (results.empty()) + throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get any valid connection. Probably all connection tries failed"); + + auto result = results.front(); if (isTryResultInvalid(result, skip_read_only_replicas)) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}", result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas); + + return result; } size_t getPoolSize() const { return nested_pools.size(); } diff --git a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp index 2db2bdf3981..625c64128e7 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp @@ -242,8 +242,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); - auto result = results.front(); - parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas); + auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas); connection = std::move(result.entry); compression_expected = connection->getCompression() == Protocol::Compression::Enable; @@ -302,8 +301,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); - auto result = results.front(); - parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas); + auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas); auto connection = std::move(result.entry); bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp index 2400de4c07c..7f368102dfd 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp @@ -415,8 +415,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path, auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); - auto result = results.front(); - pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas); + auto result = pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas); auto connection = std::move(result.entry); LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 39f75fd7d56..f01ea10065c 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -377,8 +377,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si /// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries /// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default) auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); - auto result = results.front(); - shard_info.pool->checkTryResultIsValid(result, settings.distributed_insert_skip_read_only_replicas); + auto result = shard_info.pool->getValidTryResult(results, settings.distributed_insert_skip_read_only_replicas); job.connection_entry = std::move(result.entry); } else