Owning future

This commit is contained in:
Nikita Mikhaylov 2021-09-03 12:27:49 +00:00
parent d777c6cd70
commit ca7adb76b9
3 changed files with 98 additions and 89 deletions

View File

@ -54,19 +54,24 @@ public:
/// In case of T = std::shared_ptr<Something> it won't cause any allocations
template <typename Predicate>
void eraseAll(Predicate && predicate)
bool eraseAll(Predicate && predicate)
{
/// Shift all elements to the beginning of the buffer
std::rotate(buffer.begin(), buffer.begin() + position, buffer.end());
position = 0;
/// Remove elements
auto end_removed = std::remove_if(buffer.begin(), buffer.begin() + count, predicate);
if (end_removed == buffer.begin() + count)
return false;
size_t new_count = std::distance(buffer.begin(), end_removed);
for (size_t i = new_count; i < count; ++i)
buffer[i] = T{};
count = new_count;
position = 0;
return true;
}
template <class Predicate>

View File

@ -26,7 +26,7 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
{
std::lock_guard remove_lock(remove_mutex);
std::vector<ItemPtr> tasks_to_wait;
std::vector<ActiveMeta> tasks_to_wait;
{
std::lock_guard lock(mutex);
@ -35,23 +35,91 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
/// Erase storage related tasks from pending and select active tasks to wait for
pending.eraseAll([&] (auto item) -> bool { return item->task->getStorageID() == id; });
tasks_to_wait = active.getAll([&] (auto item) -> bool { return item->task->getStorageID() == id; });
tasks_to_wait = active.getAll([&] (auto elem) -> bool { return elem.item->task->getStorageID() == id; });
}
for (const auto & item : tasks_to_wait)
for (auto & [item, future] : tasks_to_wait)
{
assert(item->future.valid());
item->future.wait();
assert(future.valid());
try
{
future.wait();
}
catch (...) {}
}
{
std::lock_guard lock(mutex);
for (auto & [item, future] : tasks_to_wait)
{
assert(item.use_count() == 1);
item.reset();
}
currently_deleting.erase(id);
}
}
void MergeTreeBackgroundExecutor::routine(ItemPtr item)
{
setThreadName(name.c_str());
bool checked{false};
auto check_if_currently_deleting = [&] ()
{
checked = true;
return active.eraseAll([&] (auto & x) { return x.item == item; });
};
SCOPE_EXIT({
if (checked)
return;
std::lock_guard guard(mutex);
check_if_currently_deleting();
});
try
{
if (item->task->execute())
{
std::lock_guard guard(mutex);
if (check_if_currently_deleting())
return;
pending.tryPush(item);
has_tasks.notify_one();
return;
}
/// In a situation of a lack of memory this method can throw an exception,
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted();
std::lock_guard guard(mutex);
has_tasks.notify_one();
}
catch(...)
{
std::lock_guard guard(mutex);
has_tasks.notify_one();
try
{
item->task->onCompleted();
}
catch (...) {}
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
DENY_ALLOCATIONS_IN_SCOPE;
@ -71,88 +139,20 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
if (!pending.tryPop(&item))
continue;
status = active.tryPush(item);
assert(status);
bool res = pool.trySchedule([this, item] ()
{
setThreadName(name.c_str());
/// Storage may want to destroy and it needs to finish all task related to it.
/// But checking causes some interaction with storage methods, for example it calls getStorageID.
/// So, we must execute this checking once, signal another thread that we are finished and be destroyed.
/// Not doing any complex stuff, especially interaction with Storage...
/// Calling this check twice may cause segfault.
auto check_if_currently_deleting = [&] () -> bool
{
active.eraseAll([&] (auto x) { return x == item; });
for (const auto & id : currently_deleting)
{
if (item->task->getStorageID() == id)
{
item->promise.set_value();
return true;
}
}
return false;
};
bool checked{false};
SCOPE_EXIT({
if (checked)
return;
checked = true;
std::lock_guard guard(mutex);
check_if_currently_deleting();
});
try
{
if (item->task->execute())
{
std::lock_guard guard(mutex);
if (check_if_currently_deleting())
return;
pending.tryPush(item);
has_tasks.notify_one();
return;
}
/// In a situation of a lack of memory this method can throw an exception,
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted();
std::lock_guard guard(mutex);
has_tasks.notify_one();
}
catch(...)
{
std::lock_guard guard(mutex);
has_tasks.notify_one();
try
{
item->task->onCompleted();
}
catch (...) {}
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});
auto thread_pool_job = std::make_shared<std::packaged_task<void()>>([this, item] { routine(item); });
auto future = thread_pool_job->get_future();
bool res = pool.trySchedule([thread_pool_job] { (*thread_pool_job)(); });
if (!res)
{
active.eraseAll([&] (auto x) { return x == item; });
active.eraseAll([&] (auto x) { return x.item == item; });
status = pending.tryPush(item);
assert(status);
continue;
}
status = active.tryPush({std::move(item), std::move(future)});
assert(status);
}
}

View File

@ -136,8 +136,6 @@ private:
max_tasks_count = new_max_tasks_count;
}
void schedulerThreadFunction();
static String toString(Type type);
Type type;
@ -156,22 +154,28 @@ private:
explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)
: task(std::move(task_))
, increment(std::move(metric_))
, future(promise.get_future())
{
}
ExecutableTaskPtr task;
CurrentMetrics::Increment increment;
std::promise<void> promise;
std::future<void> future;
};
using ItemPtr = std::shared_ptr<Item>;
void routine(ItemPtr item);
void schedulerThreadFunction();
/// Initially it will be empty
RingBuffer<ItemPtr> pending{0};
RingBuffer<ItemPtr> active{0};
struct ActiveMeta
{
ItemPtr item;
std::shared_future<void> future;
};
RingBuffer<ActiveMeta> active{0};
std::set<StorageID> currently_deleting;
std::mutex remove_mutex;