mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 13:10:48 +00:00
CLICKHOUSE-3847 fix bug when replica recover and it's log_pointer will be deleted
This commit is contained in:
parent
e7d6bc3dbc
commit
cf0a7c1529
@ -82,6 +82,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
|
||||
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
|
||||
UInt64 min_saved_log_pointer = std::numeric_limits<UInt64>::max();
|
||||
UInt64 min_inactive_log_pointer = std::numeric_limits<UInt64>::max();
|
||||
|
||||
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
|
||||
|
||||
@ -94,6 +95,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
? entries.size() - storage.data.settings.max_replicated_logs_to_keep.value
|
||||
: 0];
|
||||
|
||||
std::unordered_set<String> recovering_replicas;
|
||||
|
||||
std::unordered_map<String, UInt32> host_versions_inactive_replicas;
|
||||
std::unordered_map<String, String> log_pointers_lost_replicas;
|
||||
|
||||
@ -111,13 +114,25 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
|
||||
/// Check status of replica (active or not).
|
||||
/// If replica was not active, we could check when it's log_pointer locates.
|
||||
|
||||
String res;
|
||||
|
||||
bool new_version_of_replica = zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", res);
|
||||
|
||||
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
|
||||
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
||||
if (new_version_of_replica && res == "1")
|
||||
recovering_replicas.insert(replica);
|
||||
else
|
||||
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
||||
else
|
||||
{
|
||||
String res;
|
||||
if (!zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", res))
|
||||
if (!new_version_of_replica)
|
||||
{
|
||||
/// Only to support old versions CH.
|
||||
/// If replica did not have "/is_lost" we must save it's log_pointer.
|
||||
/// Because old version CH can not work with recovering.
|
||||
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
||||
}
|
||||
else
|
||||
if (res == "0")
|
||||
{
|
||||
@ -128,15 +143,27 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
{
|
||||
host_versions_inactive_replicas[replica] = host_stat.version;
|
||||
log_pointers_lost_replicas[replica] = log_pointer_str;
|
||||
min_inactive_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
||||
}
|
||||
}
|
||||
/// Only to support old versions CH.
|
||||
else
|
||||
++replicas_were_marked_is_lost;
|
||||
host_versions_inactive_replicas[replica] = host_stat.version;
|
||||
}
|
||||
}
|
||||
|
||||
/// We must check log_pointer recovering replicas at the end.
|
||||
/// Because log pointer recovering replicas can move backward.
|
||||
for (const String & replica : recovering_replicas)
|
||||
{
|
||||
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
|
||||
UInt32 log_pointer = parse<UInt64>(pointer);
|
||||
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
||||
}
|
||||
|
||||
if (recovering_replicas.size() != 0)
|
||||
min_saved_log_pointer = std::min(min_saved_log_pointer, min_inactive_log_pointer);
|
||||
|
||||
/// We will not touch the last `min_replicated_logs_to_keep` records.
|
||||
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.min_replicated_logs_to_keep.value), entries.end());
|
||||
/// We will not touch records that are no less than `min_pointer_active_replica`.
|
||||
@ -154,9 +181,9 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
|
||||
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
|
||||
{
|
||||
/// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer.
|
||||
for (auto host_version: host_versions_inactive_replicas)
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second));
|
||||
/// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer.
|
||||
for (auto host_version: host_versions_inactive_replicas)
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second));
|
||||
|
||||
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version));
|
||||
|
@ -1999,7 +1999,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
|
||||
if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
|
||||
throw Exception("Can not clone replica, because a " + source_path + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
|
||||
throw Exception("Can not clone replica, because the clickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
throw Exception("Can not clone replica, because the ClickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
else
|
||||
zkutil::KeeperMultiException::check(error, ops, resp);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user