2021-10-22 12:56:09 +00:00
|
|
|
#include "OvercommitTracker.h"
|
|
|
|
|
2021-10-22 15:15:33 +00:00
|
|
|
#include <chrono>
|
2021-10-22 12:56:09 +00:00
|
|
|
#include <Interpreters/ProcessList.h>
|
|
|
|
|
2021-10-22 15:15:33 +00:00
|
|
|
using namespace std::chrono_literals;
|
|
|
|
|
|
|
|
OvercommitTracker::OvercommitTracker()
|
|
|
|
: max_wait_time(0us)
|
|
|
|
, picked_tracker(nullptr)
|
|
|
|
, cancelation_state(QueryCancelationState::NONE)
|
|
|
|
{}
|
|
|
|
|
|
|
|
void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
|
|
|
|
{
|
|
|
|
max_wait_time = wait_time * 1us;
|
|
|
|
}
|
|
|
|
|
2021-10-22 12:56:09 +00:00
|
|
|
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
|
|
|
|
{
|
|
|
|
std::unique_lock<std::mutex> lk(overcommit_m);
|
|
|
|
|
|
|
|
pickQueryToExclude();
|
|
|
|
assert(cancelation_state == QueryCancelationState::RUNNING);
|
|
|
|
if (tracker == picked_tracker)
|
|
|
|
return true;
|
|
|
|
|
2021-10-22 15:15:33 +00:00
|
|
|
return cv.wait_for(lk, max_wait_time, [this]()
|
2021-10-22 12:56:09 +00:00
|
|
|
{
|
|
|
|
return cancelation_state == QueryCancelationState::NONE;
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
UserOvercommitTracker::UserOvercommitTracker(DB::ProcessListForUser * user_process_list_)
|
|
|
|
: user_process_list(user_process_list_)
|
|
|
|
{}
|
|
|
|
|
|
|
|
void UserOvercommitTracker::pickQueryToExcludeImpl()
|
|
|
|
{
|
|
|
|
MemoryTracker * current_tracker = nullptr;
|
|
|
|
OvercommitRatio current_ratio{0, 0};
|
2021-10-26 12:32:17 +00:00
|
|
|
// At this moment query list must be read only
|
2021-10-22 12:56:09 +00:00
|
|
|
for (auto const & query : user_process_list->queries)
|
|
|
|
{
|
|
|
|
auto * memory_tracker = query.second->getMemoryTracker();
|
|
|
|
auto ratio = memory_tracker->getOvercommitRatio();
|
|
|
|
if (current_ratio < ratio)
|
|
|
|
{
|
|
|
|
current_tracker = memory_tracker;
|
|
|
|
current_ratio = ratio;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert(current_tracker != nullptr);
|
|
|
|
picked_tracker = current_tracker;
|
|
|
|
}
|
|
|
|
|
|
|
|
void GlobalOvercommitTracker::pickQueryToExcludeImpl()
|
|
|
|
{
|
|
|
|
MemoryTracker * current_tracker = nullptr;
|
|
|
|
OvercommitRatio current_ratio{0, 0};
|
|
|
|
process_list->processEachQueryStatus([&](DB::QueryStatus const & query)
|
|
|
|
{
|
|
|
|
auto * memory_tracker = query.getMemoryTracker();
|
2021-10-26 12:32:17 +00:00
|
|
|
Int64 user_soft_limit = 0;
|
|
|
|
if (auto const * user_process_list = query.getUserProcessList())
|
|
|
|
user_soft_limit = user_process_list->user_memory_tracker.getSoftLimit();
|
|
|
|
|
|
|
|
auto ratio = memory_tracker->getOvercommitRatio(user_soft_limit);
|
2021-10-22 12:56:09 +00:00
|
|
|
if (current_ratio < ratio)
|
|
|
|
{
|
|
|
|
current_tracker = memory_tracker;
|
|
|
|
current_ratio = ratio;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
assert(current_tracker != nullptr);
|
|
|
|
picked_tracker = current_tracker;
|
|
|
|
}
|