Backport #69390 to 24.8: Fix undefined behavior if all connection tries fail

This commit is contained in:
robot-clickhouse 2024-09-10 16:07:50 +00:00
parent 918928e2a3
commit 1cfe69dc96
8 changed files with 23 additions and 21 deletions

View File

@ -168,7 +168,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); }; { return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); };
return getManyImpl(settings, pool_mode, try_get_entry, return getManyImpl(settings, pool_mode, try_get_entry,
/*skip_unavailable_endpoints=*/ std::nullopt, /*skip_unavailable_endpoints=*/ false, /// skip_unavailable_endpoints is used to get the min number of entries, and we need at least one
/*priority_func=*/ {}, /*priority_func=*/ {},
settings.distributed_insert_skip_read_only_replicas); settings.distributed_insert_skip_read_only_replicas);
} }

View File

@ -42,7 +42,7 @@ public:
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT); size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
using Entry = IConnectionPool::Entry; using Entry = IConnectionPool::Entry;
using PoolWithFailoverBase<IConnectionPool>::isTryResultInvalid; using PoolWithFailoverBase<IConnectionPool>::getValidTryResult;
/** Allocates connection to work. */ /** Allocates connection to work. */
Entry get(const ConnectionTimeouts & timeouts) override; Entry get(const ConnectionTimeouts & timeouts) override;
@ -98,7 +98,7 @@ public:
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false); std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false);
size_t getMaxErrorCup() const { return Base::max_error_cap; } size_t getMaxErrorCap() const { return Base::max_error_cap; }
void updateSharedError(std::vector<ShuffledPool> & shuffled_pools) void updateSharedError(std::vector<ShuffledPool> & shuffled_pools)
{ {

View File

@ -327,7 +327,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect
ShuffledPool & shuffled_pool = shuffled_pools[index]; ShuffledPool & shuffled_pool = shuffled_pools[index];
LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message); LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1); shuffled_pool.error_count = std::min(pool->getMaxErrorCap(), shuffled_pool.error_count + 1);
shuffled_pool.slowdown_count = 0; shuffled_pool.slowdown_count = 0;
if (shuffled_pool.error_count >= max_tries) if (shuffled_pool.error_count >= max_tries)

View File

@ -122,6 +122,20 @@ public:
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly); return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
} }
TryResult getValidTryResult(const std::vector<TryResult> & results, bool skip_read_only_replicas) const
{
if (results.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get any valid connection because all connection tries failed");
auto result = results.front();
if (isTryResultInvalid(result, skip_read_only_replicas))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}",
result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas);
return result;
}
size_t getPoolSize() const { return nested_pools.size(); } size_t getPoolSize() const { return nested_pools.size(); }
protected: protected:

View File

@ -40,7 +40,7 @@ static constexpr auto SHOW_CHARS_ON_SYNTAX_ERROR = ptrdiff_t(160);
/// each period reduces the error counter by 2 times /// each period reduces the error counter by 2 times
/// too short a period can cause errors to disappear immediately after creation. /// too short a period can cause errors to disappear immediately after creation.
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD = 60; static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD = 60;
/// replica error max cap, this is to prevent replica from accumulating too many errors and taking to long to recover. /// replica error max cap, this is to prevent replica from accumulating too many errors and taking too long to recover.
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT = 1000; static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT = 1000;
/// The boundary on which the blocks for asynchronous file operations should be aligned. /// The boundary on which the blocks for asynchronous file operations should be aligned.

View File

@ -243,10 +243,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
connection = std::move(result.entry); connection = std::move(result.entry);
compression_expected = connection->getCompression() == Protocol::Compression::Enable; compression_expected = connection->getCompression() == Protocol::Compression::Enable;
@ -305,10 +302,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
auto connection = std::move(result.entry); auto connection = std::move(result.entry);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;

View File

@ -413,10 +413,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
if (pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
auto connection = std::move(result.entry); auto connection = std::move(result.entry);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",

View File

@ -377,10 +377,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries /// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default) /// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = shard_info.pool->getValidTryResult(results, settings.distributed_insert_skip_read_only_replicas);
if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
job.connection_entry = std::move(result.entry); job.connection_entry = std::move(result.entry);
} }
else else