CLICKHOUSE-3847 change createReplica and change check in cloneReolica()

This commit is contained in:
VadimPE 2018-08-22 17:01:54 +03:00
parent 297be5c303
commit d81d4dfc70
6 changed files with 102 additions and 218 deletions

View File

@ -12,6 +12,8 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_FOUND_NODE;
extern const int ALL_REPLICAS_LOST;
extern const int REPLICA_IS_ACTIVE;
}
@ -65,25 +67,6 @@ void ReplicatedMergeTreeCleanupThread::iterate()
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
auto zookeeper = storage.getZooKeeper();
auto replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
for (auto replica : replicas)
{
auto replica_path = storage.zookeeper_path + "/replicas/" + replica;
if (!zookeeper->exists(replica_path + "/is_lost"))
{
oldCleaner();
return;
}
}
newCleaner();
}
void ReplicatedMergeTreeCleanupThread::newCleaner()
{
auto zookeeper = storage.getZooKeeper();
@ -114,7 +97,6 @@ void ReplicatedMergeTreeCleanupThread::newCleaner()
std::unordered_map<String, UInt32> hosts_version;
std::unordered_map<String, String> log_pointers_lost_replicas;
std::unordered_map<String, UInt32> log_pointers_version;
for (const String & replica : replicas)
{
@ -130,21 +112,23 @@ void ReplicatedMergeTreeCleanupThread::newCleaner()
/// Check status of replica (active or not).
/// If replica was not active, we could check when it's log_pointer locates.
/// If replica active, but runs clocneReplica(), we should check it's log_pointer in logs.
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active") && log_pointer_str >= entries[0])
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active") || !zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_lost"))
min_pointer_active_replica = std::min(min_pointer_active_replica, log_pointer);
else
{
hosts_version[replica] = host_stat.version;
log_pointers_lost_replicas[replica] = log_pointer_str;
log_pointers_version[replica] = log_pointer_stat.version;
if (log_pointer_str >= min_saved_record_log_str)
/// We can not mark lost replicas.
if (zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/is_lost") == "0")
{
if (min_pointer_inactive_replica_str != "" && min_pointer_inactive_replica_str >= log_pointer_str)
min_pointer_inactive_replica_str = log_pointer_str;
else if (min_pointer_inactive_replica_str == "")
min_pointer_inactive_replica_str = log_pointer_str;
hosts_version[replica] = host_stat.version;
log_pointers_lost_replicas[replica] = log_pointer_str;
if (log_pointer_str >= min_saved_record_log_str)
{
if (min_pointer_inactive_replica_str != "" && min_pointer_inactive_replica_str >= log_pointer_str)
min_pointer_inactive_replica_str = log_pointer_str;
else if (min_pointer_inactive_replica_str == "")
min_pointer_inactive_replica_str = log_pointer_str;
}
}
}
}
@ -163,12 +147,7 @@ void ReplicatedMergeTreeCleanupThread::newCleaner()
if (entries.empty())
return;
/// We must mark lost replicas.
if (!markLostReplicas(hosts_version, log_pointers_lost_replicas, entries.back(), zookeeper))
{
LOG_DEBUG(log, "Can not mark lost replicas");
return;
}
markLostReplicas(hosts_version, log_pointers_lost_replicas, entries.back(), zookeeper);
zkutil::Requests ops;
for (size_t i = 0; i < entries.size(); ++i)
@ -188,63 +167,11 @@ void ReplicatedMergeTreeCleanupThread::newCleaner()
}
void ReplicatedMergeTreeCleanupThread::oldCleaner()
{
auto zookeeper = storage.getZooKeeper();
zkutil::Stat stat;
if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat))
throw Exception(storage.zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// We will wait for 1.1 times more records to accumulate than necessary.
if (static_cast<double>(children_count) < storage.data.settings.max_replicated_logs_to_keep * 1.1)
return;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
if (pointer.empty())
return;
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
}
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
std::sort(entries.begin(), entries.end());
/// We will not touch the last `max_replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.max_replicated_logs_to_keep.value), entries.end());
/// We will not touch records that are no less than `min_pointer`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end());
if (entries.empty())
return;
zkutil::Requests ops;
for (size_t i = 0; i < entries.size(); ++i)
{
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1));
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
{
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version));
zookeeper->multi(ops);
ops.clear();
}
}
LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
}
bool ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & hosts_version,
void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & hosts_version,
const std::unordered_map<String, String> & log_pointers_lost_replicas,
const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper)
{
std::vector<zkutil::Requests> requests;
std::vector<zkutil::ZooKeeper::FutureMulti> futures;
for (auto pair : log_pointers_lost_replicas)
@ -253,26 +180,27 @@ bool ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
if (pair.second <= remove_border)
{
zkutil::Requests ops;
/// If log pointer and host change version we can not mark replicas, so we check it.
/// If host changed version we can not mark replicas, because replica startes to be active.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", hosts_version.at(replica)));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
futures.push_back(zookeeper->tryAsyncMulti(ops));
requests.push_back(ops);
}
}
/// We must return if all replicas will be lost.
if (futures.size() == (zookeeper->getChildren(storage.zookeeper_path + "/replicas")).size())
{
return false;
}
if (requests.size() == (zookeeper->getChildren(storage.zookeeper_path + "/replicas")).size())
throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
for (auto & req : requests)
futures.push_back(zookeeper->tryAsyncMulti(req));
for (auto & future : futures)
{
auto res = future.get();
if (res.error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
return false;
throw Exception("One of the replicas became active, when we clear log", ErrorCodes::REPLICA_IS_ACTIVE);
else if (res.error != ZooKeeperImpl::ZooKeeper::ZOK)
throw;
}
return true;
}

View File

@ -45,12 +45,9 @@ private:
/// Remove old records from ZooKeeper.
void clearOldLogs();
void newCleaner();
void oldCleaner();
/// Mark lost replicas.
bool markLostReplicas(const std::unordered_map<String, UInt32> & hosts_version,
void markLostReplicas(const std::unordered_map<String, UInt32> & hosts_version,
const std::unordered_map<String, String> & log_pointers_lost_replicas,
const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper);

View File

@ -53,8 +53,11 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
std::unordered_set<String> already_loaded_paths;
for (const LogEntryPtr & log_entry : queue)
already_loaded_paths.insert(log_entry->znode_name);
{
std::lock_guard lock(state_mutex);
for (const LogEntryPtr & log_entry : queue)
already_loaded_paths.insert(log_entry->znode_name);
}
Strings children = zookeeper->getChildren(queue_path);

View File

@ -198,11 +198,11 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
{
removeFailedQuorumParts();
activateReplica();
storage.cloneReplicaIfNeeded();
const auto & zookeeper = storage.getZooKeeper();
storage.cloneReplicaIfNeeded(zookeeper);
storage.queue.load(zookeeper);
/// pullLogsToQueue() after we mark replica 'is_active' and clone();

View File

@ -108,6 +108,7 @@ namespace ErrorCodes
extern const int KEEPER_EXCEPTION;
extern const int ALL_REPLICAS_LOST;
extern const int CAN_NOT_CLONE_REPLICA;
extern const int SOURCE_REPLICA_IS_LOST;
}
namespace ActionLocks
@ -591,39 +592,35 @@ void StorageReplicatedMergeTree::createReplica()
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Creating replica " << replica_path);
/// Create an empty replica. We'll create `columns` node at the end - we'll use it as a sign that replica creation is complete.
zkutil::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "1", zkutil::CreateMode::Persistent));
try
{
zookeeper->multi(ops);
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
throw;
}
/// If replica is first, is lost will be "0".
if ((zookeeper->getChildren(zookeeper_path + "/replicas")).size() == 1)
zookeeper->set(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent);
int32_t code;
do
{
zkutil::Stat replicas_stat;
String last_added_replica = zookeeper->get(zookeeper_path + "/replicas", &replicas_stat);
String is_lost_value = last_added_replica == "" ? "0" : "1";
/// Create an empty replica. We'll create `columns` node at the end - we'll use it as a sign that replica creation is complete.
zkutil::Requests ops;
zkutil::Responses resps;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent));
/// Check version of /replicas to see if there are any replicas.
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/replicas", replicas_stat.version));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, -1));
/** You need to change the data of nodes/replicas to anything, so that the thread that removes old entries in the log,
* stumbled over this change and does not delete the entries we have not yet read.
*/
zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name);
zookeeper->create(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent);
code = zookeeper->tryMulti(ops, resps);
if (code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
} while (code != ZooKeeperImpl::ZooKeeper::ZOK);
}
@ -1952,27 +1949,19 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::ZooKeeperPtr & zookeeper)
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::Stat is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
{
LOG_INFO(log, "Will mimic " << source_replica);
String source_path = zookeeper_path + "/replicas/" + source_replica;
zkutil::Stat is_lost_stat;
/// If the replica gets the is_lost, it'll help us check.
is_lost_stat.version = -2;
String res;
if (zookeeper->tryGet(source_path + "/is_lost", res, &is_lost_stat))
if (res == "1")
return false;
/** If the reference/master replica is not yet fully created, let's wait.
* NOTE: If something went wrong while creating it, we can hang around forever.
* You can create an ephemeral node at the time of creation to make sure that the replica is created, and not abandoned.
* The same can be done for the table. You can automatically delete a replica/table node,
* if you see that it was not created up to the end, and the one who created it died.
*/
/** That check will be delete (It is only for old version of CH server).
* If the reference/master replica is not yet fully created, let's wait.
* NOTE: If something went wrong while creating it, we can hang around forever.
* You can create an ephemeral node at the time of creation to make sure that the replica is created, and not abandoned.
* The same can be done for the table. You can automatically delete a replica/table node,
* if you see that it was not created up to the end, and the one who created it died.
*/
while (!zookeeper->exists(source_path + "/columns"))
{
LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");
@ -1989,28 +1978,29 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
/// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost.
/// We must set log_pointer atomically in order to cleanupThread can not clear log with our log_pointer.
String raw_log_pointer = zookeeper->get(source_path + "/log_pointer");
zookeeper->set(replica_path + "/log_pointer", raw_log_pointer, -1);
if (!checkStat(zookeeper, source_path, is_lost_stat))
return false;
/// Check that log_pointer in entries.
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
if (entries.empty())
return true;
zkutil::Requests ops;
ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1));
auto min_record = std::min_element(entries.begin(), entries.end());
/// If log_pointer out of log, we must retry cloneReplica();
if ("log-" + padIndex(parse<UInt64>(raw_log_pointer)) < *min_record)
if (is_lost_stat.version == -1)
{
LOG_DEBUG(log, "Can not clone replica, because log_pointer out of log");
return false;
ops.push_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "0", zkutil::CreateMode::PersistentSequential));
ops.push_back(zkutil::makeRemoveRequest(replica_path + "/is_lost", -1));
}
else
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", is_lost_stat.version));
zkutil::Responses resp;
auto error = zookeeper->tryMulti(ops, resp);
if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
throw Exception("Can not clone replica, because a source replica is lost", ErrorCodes::SOURCE_REPLICA_IS_LOST);
else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw Exception("Replica changed version");
else if (error != ZooKeeperImpl::ZooKeeper::ZOK)
throw ("cloneReplica() failed");
/// Let's remember the queue of the reference/master replica.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
@ -2023,9 +2013,6 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
continue;
source_queue.push_back(entry);
}
if (!checkStat(zookeeper, source_path, is_lost_stat))
return false;
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts = zookeeper->getChildren(source_path + "/parts");
@ -2041,9 +2028,6 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
if (!checkStat(zookeeper, source_path, is_lost_stat))
return false;
}
LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
@ -2055,32 +2039,11 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
}
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
return true;
}
bool StorageReplicatedMergeTree::checkStat(const zkutil::ZooKeeperPtr & zookeeper, const String & source_path, zkutil::Stat stat)
void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper)
{
zkutil::Requests ops;
zkutil::Responses resp;
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", stat.version));
auto error = zookeeper->tryMulti(ops, resp);
if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
{
LOG_DEBUG(log, "Can not clone replica, because a source replica is lost");
return false;
}
return true;
}
void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
{
auto zookeeper = getZooKeeper();
String res;
if (zookeeper->tryGet(replica_path + "/is_lost", res))
{
@ -2089,35 +2052,31 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
}
else
{
/// replica is_active, and we can create is_lost (for old version).
/// If old replica is_active, and we must create is_lost node (for old version of CH).
zookeeper->create(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent);
return;
}
String source_replica;
zkutil::Stat is_lost_stat;
is_lost_stat.version = -1;
for (const String & replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
{
String source_replica_path = zookeeper_path + "/replicas/" + replica_name;
if ((source_replica_path != replica_path) && !zookeeper->exists(source_replica_path + "/is_lost"))
if (source_replica_path != replica_path)
{
source_replica = replica_name;
break;
}
if ((source_replica_path != replica_path) && (zookeeper->get(source_replica_path + "/is_lost") == "0"))
{
source_replica = replica_name;
break;
String resp;
if (!zookeeper->tryGet(source_replica_path + "/is_lost", resp, &is_lost_stat) || resp == "0")
source_replica = replica_name;
}
}
if (source_replica == "")
throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
if (!cloneReplica(source_replica, zookeeper))
throw Exception("Can not clone replica from " + source_replica, ErrorCodes::CAN_NOT_CLONE_REPLICA);
cloneReplica(source_replica, is_lost_stat, zookeeper);
zookeeper->set(replica_path + "/is_lost", "0");
}

View File

@ -404,13 +404,10 @@ private:
* return true, if replica wil be cloned
* else return false (when source_replica was lost or log was empty).
*/
bool cloneReplica(const String & source_replica, zkutil::ZooKeeperPtr & zookeeper);
/// Check that source replica does not change;
bool checkStat(const zkutil::ZooKeeperPtr & zookeeper, const String & source_replica, zkutil::Stat stat);
void cloneReplica(const String & source_replica, zkutil::Stat is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
void cloneReplicaIfNeeded();
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
/** Performs actions from the queue.
*/