Merge pull request #11669 from azat/distributed_replica_error_ignore

Add number of errors to ignore while choosing replicas
This commit is contained in:
Alexander Kuzmenkov 2020-06-22 22:06:04 +03:00 committed by GitHub
commit e76941b52c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 114 additions and 37 deletions

View File

@ -821,6 +821,10 @@ ClickHouse supports the following algorithms of choosing replicas:
- [First or random](#load_balancing-first_or_random)
- [Round robin](#load_balancing-round_robin)
See also:
- [distributed\_replica\_max\_ignored\_errors](#settings-distributed_replica_max_ignored_errors)
### Random (by Default) {#load_balancing-random}
``` sql
@ -1170,8 +1174,10 @@ Controls how fast errors in distributed tables are zeroed. If a replica is unava
See also:
- [load\_balancing](#load_balancing-round_robin)
- [Table engine Distributed](../../engines/table-engines/special/distributed.md)
- [distributed\_replica\_error\_cap](#settings-distributed_replica_error_cap)
- [distributed\_replica\_max\_ignored\_errors](#settings-distributed_replica_max_ignored_errors)
## distributed\_replica\_error\_cap {#settings-distributed_replica_error_cap}
@ -1182,8 +1188,24 @@ Error count of each replica is capped at this value, preventing a single replica
See also:
- [load\_balancing](#load_balancing-round_robin)
- [Table engine Distributed](../../engines/table-engines/special/distributed.md)
- [distributed\_replica\_error\_half\_life](#settings-distributed_replica_error_half_life)
- [distributed\_replica\_max\_ignored\_errors](#settings-distributed_replica_max_ignored_errors)
## distributed\_replica\_max\_ignored\_errors {#settings-distributed_replica_max_ignored_errors}
- Type: unsigned int
- Default value: 0
Number of errors that will be ignored while choosing replicas (according to `load_balancing` algorithm).
See also:
- [load\_balancing](#load_balancing-round_robin)
- [Table engine Distributed](../../engines/table-engines/special/distributed.md)
- [distributed\_replica\_error\_cap](#settings-distributed_replica_error_cap)
- [distributed\_replica\_error\_half\_life](#settings-distributed_replica_error_half_life)
## distributed\_directory\_monitor\_sleep\_time\_ms {#distributed_directory_monitor_sleep_time_ms}

View File

@ -84,7 +84,10 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
break;
}
return Base::get(try_get_entry, get_priority);
UInt64 max_ignored_errors = settings ? settings->distributed_replica_max_ignored_errors.value : 0;
bool fallback_to_stale_replicas = settings ? settings->fallback_to_stale_replicas_for_distributed_queries.value : true;
return Base::get(max_ignored_errors, fallback_to_stale_replicas, try_get_entry, get_priority);
}
ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
@ -206,9 +209,12 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
break;
}
bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true;
UInt64 max_ignored_errors = settings ? settings->distributed_replica_max_ignored_errors.value : 0;
bool fallback_to_stale_replicas = settings ? settings->fallback_to_stale_replicas_for_distributed_queries.value : true;
return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas);
return Base::getMany(min_entries, max_entries, max_tries,
max_ignored_errors, fallback_to_stale_replicas,
try_get_entry, get_priority);
}
ConnectionPoolWithFailover::TryResult

View File

@ -100,28 +100,28 @@ public:
/// this functor. The pools with lower result value will be tried first.
using GetPriorityFunc = std::function<size_t(size_t index)>;
/// Returns a single connection.
Entry get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority = GetPriorityFunc());
/// Returns at least min_entries and at most max_entries connections (at most one connection per nested pool).
/// The method will throw if it is unable to get min_entries alive connections or
/// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas.
std::vector<TryResult> getMany(
size_t min_entries, size_t max_entries, size_t max_tries,
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc(),
bool fallback_to_stale_replicas = true);
void reportError(const Entry & entry);
const GetPriorityFunc & get_priority = GetPriorityFunc());
protected:
struct PoolState;
using PoolStates = std::vector<PoolState>;
/// Returns a single connection.
Entry get(size_t max_ignored_errors, bool fallback_to_stale_replicas,
const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority = GetPriorityFunc());
/// This function returns a copy of pool states to avoid race conditions when modifying shared pool states.
PoolStates updatePoolStates();
PoolStates updatePoolStates(size_t max_ignored_errors);
PoolStates getPoolStates() const;
NestedPools nested_pools;
@ -139,9 +139,13 @@ protected:
template <typename TNestedPool>
typename TNestedPool::Entry
PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
PoolWithFailoverBase<TNestedPool>::get(size_t max_ignored_errors, bool fallback_to_stale_replicas,
const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<TryResult> results = getMany(1, 1, 1, try_get_entry, get_priority);
std::vector<TryResult> results = getMany(
1 /* min entries */, 1 /* max entries */, 1 /* max tries */,
max_ignored_errors, fallback_to_stale_replicas,
try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(
"PoolWithFailoverBase::getMany() returned less than min_entries entries.",
@ -153,12 +157,13 @@ template <typename TNestedPool>
std::vector<typename PoolWithFailoverBase<TNestedPool>::TryResult>
PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries, size_t max_tries,
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority,
bool fallback_to_stale_replicas)
const GetPriorityFunc & get_priority)
{
/// Update random numbers and error counts.
PoolStates pool_states = updatePoolStates();
PoolStates pool_states = updatePoolStates(max_ignored_errors);
if (get_priority)
{
for (size_t i = 0; i < pool_states.size(); ++i)
@ -295,22 +300,6 @@ PoolWithFailoverBase<TNestedPool>::getMany(
return try_results;
}
template <typename TNestedPool>
void PoolWithFailoverBase<TNestedPool>::reportError(const Entry & entry)
{
for (size_t i = 0; i < nested_pools.size(); ++i)
{
if (nested_pools[i]->contains(entry))
{
std::lock_guard lock(pool_states_mutex);
auto & pool_state = shared_pool_states[i];
pool_state.error_count = std::min(max_error_cap, pool_state.error_count + 1);
return;
}
}
throw DB::Exception("Can't find pool to report error", DB::ErrorCodes::LOGICAL_ERROR);
}
template <typename TNestedPool>
struct PoolWithFailoverBase<TNestedPool>::PoolState
{
@ -335,7 +324,7 @@ private:
template <typename TNestedPool>
typename PoolWithFailoverBase<TNestedPool>::PoolStates
PoolWithFailoverBase<TNestedPool>::updatePoolStates()
PoolWithFailoverBase<TNestedPool>::updatePoolStates(size_t max_ignored_errors)
{
PoolStates result;
result.reserve(nested_pools.size());
@ -354,14 +343,17 @@ PoolWithFailoverBase<TNestedPool>::updatePoolStates()
if (delta >= 0)
{
const UInt64 MAX_BITS = sizeof(UInt64) * CHAR_BIT;
size_t shift_amount = MAX_BITS;
/// Divide error counts by 2 every decrease_error_period seconds.
size_t shift_amount = delta / decrease_error_period;
if (decrease_error_period)
shift_amount = delta / decrease_error_period;
/// Update time but don't do it more often than once a period.
/// Else if the function is called often enough, error count will never decrease.
if (shift_amount)
last_error_decrease_time = current_time;
if (shift_amount >= sizeof(UInt64) * CHAR_BIT)
if (shift_amount >= MAX_BITS)
{
for (auto & state : shared_pool_states)
state.error_count = 0;
@ -378,6 +370,11 @@ PoolWithFailoverBase<TNestedPool>::updatePoolStates()
result.assign(shared_pool_states.begin(), shared_pool_states.end());
}
/// distributed_replica_max_ignored_errors
for (auto & state : result)
state.error_count = std::max<UInt64>(0, state.error_count - max_ignored_errors);
return result;
}

View File

@ -348,6 +348,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingSeconds, distributed_replica_error_half_life, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD, "Time period reduces replica error counter by 2 times.", 0) \
M(SettingUInt64, distributed_replica_error_cap, DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT, "Max number of errors per replica, prevents piling up an incredible amount of errors if replica was offline for some time and allows it to be reconsidered in a shorter amount of time.", 0) \
M(SettingUInt64, distributed_replica_max_ignored_errors, 0, "Number of errors that will be ignored while choosing replicas", 0) \
\
M(SettingBool, allow_experimental_live_view, false, "Enable LIVE VIEW. Not mature enough.", 0) \
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.", 0) \

View File

@ -16,8 +16,23 @@ n3 = cluster.add_instance('n3', main_configs=['configs/remote_servers.xml'])
nodes = len(cluster.instances)
queries = nodes*5
def create_tables():
def bootstrap():
for n in cluster.instances.values():
# At startup, server loads configuration files.
#
# However ConfigReloader does not know about already loaded files
# (files is empty()), hence it will always reload the configuration
# just after server starts (+ 2 seconds, reload timeout).
#
# And on configuration reload the clusters will be re-created, so some
# internal stuff will be reseted:
# - error_count
# - last_used (round_robing)
#
# And if the reload will happen during round_robin test it will start
# querying from the beginning, so let's issue config reload just after
# start to avoid reload in the middle of the test execution.
n.query('SYSTEM RELOAD CONFIG')
n.query('DROP TABLE IF EXISTS data')
n.query('DROP TABLE IF EXISTS dist')
n.query('CREATE TABLE data (key Int) Engine=Memory()')
@ -36,7 +51,7 @@ def make_uuid():
def start_cluster():
try:
cluster.start()
create_tables()
bootstrap()
yield cluster
finally:
cluster.shutdown()
@ -112,3 +127,39 @@ def test_load_balancing_round_robin():
unique_nodes.add(get_node(n1, settings={'load_balancing': 'round_robin'}))
assert len(unique_nodes) == nodes, unique_nodes
assert unique_nodes == set(['n1', 'n2', 'n3'])
def test_distributed_replica_max_ignored_errors():
settings = {
'load_balancing': 'in_order',
'prefer_localhost_replica': 0,
'connect_timeout': 2,
'receive_timeout': 2,
'send_timeout': 2,
'idle_connection_timeout': 2,
'tcp_keep_alive_timeout': 2,
'distributed_replica_max_ignored_errors': 0,
'distributed_replica_error_half_life': 60,
}
# initiate connection (if started only this test)
n2.query('SELECT * FROM dist', settings=settings)
cluster.pause_container('n1')
# n1 paused -- skipping, and increment error_count for n1
# but the query succeeds, no need in query_and_get_error()
n2.query('SELECT * FROM dist', settings=settings)
# XXX: due to config reloading we need second time (sigh)
n2.query('SELECT * FROM dist', settings=settings)
# check error_count for n1
assert int(n2.query("""
SELECT errors_count FROM system.clusters
WHERE cluster = 'replicas_cluster' AND host_name = 'n1'
""", settings=settings)) == 1
cluster.unpause_container('n1')
# still n2
assert get_node(n2, settings=settings) == 'n2'
# now n1
settings['distributed_replica_max_ignored_errors'] = 1
assert get_node(n2, settings=settings) == 'n1'