ClickHouse/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp

217 lines
5.5 KiB
C++
Raw Normal View History

2021-09-06 12:01:16 +00:00
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
2021-08-30 19:37:03 +00:00
2021-09-07 12:45:39 +00:00
#include <algorithm>
#include <Common/setThreadName.h>
2021-09-06 12:01:16 +00:00
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
2021-08-30 19:37:03 +00:00
namespace DB
{
2021-08-31 12:09:35 +00:00
2021-09-02 10:39:27 +00:00
String MergeTreeBackgroundExecutor::toString(Type type)
{
switch (type)
{
case Type::MERGE_MUTATE:
return "MergeMutate";
case Type::FETCH:
return "Fetch";
case Type::MOVE:
return "Move";
}
2021-09-06 16:44:46 +00:00
__builtin_unreachable();
2021-09-02 10:39:27 +00:00
}
2021-09-03 22:15:20 +00:00
void MergeTreeBackgroundExecutor::updateConfiguration()
{
2021-09-07 12:45:39 +00:00
auto new_threads_count = std::max<size_t>(1u, threads_count_getter());
auto new_max_tasks_count = std::max<size_t>(1, max_task_count_getter());
2021-09-03 22:15:20 +00:00
try
{
pending.set_capacity(new_max_tasks_count);
active.set_capacity(new_max_tasks_count);
2021-09-06 22:07:41 +00:00
pool.setMaxFreeThreads(0);
2021-09-03 22:15:20 +00:00
pool.setMaxThreads(new_threads_count);
pool.setQueueSize(new_max_tasks_count);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
threads_count = new_threads_count;
max_tasks_count = new_max_tasks_count;
}
void MergeTreeBackgroundExecutor::wait()
{
{
std::lock_guard lock(mutex);
shutdown = true;
has_tasks.notify_all();
}
if (scheduler.joinable())
scheduler.join();
2021-09-07 12:45:39 +00:00
pool.wait();
2021-09-03 22:15:20 +00:00
}
bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task)
{
std::lock_guard lock(mutex);
if (shutdown)
return false;
try
{
/// This is needed to increase / decrease the number of threads at runtime
if (update_timer.compareAndRestartDeferred(10.))
updateConfiguration();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
auto & value = CurrentMetrics::values[metric];
if (value.load() >= static_cast<int64_t>(max_tasks_count))
return false;
/// Just check if the main scheduler thread in excellent condition
if (!scheduler.joinable())
{
LOG_ERROR(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead. Trying to alive..");
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
if (!scheduler.joinable())
LOG_FATAL(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead permanently. Restart is needed");
}
pending.push_back(std::make_shared<Item>(std::move(task), metric));
has_tasks.notify_one();
return true;
}
2021-08-31 11:02:39 +00:00
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
{
2021-09-03 13:02:49 +00:00
std::vector<ItemPtr> tasks_to_wait;
2021-08-31 11:02:39 +00:00
{
2021-08-31 23:20:23 +00:00
std::lock_guard lock(mutex);
2021-09-02 18:49:37 +00:00
/// Erase storage related tasks from pending and select active tasks to wait for
2021-09-03 22:15:20 +00:00
auto it = std::remove_if(pending.begin(), pending.end(),
2021-09-06 12:01:16 +00:00
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
2021-09-03 22:15:20 +00:00
pending.erase(it, pending.end());
/// Copy items to wait for their completion
std::copy_if(active.begin(), active.end(), std::back_inserter(tasks_to_wait),
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
2021-09-07 12:45:39 +00:00
for (auto & item : tasks_to_wait)
item->is_currently_deleting = true;
2021-08-31 11:02:39 +00:00
}
2021-09-03 13:02:49 +00:00
for (auto & item : tasks_to_wait)
item->is_done.wait();
2021-08-31 11:02:39 +00:00
}
2021-08-30 19:37:03 +00:00
2021-09-03 12:27:49 +00:00
void MergeTreeBackgroundExecutor::routine(ItemPtr item)
{
setThreadName(name.c_str());
2021-09-06 22:07:41 +00:00
auto erase_from_active = [this, item]
2021-09-03 12:27:49 +00:00
{
2021-09-03 22:15:20 +00:00
active.erase(std::remove(active.begin(), active.end(), item), active.end());
2021-09-03 12:27:49 +00:00
};
try
{
if (item->task->execute())
{
std::lock_guard guard(mutex);
2021-09-07 12:45:39 +00:00
if (item->is_currently_deleting)
2021-09-03 22:15:20 +00:00
{
erase_from_active();
2021-09-03 12:27:49 +00:00
return;
2021-09-03 22:15:20 +00:00
}
2021-09-03 12:27:49 +00:00
2021-09-03 22:15:20 +00:00
pending.push_back(item);
erase_from_active();
2021-09-03 12:27:49 +00:00
has_tasks.notify_one();
return;
}
2021-09-03 22:15:20 +00:00
std::lock_guard guard(mutex);
erase_from_active();
has_tasks.notify_one();
2021-09-03 12:27:49 +00:00
/// In a situation of a lack of memory this method can throw an exception,
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted();
2021-09-06 22:07:41 +00:00
item->task.reset();
2021-09-03 12:27:49 +00:00
}
2021-09-06 12:01:16 +00:00
catch (...)
2021-09-03 12:27:49 +00:00
{
std::lock_guard guard(mutex);
2021-09-03 22:15:20 +00:00
erase_from_active();
2021-09-03 12:27:49 +00:00
has_tasks.notify_one();
tryLogCurrentException(__PRETTY_FUNCTION__);
2021-09-03 22:15:20 +00:00
/// Do not want any exceptions
try { item->task->onCompleted(); } catch (...) {}
2021-09-03 12:27:49 +00:00
}
}
2021-08-30 19:37:03 +00:00
void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
while (true)
{
2021-08-31 23:20:23 +00:00
std::unique_lock lock(mutex);
2021-08-30 19:37:03 +00:00
2021-09-03 22:15:20 +00:00
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown; });
2021-08-30 19:37:03 +00:00
2021-09-03 22:15:20 +00:00
if (shutdown)
2021-08-31 23:20:23 +00:00
break;
2021-08-30 19:37:03 +00:00
2021-09-03 22:15:20 +00:00
ItemPtr item = std::move(pending.front());
pending.pop_front();
2021-08-30 19:37:03 +00:00
2021-09-06 22:07:41 +00:00
/// Execute a piece of task
bool res = pool.trySchedule([this, item] () mutable
2021-09-03 13:02:49 +00:00
{
2021-09-06 22:07:41 +00:00
routine(item);
/// When storage shutdowns it will wait until all related background tasks
/// are finished, because they may want to interact with its fields
/// and this will cause segfault.
2021-09-07 12:45:39 +00:00
if (item->is_currently_deleting)
item->is_done.set();
2021-09-06 22:07:41 +00:00
});
2021-08-30 19:37:03 +00:00
if (!res)
{
2021-09-03 22:15:20 +00:00
pending.push_back(item);
2021-09-03 12:27:49 +00:00
continue;
2021-08-30 19:37:03 +00:00
}
2021-08-31 23:20:23 +00:00
2021-09-03 22:15:20 +00:00
active.push_back(std::move(item));
2021-08-30 19:37:03 +00:00
}
}
}