mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 02:52:13 +00:00
Merge pull request #69456 from ClickHouse/backport/24.3/69390
Backport #69390 to 24.3: Fix undefined behavior if all connection tries fail
This commit is contained in:
commit
65b188cb65
@ -168,7 +168,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
|
|||||||
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); };
|
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); };
|
||||||
|
|
||||||
return getManyImpl(settings, pool_mode, try_get_entry,
|
return getManyImpl(settings, pool_mode, try_get_entry,
|
||||||
/*skip_unavailable_endpoints=*/ std::nullopt,
|
/*skip_unavailable_endpoints=*/ false, /// skip_unavailable_endpoints is used to get the min number of entries, and we need at least one
|
||||||
/*priority_func=*/ {},
|
/*priority_func=*/ {},
|
||||||
settings.distributed_insert_skip_read_only_replicas);
|
settings.distributed_insert_skip_read_only_replicas);
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ public:
|
|||||||
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
|
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
|
||||||
|
|
||||||
using Entry = IConnectionPool::Entry;
|
using Entry = IConnectionPool::Entry;
|
||||||
using PoolWithFailoverBase<IConnectionPool>::isTryResultInvalid;
|
using PoolWithFailoverBase<IConnectionPool>::getValidTryResult;
|
||||||
|
|
||||||
/** Allocates connection to work. */
|
/** Allocates connection to work. */
|
||||||
Entry get(const ConnectionTimeouts & timeouts) override;
|
Entry get(const ConnectionTimeouts & timeouts) override;
|
||||||
@ -98,7 +98,7 @@ public:
|
|||||||
|
|
||||||
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false);
|
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false);
|
||||||
|
|
||||||
size_t getMaxErrorCup() const { return Base::max_error_cap; }
|
size_t getMaxErrorCap() const { return Base::max_error_cap; }
|
||||||
|
|
||||||
void updateSharedError(std::vector<ShuffledPool> & shuffled_pools)
|
void updateSharedError(std::vector<ShuffledPool> & shuffled_pools)
|
||||||
{
|
{
|
||||||
|
@ -329,7 +329,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect
|
|||||||
LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
|
LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
|
||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
|
||||||
|
|
||||||
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);
|
shuffled_pool.error_count = std::min(pool->getMaxErrorCap(), shuffled_pool.error_count + 1);
|
||||||
shuffled_pool.slowdown_count = 0;
|
shuffled_pool.slowdown_count = 0;
|
||||||
|
|
||||||
if (shuffled_pool.error_count >= max_tries)
|
if (shuffled_pool.error_count >= max_tries)
|
||||||
|
@ -123,6 +123,20 @@ public:
|
|||||||
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
|
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TryResult getValidTryResult(const std::vector<TryResult> & results, bool skip_read_only_replicas) const
|
||||||
|
{
|
||||||
|
if (results.empty())
|
||||||
|
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get any valid connection because all connection tries failed");
|
||||||
|
|
||||||
|
auto result = results.front();
|
||||||
|
if (isTryResultInvalid(result, skip_read_only_replicas))
|
||||||
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
|
||||||
|
"Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}",
|
||||||
|
result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
size_t getPoolSize() const { return nested_pools.size(); }
|
size_t getPoolSize() const { return nested_pools.size(); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -40,7 +40,7 @@ static constexpr auto SHOW_CHARS_ON_SYNTAX_ERROR = ptrdiff_t(160);
|
|||||||
/// each period reduces the error counter by 2 times
|
/// each period reduces the error counter by 2 times
|
||||||
/// too short a period can cause errors to disappear immediately after creation.
|
/// too short a period can cause errors to disappear immediately after creation.
|
||||||
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD = 60;
|
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD = 60;
|
||||||
/// replica error max cap, this is to prevent replica from accumulating too many errors and taking to long to recover.
|
/// replica error max cap, this is to prevent replica from accumulating too many errors and taking too long to recover.
|
||||||
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT = 1000;
|
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT = 1000;
|
||||||
|
|
||||||
/// The boundary on which the blocks for asynchronous file operations should be aligned.
|
/// The boundary on which the blocks for asynchronous file operations should be aligned.
|
||||||
|
@ -234,10 +234,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
|
|||||||
|
|
||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
||||||
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
|
||||||
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
connection = std::move(result.entry);
|
connection = std::move(result.entry);
|
||||||
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
||||||
|
|
||||||
@ -296,10 +293,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
|
|||||||
|
|
||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
||||||
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
|
||||||
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
auto connection = std::move(result.entry);
|
auto connection = std::move(result.entry);
|
||||||
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
||||||
|
|
||||||
|
@ -413,10 +413,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
|
|||||||
|
|
||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
||||||
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
|
||||||
if (pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
auto connection = std::move(result.entry);
|
auto connection = std::move(result.entry);
|
||||||
|
|
||||||
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
|
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
|
||||||
|
@ -373,10 +373,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
|||||||
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
|
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
|
||||||
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
|
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
|
||||||
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = shard_info.pool->getValidTryResult(results, settings.distributed_insert_skip_read_only_replicas);
|
||||||
if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas))
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
job.connection_entry = std::move(result.entry);
|
job.connection_entry = std::move(result.entry);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
Loading…
Reference in New Issue
Block a user