Merge remote-tracking branch 'upstream/master' into fix3

This commit is contained in:
proller 2018-08-22 17:06:19 +03:00
commit f47d93c92f
9 changed files with 43 additions and 37 deletions

View File

@ -85,8 +85,7 @@ private:
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
task->activate();
task->schedule();
task->activateAndSchedule();
}
void releaseNode()

View File

@ -43,17 +43,7 @@ bool BackgroundSchedulePool::TaskInfo::schedule()
if (deactivated || scheduled)
return false;
scheduled = true;
if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock);
/// If the task is not executing at the moment, enqueue it for immediate execution.
/// But if it is currently executing, do nothing because it will be enqueued
/// at the end of the execute() method.
if (!executing)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
scheduleImpl(lock);
return true;
}
@ -89,6 +79,18 @@ void BackgroundSchedulePool::TaskInfo::activate()
deactivated = false;
}
bool BackgroundSchedulePool::TaskInfo::activateAndSchedule()
{
std::lock_guard lock(schedule_mutex);
deactivated = false;
if (scheduled)
return false;
scheduleImpl(lock);
return true;
}
void BackgroundSchedulePool::TaskInfo::execute()
{
Stopwatch watch;
@ -129,6 +131,20 @@ void BackgroundSchedulePool::TaskInfo::execute()
}
}
void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock)
{
scheduled = true;
if (delayed)
pool.cancelDelayedTask(shared_from_this(), schedule_mutex_lock);
/// If the task is not executing at the moment, enqueue it for immediate execution.
/// But if it is currently executing, do nothing because it will be enqueued
/// at the end of the execute() method.
if (!executing)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
return [t = shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &)

View File

@ -50,11 +50,14 @@ public:
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// Further attempts to schedule become no-op.
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();
void activate();
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get zkutil::WatchCallback needed for notifications from ZooKeeper watches.
zkutil::WatchCallback getWatchCallback();
@ -64,6 +67,8 @@ public:
void execute();
void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
BackgroundSchedulePool & pool;
std::string log_name;
TaskFunc function;

View File

@ -23,11 +23,7 @@ class ReplicatedMergeTreeAlterThread
public:
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
void start()
{
task->activate();
task->schedule();
}
void start() { task->activateAndSchedule(); }
void stop() { task->deactivate(); }

View File

@ -23,11 +23,7 @@ class ReplicatedMergeTreeCleanupThread
public:
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
void start()
{
task->activate();
task->schedule();
}
void start() { task->activateAndSchedule(); }
void wakeup() { task->schedule(); }

View File

@ -35,8 +35,7 @@ void ReplicatedMergeTreePartCheckThread::start()
{
std::lock_guard<std::mutex> lock(start_stop_mutex);
need_stop = false;
task->activate();
task->schedule();
task->activateAndSchedule();
}
void ReplicatedMergeTreePartCheckThread::stop()

View File

@ -179,10 +179,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.partial_shutdown_called = false;
storage.partial_shutdown_event.reset();
storage.queue_updating_task->activate();
storage.queue_updating_task->schedule();
storage.mutations_updating_task->activate();
storage.mutations_updating_task->schedule();
storage.queue_updating_task->activateAndSchedule();
storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.alter_thread.start();
storage.part_check_thread.start();
@ -328,6 +327,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.queue_updating_task->deactivate();
storage.mutations_updating_task->deactivate();
storage.mutations_finalizing_task->deactivate();
storage.cleanup_thread.stop();
storage.alter_thread.stop();

View File

@ -24,11 +24,7 @@ class ReplicatedMergeTreeRestartingThread
public:
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
void start()
{
task->activate();
task->schedule();
}
void start() { task->activateAndSchedule(); }
void wakeup() { task->schedule(); }

View File

@ -2411,8 +2411,7 @@ void StorageReplicatedMergeTree::enterLeaderElection()
LOG_INFO(log, "Became leader");
is_leader = true;
merge_selecting_task->activate();
merge_selecting_task->schedule();
merge_selecting_task->activateAndSchedule();
};
try