This commit is contained in:
Yarik Briukhovetskyi 2024-09-27 13:40:03 +02:00 committed by GitHub
parent f3150e8e57
commit 49df0f95fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,9 +1,8 @@
#pragma once
#include <chrono>
#include <memory>
#include <mutex>
#include <Interpreters/ProcessList.h>
#include <Common/logger_useful.h>
#include <base/types.h>
namespace DB
@ -93,6 +92,7 @@ public:
// Method to add a new task to the multiset
void appendTask(const std::shared_ptr<QueryStatus> & query, const UInt64 & timeout)
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "added. query: {}, timeout: {} milliseconds", query->getInfo().query, timeout);
const auto & now = std::chrono::steady_clock::now();
const UInt64 & end_time = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() + timeout;
querySet.emplace(query, timeout, end_time);
@ -102,35 +102,47 @@ public:
// Used when some task is done
void appendDoneTasks(const std::shared_ptr<QueryStatus> & query)
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "added to done tasks, query: {}", query->getInfo().query);
done_tasks.push_back(query);
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "done tasks size: {}", done_tasks.size());
cond_var.notify_all();
}
// Used when some task is cancelled
void addToCancelledTasks(const std::shared_ptr<QueryStatus> & query)
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "added to cancelled tasks, query: {}", query->getInfo().query);
cancelled_tasks.push_back(query);
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "cancelled tasks size: {}", cancelled_tasks.size());
cond_var.notify_all();
}
// Worker thread function
void workerFunction()
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "workerFunction()");
std::unique_lock<std::mutex> lock(m);
while (!stop_thread)
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "Iteration");
if (querySet.empty())
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "minPriorityQueue.empty()");
// Wait until a new task is added or the thread is stopped
cond_var.wait(lock, [this]() { return stop_thread || !querySet.empty(); });
}
else
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "else");
if (!cancelled_tasks.empty())
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "Something needs to be cancelled");
for (auto it = cancelled_tasks.begin(); it != cancelled_tasks.end();)
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "removing {} from cancelled tasks", (*it)->getInfo().query);
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "cancelling this query.");
cancelTask(*it);
removeQueryFromSet(*it);
cancelled_tasks.erase(it);
@ -141,8 +153,10 @@ public:
if (!done_tasks.empty())
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "Something is done");
for (auto it = done_tasks.begin(); it != done_tasks.end();)
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "removing {} from done tasks", (*it)->getInfo().query);
removeQueryFromSet(*it);
done_tasks.erase(it);
}
@ -161,11 +175,13 @@ public:
if ((end_time_ms <= now_ms && duration_milliseconds.count() != 0))
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "cancel task because of the timeout, end_time_ms: {}, now_ms: {}", end_time_ms, now_ms);
cancelTask(next_task.query);
querySet.erase(next_task);
}
else
{
LOG_TRACE(getLogger("CANCELLATION CHECKER"), "condvar, duration: {}", duration_milliseconds.count());
// Wait until the nearest endTime or until a new task is added that might have an earlier endTime or maybe some task cancelled
if (duration_milliseconds.count())
{