initial initialization of cluster discovery in main thread

This commit is contained in:
vdimir 2021-11-19 12:42:00 +03:00
parent 7a38716360
commit fbdb5c60bd
No known key found for this signature in database
GPG Key ID: 9B404D301C0CC7EB
4 changed files with 41 additions and 23 deletions

View File

@ -1555,7 +1555,14 @@ if (ThreadFuzzer::instance().isEffective())
server.start();
LOG_INFO(log, "Ready for connections.");
global_context->startClusterDiscovery();
try
{
global_context->startClusterDiscovery();
}
catch (...)
{
tryLogCurrentException(log, "Caught exception while starting cluster discovery");
}
SCOPE_EXIT_SAFE({
LOG_DEBUG(log, "Received termination signal.");

View File

@ -190,7 +190,7 @@ bool ClusterDiscovery::needUpdate(const Strings & node_uuids, const NodesInfo &
return has_difference;
}
ClusterPtr ClusterDiscovery::getCluster(const ClusterInfo & cluster_info)
ClusterPtr ClusterDiscovery::makeCluster(const ClusterInfo & cluster_info)
{
Strings replica_adresses;
replica_adresses.reserve(cluster_info.nodes_info.size());
@ -262,7 +262,7 @@ bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info)
return false;
}
auto cluster = getCluster(cluster_info);
auto cluster = makeCluster(cluster_info);
context->setCluster(cluster_info.name, cluster);
return true;
}
@ -280,6 +280,8 @@ bool ClusterDiscovery::updateCluster(const String & cluster_name)
void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
{
LOG_DEBUG(log, "Registering current node {} in cluster {}", node_name, info.name);
String node_path = getShardsListPath(info.zk_root) / node_name;
zk->createAncestors(node_path);
@ -297,26 +299,35 @@ void ClusterDiscovery::start()
return;
}
LOG_TRACE(log, "Starting working thread");
main_thread = ThreadFromGlobalPool([this] { runMainThread(); });
auto zk = context->getZooKeeper();
for (auto & [_, info] : clusters_info)
{
registerInZk(zk, info);
if (!updateCluster(info))
{
LOG_WARNING(log, "Error on updating cluster '{}', will retry", info.name);
clusters_to_update->set(info.name);
}
}
main_thread = ThreadFromGlobalPool([this]
{
try
{
runMainThread();
}
catch (...)
{
tryLogCurrentException(log, "Caught exception in cluster discovery runMainThread");
}
});
}
void ClusterDiscovery::runMainThread()
{
setThreadName("ClusterDiscover");
LOG_DEBUG(log, "Worker thread started");
// setThreadName("ClusterDiscovery");
{
auto zk = context->getZooKeeper();
for (auto & [_, info] : clusters_info)
{
registerInZk(zk, info);
if (!updateCluster(info))
{
LOG_WARNING(log, "Error on updating cluster '{}', will retry", info.name);
clusters_to_update->set(info.name);
}
}
}
using namespace std::chrono_literals;

View File

@ -71,7 +71,7 @@ private:
NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & node_uuids);
ClusterPtr getCluster(const ClusterInfo & cluster_info);
ClusterPtr makeCluster(const ClusterInfo & cluster_info);
bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes);
bool updateCluster(const String & cluster_name);

View File

@ -31,11 +31,11 @@ def check_nodes_count_in_cluster(nodes, expected, cluster_name, *, retries=5):
assert 1 <= retries <= 6
for retry in range(1, retries + 1):
nodes_cnt = [
int(node.query(f"SELECT count() FROM system.clusters WHERE cluster = '{cluster_name}'"))
nodes_cnt = {
node.name: int(node.query(f"SELECT count() FROM system.clusters WHERE cluster = '{cluster_name}'"))
for node in nodes
]
if all(actual == expected for actual in nodes_cnt):
}
if all(actual == expected for actual in nodes_cnt.values()):
break
if retry != retries: