easier init and deactivation of BackgroundSchedulePool tasks

This commit is contained in:
Alexey Zatelepin 2018-05-31 16:05:05 +03:00
parent f6c72f8e56
commit d89221c458
16 changed files with 175 additions and 185 deletions

View File

@ -17,29 +17,22 @@ namespace DB
{
// TaskNotification
class TaskNotification final : public Poco::Notification
{
public:
explicit TaskNotification(const BackgroundSchedulePool::TaskHandle & task) : task(task) {}
explicit TaskNotification(const BackgroundSchedulePool::TaskInfoPtr & task) : task(task) {}
void execute() { task->execute(); }
private:
BackgroundSchedulePool::TaskHandle task;
BackgroundSchedulePool::TaskInfoPtr task;
};
// BackgroundSchedulePool::TaskInfo
BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function):
name(name),
pool(pool),
function(function)
BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_)
: pool(pool_) , log_name(log_name_) , function(function_)
{
}
bool BackgroundSchedulePool::TaskInfo::schedule()
{
std::lock_guard lock(schedule_mutex);
@ -49,7 +42,7 @@ bool BackgroundSchedulePool::TaskInfo::schedule()
scheduled = true;
if(!executing)
if (!executing)
{
if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock);
@ -60,7 +53,6 @@ bool BackgroundSchedulePool::TaskInfo::schedule()
return true;
}
bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
{
std::lock_guard lock(schedule_mutex);
@ -72,7 +64,6 @@ bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
return true;
}
void BackgroundSchedulePool::TaskInfo::deactivate()
{
std::lock_guard lock_exec(exec_mutex);
@ -88,16 +79,17 @@ void BackgroundSchedulePool::TaskInfo::deactivate()
pool.cancelDelayedTask(shared_from_this(), lock_schedule);
}
void BackgroundSchedulePool::TaskInfo::activate()
{
std::lock_guard lock(schedule_mutex);
deactivated = false;
}
void BackgroundSchedulePool::TaskInfo::execute()
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
std::lock_guard lock_exec(exec_mutex);
{
@ -110,17 +102,14 @@ void BackgroundSchedulePool::TaskInfo::execute()
executing = true;
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
Stopwatch watch;
function();
UInt64 milliseconds = watch.elapsedMilliseconds();
/// If the task is executed longer than specified time, it will be logged.
static const int32_t slow_execution_threshold_ms = 50;
static const int32_t slow_execution_threshold_ms = 200;
if (milliseconds >= slow_execution_threshold_ms)
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms.");
LOG_TRACE(&Logger::get(log_name), "Execution took " << milliseconds << " ms.");
{
std::lock_guard lock_schedule(schedule_mutex);
@ -132,7 +121,7 @@ void BackgroundSchedulePool::TaskInfo::execute()
/// will have their chance to execute
if(scheduled)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
}
@ -145,8 +134,6 @@ zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
}
// BackgroundSchedulePool
BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
: size(size)
{
@ -165,7 +152,7 @@ BackgroundSchedulePool::~BackgroundSchedulePool()
try
{
{
std::unique_lock lock(delayed_tasks_lock);
std::unique_lock lock(delayed_tasks_mutex);
shutdown = true;
wakeup_cond.notify_all();
}
@ -184,24 +171,18 @@ BackgroundSchedulePool::~BackgroundSchedulePool()
}
BackgroundSchedulePool::TaskHandle BackgroundSchedulePool::addTask(const std::string & name, const Task & task)
BackgroundSchedulePool::TaskHolder BackgroundSchedulePool::createTask(const std::string & name, const TaskFunc & function)
{
return std::make_shared<TaskInfo>(*this, name, task);
return TaskHolder(std::make_shared<TaskInfo>(*this, name, function));
}
void BackgroundSchedulePool::removeTask(const TaskHandle & task)
{
task->deactivate();
}
void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex> & /* schedule_mutex_lock */)
void BackgroundSchedulePool::scheduleDelayedTask(const TaskInfoPtr & task, size_t ms, std::lock_guard<std::mutex> & /* task_schedule_mutex_lock */)
{
Poco::Timestamp current_time;
{
std::lock_guard lock(delayed_tasks_lock);
std::lock_guard lock(delayed_tasks_mutex);
if (task->delayed)
delayed_tasks.erase(task->iterator);
@ -214,10 +195,10 @@ void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t
}
void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> & /* schedule_mutex_lock */)
void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lock_guard<std::mutex> & /* task_schedule_mutex_lock */)
{
{
std::lock_guard lock(delayed_tasks_lock);
std::lock_guard lock(delayed_tasks_mutex);
delayed_tasks.erase(task->iterator);
task->delayed = false;
}
@ -253,11 +234,11 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
while (!shutdown)
{
TaskHandle task;
TaskInfoPtr task;
bool found = false;
{
std::unique_lock lock(delayed_tasks_lock);
std::unique_lock lock(delayed_tasks_mutex);
while(!shutdown)
{

View File

@ -33,16 +33,14 @@ class BackgroundSchedulePool
{
public:
class TaskInfo;
using TaskHandle = std::shared_ptr<TaskInfo>;
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>;
using Task = std::function<void()>;
using TaskInfoPtr = std::shared_ptr<TaskInfo>;
using TaskFunc = std::function<void()>;
using DelayedTasks = std::multimap<Poco::Timestamp, TaskInfoPtr>;
class TaskInfo : public std::enable_shared_from_this<TaskInfo>, private boost::noncopyable
{
public:
TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function);
/// All these methods waits for current execution of task.
TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_);
/// Schedule for execution as soon as possible (if not already scheduled).
/// If the task was already scheduled with delay, the delay will be ignored.
@ -53,10 +51,10 @@ public:
/// Further attempts to schedule become no-op.
void deactivate();
void activate();
/// get zkutil::WatchCallback needed for zookeeper callbacks.
/// get zkutil::WatchCallback needed for notifications from ZooKeeper watches.
zkutil::WatchCallback getWatchCallback();
private:
@ -65,28 +63,52 @@ public:
void execute();
std::mutex schedule_mutex;
std::mutex exec_mutex;
BackgroundSchedulePool & pool;
std::string log_name;
TaskFunc function;
std::mutex exec_mutex;
std::mutex schedule_mutex;
std::string name;
bool deactivated = false;
bool scheduled = false;
bool delayed = false;
bool executing = false;
BackgroundSchedulePool & pool;
Task function;
/// If the task is scheduled with delay, points to element of delayed_tasks.
Tasks::iterator iterator;
DelayedTasks::iterator iterator;
};
class TaskHolder
{
public:
TaskHolder() = default;
explicit TaskHolder(const TaskInfoPtr & task_info_) : task_info(task_info_) {}
TaskHolder(const TaskHolder & other) = delete;
TaskHolder(TaskHolder && other) noexcept = default;
TaskHolder & operator=(const TaskHolder & other) noexcept = delete;
TaskHolder & operator=(TaskHolder && other) noexcept = default;
~TaskHolder()
{
if (task_info)
task_info->deactivate();
}
TaskInfo * operator->() { return task_info.get(); }
const TaskInfo * operator->() const { return task_info.get(); }
private:
TaskInfoPtr task_info;
};
TaskHolder createTask(const std::string & log_name, const TaskFunc & function);
size_t getNumberOfThreads() const { return size; }
BackgroundSchedulePool(size_t size);
~BackgroundSchedulePool();
TaskHandle addTask(const std::string & name, const Task & task);
void removeTask(const TaskHandle & task);
size_t getNumberOfThreads() const { return size; }
private:
using Threads = std::vector<std::thread>;
@ -94,10 +116,10 @@ private:
void delayExecutionThreadFunction();
/// Schedule task for execution after specified delay from now.
void scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex> &);
void scheduleDelayedTask(const TaskInfoPtr & task_info, size_t ms, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
/// Remove task, that was scheduled with delay, from schedule.
void cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> &);
void cancelDelayedTask(const TaskInfoPtr & task_info, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
/// Number for worker threads.
const size_t size;
@ -108,11 +130,11 @@ private:
/// Delayed notifications.
std::condition_variable wakeup_cond;
std::mutex delayed_tasks_lock;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
std::thread delayed_thread;
/// Tasks ordered by scheduled time.
Tasks delayed_tasks;
DelayedTasks delayed_tasks;
};
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;

View File

@ -39,8 +39,10 @@ public:
*/
LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
, log_name("LeaderElection (" + path + ")")
, log(&Logger::get(log_name))
{
task_handle = pool.addTask("LeaderElection", [this] { threadFunction(); });
task = pool.createTask(log_name, [this] { threadFunction(); });
createNode();
}
@ -50,22 +52,23 @@ public:
return;
shutdown_called = true;
task_handle->deactivate();
task->deactivate();
}
~LeaderElection()
{
releaseNode();
pool.removeTask(task_handle);
}
private:
DB::BackgroundSchedulePool & pool;
DB::BackgroundSchedulePool::TaskHandle task_handle;
DB::BackgroundSchedulePool::TaskHolder task;
std::string path;
ZooKeeper & zookeeper;
LeadershipHandler handler;
std::string identifier;
std::string log_name;
Logger * log;
EphemeralNodeHolderPtr node;
std::string node_name;
@ -82,8 +85,8 @@ private:
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
task_handle->activate();
task_handle->schedule();
task->activate();
task->schedule();
}
void releaseNode()
@ -111,25 +114,25 @@ private:
return;
}
if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task_handle->getWatchCallback()))
task_handle->schedule();
if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task->getWatchCallback()))
task->schedule();
success = true;
}
catch (const KeeperException & e)
{
DB::tryLogCurrentException("LeaderElection");
DB::tryLogCurrentException(log);
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
return;
}
catch (...)
{
DB::tryLogCurrentException("LeaderElection");
DB::tryLogCurrentException(log);
}
if (!success)
task_handle->scheduleAfter(10 * 1000);
task->scheduleAfter(10 * 1000);
}
};

View File

@ -14,17 +14,13 @@ namespace DB
static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000;
ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_) :
storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)"))
{
task_handle = storage_.context.getSchedulePool().addTask("ReplicatedMergeTreeAlterThread", [this]{run();});
task_handle->schedule();
}
ReplicatedMergeTreeAlterThread::~ReplicatedMergeTreeAlterThread()
ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_)
: storage(storage_)
, log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeAlterThread)")
, log(&Logger::get(log_name))
{
storage.context.getSchedulePool().removeTask(task_handle);
task = storage_.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
}
void ReplicatedMergeTreeAlterThread::run()
@ -59,7 +55,7 @@ void ReplicatedMergeTreeAlterThread::run()
auto zookeeper = storage.getZooKeeper();
zkutil::Stat stat;
const String columns_str = zookeeper->getWatch(storage.zookeeper_path + "/columns", &stat, task_handle->getWatchCallback());
const String columns_str = zookeeper->getWatch(storage.zookeeper_path + "/columns", &stat, task->getWatchCallback());
auto columns_in_zk = ColumnsDescription::parse(columns_str);
bool changed_version = (stat.version != storage.columns_version);
@ -197,14 +193,14 @@ void ReplicatedMergeTreeAlterThread::run()
return;
force_recheck_parts = true;
task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS);
task->scheduleAfter(ALTER_ERROR_SLEEP_MS);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
force_recheck_parts = true;
task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS);
task->scheduleAfter(ALTER_ERROR_SLEEP_MS);
}
}

View File

@ -22,14 +22,14 @@ class ReplicatedMergeTreeAlterThread
{
public:
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeAlterThread();
private:
void run();
StorageReplicatedMergeTree & storage;
String log_name;
Logger * log;
BackgroundSchedulePool::TaskHandle task_handle;
BackgroundSchedulePool::TaskHolder task;
};
}

View File

@ -304,7 +304,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
if (multi_code == ZooKeeperImpl::ZooKeeper::ZOK)
{
transaction.commit();
storage.merge_selecting_task_handle->schedule();
storage.merge_selecting_task->schedule();
/// Lock nodes have been already deleted, do not delete them in destructor
block_number_lock->assumeUnlocked();

View File

@ -16,16 +16,12 @@ namespace ErrorCodes
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)"))
: storage(storage_)
, log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeCleanupThread)")
, log(&Logger::get(log_name))
{
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeCleanupThread", [this]{ run(); });
task_handle->schedule();
}
ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread()
{
storage.context.getSchedulePool().removeTask(task_handle);
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
}
void ReplicatedMergeTreeCleanupThread::run()
@ -49,7 +45,7 @@ void ReplicatedMergeTreeCleanupThread::run()
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
task_handle->scheduleAfter(CLEANUP_SLEEP_MS);
task->scheduleAfter(CLEANUP_SLEEP_MS);
}

View File

@ -24,14 +24,13 @@ class ReplicatedMergeTreeCleanupThread
public:
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeCleanupThread();
void schedule() { task_handle->schedule(); }
void schedule() { task->schedule(); }
private:
StorageReplicatedMergeTree & storage;
String log_name;
Logger * log;
BackgroundSchedulePool::TaskHandle task_handle;
BackgroundSchedulePool::TaskHolder task;
pcg64 rng;
void run();

View File

@ -18,25 +18,25 @@ static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000;
ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, PartCheckThread)"))
: storage(storage_)
, log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreePartCheckThread)")
, log(&Logger::get(log_name))
{
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreePartCheckThread", [this] { run(); });
task_handle->schedule();
task = storage.context.getSchedulePool().createTask(log_name, [this] { run(); });
task->schedule();
}
ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread()
{
stop();
storage.context.getSchedulePool().removeTask(task_handle);
}
void ReplicatedMergeTreePartCheckThread::start()
{
std::lock_guard<std::mutex> lock(start_stop_mutex);
need_stop = false;
task_handle->activate();
task_handle->schedule();
task->activate();
task->schedule();
}
void ReplicatedMergeTreePartCheckThread::stop()
@ -46,7 +46,7 @@ void ReplicatedMergeTreePartCheckThread::stop()
std::lock_guard<std::mutex> lock(start_stop_mutex);
need_stop = true;
task_handle->deactivate();
task->deactivate();
}
void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds)
@ -58,7 +58,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds);
parts_set.insert(name);
task_handle->schedule();
task->schedule();
}
@ -340,7 +340,7 @@ void ReplicatedMergeTreePartCheckThread::run()
}
}
task_handle->schedule();
task->schedule();
}
catch (const zkutil::KeeperException & e)
{
@ -349,12 +349,12 @@ void ReplicatedMergeTreePartCheckThread::run()
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
return;
task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
}
}

View File

@ -75,6 +75,7 @@ private:
void searchForMissingPart(const String & part_name);
StorageReplicatedMergeTree & storage;
String log_name;
Logger * log;
using StringSet = std::set<String>;
@ -92,7 +93,7 @@ private:
std::mutex start_stop_mutex;
std::atomic<bool> need_stop { false };
BackgroundSchedulePool::TaskHandle task_handle;
BackgroundSchedulePool::TaskHolder task;
};
}

View File

@ -288,23 +288,20 @@ bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo &
}
void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle)
void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback)
{
std::lock_guard lock(pull_logs_to_queue_mutex);
String index_str = zookeeper->get(replica_path + "/log_pointer");
UInt64 index;
zkutil::WatchCallback watch_callback;
if (update_task_handle)
watch_callback = update_task_handle->getWatchCallback();
Strings log_entries = zookeeper->getChildrenWatch(zookeeper_path + "/log", nullptr, watch_callback);
/// We update mutations after we have loaded the list of log entries, but before we insert them
/// in the queue.
/// With this we ensure that if you read the log state L1 and then the state of mutations M1,
/// then L1 "happened-before" M1.
updateMutations(zookeeper, nullptr);
updateMutations(zookeeper);
if (index_str.empty())
{
@ -434,13 +431,10 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
}
}
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle)
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback)
{
std::lock_guard lock(update_mutations_mutex);
zkutil::WatchCallback watch_callback;
if (update_task_handle)
watch_callback = update_task_handle->getWatchCallback();
Strings entries_in_zk = zookeeper->getChildrenWatch(zookeeper_path + "/mutations", nullptr, watch_callback);
StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end());
@ -506,7 +500,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, B
}
}
storage.merge_selecting_task_handle->schedule();
storage.merge_selecting_task->schedule();
}
}
@ -1099,7 +1093,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
}
}
queue_.pullLogsToQueue(zookeeper, nullptr);
queue_.pullLogsToQueue(zookeeper);
/// Load current quorum status.
zookeeper->tryGet(queue.zookeeper_path + "/quorum/last_part", last_quorum_part);

View File

@ -214,15 +214,15 @@ public:
bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
* If update_task_handle != nullptr, will schedule this task when new entries appear in the log.
* If watch_callback is not empty, will call it when new entries appear in the log.
* If there were new entries, notifies storage.queue_task_handle.
* Additionally loads mutations (so that the set of mutations is always more recent than the queue).
*/
void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle);
void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback = {});
/// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task_handle.
/// If update_task_handle != nullptr, will schedule this task when new mutations appear in ZK.
void updateMutations(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle);
/// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task.
/// If watch_callback is not empty, will call it when new mutations appear in ZK.
void updateMutations(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback = {});
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
* And also wait for the completion of their execution, if they are now being executed.

View File

@ -40,9 +40,10 @@ static String generateActiveNodeIdentifier()
}
ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, RestartingThread)")),
active_node_identifier(generateActiveNodeIdentifier())
: storage(storage_)
, log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeRestartingThread)")
, log(&Logger::get(log_name))
, active_node_identifier(generateActiveNodeIdentifier())
{
check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
@ -50,20 +51,13 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
check_period_ms = storage.data.settings.check_delay_period * 1000;
storage.queue_updating_task_handle = storage.context.getSchedulePool().addTask("StorageReplicatedMergeTree::queueUpdatingThread", [this]{ storage.queueUpdatingThread(); });
storage.queue_updating_task_handle->deactivate();
storage.mutations_updating_task_handle = storage.context.getSchedulePool().addTask("StorageReplicatedMergeTree::mutationsUpdatingThread", [this]{ storage.mutationsUpdatingThread(); });
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeRestartingThread", [this]{ run(); });
task_handle->schedule();
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
}
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
{
storage.context.getSchedulePool().removeTask(task_handle);
completeShutdown();
storage.context.getSchedulePool().removeTask(storage.queue_updating_task_handle);
}
void ReplicatedMergeTreeRestartingThread::run()
@ -105,7 +99,7 @@ void ReplicatedMergeTreeRestartingThread::run()
if (first_time)
storage.startup_event.set();
task_handle->scheduleAfter(retry_period_ms);
task->scheduleAfter(retry_period_ms);
return;
}
@ -113,7 +107,7 @@ void ReplicatedMergeTreeRestartingThread::run()
{
if (first_time)
storage.startup_event.set();
task_handle->scheduleAfter(retry_period_ms);
task->scheduleAfter(retry_period_ms);
return;
}
@ -170,7 +164,7 @@ void ReplicatedMergeTreeRestartingThread::run()
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
task_handle->scheduleAfter(check_period_ms);
task->scheduleAfter(check_period_ms);
}
void ReplicatedMergeTreeRestartingThread::completeShutdown()
@ -214,10 +208,10 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.shutdown_called = false;
storage.shutdown_event.reset();
storage.queue_updating_task_handle->activate();
storage.queue_updating_task_handle->schedule();
storage.mutations_updating_task_handle->activate();
storage.mutations_updating_task_handle->schedule();
storage.queue_updating_task->activate();
storage.queue_updating_task->schedule();
storage.mutations_updating_task->activate();
storage.mutations_updating_task->schedule();
storage.part_check_thread.start();
storage.alter_thread = std::make_unique<ReplicatedMergeTreeAlterThread>(storage);
storage.cleanup_thread = std::make_unique<ReplicatedMergeTreeCleanupThread>(storage);
@ -365,8 +359,8 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.exitLeaderElection();
storage.queue_updating_task_handle->deactivate();
storage.mutations_updating_task_handle->deactivate();
storage.queue_updating_task->deactivate();
storage.mutations_updating_task->deactivate();
storage.cleanup_thread.reset();
storage.alter_thread.reset();

View File

@ -28,7 +28,7 @@ public:
void wakeup()
{
wakeup_event.set();
task_handle->schedule();
task->schedule();
}
Poco::Event & getWakeupEvent()
@ -44,6 +44,7 @@ public:
private:
StorageReplicatedMergeTree & storage;
String log_name;
Logger * log;
Poco::Event wakeup_event;
std::atomic<bool> need_stop {false};
@ -51,7 +52,7 @@ private:
/// The random data we wrote into `/replicas/me/is_active`.
String active_node_identifier;
BackgroundSchedulePool::TaskHandle task_handle;
BackgroundSchedulePool::TaskHolder task;
Int64 check_period_ms; /// The frequency of checking expiration of session in ZK.
bool first_time = true; /// Activate replica for the first time.
time_t prev_time_of_check_delay = 0;

View File

@ -227,11 +227,17 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
zookeeper_path = "/" + zookeeper_path;
replica_path = zookeeper_path + "/replicas/" + replica_name;
queue_updating_task = context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
mutations_updating_task = context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mutationsUpdatingTask)", [this]{ mutationsUpdatingTask(); });
merge_selecting_task = context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
/// Will be activated if we win leader election.
merge_selecting_task->deactivate();
if (context.hasZooKeeper())
current_zookeeper = context.getZooKeeper();
merge_selecting_task_handle = context_.getSchedulePool().addTask("StorageReplicatedMergeTree::mergeSelectingThread", [this] { mergeSelectingThread(); });
bool skip_sanity_checks = false;
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
@ -1298,7 +1304,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
*/
merge_selecting_task_handle->schedule();
merge_selecting_task->schedule();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
write_part_log({});
@ -1406,7 +1412,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the entry will remain in the queue, and we will try again.
*/
merge_selecting_task_handle->schedule();
merge_selecting_task->schedule();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMutations);
write_part_log({});
@ -2026,7 +2032,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const StorageReplicatedMerg
}
void StorageReplicatedMergeTree::queueUpdatingThread()
void StorageReplicatedMergeTree::queueUpdatingTask()
{
//most probably this check is not relevant
if (shutdown_called)
@ -2039,7 +2045,7 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
}
try
{
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task_handle);
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
last_queue_update_finish_time.store(time(nullptr));
queue_update_in_progress = false;
}
@ -2050,21 +2056,21 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
return;
queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
}
void StorageReplicatedMergeTree::mutationsUpdatingThread()
void StorageReplicatedMergeTree::mutationsUpdatingTask()
{
try
{
queue.updateMutations(getZooKeeper(), mutations_updating_task_handle);
queue.updateMutations(getZooKeeper(), mutations_updating_task->getWatchCallback());
}
catch (const zkutil::KeeperException & e)
{
@ -2073,12 +2079,12 @@ void StorageReplicatedMergeTree::mutationsUpdatingThread()
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
return;
mutations_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
mutations_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
}
@ -2158,7 +2164,7 @@ bool StorageReplicatedMergeTree::queueTask()
}
void StorageReplicatedMergeTree::mergeSelectingThread()
void StorageReplicatedMergeTree::mergeSelectingTask()
{
if (!is_leader)
return;
@ -2232,9 +2238,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
return;
if (!success)
merge_selecting_task_handle->scheduleAfter(MERGE_SELECTING_SLEEP_MS);
merge_selecting_task->scheduleAfter(MERGE_SELECTING_SLEEP_MS);
else
merge_selecting_task_handle->schedule();
merge_selecting_task->schedule();
}
@ -2378,8 +2384,8 @@ void StorageReplicatedMergeTree::enterLeaderElection()
LOG_INFO(log, "Became leader");
is_leader = true;
merge_selecting_task_handle->activate();
merge_selecting_task_handle->schedule();
merge_selecting_task->activate();
merge_selecting_task->schedule();
};
try
@ -2414,7 +2420,7 @@ void StorageReplicatedMergeTree::exitLeaderElection()
LOG_INFO(log, "Stopped being leader");
is_leader = false;
merge_selecting_task_handle->deactivate();
merge_selecting_task->deactivate();
}
/// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one
@ -2697,7 +2703,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
if (quorum)
updateQuorum(part_name);
merge_selecting_task_handle->schedule();
merge_selecting_task->schedule();
for (const auto & replaced_part : replaced_parts)
{
@ -2737,7 +2743,7 @@ void StorageReplicatedMergeTree::startup()
database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
data.getDataParts(), current_zookeeper);
queue.pullLogsToQueue(current_zookeeper, nullptr);
queue.pullLogsToQueue(current_zookeeper);
last_queue_update_finish_time.store(time(nullptr));
/// NOTE: not updating last_queue_update_start_time because it must contain the time when
/// the notification of queue change was received. In the beginning it is effectively infinite.
@ -2783,8 +2789,6 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
context.getSchedulePool().removeTask(merge_selecting_task_handle);
}
@ -3759,7 +3763,7 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
* The conclusion that the replica does not lag may be incorrect,
* because the information about `min_unprocessed_insert_time` is taken
* only from that part of the log that has been moved to the queue.
* If the replica for some reason has stalled `queueUpdatingThread`,
* If the replica for some reason has stalled `queueUpdatingTask`,
* then `min_unprocessed_insert_time` will be incorrect.
*/
@ -4503,7 +4507,7 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
{
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeper(), nullptr);
queue.pullLogsToQueue(getZooKeeper());
Stopwatch watch;
Poco::Event event;

View File

@ -198,7 +198,6 @@ private:
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
friend class ReplicatedMergeTreeMergeSelectingThread;
friend class MergeTreeData;
using LogEntry = ReplicatedMergeTreeLogEntry;
@ -269,15 +268,15 @@ private:
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
BackgroundSchedulePool::TaskHandle queue_updating_task_handle;
BackgroundSchedulePool::TaskHolder queue_updating_task;
BackgroundSchedulePool::TaskHandle mutations_updating_task_handle;
BackgroundSchedulePool::TaskHolder mutations_updating_task;
/// A task that performs actions from the queue.
BackgroundProcessingPool::TaskHandle queue_task_handle;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHandle merge_selecting_task_handle;
BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
@ -385,9 +384,9 @@ private:
/** Updates the queue.
*/
void queueUpdatingThread();
void queueUpdatingTask();
void mutationsUpdatingThread();
void mutationsUpdatingTask();
/** Performs actions from the queue.
*/
@ -405,7 +404,7 @@ private:
/** Selects the parts to merge and writes to the log.
*/
void mergeSelectingThread();
void mergeSelectingTask();
/** Write the selected parts to merge into the log,
* Call when merge_selecting_mutex is locked.