From e6e79c3c4ada9f2eb82e6e9c63c736663cfb8e02 Mon Sep 17 00:00:00 2001 From: Pablo Marcos Date: Mon, 9 Sep 2024 11:36:18 +0000 Subject: [PATCH 1/3] Fix minor typos --- src/Client/ConnectionPoolWithFailover.h | 2 +- src/Client/HedgedConnectionsFactory.cpp | 2 +- src/Core/Defines.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Client/ConnectionPoolWithFailover.h b/src/Client/ConnectionPoolWithFailover.h index 308644ebbdb..19232e0ee7c 100644 --- a/src/Client/ConnectionPoolWithFailover.h +++ b/src/Client/ConnectionPoolWithFailover.h @@ -98,7 +98,7 @@ public: std::vector getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false); - size_t getMaxErrorCup() const { return Base::max_error_cap; } + size_t getMaxErrorCap() const { return Base::max_error_cap; } void updateSharedError(std::vector & shuffled_pools) { diff --git a/src/Client/HedgedConnectionsFactory.cpp b/src/Client/HedgedConnectionsFactory.cpp index be7397b0fad..df63a124539 100644 --- a/src/Client/HedgedConnectionsFactory.cpp +++ b/src/Client/HedgedConnectionsFactory.cpp @@ -327,7 +327,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect ShuffledPool & shuffled_pool = shuffled_pools[index]; LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message); - shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1); + shuffled_pool.error_count = std::min(pool->getMaxErrorCap(), shuffled_pool.error_count + 1); shuffled_pool.slowdown_count = 0; if (shuffled_pool.error_count >= max_tries) diff --git a/src/Core/Defines.h b/src/Core/Defines.h index 6df335a9c8f..3341669fed2 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -40,7 +40,7 @@ static constexpr auto SHOW_CHARS_ON_SYNTAX_ERROR = ptrdiff_t(160); /// each period reduces the error counter by 2 times /// too short a period can cause errors to disappear immediately after creation. static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD = 60; -/// replica error max cap, this is to prevent replica from accumulating too many errors and taking to long to recover. +/// replica error max cap, this is to prevent replica from accumulating too many errors and taking too long to recover. static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT = 1000; /// The boundary on which the blocks for asynchronous file operations should be aligned. From 67b57eb89fb8724c542cc197a7371c3e60009566 Mon Sep 17 00:00:00 2001 From: Pablo Marcos Date: Mon, 9 Sep 2024 13:55:14 +0000 Subject: [PATCH 2/3] Fix undefined behavior if all connection tries fail For some reason, getManyCheckedForInsert was calling getManyImpl with skip_unavailable_endpoints=nullptr, which resulted in getManyImpl using the `skip_unavailable_shards` settings. Since this is true by default, min_entries was set to 0. Having min_entries set to 0 while using PoolMode::GET_ONE was strange, to say the least. There was an edge case where if all connection attempts failed and min_entries was 0, it was returning an empty vector. That was not considered to be possible by the caller and it was getting the front entry of an empty vector, causing undefined behavior. Conclusion: set `skip_unavailable_endpoints=false` explicitly in getManyCheckedForInsert so that min_entries=1. In case all connections fail, an exception will be thrown. Also, add some defensive programming to ensure we never ever try to get the front element of an empty vector. --- src/Client/ConnectionPoolWithFailover.cpp | 2 +- src/Client/ConnectionPoolWithFailover.h | 2 +- src/Common/PoolWithFailoverBase.h | 8 +++++++- src/Storages/Distributed/DistributedAsyncInsertBatch.cpp | 6 ++---- .../Distributed/DistributedAsyncInsertDirectoryQueue.cpp | 3 +-- src/Storages/Distributed/DistributedSink.cpp | 3 +-- 6 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index fb895d17763..a5c14dc9957 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -168,7 +168,7 @@ std::vector ConnectionPoolWithFailover::g { return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); }; return getManyImpl(settings, pool_mode, try_get_entry, - /*skip_unavailable_endpoints=*/ std::nullopt, + /*skip_unavailable_endpoints=*/ false, /// skip_unavailable_endpoints is used to get the min number of entries, and we need at least one /*priority_func=*/ {}, settings.distributed_insert_skip_read_only_replicas); } diff --git a/src/Client/ConnectionPoolWithFailover.h b/src/Client/ConnectionPoolWithFailover.h index 19232e0ee7c..6db52140854 100644 --- a/src/Client/ConnectionPoolWithFailover.h +++ b/src/Client/ConnectionPoolWithFailover.h @@ -42,7 +42,7 @@ public: size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT); using Entry = IConnectionPool::Entry; - using PoolWithFailoverBase::checkTryResultIsValid; + using PoolWithFailoverBase::getValidTryResult; /** Allocates connection to work. */ Entry get(const ConnectionTimeouts & timeouts) override; diff --git a/src/Common/PoolWithFailoverBase.h b/src/Common/PoolWithFailoverBase.h index 53a746c316e..a0da72071f9 100644 --- a/src/Common/PoolWithFailoverBase.h +++ b/src/Common/PoolWithFailoverBase.h @@ -122,12 +122,18 @@ public: return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly); } - void checkTryResultIsValid(const TryResult & result, bool skip_read_only_replicas) const + TryResult getValidTryResult(const std::vector & results, bool skip_read_only_replicas) const { + if (results.empty()) + throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get any valid connection. Probably all connection tries failed"); + + auto result = results.front(); if (isTryResultInvalid(result, skip_read_only_replicas)) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}", result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas); + + return result; } size_t getPoolSize() const { return nested_pools.size(); } diff --git a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp index 2db2bdf3981..625c64128e7 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp @@ -242,8 +242,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); - auto result = results.front(); - parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas); + auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas); connection = std::move(result.entry); compression_expected = connection->getCompression() == Protocol::Compression::Enable; @@ -302,8 +301,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); - auto result = results.front(); - parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas); + auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas); auto connection = std::move(result.entry); bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp index 2400de4c07c..7f368102dfd 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp @@ -415,8 +415,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path, auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); - auto result = results.front(); - pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas); + auto result = pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas); auto connection = std::move(result.entry); LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 39f75fd7d56..f01ea10065c 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -377,8 +377,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si /// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries /// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default) auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); - auto result = results.front(); - shard_info.pool->checkTryResultIsValid(result, settings.distributed_insert_skip_read_only_replicas); + auto result = shard_info.pool->getValidTryResult(results, settings.distributed_insert_skip_read_only_replicas); job.connection_entry = std::move(result.entry); } else From f7dee100309a0c47d6daaad19164b4e968fd3b8d Mon Sep 17 00:00:00 2001 From: Pablo Marcos Date: Mon, 9 Sep 2024 14:38:06 +0000 Subject: [PATCH 3/3] Make a more assertive exception text --- src/Common/PoolWithFailoverBase.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/PoolWithFailoverBase.h b/src/Common/PoolWithFailoverBase.h index a0da72071f9..989ffd888f8 100644 --- a/src/Common/PoolWithFailoverBase.h +++ b/src/Common/PoolWithFailoverBase.h @@ -125,7 +125,7 @@ public: TryResult getValidTryResult(const std::vector & results, bool skip_read_only_replicas) const { if (results.empty()) - throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get any valid connection. Probably all connection tries failed"); + throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get any valid connection because all connection tries failed"); auto result = results.front(); if (isTryResultInvalid(result, skip_read_only_replicas))