Keeper reconfig -- sleep before removing leader to avoid losing accepted

but uncommitted commands
This commit is contained in:
Mike Kot 2023-08-16 14:37:30 +00:00
parent c391527c86
commit 785183bb3c
12 changed files with 317 additions and 14 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit eb1572129c71beb2156dcdaadc3fb136954aed96
Subproject commit b7ea89b817a18dc0eafc1f909d568869f02d2d04

View File

@ -32,6 +32,7 @@ struct Settings;
M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \
M(Milliseconds, session_shutdown_timeout, 10000, "How much time we will wait until sessions are closed during shutdown", 0) \
M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \
M(Milliseconds, sleep_before_leader_change_ms, 8000, "How much time we will wait before removing leader (so as leader could commit accepted but non-committed commands and they won't be lost -- leader removal is not synchronized with committing)", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \

View File

@ -813,6 +813,8 @@ void KeeperDispatcher::clusterUpdateWithReconfigDisabledThread()
void KeeperDispatcher::clusterUpdateThread()
{
using enum KeeperServer::ConfigUpdateState;
bool last_command_was_leader_change = false;
auto & shutdown_called = keeper_context->shutdown_called;
while (!shutdown_called)
{
@ -820,13 +822,18 @@ void KeeperDispatcher::clusterUpdateThread()
if (!cluster_update_queue.pop(action))
return;
if (server->applyConfigUpdate(action))
if (const auto res = server->applyConfigUpdate(action, last_command_was_leader_change); res == Accepted)
LOG_DEBUG(log, "Processing config update {}: accepted", action);
else // TODO (myrrc) sleep a random amount? sleep less?
else
{
last_command_was_leader_change = res == WaitBeforeChangingLeader;
(void)cluster_update_queue.pushFront(action);
LOG_DEBUG(log, "Processing config update {}: declined, backoff", action);
std::this_thread::sleep_for(50ms);
std::this_thread::sleep_for(last_command_was_leader_change
? configuration_and_settings->coordination_settings->sleep_before_leader_change_ms
: 50ms);
}
}
}

View File

@ -870,36 +870,50 @@ std::vector<int64_t> KeeperServer::getDeadSessions()
return state_machine->getDeadSessions();
}
bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction & action)
KeeperServer::ConfigUpdateState KeeperServer::applyConfigUpdate(
const ClusterUpdateAction & action, bool last_command_was_leader_change)
{
using enum ConfigUpdateState;
std::lock_guard _{server_write_mutex};
if (const auto * add = std::get_if<AddRaftServer>(&action))
{
if (raft_instance->get_srv_config(add->id) != nullptr)
return true;
return Accepted;
auto resp = raft_instance->add_srv(static_cast<nuraft::srv_config>(*add));
resp->get();
return resp->get_accepted();
return resp->get_accepted() ? Accepted : Declined;
}
else if (const auto * remove = std::get_if<RemoveRaftServer>(&action))
{
// This corner case is the most problematic. Issue follows: if we agree on a number
// of commands but don't commit them on leader, and then issue a leadership change via
// yield/request, leader can pause writes before all commits, therefore commands will be lost
// (leadership change is not synchronized with committing in NuRaft).
// However, waiting till some commands get _committed_ instead of _agreed_ is a hard task
// regarding current library design, and this brings lots of levels of complexity
// (see https://github.com/ClickHouse/ClickHouse/pull/53481 history). So, a compromise here
// is a timeout before issuing a leadership change with an ability to change if user knows they
// have a particularly slow network.
if (remove->id == raft_instance->get_leader())
{
if (!last_command_was_leader_change)
return WaitBeforeChangingLeader;
if (isLeader())
raft_instance->yield_leadership();
else
raft_instance->request_leadership();
return false;
return Declined;
}
if (raft_instance->get_srv_config(remove->id) == nullptr)
return true;
return Accepted;
auto resp = raft_instance->remove_srv(remove->id);
resp->get();
return resp->get_accepted();
return resp->get_accepted() ? Accepted : Declined;
}
else if (const auto * update = std::get_if<UpdateRaftServerPriority>(&action))
{
@ -908,10 +922,10 @@ bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction & action)
"Attempt to apply {} but server is not present in Raft",
action);
else if (ptr->get_priority() == update->priority)
return true;
return Accepted;
raft_instance->set_priority(update->id, update->priority, /*broadcast on live leader*/true);
return true;
return Accepted;
}
UNREACHABLE();
}

View File

@ -128,7 +128,10 @@ public:
int getServerID() const { return server_id; }
bool applyConfigUpdate(const ClusterUpdateAction& action);
enum class ConfigUpdateState { Accepted, Declined, WaitBeforeChangingLeader };
ConfigUpdateState applyConfigUpdate(
const ClusterUpdateAction& action,
bool last_command_was_leader_change = false);
// TODO (myrrc) these functions should be removed once "reconfig" is stabilized
void applyConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action);

View File

@ -83,7 +83,8 @@ def test_reconfig_replace_leader(started_cluster):
assert "node3" in config
assert "node4" not in config
ku.wait_configs_equal(config, zk2)
# additional 20s wait before removing leader
ku.wait_configs_equal(config, zk2, timeout=50)
node4.start_clickhouse()
config = zk2.reconfig(joining="server.4=node4:9234", leaving=None, new_members=None)

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
<server> <id>3</id> <hostname>node3</hostname> <port>9234</port> </server>
<server> <id>4</id> <hostname>node4</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,151 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"])
node4 = cluster.add_instance("node4", stay_alive=True)
zk1, zk2, zk3, zk4 = None, None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node4.stop_clickhouse()
node4.copy_file_to_container(
join(CONFIG_DIR, "keeper4.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
# can't use create_client as clickhouse-keeper-client 's reconfig doesn't support
# joining and adding in single reconfig command, thus duplication
# TODO myrrc this should be removed once keeper-client is updated
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def get_config_str(zk):
return ku.get_config_str(zk)[0].decode("utf-8")
def wait_configs_equal(
left_config: str, right_zk: ku.KeeperClient, timeout: float = 30.0
):
"""
Check whether get /keeper/config result in left_config is equal
to get /keeper/config on right_zk ZK connection.
"""
elapsed: float = 0.0
while sorted(left_config.split("\n")) != sorted(
get_config_str(right_zk).split("\n")
):
time.sleep(1)
elapsed += 1
if elapsed >= timeout:
raise Exception(
f"timeout while checking nodes configs to get equal. "
f"Left: {left_config}, right: {get_config_str(right_zk)}"
)
def test_reconfig_replace_leader_in_one_command(started_cluster):
"""
Remove leader from a cluster of 3 and add a new node to this cluster in a single command
"""
zk1 = get_fake_zk(node1)
config = get_config_str(zk1)
assert len(config.split("\n")) == 3
assert "node1" in config
assert "node2" in config
assert "node3" in config
assert "node4" not in config
for i in range(100):
zk1.create(f"/test_four_{i}", b"somedata")
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0")
wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3)
zk3.sync("/test_four_0")
wait_configs_equal(config, zk3)
for i in range(100):
assert zk2.exists(f"/test_four_{i}") is not None
assert zk3.exists(f"/test_four_{i}") is not None
assert ku.is_leader(cluster, node1)
node4.start_clickhouse()
config, _ = zk2.reconfig(
joining="server.4=node4:9234", leaving="1", new_members=None
)
config = config.decode("utf-8")
print("After removing 1 and adding 4", config)
assert len(config.split("\n")) == 3
assert "node1" not in config
assert "node2" in config
assert "node3" in config
assert "node4" in config
ku.wait_until_connected(cluster, node4)
time.sleep(1)
zk4 = get_fake_zk(node4)
zk4.sync("/test_four_0")
# we have an additional 20s timeout for removing leader
wait_configs_equal(config, zk4, timeout=50)
for i in range(100):
assert zk4.exists(f"test_four_{i}") is not None
zk4.create(f"/test_four_{100 + i}", b"somedata")
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0")
wait_configs_equal(config, zk2)
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3.sync("/test_four_0")
wait_configs_equal(config, zk3)
for i in range(200):
assert zk2.exists(f"test_four_{i}") is not None
assert zk3.exists(f"test_four_{i}") is not None