2021-09-06 12:01:16 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
|
2021-08-30 19:37:03 +00:00
|
|
|
|
2021-09-02 21:31:32 +00:00
|
|
|
#include <Common/setThreadName.h>
|
2021-09-06 12:01:16 +00:00
|
|
|
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
2021-08-30 19:37:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-08-31 12:09:35 +00:00
|
|
|
|
2021-09-02 10:39:27 +00:00
|
|
|
String MergeTreeBackgroundExecutor::toString(Type type)
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
|
|
|
case Type::MERGE_MUTATE:
|
|
|
|
return "MergeMutate";
|
|
|
|
case Type::FETCH:
|
|
|
|
return "Fetch";
|
|
|
|
case Type::MOVE:
|
|
|
|
return "Move";
|
|
|
|
}
|
2021-09-06 16:44:46 +00:00
|
|
|
__builtin_unreachable();
|
2021-09-02 10:39:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
void MergeTreeBackgroundExecutor::updateConfiguration()
|
|
|
|
{
|
|
|
|
auto new_threads_count = threads_count_getter();
|
|
|
|
auto new_max_tasks_count = max_task_count_getter();
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
pending.set_capacity(new_max_tasks_count);
|
|
|
|
active.set_capacity(new_max_tasks_count);
|
|
|
|
|
|
|
|
pool.setMaxFreeThreads(new_threads_count);
|
|
|
|
pool.setMaxThreads(new_threads_count);
|
|
|
|
pool.setQueueSize(new_max_tasks_count);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
|
|
|
|
threads_count = new_threads_count;
|
|
|
|
max_tasks_count = new_max_tasks_count;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MergeTreeBackgroundExecutor::wait()
|
|
|
|
{
|
|
|
|
{
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
shutdown = true;
|
|
|
|
has_tasks.notify_all();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (scheduler.joinable())
|
|
|
|
scheduler.join();
|
|
|
|
|
2021-09-06 12:01:16 +00:00
|
|
|
/// ThreadPool will be finalized in destructor.
|
2021-09-03 22:15:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
|
|
|
|
if (shutdown)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
/// This is needed to increase / decrease the number of threads at runtime
|
|
|
|
if (update_timer.compareAndRestartDeferred(10.))
|
|
|
|
updateConfiguration();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
|
|
|
|
auto & value = CurrentMetrics::values[metric];
|
|
|
|
if (value.load() >= static_cast<int64_t>(max_tasks_count))
|
|
|
|
return false;
|
|
|
|
|
|
|
|
/// Just check if the main scheduler thread in excellent condition
|
|
|
|
if (!scheduler.joinable())
|
|
|
|
{
|
|
|
|
LOG_ERROR(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead. Trying to alive..");
|
|
|
|
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
|
|
|
|
|
|
|
|
if (!scheduler.joinable())
|
|
|
|
LOG_FATAL(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead permanently. Restart is needed");
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pending.push_back(std::make_shared<Item>(std::move(task), metric));
|
|
|
|
|
|
|
|
has_tasks.notify_one();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-08-31 11:02:39 +00:00
|
|
|
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
|
|
|
|
{
|
2021-09-03 22:15:20 +00:00
|
|
|
/// Executor is global, so protect from any concurrent storage shutdowns
|
2021-08-31 11:02:39 +00:00
|
|
|
std::lock_guard remove_lock(remove_mutex);
|
|
|
|
|
2021-09-03 13:02:49 +00:00
|
|
|
std::vector<ItemPtr> tasks_to_wait;
|
2021-08-31 11:02:39 +00:00
|
|
|
{
|
2021-08-31 23:20:23 +00:00
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
|
|
|
|
/// Mark this StorageID as deleting
|
|
|
|
currently_deleting.emplace(id);
|
|
|
|
|
2021-09-02 18:49:37 +00:00
|
|
|
/// Erase storage related tasks from pending and select active tasks to wait for
|
2021-09-03 22:15:20 +00:00
|
|
|
auto it = std::remove_if(pending.begin(), pending.end(),
|
2021-09-06 12:01:16 +00:00
|
|
|
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
|
2021-09-03 22:15:20 +00:00
|
|
|
pending.erase(it, pending.end());
|
|
|
|
|
|
|
|
/// Copy items to wait for their completion
|
|
|
|
std::copy_if(active.begin(), active.end(), std::back_inserter(tasks_to_wait),
|
|
|
|
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
|
2021-08-31 11:02:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-09-03 13:02:49 +00:00
|
|
|
for (auto & item : tasks_to_wait)
|
|
|
|
item->is_done.wait();
|
2021-08-31 11:02:39 +00:00
|
|
|
|
2021-08-31 23:20:23 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(mutex);
|
2021-09-03 12:27:49 +00:00
|
|
|
|
2021-09-03 13:02:49 +00:00
|
|
|
for (auto & item : tasks_to_wait)
|
2021-09-03 12:27:49 +00:00
|
|
|
{
|
|
|
|
assert(item.use_count() == 1);
|
|
|
|
item.reset();
|
|
|
|
}
|
|
|
|
|
2021-08-31 23:20:23 +00:00
|
|
|
currently_deleting.erase(id);
|
2021-08-31 11:02:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-30 19:37:03 +00:00
|
|
|
|
2021-09-03 12:27:49 +00:00
|
|
|
void MergeTreeBackgroundExecutor::routine(ItemPtr item)
|
|
|
|
{
|
|
|
|
setThreadName(name.c_str());
|
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
auto erase_from_active = [&]
|
2021-09-03 12:27:49 +00:00
|
|
|
{
|
2021-09-03 22:15:20 +00:00
|
|
|
active.erase(std::remove(active.begin(), active.end(), item), active.end());
|
2021-09-03 12:27:49 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (item->task->execute())
|
|
|
|
{
|
|
|
|
std::lock_guard guard(mutex);
|
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
if (currently_deleting.contains(item->task->getStorageID()))
|
|
|
|
{
|
|
|
|
erase_from_active();
|
2021-09-03 12:27:49 +00:00
|
|
|
return;
|
2021-09-03 22:15:20 +00:00
|
|
|
}
|
2021-09-03 12:27:49 +00:00
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
pending.push_back(item);
|
|
|
|
erase_from_active();
|
2021-09-03 13:02:49 +00:00
|
|
|
item->is_done.reset();
|
2021-09-03 12:27:49 +00:00
|
|
|
has_tasks.notify_one();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
std::lock_guard guard(mutex);
|
|
|
|
erase_from_active();
|
|
|
|
has_tasks.notify_one();
|
2021-09-03 12:27:49 +00:00
|
|
|
/// In a situation of a lack of memory this method can throw an exception,
|
|
|
|
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
|
|
|
|
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
|
|
|
|
item->task->onCompleted();
|
|
|
|
}
|
2021-09-06 12:01:16 +00:00
|
|
|
catch (...)
|
2021-09-03 12:27:49 +00:00
|
|
|
{
|
|
|
|
std::lock_guard guard(mutex);
|
2021-09-03 22:15:20 +00:00
|
|
|
erase_from_active();
|
2021-09-03 12:27:49 +00:00
|
|
|
has_tasks.notify_one();
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
2021-09-03 22:15:20 +00:00
|
|
|
/// Do not want any exceptions
|
|
|
|
try { item->task->onCompleted(); } catch (...) {}
|
2021-09-03 12:27:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-08-30 19:37:03 +00:00
|
|
|
void MergeTreeBackgroundExecutor::schedulerThreadFunction()
|
|
|
|
{
|
2021-09-02 21:31:32 +00:00
|
|
|
DENY_ALLOCATIONS_IN_SCOPE;
|
|
|
|
|
2021-08-30 19:37:03 +00:00
|
|
|
while (true)
|
|
|
|
{
|
2021-08-31 23:20:23 +00:00
|
|
|
std::unique_lock lock(mutex);
|
2021-08-30 19:37:03 +00:00
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown; });
|
2021-08-30 19:37:03 +00:00
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
if (shutdown)
|
2021-08-31 23:20:23 +00:00
|
|
|
break;
|
2021-08-30 19:37:03 +00:00
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
ItemPtr item = std::move(pending.front());
|
|
|
|
pending.pop_front();
|
2021-08-30 19:37:03 +00:00
|
|
|
|
2021-09-06 11:37:51 +00:00
|
|
|
bool res = false;
|
2021-09-03 13:02:49 +00:00
|
|
|
{
|
2021-09-06 11:37:51 +00:00
|
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
|
|
/// Execute a piece of task
|
|
|
|
res = pool.trySchedule([this, item]
|
|
|
|
{
|
|
|
|
routine(item);
|
|
|
|
/// When storage shutdowns it will wait until all related background tasks
|
|
|
|
/// are finished, because they may want to interact with its fields
|
|
|
|
/// and this will cause segfault.
|
|
|
|
item->is_done.set();
|
|
|
|
});
|
|
|
|
}
|
2021-08-30 19:37:03 +00:00
|
|
|
|
|
|
|
if (!res)
|
|
|
|
{
|
2021-09-03 22:15:20 +00:00
|
|
|
active.erase(std::remove(active.begin(), active.end(), item), active.end());
|
|
|
|
pending.push_back(item);
|
2021-09-03 12:27:49 +00:00
|
|
|
continue;
|
2021-08-30 19:37:03 +00:00
|
|
|
}
|
2021-08-31 23:20:23 +00:00
|
|
|
|
2021-09-03 22:15:20 +00:00
|
|
|
active.push_back(std::move(item));
|
2021-08-30 19:37:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|