CLICKHOUSE-3847 fix race in set log_pointer in cloneReplica()

This commit is contained in:
VadimPE 2018-08-08 18:25:07 +03:00
parent 3b9002dfc9
commit c16d876867

View File

@ -1970,26 +1970,19 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
/// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost.
/// Copy reference to the log from `reference/master` replica.
zkutil::Requests rec;
/// We must check is_active and set log_pointer atomically in order to cleanupThread can not clear log with our log_pointer.
rec.push_back(zkutil::makeCheckRequest(source_path + "/is_active", 0));
rec.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"), -1));
zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"), -1);
try
{
zookeeper->multi(rec);
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
return false;
else
throw e;
}
String raw_log_pointer = zookeeper->get(replica_path + "/log_pointer");
zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
if (!entries.empty())
return false;
std::sort(entries.begin(), entries.end());
if ("log-" + padIndex(parse<UInt64>(raw_log_pointer)) < entries[0])
return false;
/// Let's remember the queue of the reference/master replica.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
@ -2058,7 +2051,8 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
for (const String & replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
{
String source_replica_path = zookeeper_path + "/replicas/" + replica_name;
if (source_replica_path != replica_path && zookeeper->exists(source_replica_path + "/is_active") && !(zookeeper->get(source_replica_path + "/log_pointer").empty()))
String source_log_pointer_raw = zookeeper->get(source_replica_path + "/log_pointer");
if ((source_replica_path != replica_path) && (!source_log_pointer_raw.empty()) && ("log-" + padIndex(parse<UInt64>(source_log_pointer_raw) >= entries[0]))
source_replica = replica_name;
}
}