Made merge_selecting_thread creation & deletion atomic. [#METR-22471]

This commit is contained in:
Vitaliy Lyudvichenko 2016-10-14 14:47:11 +03:00
parent aa863ea7bb
commit 669e2982a5
3 changed files with 12 additions and 5 deletions

View File

@ -248,6 +248,7 @@ private:
/** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния.
*/
bool is_leader_node = false;
std::mutex leader_node_mutex;
InterserverIOEndpointHolderPtr endpoint_holder;
InterserverIOEndpointHolderPtr disk_space_monitor_endpoint_holder;

View File

@ -362,11 +362,15 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
if (storage.is_leader_node)
{
storage.is_leader_node = false;
if (storage.merge_selecting_thread.joinable())
storage.merge_selecting_thread.join();
std::lock_guard<std::mutex> lock(storage.leader_node_mutex);
if (storage.is_leader_node)
{
storage.is_leader_node = false;
if (storage.merge_selecting_thread.joinable())
storage.merge_selecting_thread.join();
}
}
if (storage.queue_updating_thread.joinable())
storage.queue_updating_thread.join();
@ -390,7 +394,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
/// Such behaviour was rarely observed on DROP queries.
/// Therefore we need either avoid becoming leader after first shutdown call (more deliberate choice),
/// either manually wait merge_selecting_thread.join() inside ~StorageReplicatedMergeTree(), either or something third.
/// So, shutdown check is added in becomeLeader().
/// So, we added shutdown check in becomeLeader() and made its creation and deletion atomic.
storage.leader_election = nullptr;
LOG_TRACE(log, "Threads finished");

View File

@ -1872,6 +1872,8 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
void StorageReplicatedMergeTree::becomeLeader()
{
std::lock_guard<std::mutex> lock(leader_node_mutex);
if (shutdown_called)
return;