This commit is contained in:
Nikita Mikhaylov 2021-09-06 22:07:41 +00:00
parent 74c0b61e53
commit cfe2207be3
5 changed files with 20 additions and 30 deletions

View File

@ -78,6 +78,7 @@ String BackgroundJobsAssignee::toString(Type type)
case Type::Moving: case Type::Moving:
return "Moving"; return "Moving";
} }
__builtin_unreachable();
} }
void BackgroundJobsAssignee::start() void BackgroundJobsAssignee::start()

View File

@ -9,19 +9,19 @@
namespace DB namespace DB
{ {
class ExecutableTask class IExecutableTask
{ {
public: public:
virtual bool execute() = 0; virtual bool execute() = 0;
virtual void onCompleted() = 0; virtual void onCompleted() = 0;
virtual StorageID getStorageID() = 0; virtual StorageID getStorageID() = 0;
virtual ~ExecutableTask() = default; virtual ~IExecutableTask() = default;
}; };
using ExecutableTaskPtr = std::shared_ptr<ExecutableTask>; using ExecutableTaskPtr = std::shared_ptr<IExecutableTask>;
class LambdaAdapter : public shared_ptr_helper<LambdaAdapter>, public ExecutableTask class LambdaAdapter : public shared_ptr_helper<LambdaAdapter>, public IExecutableTask
{ {
public: public:
@ -32,6 +32,7 @@ public:
bool execute() override bool execute() override
{ {
res = inner(); res = inner();
inner = {};
return false; return false;
} }

View File

@ -33,7 +33,7 @@ void MergeTreeBackgroundExecutor::updateConfiguration()
pending.set_capacity(new_max_tasks_count); pending.set_capacity(new_max_tasks_count);
active.set_capacity(new_max_tasks_count); active.set_capacity(new_max_tasks_count);
pool.setMaxFreeThreads(new_threads_count); pool.setMaxFreeThreads(0);
pool.setMaxThreads(new_threads_count); pool.setMaxThreads(new_threads_count);
pool.setQueueSize(new_max_tasks_count); pool.setQueueSize(new_max_tasks_count);
} }
@ -130,13 +130,6 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
for (auto & item : tasks_to_wait)
{
assert(item.use_count() == 1);
item.reset();
}
currently_deleting.erase(id); currently_deleting.erase(id);
} }
} }
@ -146,7 +139,7 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item)
{ {
setThreadName(name.c_str()); setThreadName(name.c_str());
auto erase_from_active = [&] auto erase_from_active = [this, item]
{ {
active.erase(std::remove(active.begin(), active.end(), item), active.end()); active.erase(std::remove(active.begin(), active.end(), item), active.end());
}; };
@ -177,6 +170,7 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item)
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory /// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool. /// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted(); item->task->onCompleted();
item->task.reset();
} }
catch (...) catch (...)
{ {
@ -192,8 +186,6 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item)
void MergeTreeBackgroundExecutor::schedulerThreadFunction() void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{ {
DENY_ALLOCATIONS_IN_SCOPE;
while (true) while (true)
{ {
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
@ -206,19 +198,15 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
ItemPtr item = std::move(pending.front()); ItemPtr item = std::move(pending.front());
pending.pop_front(); pending.pop_front();
bool res = false; /// Execute a piece of task
bool res = pool.trySchedule([this, item] () mutable
{ {
ALLOW_ALLOCATIONS_IN_SCOPE; routine(item);
/// Execute a piece of task /// When storage shutdowns it will wait until all related background tasks
res = pool.trySchedule([this, item] /// are finished, because they may want to interact with its fields
{ /// and this will cause segfault.
routine(item); item->is_done.set();
/// When storage shutdowns it will wait until all related background tasks });
/// are finished, because they may want to interact with its fields
/// and this will cause segfault.
item->is_done.set();
});
}
if (!res) if (!res)
{ {

View File

@ -14,7 +14,7 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Storages/MergeTree/ExecutableTask.h> #include <Storages/MergeTree/IExecutableTask.h>
namespace DB namespace DB

View File

@ -5,7 +5,7 @@
#include <memory> #include <memory>
#include <random> #include <random>
#include <Storages/MergeTree/ExecutableTask.h> #include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h> #include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
using namespace DB; using namespace DB;
@ -17,7 +17,7 @@ namespace CurrentMetrics
std::random_device device; std::random_device device;
class FakeExecutableTask : public ExecutableTask class FakeExecutableTask : public IExecutableTask
{ {
public: public:
explicit FakeExecutableTask(String name_) : generator(device()), distribution(0, 5), name(name_) explicit FakeExecutableTask(String name_) : generator(device()), distribution(0, 5), name(name_)