Fix undefined behavior if all connection tries fail

For some reason, getManyCheckedForInsert was calling getManyImpl with
skip_unavailable_endpoints=nullptr, which resulted in getManyImpl using
the `skip_unavailable_shards` settings. Since this is true by default,
min_entries was set to 0.

Having min_entries set to 0 while using PoolMode::GET_ONE was strange,
to say the least. There was an edge case where if all connection
attempts failed and min_entries was 0, it was returning an empty vector.
That was not considered to be possible by the caller and it was getting
the front entry of an empty vector, causing undefined behavior.

Conclusion: set `skip_unavailable_endpoints=false` explicitly in
getManyCheckedForInsert so that min_entries=1. In case all connections
fail, an exception will be thrown. Also, add some defensive programming
to ensure we never ever try to get the front element of an empty vector.
This commit is contained in:
Pablo Marcos 2024-09-09 13:55:14 +00:00
parent e6e79c3c4a
commit 67b57eb89f
6 changed files with 13 additions and 11 deletions

View File

@ -168,7 +168,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); }; { return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); };
return getManyImpl(settings, pool_mode, try_get_entry, return getManyImpl(settings, pool_mode, try_get_entry,
/*skip_unavailable_endpoints=*/ std::nullopt, /*skip_unavailable_endpoints=*/ false, /// skip_unavailable_endpoints is used to get the min number of entries, and we need at least one
/*priority_func=*/ {}, /*priority_func=*/ {},
settings.distributed_insert_skip_read_only_replicas); settings.distributed_insert_skip_read_only_replicas);
} }

View File

@ -42,7 +42,7 @@ public:
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT); size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
using Entry = IConnectionPool::Entry; using Entry = IConnectionPool::Entry;
using PoolWithFailoverBase<IConnectionPool>::checkTryResultIsValid; using PoolWithFailoverBase<IConnectionPool>::getValidTryResult;
/** Allocates connection to work. */ /** Allocates connection to work. */
Entry get(const ConnectionTimeouts & timeouts) override; Entry get(const ConnectionTimeouts & timeouts) override;

View File

@ -122,12 +122,18 @@ public:
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly); return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
} }
void checkTryResultIsValid(const TryResult & result, bool skip_read_only_replicas) const TryResult getValidTryResult(const std::vector<TryResult> & results, bool skip_read_only_replicas) const
{ {
if (results.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get any valid connection. Probably all connection tries failed");
auto result = results.front();
if (isTryResultInvalid(result, skip_read_only_replicas)) if (isTryResultInvalid(result, skip_read_only_replicas))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}", "Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}",
result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas); result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas);
return result;
} }
size_t getPoolSize() const { return nested_pools.size(); } size_t getPoolSize() const { return nested_pools.size(); }

View File

@ -242,8 +242,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
connection = std::move(result.entry); connection = std::move(result.entry);
compression_expected = connection->getCompression() == Protocol::Compression::Enable; compression_expected = connection->getCompression() == Protocol::Compression::Enable;
@ -302,8 +301,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = parent.pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
auto connection = std::move(result.entry); auto connection = std::move(result.entry);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;

View File

@ -415,8 +415,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = pool->getValidTryResult(results, insert_settings.distributed_insert_skip_read_only_replicas);
pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
auto connection = std::move(result.entry); auto connection = std::move(result.entry);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",

View File

@ -377,8 +377,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries /// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default) /// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front(); auto result = shard_info.pool->getValidTryResult(results, settings.distributed_insert_skip_read_only_replicas);
shard_info.pool->checkTryResultIsValid(result, settings.distributed_insert_skip_read_only_replicas);
job.connection_entry = std::move(result.entry); job.connection_entry = std::move(result.entry);
} }
else else