Merge pull request #2185 from yandex/fix-leader-election-race

Fix races in leader election.
This commit is contained in:
alexey-milovidov 2018-04-07 00:18:26 +03:00 committed by GitHub
commit 84726e17e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 74 deletions

View File

@ -41,10 +41,15 @@ public:
createNode(); createNode();
} }
void yield() void shutdown()
{ {
releaseNode(); if (shutdown_called)
createNode(); return;
shutdown_called = true;
event->set();
if (thread.joinable())
thread.join();
} }
~LeaderElection() ~LeaderElection()
@ -62,14 +67,14 @@ private:
std::string node_name; std::string node_name;
std::thread thread; std::thread thread;
std::atomic<bool> shutdown {false}; std::atomic<bool> shutdown_called {false};
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
void createNode() void createNode()
{ {
shutdown = false; shutdown_called = false;
node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier); node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier);
std::string node_path = node->getPath(); std::string node_path = node->getPath();
@ -80,16 +85,13 @@ private:
void releaseNode() void releaseNode()
{ {
shutdown = true; shutdown();
event->set();
if (thread.joinable())
thread.join();
node = nullptr; node = nullptr;
} }
void threadFunction() void threadFunction()
{ {
while (!shutdown) while (!shutdown_called)
{ {
bool success = false; bool success = false;

View File

@ -55,7 +55,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
/// This is loose condition: no problem if we actually had lost leadership at this moment /// This is loose condition: no problem if we actually had lost leadership at this moment
/// and two replicas will try to do cleanup simultaneously. /// and two replicas will try to do cleanup simultaneously.
if (storage.is_leader_node) if (storage.is_leader)
{ {
clearOldLogs(); clearOldLogs();
clearOldBlocks(); clearOldBlocks();

View File

@ -17,7 +17,6 @@ namespace ProfileEvents
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric ReadonlyReplica; extern const Metric ReadonlyReplica;
extern const Metric LeaderReplica;
} }
@ -139,7 +138,7 @@ void ReplicatedMergeTreeRestartingThread::run()
prev_time_of_check_delay = current_time; prev_time_of_check_delay = current_time;
/// We give up leadership if the relative lag is greater than threshold. /// We give up leadership if the relative lag is greater than threshold.
if (storage.is_leader_node if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership)) && relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
{ {
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
@ -147,11 +146,11 @@ void ReplicatedMergeTreeRestartingThread::run()
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership); ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
storage.is_leader_node = false; storage.exitLeaderElection();
CurrentMetrics::sub(CurrentMetrics::LeaderReplica); /// NOTE: enterLeaderElection() can throw if node creation in ZK fails.
if (storage.merge_selecting_thread.joinable()) /// This is bad because we can end up without a leader on any replica.
storage.merge_selecting_thread.join(); /// In this case we rely on the fact that the session will expire and we will reconnect.
storage.leader_election->yield(); storage.enterLeaderElection();
} }
} }
} }
@ -169,6 +168,8 @@ void ReplicatedMergeTreeRestartingThread::run()
storage.data_parts_exchange_endpoint_holder->cancelForever(); storage.data_parts_exchange_endpoint_holder->cancelForever();
storage.data_parts_exchange_endpoint_holder = nullptr; storage.data_parts_exchange_endpoint_holder = nullptr;
/// Cancel fetches and merges to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger.merges_blocker.cancelForever(); storage.merger.merges_blocker.cancelForever();
partialShutdown(); partialShutdown();
@ -195,12 +196,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
updateQuorumIfWeHavePart(); updateQuorumIfWeHavePart();
if (storage.data.settings.replicated_can_become_leader) if (storage.data.settings.replicated_can_become_leader)
storage.leader_election = std::make_shared<zkutil::LeaderElection>( storage.enterLeaderElection();
storage.zookeeper_path + "/leader_election",
*storage.current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
[this] { storage.becomeLeader(); CurrentMetrics::add(CurrentMetrics::LeaderReplica); },
storage.replica_name);
/// Anything above can throw a KeeperException if something is wrong with ZK. /// Anything above can throw a KeeperException if something is wrong with ZK.
/// Anything below should not throw exceptions. /// Anything below should not throw exceptions.
@ -222,7 +218,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
catch (...) catch (...)
{ {
storage.replica_is_active_node = nullptr; storage.replica_is_active_node = nullptr;
storage.leader_election = nullptr;
try try
{ {
@ -366,17 +361,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.replica_is_active_node = nullptr; storage.replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish"); LOG_TRACE(log, "Waiting for threads to finish");
{
std::lock_guard<std::mutex> lock(storage.leader_node_mutex);
bool old_val = true; storage.exitLeaderElection();
if (storage.is_leader_node.compare_exchange_strong(old_val, false))
{
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
if (storage.merge_selecting_thread.joinable())
storage.merge_selecting_thread.join();
}
}
if (storage.queue_updating_thread.joinable()) if (storage.queue_updating_thread.joinable())
storage.queue_updating_thread.join(); storage.queue_updating_thread.join();
@ -384,20 +371,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.alter_thread.reset(); storage.alter_thread.reset();
storage.part_check_thread.stop(); storage.part_check_thread.stop();
/// Yielding leadership only after finish of merge_selecting_thread.
/// Otherwise race condition with parallel run of merge selecting thread on different servers is possible.
///
/// On the other hand, leader_election could call becomeLeader() from own thread after
/// merge_selecting_thread is finished and restarting_thread is destroyed.
/// becomeLeader() recreates merge_selecting_thread and it becomes joinable again, even restarting_thread is destroyed.
/// But restarting_thread is responsible to stop merge_selecting_thread.
/// It will lead to std::terminate in ~StorageReplicatedMergeTree().
/// Such behaviour was rarely observed on DROP queries.
/// Therefore we need either avoid becoming leader after first shutdown call (more deliberate choice),
/// either manually wait merge_selecting_thread.join() inside ~StorageReplicatedMergeTree(), either or something third.
/// So, we added shutdown check in becomeLeader() and made its creation and deletion atomic.
storage.leader_election = nullptr;
LOG_TRACE(log, "Threads finished"); LOG_TRACE(log, "Threads finished");
} }

View File

@ -59,6 +59,12 @@ namespace ProfileEvents
extern const Event DataAfterMergeDiffersFromReplica; extern const Event DataAfterMergeDiffersFromReplica;
} }
namespace CurrentMetrics
{
extern const Metric LeaderReplica;
}
namespace DB namespace DB
{ {
@ -1883,7 +1889,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
&& cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); && cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
}; };
while (!shutdown_called && is_leader_node) while (is_leader)
{ {
bool success = false; bool success = false;
@ -1932,7 +1938,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
if (shutdown_called || !is_leader_node) if (!is_leader)
break; break;
if (!success) if (!success)
@ -2037,23 +2043,55 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
} }
void StorageReplicatedMergeTree::becomeLeader() void StorageReplicatedMergeTree::enterLeaderElection()
{ {
std::lock_guard<std::mutex> lock(leader_node_mutex); auto callback = [this]()
{
CurrentMetrics::add(CurrentMetrics::LeaderReplica);
LOG_INFO(log, "Became leader");
if (shutdown_called) is_leader = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
};
try
{
leader_election = std::make_shared<zkutil::LeaderElection>(
zookeeper_path + "/leader_election",
*current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
callback,
replica_name);
}
catch (...)
{
leader_election = nullptr;
throw;
}
}
void StorageReplicatedMergeTree::exitLeaderElection()
{
if (!leader_election)
return; return;
if (merge_selecting_thread.joinable()) /// Shut down the leader election thread to avoid suddenly becoming the leader again after
/// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object.
leader_election->shutdown();
if (is_leader)
{ {
LOG_INFO(log, "Deleting old leader"); CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
is_leader_node = false; /// exit trigger inside thread LOG_INFO(log, "Stopped being leader");
is_leader = false;
merge_selecting_event.set();
merge_selecting_thread.join(); merge_selecting_thread.join();
} }
LOG_INFO(log, "Became leader"); /// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one
is_leader_node = true; /// replica assigns merges at any given time.
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); leader_election = nullptr;
} }
@ -2382,12 +2420,6 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::shutdown() void StorageReplicatedMergeTree::shutdown()
{ {
/** This must be done before waiting for restarting_thread.
* Because restarting_thread will wait for finishing of tasks in background pool,
* and parts are fetched in that tasks.
*/
fetcher.blocker.cancelForever();
if (restarting_thread) if (restarting_thread)
{ {
restarting_thread->stop(); restarting_thread->stop();
@ -2399,6 +2431,8 @@ void StorageReplicatedMergeTree::shutdown()
data_parts_exchange_endpoint_holder->cancelForever(); data_parts_exchange_endpoint_holder->cancelForever();
data_parts_exchange_endpoint_holder = nullptr; data_parts_exchange_endpoint_holder = nullptr;
} }
fetcher.blocker.cancelForever();
} }
@ -2487,7 +2521,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
{ {
assertNotReadonly(); assertNotReadonly();
if (!is_leader_node) if (!is_leader)
{ {
sendRequestToLeaderReplica(query, context.getSettingsRef()); sendRequestToLeaderReplica(query, context.getSettingsRef());
return true; return true;
@ -2813,7 +2847,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
zkutil::ZooKeeperPtr zookeeper = getZooKeeper(); zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
if (!is_leader_node) if (!is_leader)
{ {
sendRequestToLeaderReplica(query, context.getSettingsRef()); sendRequestToLeaderReplica(query, context.getSettingsRef());
return; return;
@ -3171,7 +3205,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{ {
auto zookeeper = tryGetZooKeeper(); auto zookeeper = tryGetZooKeeper();
res.is_leader = is_leader_node; res.is_leader = is_leader;
res.is_readonly = is_readonly; res.is_readonly = is_readonly;
res.is_session_expired = !zookeeper || zookeeper->expired(); res.is_session_expired = !zookeeper || zookeeper->expired();

View File

@ -220,8 +220,8 @@ private:
/** Is this replica "leading". The leader replica selects the parts to merge. /** Is this replica "leading". The leader replica selects the parts to merge.
*/ */
std::atomic_bool is_leader_node {false}; std::atomic<bool> is_leader {false};
std::mutex leader_node_mutex; zkutil::LeaderElectionPtr leader_election;
InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder;
@ -239,7 +239,6 @@ private:
DataPartsExchange::Fetcher fetcher; DataPartsExchange::Fetcher fetcher;
zkutil::LeaderElectionPtr leader_election;
/// When activated, replica is initialized and startup() method could exit /// When activated, replica is initialized and startup() method could exit
Poco::Event startup_event; Poco::Event startup_event;
@ -368,9 +367,15 @@ private:
*/ */
bool queueTask(); bool queueTask();
/// Select the parts to merge. /// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)
/// or an exception is thrown and leader_election is destroyed.
void enterLeaderElection();
void becomeLeader(); /// Postcondition:
/// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr.
/// leader_election node in ZK is either deleted, or the session is marked expired.
void exitLeaderElection();
/** Selects the parts to merge and writes to the log. /** Selects the parts to merge and writes to the log.
*/ */