add timeouts to ConnectionPoolWithFailover and use max_tries from settings

This commit is contained in:
Konstantin Podshumok 2019-03-02 02:03:41 +03:00
parent d18fd5f10b
commit eb31345224
3 changed files with 45 additions and 28 deletions

View File

@ -8,6 +8,8 @@
#include <Common/ProfileEvents.h>
#include <Core/Settings.h>
#include <IO/ConnectionTimeouts.h>
namespace ProfileEvents
{
@ -29,9 +31,8 @@ namespace ErrorCodes
ConnectionPoolWithFailover::ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
size_t max_tries_,
time_t decrease_error_period_)
: Base(std::move(nested_pools_), max_tries_, decrease_error_period_, &Logger::get("ConnectionPoolWithFailover"))
: Base(std::move(nested_pools_), decrease_error_period_, &Logger::get("ConnectionPoolWithFailover"))
, default_load_balancing(load_balancing)
{
const std::string & local_hostname = getFQDNOrHostName();
@ -44,11 +45,13 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover(
}
}
IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings, bool /*force_connected*/)
IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts,
const Settings * settings,
bool /*force_connected*/)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
GetPriorityFunc get_priority;
@ -70,11 +73,13 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings
return Base::get(try_get_entry, get_priority);
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Settings * settings, PoolMode pool_mode)
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);
@ -86,22 +91,27 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Se
return entries;
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(const Settings * settings, PoolMode pool_mode)
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(
const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode,
const QualifiedTableName & table_to_check)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings, &table_to_check);
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check);
};
return getManyImpl(settings, pool_mode, try_get_entry);
@ -113,6 +123,9 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
const TryGetEntryFunc & try_get_entry)
{
size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
size_t max_tries = (settings ?
size_t{settings->connections_with_failover_max_tries} :
size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
size_t max_entries;
if (pool_mode == PoolMode::GET_ALL)
{
@ -144,12 +157,13 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true;
return Base::getMany(min_entries, max_entries, try_get_entry, get_priority, fallback_to_stale_replicas);
return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas);
}
ConnectionPoolWithFailover::TryResult
ConnectionPoolWithFailover::tryGetEntry(
IConnectionPool & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings * settings,
const QualifiedTableName * table_to_check)
@ -157,15 +171,15 @@ ConnectionPoolWithFailover::tryGetEntry(
TryResult result;
try
{
result.entry = pool.get(settings, /* force_connected = */ false);
result.entry = pool.get(timeouts, settings, /* force_connected = */ false);
UInt64 server_revision = 0;
if (table_to_check)
server_revision = result.entry->getServerRevision();
server_revision = result.entry->getServerRevision(timeouts);
if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
{
result.entry->forceConnected();
result.entry->forceConnected(timeouts);
result.is_usable = true;
result.is_up_to_date = true;
return result;
@ -176,7 +190,7 @@ ConnectionPoolWithFailover::tryGetEntry(
TablesStatusRequest status_request;
status_request.tables.emplace(*table_to_check);
TablesStatusResponse status_response = result.entry->getTablesStatus(status_request);
TablesStatusResponse status_response = result.entry->getTablesStatus(timeouts, status_request);
auto table_status_it = status_response.table_states_by_id.find(*table_to_check);
if (table_status_it == status_response.table_states_by_id.end())
{

View File

@ -34,21 +34,24 @@ public:
ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
size_t max_tries_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD);
using Entry = IConnectionPool::Entry;
/** Allocates connection to work. */
Entry get(const Settings * settings = nullptr, bool force_connected = true) override; /// From IConnectionPool
Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) override; /// From IConnectionPool
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode);
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const Settings * settings, PoolMode pool_mode);
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;
@ -56,7 +59,10 @@ public:
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
/// Delay threshold is taken from settings.
std::vector<TryResult> getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check);
const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check);
private:
/// Get the values of relevant settings and call Base::getMany()
@ -70,6 +76,7 @@ private:
/// for this table is not too large.
TryResult tryGetEntry(
IConnectionPool & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings * settings,
const QualifiedTableName * table_to_check = nullptr);

View File

@ -55,11 +55,9 @@ public:
PoolWithFailoverBase(
NestedPools nested_pools_,
size_t max_tries_,
time_t decrease_error_period_,
Logger * log_)
: nested_pools(std::move(nested_pools_))
, max_tries(max_tries_)
, decrease_error_period(decrease_error_period_)
, shared_pool_states(nested_pools.size())
, log(log_)
@ -108,7 +106,7 @@ public:
/// The method will throw if it is unable to get min_entries alive connections or
/// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas.
std::vector<TryResult> getMany(
size_t min_entries, size_t max_entries,
size_t min_entries, size_t max_entries, size_t max_tries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc(),
bool fallback_to_stale_replicas = true);
@ -125,8 +123,6 @@ protected:
NestedPools nested_pools;
const size_t max_tries;
const time_t decrease_error_period;
std::mutex pool_states_mutex;
@ -141,7 +137,7 @@ template <typename TNestedPool>
typename TNestedPool::Entry
PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<TryResult> results = getMany(1, 1, try_get_entry, get_priority);
std::vector<TryResult> results = getMany(1, 1, 1, try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(
"PoolWithFailoverBase::getMany() returned less than min_entries entries.",
@ -152,7 +148,7 @@ PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, co
template <typename TNestedPool>
std::vector<typename PoolWithFailoverBase<TNestedPool>::TryResult>
PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries,
size_t min_entries, size_t max_entries, size_t max_tries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority,
bool fallback_to_stale_replicas)
@ -192,7 +188,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
size_t up_to_date_count = 0;
size_t failed_pools_count = 0;
/// At exit update shared error counts with error counts occured during this call.
/// At exit update shared error counts with error counts occurred during this call.
SCOPE_EXIT(
{
std::lock_guard lock(pool_states_mutex);