don't destroy Cleanup- and AlterThread objects, just deactivate the tasks [#CLICKHOUSE-3862]

Otherwise segfaults are possible when the objects are recreated in the middle of the queue task execution.
This commit is contained in:
Alexey Zatelepin 2018-07-30 20:34:55 +03:00 committed by alexey-milovidov
parent cfb3537eb6
commit cc7950a733
8 changed files with 58 additions and 56 deletions

View File

@ -20,7 +20,6 @@ ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicated
, log(&Logger::get(log_name)) , log(&Logger::get(log_name))
{ {
task = storage_.context.getSchedulePool().createTask(log_name, [this]{ run(); }); task = storage_.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
} }
void ReplicatedMergeTreeAlterThread::run() void ReplicatedMergeTreeAlterThread::run()

View File

@ -23,6 +23,14 @@ class ReplicatedMergeTreeAlterThread
public: public:
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_); ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
void start()
{
task->activate();
task->schedule();
}
void stop() { task->deactivate(); }
private: private:
void run(); void run();

View File

@ -21,7 +21,6 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
, log(&Logger::get(log_name)) , log(&Logger::get(log_name))
{ {
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); }); task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
} }
void ReplicatedMergeTreeCleanupThread::run() void ReplicatedMergeTreeCleanupThread::run()

View File

@ -24,7 +24,15 @@ class ReplicatedMergeTreeCleanupThread
public: public:
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_); ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
void schedule() { task->schedule(); } void start()
{
task->activate();
task->schedule();
}
void wakeup() { task->schedule(); }
void stop() { task->deactivate(); }
private: private:
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;

View File

@ -57,7 +57,29 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread() ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
{ {
completeShutdown(); try
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger_mutator.actions_blocker.cancelForever();
/// Stop other tasks.
partialShutdown();
if (storage.queue_task_handle)
storage.context.getBackgroundPool().removeTask(storage.queue_task_handle);
storage.queue_task_handle.reset();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
} }
void ReplicatedMergeTreeRestartingThread::run() void ReplicatedMergeTreeRestartingThread::run()
@ -167,29 +189,6 @@ void ReplicatedMergeTreeRestartingThread::run()
task->scheduleAfter(check_period_ms); task->scheduleAfter(check_period_ms);
} }
void ReplicatedMergeTreeRestartingThread::completeShutdown()
{
try
{
storage.data_parts_exchange_endpoint_holder->getBlocker().cancelForever();
storage.data_parts_exchange_endpoint_holder = nullptr;
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger_mutator.actions_blocker.cancelForever();
partialShutdown();
if (storage.queue_task_handle)
storage.context.getBackgroundPool().removeTask(storage.queue_task_handle);
storage.queue_task_handle.reset();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
bool ReplicatedMergeTreeRestartingThread::tryStartup() bool ReplicatedMergeTreeRestartingThread::tryStartup()
{ {
@ -212,9 +211,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.queue_updating_task->schedule(); storage.queue_updating_task->schedule();
storage.mutations_updating_task->activate(); storage.mutations_updating_task->activate();
storage.mutations_updating_task->schedule(); storage.mutations_updating_task->schedule();
storage.cleanup_thread.start();
storage.alter_thread.start();
storage.part_check_thread.start(); storage.part_check_thread.start();
storage.alter_thread = std::make_unique<ReplicatedMergeTreeAlterThread>(storage);
storage.cleanup_thread = std::make_unique<ReplicatedMergeTreeCleanupThread>(storage);
if (!storage.queue_task_handle) if (!storage.queue_task_handle)
storage.queue_task_handle = storage.context.getBackgroundPool().addTask( storage.queue_task_handle = storage.context.getBackgroundPool().addTask(
@ -362,8 +361,8 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.queue_updating_task->deactivate(); storage.queue_updating_task->deactivate();
storage.mutations_updating_task->deactivate(); storage.mutations_updating_task->deactivate();
storage.cleanup_thread.reset(); storage.cleanup_thread.stop();
storage.alter_thread.reset(); storage.alter_thread.stop();
storage.part_check_thread.stop(); storage.part_check_thread.stop();
LOG_TRACE(log, "Threads finished"); LOG_TRACE(log, "Threads finished");

View File

@ -36,12 +36,6 @@ public:
return wakeup_event; return wakeup_event;
} }
void stop()
{
need_stop = true;
wakeup_event.set();
}
private: private:
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;
String log_name; String log_name;
@ -59,7 +53,6 @@ private:
bool startup_completed = false; bool startup_completed = false;
void run(); void run();
void completeShutdown();
/// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper. /// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper.
bool tryStartup(); /// Returns false if ZooKeeper is not available. bool tryStartup(); /// Returns false if ZooKeeper is not available.

View File

@ -215,7 +215,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
[this] (const std::string & name) { enqueuePartForCheck(name); }), [this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this), reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this),
fetcher(data), fetcher(data),
shutdown_event(false), part_check_thread(*this), shutdown_event(false),
cleanup_thread(*this), alter_thread(*this), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{ {
if (path_.empty()) if (path_.empty())
@ -1653,7 +1654,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
/// We want to remove dropped parts from disk as soon as possible /// We want to remove dropped parts from disk as soon as possible
/// To be removed a partition should have zero refcount, therefore call the cleanup thread at exit /// To be removed a partition should have zero refcount, therefore call the cleanup thread at exit
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread->schedule(); cleanup_thread.wakeup();
} }
@ -2034,7 +2035,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
res_parts.clear(); res_parts.clear();
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread->schedule(); cleanup_thread.wakeup();
return true; return true;
} }
@ -2668,7 +2669,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
{ {
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
cleanup_thread->schedule(); cleanup_thread.wakeup();
return false; return false;
} }
@ -2791,11 +2792,7 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::shutdown() void StorageReplicatedMergeTree::shutdown()
{ {
if (restarting_thread)
{
restarting_thread->stop();
restarting_thread.reset(); restarting_thread.reset();
}
if (data_parts_exchange_endpoint_holder) if (data_parts_exchange_endpoint_holder)
{ {
@ -4541,7 +4538,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
/// Speedup removing of replaced parts from filesystem /// Speedup removing of replaced parts from filesystem
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread->schedule(); cleanup_thread.wakeup();
/// If necessary, wait until the operation is performed on all replicas. /// If necessary, wait until the operation is performed on all replicas.
if (context.getSettingsRef().replication_alter_partitions_sync > 1) if (context.getSettingsRef().replication_alter_partitions_sync > 1)

View File

@ -283,25 +283,24 @@ private:
/// A task that selects parts to merge. /// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task; BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
/// A task that marks finished mutations as done. /// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task; BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
/// A thread that removes old parts, log entries, and blocks. /// A thread that removes old parts, log entries, and blocks.
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread; ReplicatedMergeTreeCleanupThread cleanup_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
/// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes. /// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes.
std::unique_ptr<ReplicatedMergeTreeAlterThread> alter_thread; ReplicatedMergeTreeAlterThread alter_thread;
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked. /// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread; ReplicatedMergeTreePartCheckThread part_check_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
/// An event that awakens `alter` method from waiting for the completion of the ALTER query. /// An event that awakens `alter` method from waiting for the completion of the ALTER query.
zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>(); zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>();