mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
improvements in lost replica recovery
This commit is contained in:
parent
ce3d18e8c5
commit
ea16370024
@ -58,7 +58,7 @@ struct Settings;
|
||||
/** Replication settings. */ \
|
||||
M(UInt64, replicated_deduplication_window, 100, "How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).", 0) \
|
||||
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
|
||||
M(UInt64, max_replicated_logs_to_keep, 100, "How many records may be in log, if there is inactive replica.", 0) \
|
||||
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
|
||||
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
|
||||
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
|
||||
M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
|
||||
@ -75,6 +75,7 @@ struct Settings;
|
||||
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Limit parallel sends for one table.", 0) \
|
||||
M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \
|
||||
M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
|
||||
M(Bool, detach_old_local_parts_when_cloning_replica, 1, "Do not remove old local parts when repairing lost replica.", 0) \
|
||||
\
|
||||
/** Check delay of replicas settings. */ \
|
||||
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \
|
||||
|
@ -274,6 +274,7 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
|
||||
for (const auto & pair : log_pointers_candidate_lost_replicas)
|
||||
{
|
||||
String replica = pair.first;
|
||||
LOG_WARNING(log, "Will mark replica {} as lost, because it has stale log pointer: {}", replica, pair.second);
|
||||
Coordination::Requests ops;
|
||||
/// If host changed version we can not mark replicas, because replica started to be active.
|
||||
ops.emplace_back(zkutil::makeCheckRequest(
|
||||
|
@ -2182,8 +2182,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
|
||||
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
LOG_INFO(log, "Will mimic {}", source_replica);
|
||||
|
||||
String source_path = zookeeper_path + "/replicas/" + source_replica;
|
||||
|
||||
/** TODO: it will be deleted! (It is only to support old version of CH server).
|
||||
@ -2309,6 +2307,17 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
||||
LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name);
|
||||
}
|
||||
}
|
||||
|
||||
if (getSettings()->detach_old_local_parts_when_cloning_replica)
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
for (const auto & part : parts_to_remove_from_working_set)
|
||||
{
|
||||
LOG_INFO(log, "Detaching {}", part->relative_path);
|
||||
part->makeCloneInDetached("clone", metadata_snapshot);
|
||||
}
|
||||
}
|
||||
|
||||
removePartsFromWorkingSet(parts_to_remove_from_working_set, true);
|
||||
|
||||
for (const String & name : active_parts)
|
||||
@ -2336,47 +2345,102 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
||||
|
||||
void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper)
|
||||
{
|
||||
Coordination::Stat is_lost_stat;
|
||||
bool is_new_replica = true;
|
||||
String res;
|
||||
if (zookeeper->tryGet(replica_path + "/is_lost", res))
|
||||
if (zookeeper->tryGet(replica_path + "/is_lost", res, &is_lost_stat))
|
||||
{
|
||||
if (res == "0")
|
||||
return;
|
||||
if (is_lost_stat.version)
|
||||
is_new_replica = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Replica was created by old version of CH, so me must create "/is_lost".
|
||||
/// Note that in old version of CH there was no "lost" replicas possible.
|
||||
/// TODO is_lost node should always exist since v18.12, maybe we can replace `tryGet` with `get` and remove old code?
|
||||
zookeeper->create(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent);
|
||||
return;
|
||||
}
|
||||
|
||||
/// is_lost is "1": it means that we are in repair mode.
|
||||
|
||||
String source_replica;
|
||||
Coordination::Stat source_is_lost_stat;
|
||||
source_is_lost_stat.version = -1;
|
||||
|
||||
for (const String & source_replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
|
||||
/// Try choose source replica to clone.
|
||||
/// Source replica must not be lost and should have minimal queue size and maximal log pointer.
|
||||
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
|
||||
std::vector<zkutil::ZooKeeper::FutureGet> futures;
|
||||
for (const String & source_replica_name : replicas)
|
||||
{
|
||||
/// Do not clone from myself.
|
||||
if (source_replica_name == replica_name)
|
||||
continue;
|
||||
|
||||
String source_replica_path = zookeeper_path + "/replicas/" + source_replica_name;
|
||||
|
||||
/// Do not clone from myself.
|
||||
if (source_replica_path != replica_path)
|
||||
/// Obviously the following get operations are not atomic, but it's ok to choose good enough replica, not the best one.
|
||||
/// NOTE: We may count some entries twice if log_pointer is moved.
|
||||
futures.emplace_back(zookeeper->asyncTryGet(source_replica_path + "/is_lost"));
|
||||
futures.emplace_back(zookeeper->asyncTryGet(source_replica_path + "/log_pointer"));
|
||||
futures.emplace_back(zookeeper->asyncTryGet(source_replica_path + "/queue"));
|
||||
}
|
||||
|
||||
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log");
|
||||
size_t max_log_entry = 0;
|
||||
if (!log_entries.empty())
|
||||
{
|
||||
String last_entry_num = std::max_element(log_entries.begin(), log_entries.end())->substr(4);
|
||||
max_log_entry = std::stol(last_entry_num);
|
||||
}
|
||||
++max_log_entry;
|
||||
|
||||
size_t min_replication_lag = std::numeric_limits<size_t>::max();
|
||||
String source_replica;
|
||||
Coordination::Stat source_is_lost_stat;
|
||||
size_t future_num = 0;
|
||||
for (const String & source_replica_name : replicas)
|
||||
{
|
||||
if (source_replica_name == replica_name)
|
||||
continue;
|
||||
|
||||
auto get_is_lost = futures[future_num++].get();
|
||||
auto get_log_pointer = futures[future_num++].get();
|
||||
auto get_queue = futures[future_num++].get();
|
||||
|
||||
if (get_is_lost.error == Coordination::Error::ZNONODE)
|
||||
{
|
||||
/// Do not clone from lost replicas.
|
||||
String source_replica_is_lost_value;
|
||||
if (!zookeeper->tryGet(source_replica_path + "/is_lost", source_replica_is_lost_value, &source_is_lost_stat)
|
||||
|| source_replica_is_lost_value == "0")
|
||||
{
|
||||
source_replica = source_replica_name;
|
||||
break;
|
||||
}
|
||||
/// For compatibility with older ClickHouse versions
|
||||
get_is_lost.stat.version = -1;
|
||||
}
|
||||
else if (get_is_lost.data != "0")
|
||||
continue;
|
||||
if (get_log_pointer.error != Coordination::Error::ZOK)
|
||||
continue;
|
||||
if (get_queue.error != Coordination::Error::ZOK)
|
||||
continue;
|
||||
|
||||
/// Replica is not lost and we can clone it. Let's calculate approx replication lag.
|
||||
size_t source_log_pointer = get_log_pointer.data.empty() ? 0 : parse<UInt64>(get_log_pointer.data);
|
||||
assert(source_log_pointer <= max_log_entry);
|
||||
size_t replica_queue_lag = max_log_entry - source_log_pointer;
|
||||
size_t replica_queue_size = get_queue.stat.numChildren;
|
||||
size_t replication_lag = replica_queue_lag + replica_queue_size;
|
||||
LOG_INFO(log, "Replica {} has approximate {} queue lag and {} queue size", source_replica_name, replica_queue_lag, replica_queue_size);
|
||||
if (replication_lag < min_replication_lag)
|
||||
{
|
||||
source_replica = source_replica_name;
|
||||
source_is_lost_stat = get_is_lost.stat;
|
||||
min_replication_lag = replication_lag;
|
||||
}
|
||||
}
|
||||
|
||||
if (source_replica.empty())
|
||||
throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
|
||||
|
||||
if (is_new_replica)
|
||||
LOG_INFO(log, "Will mimic {}", source_replica);
|
||||
else
|
||||
LOG_WARNING(log, "Will mimic {}", source_replica);
|
||||
|
||||
/// Clear obsolete queue that we no longer need.
|
||||
zookeeper->removeChildren(replica_path + "/queue");
|
||||
|
||||
|
@ -1,19 +0,0 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<internal_replication>true</internal_replication>
|
||||
<replica>
|
||||
<default_database>shard_0</default_database>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<default_database>shard_0</default_database>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</yandex>
|
@ -9,16 +9,15 @@ def fill_nodes(nodes, shard):
|
||||
for node in nodes:
|
||||
node.query(
|
||||
'''
|
||||
CREATE DATABASE test;
|
||||
|
||||
CREATE TABLE test_table(date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
|
||||
'''.format(shard=shard, replica=node.name))
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
||||
node1 = cluster.add_instance('node1', with_zookeeper=True)
|
||||
node2 = cluster.add_instance('node2', with_zookeeper=True)
|
||||
node3 = cluster.add_instance('node3', with_zookeeper=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@ -26,7 +25,7 @@ def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
fill_nodes([node1, node2], 1)
|
||||
fill_nodes([node1, node2, node3], 1)
|
||||
|
||||
yield cluster
|
||||
|
||||
@ -49,3 +48,29 @@ def test_recovery(start_cluster):
|
||||
check_callback=lambda x: len(node2.query("select * from test_table")) > 0)
|
||||
|
||||
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table"))
|
||||
lost_marker = "Will mark replica node2 as lost"
|
||||
assert node1.contains_in_log(lost_marker) or node3.contains_in_log(lost_marker)
|
||||
|
||||
def test_choose_source_replica(start_cluster):
|
||||
node3.query("INSERT INTO test_table VALUES (2, 1)")
|
||||
time.sleep(1)
|
||||
node2.query("DETACH TABLE test_table")
|
||||
node1.query("SYSTEM STOP FETCHES test_table") # node1 will have many entries in queue, so node2 will clone node3
|
||||
|
||||
for i in range(100):
|
||||
node3.query("INSERT INTO test_table VALUES (2, {})".format(i))
|
||||
|
||||
node2.query_with_retry("ATTACH TABLE test_table",
|
||||
check_callback=lambda x: len(node2.query("select * from test_table")) > 0)
|
||||
|
||||
node1.query("SYSTEM START FETCHES test_table")
|
||||
node1.query("SYSTEM SYNC REPLICA test_table")
|
||||
node2.query("SYSTEM SYNC REPLICA test_table")
|
||||
|
||||
assert node1.query("SELECT count(*) FROM test_table") == node3.query("SELECT count(*) FROM test_table")
|
||||
assert node2.query("SELECT count(*) FROM test_table") == node3.query("SELECT count(*) FROM test_table")
|
||||
|
||||
lost_marker = "Will mark replica node2 as lost"
|
||||
assert node1.contains_in_log(lost_marker) or node3.contains_in_log(lost_marker)
|
||||
assert node2.contains_in_log("Will mimic node3")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user