Fix overcommit ratio comparison and race condition

This commit is contained in:
Dmitry Novik 2021-12-06 21:34:52 +03:00
parent cbe6d89c69
commit 12101d82aa
6 changed files with 95 additions and 38 deletions

View File

@ -101,17 +101,6 @@ MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) :
MemoryTracker::~MemoryTracker()
{
if (level == VariableContext::Process)
{
auto * loaded_next = getParent();
while (loaded_next != nullptr)
{
if (auto * next_overcommit_tracker = loaded_next->overcommit_tracker)
next_overcommit_tracker->unsubscribe(this);
loaded_next = loaded_next->getParent();
}
}
if ((level == VariableContext::Process || level == VariableContext::User) && peak)
{
try
@ -141,7 +130,7 @@ void MemoryTracker::logMemoryUsage(Int64 current) const
}
void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker)
{
if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
@ -150,7 +139,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->allocImpl(size, throw_if_memory_exceeded);
loaded_next->allocImpl(size, throw_if_memory_exceeded,
level == VariableContext::Process ? this : query_tracker);
return;
}
@ -233,8 +223,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
bool need_to_throw = true;
if (overcommit_tracker)
need_to_throw = overcommit_tracker->needToStopQuery(this);
if (!!overcommit_tracker && !!query_tracker)
need_to_throw = overcommit_tracker->needToStopQuery(query_tracker);
if (need_to_throw)
{
@ -278,7 +268,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
}
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->allocImpl(size, throw_if_memory_exceeded);
loaded_next->allocImpl(size, throw_if_memory_exceeded,
level == VariableContext::Process ? this : query_tracker);
}
void MemoryTracker::alloc(Int64 size)

View File

@ -86,7 +86,7 @@ public:
void allocNoThrow(Int64 size);
void allocImpl(Int64 size, bool throw_if_memory_exceeded);
void allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker = nullptr);
void realloc(Int64 old_size, Int64 new_size)
{

View File

@ -1,5 +1,6 @@
#include "OvercommitTracker.h"
#include <base/logger_useful.h>
#include <chrono>
#include <Interpreters/ProcessList.h>
@ -22,12 +23,14 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
pickQueryToExclude();
assert(cancelation_state == QueryCancelationState::RUNNING);
if (picked_tracker == tracker || picked_tracker == nullptr)
if (picked_tracker == nullptr)
{
++waiting_to_stop;
cancelation_state = QueryCancelationState::NONE;
return true;
}
if (picked_tracker == tracker)
return true;
return cv.wait_for(lk, max_wait_time, [this]()
{
return cancelation_state == QueryCancelationState::NONE;
@ -37,15 +40,13 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
void OvercommitTracker::unsubscribe(MemoryTracker * tracker)
{
std::unique_lock<std::mutex> lk(overcommit_m);
if (picked_tracker == tracker || picked_tracker == nullptr)
if (picked_tracker == tracker)
{
--waiting_to_stop;
if (waiting_to_stop == 0)
{
picked_tracker = nullptr;
cancelation_state = QueryCancelationState::NONE;
cv.notify_all();
}
LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"), "Picked query stopped");
picked_tracker = nullptr;
cancelation_state = QueryCancelationState::NONE;
cv.notify_all();
}
}
@ -58,16 +59,26 @@ void UserOvercommitTracker::pickQueryToExcludeImpl()
MemoryTracker * current_tracker = nullptr;
OvercommitRatio current_ratio{0, 0};
// At this moment query list must be read only
for (auto const & query : user_process_list->queries)
auto & queries = user_process_list->queries;
LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"),
"Trying to choose query to stop from {} queries", queries.size());
for (auto const & query : queries)
{
if (query.second->isKilled())
continue;
auto * memory_tracker = query.second->getMemoryTracker();
auto ratio = memory_tracker->getOvercommitRatio();
if (current_ratio < ratio)
LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"),
"Query has ratio {}/{}", ratio.committed, ratio.soft_limit);
if (ratio.soft_limit != 0 && current_ratio < ratio)
{
current_tracker = memory_tracker;
current_ratio = ratio;
}
}
LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"),
"Selected to stop query with overcommit ratio {}/{}",
current_ratio.committed, current_ratio.soft_limit);
picked_tracker = current_tracker;
}

View File

@ -18,7 +18,9 @@ struct OvercommitRatio
friend bool operator<(OvercommitRatio const& lhs, OvercommitRatio const& rhs) noexcept
{
// (a / b < c / d) <=> (a * d < c * b)
return (lhs.committed * rhs.soft_limit) < (rhs.committed * lhs.soft_limit);
return (lhs.committed * rhs.soft_limit) < (rhs.committed * lhs.soft_limit)
|| (lhs.soft_limit == 0 && rhs.soft_limit > 0)
|| (lhs.committed == 0 && rhs.committed == 0 && lhs.soft_limit > rhs.soft_limit);
}
Int64 committed;
@ -62,10 +64,6 @@ protected:
private:
// Number of queries are being canceled at the moment. Overcommit tracker
// must be in RUNNING state until this counter is not equal to 0.
UInt64 waiting_to_stop = 0;
void pickQueryToExclude()
{
if (cancelation_state != QueryCancelationState::RUNNING)

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/queryNormalization.h>
#include <Processors/Executors/PipelineExecutor.h>
#include "Common/tests/gtest_global_context.h"
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/CurrentThread.h>
@ -249,9 +250,6 @@ ProcessListEntry::~ProcessListEntry()
const QueryStatus * process_list_element_ptr = &*it;
/// This removes the memory_tracker of one request.
parent.processes.erase(it);
auto user_process_list_it = parent.user_to_queries.find(user);
if (user_process_list_it == parent.user_to_queries.end())
{
@ -273,6 +271,9 @@ ProcessListEntry::~ProcessListEntry()
}
}
/// This removes the memory_tracker of one request.
parent.processes.erase(it);
if (!found)
{
LOG_ERROR(&Poco::Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser");
@ -303,6 +304,12 @@ QueryStatus::QueryStatus(
QueryStatus::~QueryStatus()
{
assert(executors.empty());
auto * memory_tracker = getMemoryTracker();
if (user_process_list)
user_process_list->user_overcommit_tracker.unsubscribe(memory_tracker);
if (auto context = getContext())
context->getGlobalOvercommitTracker()->unsubscribe(memory_tracker);
}
CancellationCode QueryStatus::cancelQuery(bool)

View File

@ -0,0 +1,50 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q 'SELECT number FROM numbers(1000000) GROUP BY number SETTINGS max_memory_usage_for_user = 1' 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q 'SELECT number FROM numbers(1000000) GROUP BY number SETTINGS max_guaranteed_memory_usage_for_user = 1' 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'FAIL' || echo 'OK'
$CLICKHOUSE_CLIENT -q 'CREATE USER IF NOT EXISTS u1 IDENTIFIED WITH no_password'
$CLICKHOUSE_CLIENT -q 'GRANT ALL ON *.* TO u1'
function overcommited()
{
while true; do
$CLICKHOUSE_CLIENT -u u1 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS max_guaranteed_memory_usage=1,memory_usage_overcommit_max_wait_microseconds=500' 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo "OVERCOMMITED WITH USER LIMIT IS KILLED"
done
}
function expect_execution()
{
while true; do
$CLICKHOUSE_CLIENT -u u1 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS max_memory_usage_for_user=5000000,max_guaranteed_memory_usage=2,memory_usage_overcommit_max_wait_microseconds=500' >/dev/null 2>/dev/null 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo "bla\n"
done
}
export -f overcommited
export -f expect_execution
function user_test()
{
for _ in {1..10};
do
timeout 3 bash -c overcommited &
timeout 3 bash -c expect_execution &
done;
wait
}
output=$(user_test)
if test -z "$output"
then
echo "OVERCOMMITED WITH USER LIMIT WAS NOT KILLED"
else
echo "OVERCOMMITED WITH USER LIMIT WAS KILLED"
fi
$CLICKHOUSE_CLIENT -q 'DROP USER IF EXISTS u1'