Add slowdown_count and show actual information in system.clusters

This commit is contained in:
Pavel Kruglov 2021-03-06 02:45:17 +03:00
parent 5f6c8eb1d0
commit dbae58f30b
10 changed files with 169 additions and 39 deletions

View File

@ -110,15 +110,20 @@ ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
ConnectionPoolWithFailover::Status result;
result.reserve(states.size());
const time_t since_last_error_decrease = time(nullptr) - error_decrease_time;
/// Update error_count and slowdown_count in states to return actual information.
auto updated_states = states;
auto updated_error_decrease_time = error_decrease_time;
Base::updateErrorCounts(updated_states, updated_error_decrease_time);
for (size_t i = 0; i < states.size(); ++i)
{
const auto rounds_to_zero_errors = states[i].error_count ? bitScanReverse(states[i].error_count) + 1 : 0;
const auto seconds_to_zero_errors = std::max(static_cast<time_t>(0), rounds_to_zero_errors * decrease_error_period - since_last_error_decrease);
const auto rounds_to_zero_slowdowns = states[i].slowdown_count ? bitScanReverse(states[i].slowdown_count) + 1 : 0;
const auto seconds_to_zero_errors = std::max(static_cast<time_t>(0), std::max(rounds_to_zero_errors, rounds_to_zero_slowdowns) * decrease_error_period - since_last_error_decrease);
result.emplace_back(NestedPoolStatus{
pools[i],
states[i].error_count,
updated_states[i].error_count,
updated_states[i].slowdown_count,
std::chrono::seconds{seconds_to_zero_errors}
});
}

View File

@ -74,6 +74,7 @@ public:
{
const Base::NestedPoolPtr pool;
size_t error_count;
size_t slowdown_count;
std::chrono::seconds estimated_recovery_time;
};

View File

@ -215,7 +215,11 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processEpollEvents(boo
return state;
}
else if (timeout_fd_to_replica_index.contains(event_fd))
replicas[timeout_fd_to_replica_index[event_fd]].change_replica_timeout.reset();
{
int index = timeout_fd_to_replica_index[event_fd];
replicas[index].change_replica_timeout.reset();
++shuffled_pools[index].slowdown_count;
}
else
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
@ -285,6 +289,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);
shuffled_pool.slowdown_count = 0;
if (shuffled_pool.error_count >= max_tries)
{

View File

@ -103,6 +103,7 @@ public:
const PoolState * state{};
size_t index = 0;
size_t error_count = 0;
size_t slowdown_count = 0;
};
/// This functor must be provided by a client. It must perform a single try that takes a connection
@ -133,6 +134,8 @@ protected:
/// This function returns a copy of pool states to avoid race conditions when modifying shared pool states.
PoolStates updatePoolStates(size_t max_ignored_errors);
void updateErrorCounts(PoolStates & states, time_t & last_decrease_time) const;
std::vector<ShuffledPool> getShuffledPools(size_t max_ignored_errors, const GetPriorityFunc & get_priority);
inline void updateSharedErrorCounts(std::vector<ShuffledPool> & shuffled_pools);
@ -193,6 +196,7 @@ inline void PoolWithFailoverBase<TNestedPool>::updateSharedErrorCounts(std::vect
{
auto & pool_state = shared_pool_states[pool.index];
pool_state.error_count = std::min<UInt64>(max_error_cap, pool_state.error_count + pool.error_count);
pool_state.slowdown_count += pool.slowdown_count;
}
}
@ -332,6 +336,8 @@ template <typename TNestedPool>
struct PoolWithFailoverBase<TNestedPool>::PoolState
{
UInt64 error_count = 0;
/// The number of slowdowns that led to changing replica in HedgedRequestsFactory
UInt64 slowdown_count = 0;
/// Priority from the <remote_server> configuration.
Int64 config_priority = 1;
/// Priority from the GetPriorityFunc.
@ -345,8 +351,8 @@ struct PoolWithFailoverBase<TNestedPool>::PoolState
static bool compare(const PoolState & lhs, const PoolState & rhs)
{
return std::forward_as_tuple(lhs.error_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.config_priority, rhs.priority, rhs.random);
return std::forward_as_tuple(lhs.error_count, lhs.slowdown_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.slowdown_count, rhs.config_priority, rhs.priority, rhs.random);
}
private:
@ -366,39 +372,7 @@ PoolWithFailoverBase<TNestedPool>::updatePoolStates(size_t max_ignored_errors)
for (auto & state : shared_pool_states)
state.randomize();
time_t current_time = time(nullptr);
if (last_error_decrease_time)
{
time_t delta = current_time - last_error_decrease_time;
if (delta >= 0)
{
const UInt64 MAX_BITS = sizeof(UInt64) * CHAR_BIT;
size_t shift_amount = MAX_BITS;
/// Divide error counts by 2 every decrease_error_period seconds.
if (decrease_error_period)
shift_amount = delta / decrease_error_period;
/// Update time but don't do it more often than once a period.
/// Else if the function is called often enough, error count will never decrease.
if (shift_amount)
last_error_decrease_time = current_time;
if (shift_amount >= MAX_BITS)
{
for (auto & state : shared_pool_states)
state.error_count = 0;
}
else if (shift_amount)
{
for (auto & state : shared_pool_states)
state.error_count >>= shift_amount;
}
}
}
else
last_error_decrease_time = current_time;
updateErrorCounts(shared_pool_states, last_error_decrease_time);
result.assign(shared_pool_states.begin(), shared_pool_states.end());
}
@ -408,3 +382,46 @@ PoolWithFailoverBase<TNestedPool>::updatePoolStates(size_t max_ignored_errors)
return result;
}
template <typename TNestedPool>
void PoolWithFailoverBase<TNestedPool>::updateErrorCounts(PoolWithFailoverBase<TNestedPool>::PoolStates & states, time_t & last_decrease_time) const
{
time_t current_time = time(nullptr);
if (last_decrease_time)
{
time_t delta = current_time - last_decrease_time;
if (delta >= 0)
{
const UInt64 MAX_BITS = sizeof(UInt64) * CHAR_BIT;
size_t shift_amount = MAX_BITS;
/// Divide error counts by 2 every decrease_error_period seconds.
if (decrease_error_period)
shift_amount = delta / decrease_error_period;
/// Update time but don't do it more often than once a period.
/// Else if the function is called often enough, error count will never decrease.
if (shift_amount)
last_decrease_time = current_time;
if (shift_amount >= MAX_BITS)
{
for (auto & state : states)
{
state.error_count = 0;
state.slowdown_count = 0;
}
}
else if (shift_amount)
{
for (auto & state : states)
{
state.error_count >>= shift_amount;
state.slowdown_count >>= shift_amount;
}
}
}
}
else
last_decrease_time = current_time;
}

View File

@ -23,6 +23,7 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes()
{"user", std::make_shared<DataTypeString>()},
{"default_database", std::make_shared<DataTypeString>()},
{"errors_count", std::make_shared<DataTypeUInt32>()},
{"slowdowns_count", std::make_shared<DataTypeUInt32>()},
{"estimated_recovery_time", std::make_shared<DataTypeUInt32>()}
};
}
@ -71,6 +72,7 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
res_columns[i++]->insert(address.user);
res_columns[i++]->insert(address.default_database);
res_columns[i++]->insert(pool_status[replica_index].error_count);
res_columns[i++]->insert(pool_status[replica_index].slowdown_count);
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
}
}

View File

@ -85,11 +85,29 @@ def check_settings(node_name, sleep_in_send_tables_status, sleep_in_send_data):
def test_stuck_replica(started_cluster):
cluster.pause_container("node_1")
check_query(expected_replica="node_2")
result = NODES['node'].query("SELECT slowdowns_count FROM system.clusters WHERE cluster='test_cluster' and host_name='node_1'")
assert TSV(result) == TSV("1")
result = NODES['node'].query("SELECT hostName(), id FROM distributed ORDER BY id LIMIT 1");
assert TSV(result) == TSV("node_2\t0")
# Check that we didn't choose node_1 first again and slowdowns_count didn't increase.
result = NODES['node'].query("SELECT slowdowns_count FROM system.clusters WHERE cluster='test_cluster' and host_name='node_1'")
assert TSV(result) == TSV("1")
cluster.unpause_container("node_1")
def test_long_query(started_cluster):
# Restart to reset pool states.
NODES['node'].restart_clickhouse()
result = NODES['node'].query("select hostName(), max(id + sleep(1.5)) from distributed settings max_block_size = 1, max_threads = 1;")
assert TSV(result) == TSV("node_1\t99")

View File

@ -0,0 +1,14 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<sleep_in_send_tables_status>5</sleep_in_send_tables_status>
</default>
</profiles>
</yandex>

View File

@ -0,0 +1,60 @@
import os
import sys
import time
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', with_zookeeper=True, main_configs=['configs/remote_servers.xml'])
node_1 = cluster.add_instance('node_1', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node_1.query('''CREATE TABLE replicated (id UInt32, date Date) ENGINE =
ReplicatedMergeTree('/clickhouse/tables/replicated', 'node_1') ORDER BY id PARTITION BY toYYYYMM(date)''')
node.query("CREATE TABLE distributed (id UInt32, date Date) ENGINE = Distributed('test_cluster', 'default', 'replicated')")
yield cluster
finally:
cluster.shutdown()
def test(started_cluster):
cluster.pause_container("node_1")
node.query("SYSTEM RELOAD CONFIG")
node.query_and_get_error("SELECT count() FROM distributed SETTINGS receive_timeout=1")
result = node.query("SELECT errors_count, estimated_recovery_time FROM system.clusters WHERE cluster='test_cluster' and host_name='node_1'")
errors_count, recovery_time = map(int, result.split())
assert errors_count == 3
while True:
time.sleep(1)
result = node.query("SELECT errors_count, estimated_recovery_time FROM system.clusters WHERE cluster='test_cluster' and host_name='node_1'")
prev_time = recovery_time
errors_count, recovery_time = map(int, result.split())
if recovery_time == 0:
break
assert recovery_time < prev_time
assert errors_count > 0
assert recovery_time == 0
assert errors_count == 0
cluster.unpause_container("node_1")