mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-18 21:51:57 +00:00
163 lines
3.9 KiB
C++
163 lines
3.9 KiB
C++
#pragma once
|
|
|
|
#include <zkutil/ZooKeeper.h>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <common/logger_useful.h>
|
|
#include <DB/Common/CurrentMetrics.h>
|
|
|
|
|
|
namespace ProfileEvents
|
|
{
|
|
extern const Event ObsoleteEphemeralNode;
|
|
extern const Event LeaderElectionAcquiredLeadership;
|
|
}
|
|
|
|
namespace CurrentMetrics
|
|
{
|
|
extern const Metric LeaderElection;
|
|
}
|
|
|
|
|
|
namespace zkutil
|
|
{
|
|
|
|
/** Implements leader election algorithm described here: http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection
|
|
*/
|
|
class LeaderElection
|
|
{
|
|
public:
|
|
using LeadershipHandler = std::function<void()>;
|
|
|
|
/** handler is called when this instance become leader.
|
|
*
|
|
* identifier - if not empty, must uniquely (within same path) identify participant of leader election.
|
|
* It means that different participants of leader election have different identifiers
|
|
* and existence of more than one ephemeral node with same identifier indicates an error
|
|
* (see cleanOldEphemeralNodes).
|
|
*/
|
|
LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
|
|
: path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
|
|
{
|
|
createNode();
|
|
}
|
|
|
|
void yield()
|
|
{
|
|
releaseNode();
|
|
createNode();
|
|
}
|
|
|
|
~LeaderElection()
|
|
{
|
|
releaseNode();
|
|
}
|
|
|
|
private:
|
|
std::string path;
|
|
ZooKeeper & zookeeper;
|
|
LeadershipHandler handler;
|
|
std::string identifier;
|
|
|
|
EphemeralNodeHolderPtr node;
|
|
std::string node_name;
|
|
|
|
std::thread thread;
|
|
std::atomic<bool> shutdown {false};
|
|
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
|
|
|
|
void createNode()
|
|
{
|
|
shutdown = false;
|
|
node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier);
|
|
|
|
std::string node_path = node->getPath();
|
|
node_name = node_path.substr(node_path.find_last_of('/') + 1);
|
|
|
|
cleanOldEphemeralNodes();
|
|
|
|
thread = std::thread(&LeaderElection::threadFunction, this);
|
|
}
|
|
|
|
void cleanOldEphemeralNodes()
|
|
{
|
|
if (identifier.empty())
|
|
return;
|
|
|
|
/** If there are nodes with same identifier, remove them.
|
|
* Such nodes could still be alive after failed attempt of removal,
|
|
* if it was temporary communication failure, that was continued for more than session timeout,
|
|
* but ZK session is still alive for unknown reason, and someone still holds that ZK session.
|
|
* See comments in destructor of EphemeralNodeHolder.
|
|
*/
|
|
Strings brothers = zookeeper.getChildren(path);
|
|
for (const auto & brother : brothers)
|
|
{
|
|
if (brother == node_name)
|
|
continue;
|
|
|
|
std::string brother_path = path + "/" + brother;
|
|
std::string brother_identifier = zookeeper.get(brother_path);
|
|
|
|
if (brother_identifier == identifier)
|
|
{
|
|
ProfileEvents::increment(ProfileEvents::ObsoleteEphemeralNode);
|
|
LOG_WARNING(&Logger::get("LeaderElection"), "Found obsolete ephemeral node for identifier "
|
|
+ identifier + ", removing: " + brother_path);
|
|
zookeeper.tryRemoveWithRetries(brother_path);
|
|
}
|
|
}
|
|
}
|
|
|
|
void releaseNode()
|
|
{
|
|
shutdown = true;
|
|
event->set();
|
|
if (thread.joinable())
|
|
thread.join();
|
|
node = nullptr;
|
|
}
|
|
|
|
void threadFunction()
|
|
{
|
|
while (!shutdown)
|
|
{
|
|
bool success = false;
|
|
|
|
try
|
|
{
|
|
Strings children = zookeeper.getChildren(path);
|
|
std::sort(children.begin(), children.end());
|
|
auto it = std::lower_bound(children.begin(), children.end(), node_name);
|
|
if (it == children.end() || *it != node_name)
|
|
throw Poco::Exception("Assertion failed in LeaderElection");
|
|
|
|
if (it == children.begin())
|
|
{
|
|
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
|
|
handler();
|
|
return;
|
|
}
|
|
|
|
if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event))
|
|
event->wait();
|
|
|
|
success = true;
|
|
}
|
|
catch (...)
|
|
{
|
|
DB::tryLogCurrentException("LeaderElection");
|
|
}
|
|
|
|
if (!success)
|
|
event->tryWait(10 * 1000);
|
|
}
|
|
}
|
|
};
|
|
|
|
using LeaderElectionPtr = std::shared_ptr<LeaderElection>;
|
|
|
|
}
|