check version in ClusterDiscovery::updateCluster

This commit is contained in:
vdimir 2021-11-16 12:02:44 +03:00
parent 5c47229797
commit aa0d79659b
No known key found for this signature in database
GPG Key ID: 9B404D301C0CC7EB
2 changed files with 54 additions and 28 deletions

View File

@ -49,23 +49,11 @@ ClusterDiscovery::ClusterDiscovery(
}
}
Strings ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes)
{
Strings result;
result.reserve(nodes.size());
for (const auto & node : nodes)
{
/// TODO (vdimir@): use batch request?
if (bool ok = zk->tryGet(getReplicasListPath(zk_root) / node, result.emplace_back()); !ok)
{
result.pop_back();
LOG_WARNING(log, "Cluster configuration was changed during update, skip nonexisting node");
}
}
return result;
}
Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name)
Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk,
const String & zk_root,
const String & cluster_name,
int * version,
bool set_callback)
{
auto watch_callback = [cluster_name, queue=queue, log=log](const Coordination::WatchResponse &)
{
@ -77,20 +65,48 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, const String &
}
};
return zk->getChildrenWatch(getReplicasListPath(zk_root), nullptr, watch_callback);
Coordination::Stat stat;
Strings nodes = zk->getChildrenWatch(getReplicasListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{});
if (version)
*version = stat.cversion;
return nodes;
}
void ClusterDiscovery::updateCluster(const String & cluster_name, const String & zk_root)
Strings ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes)
{
Strings result;
result.reserve(nodes.size());
for (const auto & node : nodes)
{
bool ok = zk->tryGet(getReplicasListPath(zk_root) / node, result.emplace_back());
if (!ok)
{
result.pop_back();
LOG_WARNING(log, "Cluster configuration was changed during update, skip nonexisting node");
}
}
return result;
}
bool ClusterDiscovery::updateCluster(const String & cluster_name, const String & zk_root)
{
LOG_TRACE(log, "Updating cluster '{}'", cluster_name);
auto zk = context->getZooKeeper();
Strings nodes = getNodeNames(zk, zk_root, cluster_name);
int start_version;
Strings nodes = getNodeNames(zk, zk_root, cluster_name, &start_version, false);
Strings replicas = getNodes(zk, zk_root, nodes);
if (replicas.empty())
return;
return false;
int current_version;
getNodeNames(zk, zk_root, cluster_name, &current_version, true);
if (current_version != start_version)
return false;
std::vector<std::vector<String>> shards = {replicas};
@ -107,11 +123,12 @@ void ClusterDiscovery::updateCluster(const String & cluster_name, const String &
secure);
context->setCluster(cluster_name, cluster);
return true;
}
void ClusterDiscovery::updateCluster(const String & cluster_name)
bool ClusterDiscovery::updateCluster(const String & cluster_name)
{
updateCluster(cluster_name, clusters[cluster_name]);
return updateCluster(cluster_name, clusters[cluster_name]);
}
void ClusterDiscovery::start()
@ -143,9 +160,13 @@ void ClusterDiscovery::runMainThread()
while (!stop_flag)
{
std::string cluster_name;
if (bool ok = queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS))
if (queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS))
{
updateCluster(cluster_name);
bool ok = updateCluster(cluster_name);
if (!ok)
{
LOG_WARNING(log, "Error on updating cluster '{}', configuration changed during update, will retry", cluster_name);
}
}
}
LOG_TRACE(log, "Worker thread stopped");

View File

@ -27,11 +27,16 @@ public:
~ClusterDiscovery();
private:
Strings getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name);
Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
const String & zk_root,
const String & cluster_name,
int * version = nullptr,
bool set_callback = true);
Strings getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes);
void updateCluster(const String & cluster_name);
void updateCluster(const String & cluster_name, const String & zk_root);
bool updateCluster(const String & cluster_name);
bool updateCluster(const String & cluster_name, const String & zk_root);
void runMainThread();
void shutdown();