From 12101d82aab42a2b5e8d44ddf52e690930d90180 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 6 Dec 2021 21:34:52 +0300 Subject: [PATCH] Fix overcommit ratio comparison and race condition --- src/Common/MemoryTracker.cpp | 23 +++------ src/Common/MemoryTracker.h | 2 +- src/Common/OvercommitTracker.cpp | 37 +++++++++----- src/Common/OvercommitTracker.h | 8 ++- src/Interpreters/ProcessList.cpp | 13 +++-- .../0_stateless/02104_overcommit_memory.sh | 50 +++++++++++++++++++ 6 files changed, 95 insertions(+), 38 deletions(-) create mode 100755 tests/queries/0_stateless/02104_overcommit_memory.sh diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 4c0e6fa7503..d93b13ebc27 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -101,17 +101,6 @@ MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : MemoryTracker::~MemoryTracker() { - if (level == VariableContext::Process) - { - auto * loaded_next = getParent(); - while (loaded_next != nullptr) - { - if (auto * next_overcommit_tracker = loaded_next->overcommit_tracker) - next_overcommit_tracker->unsubscribe(this); - loaded_next = loaded_next->getParent(); - } - } - if ((level == VariableContext::Process || level == VariableContext::User) && peak) { try @@ -141,7 +130,7 @@ void MemoryTracker::logMemoryUsage(Int64 current) const } -void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) +void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker) { if (size < 0) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size); @@ -150,7 +139,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) { /// Since the BlockerInThread should respect the level, we should go to the next parent. if (auto * loaded_next = parent.load(std::memory_order_relaxed)) - loaded_next->allocImpl(size, throw_if_memory_exceeded); + loaded_next->allocImpl(size, throw_if_memory_exceeded, + level == VariableContext::Process ? this : query_tracker); return; } @@ -233,8 +223,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded) { bool need_to_throw = true; - if (overcommit_tracker) - need_to_throw = overcommit_tracker->needToStopQuery(this); + if (!!overcommit_tracker && !!query_tracker) + need_to_throw = overcommit_tracker->needToStopQuery(query_tracker); if (need_to_throw) { @@ -278,7 +268,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) } if (auto * loaded_next = parent.load(std::memory_order_relaxed)) - loaded_next->allocImpl(size, throw_if_memory_exceeded); + loaded_next->allocImpl(size, throw_if_memory_exceeded, + level == VariableContext::Process ? this : query_tracker); } void MemoryTracker::alloc(Int64 size) diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 7602a15e2a4..85139fcc386 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -86,7 +86,7 @@ public: void allocNoThrow(Int64 size); - void allocImpl(Int64 size, bool throw_if_memory_exceeded); + void allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker = nullptr); void realloc(Int64 old_size, Int64 new_size) { diff --git a/src/Common/OvercommitTracker.cpp b/src/Common/OvercommitTracker.cpp index 21e27b7c825..9e3917981f9 100644 --- a/src/Common/OvercommitTracker.cpp +++ b/src/Common/OvercommitTracker.cpp @@ -1,5 +1,6 @@ #include "OvercommitTracker.h" +#include #include #include @@ -22,12 +23,14 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker) pickQueryToExclude(); assert(cancelation_state == QueryCancelationState::RUNNING); - if (picked_tracker == tracker || picked_tracker == nullptr) + + if (picked_tracker == nullptr) { - ++waiting_to_stop; + cancelation_state = QueryCancelationState::NONE; return true; } - + if (picked_tracker == tracker) + return true; return cv.wait_for(lk, max_wait_time, [this]() { return cancelation_state == QueryCancelationState::NONE; @@ -37,15 +40,13 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker) void OvercommitTracker::unsubscribe(MemoryTracker * tracker) { std::unique_lock lk(overcommit_m); - if (picked_tracker == tracker || picked_tracker == nullptr) + if (picked_tracker == tracker) { - --waiting_to_stop; - if (waiting_to_stop == 0) - { - picked_tracker = nullptr; - cancelation_state = QueryCancelationState::NONE; - cv.notify_all(); - } + LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"), "Picked query stopped"); + + picked_tracker = nullptr; + cancelation_state = QueryCancelationState::NONE; + cv.notify_all(); } } @@ -58,16 +59,26 @@ void UserOvercommitTracker::pickQueryToExcludeImpl() MemoryTracker * current_tracker = nullptr; OvercommitRatio current_ratio{0, 0}; // At this moment query list must be read only - for (auto const & query : user_process_list->queries) + auto & queries = user_process_list->queries; + LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"), + "Trying to choose query to stop from {} queries", queries.size()); + for (auto const & query : queries) { + if (query.second->isKilled()) + continue; auto * memory_tracker = query.second->getMemoryTracker(); auto ratio = memory_tracker->getOvercommitRatio(); - if (current_ratio < ratio) + LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"), + "Query has ratio {}/{}", ratio.committed, ratio.soft_limit); + if (ratio.soft_limit != 0 && current_ratio < ratio) { current_tracker = memory_tracker; current_ratio = ratio; } } + LOG_DEBUG(&Poco::Logger::get("OvercommitTracker"), + "Selected to stop query with overcommit ratio {}/{}", + current_ratio.committed, current_ratio.soft_limit); picked_tracker = current_tracker; } diff --git a/src/Common/OvercommitTracker.h b/src/Common/OvercommitTracker.h index 2de24623b1e..5d02d47259c 100644 --- a/src/Common/OvercommitTracker.h +++ b/src/Common/OvercommitTracker.h @@ -18,7 +18,9 @@ struct OvercommitRatio friend bool operator<(OvercommitRatio const& lhs, OvercommitRatio const& rhs) noexcept { // (a / b < c / d) <=> (a * d < c * b) - return (lhs.committed * rhs.soft_limit) < (rhs.committed * lhs.soft_limit); + return (lhs.committed * rhs.soft_limit) < (rhs.committed * lhs.soft_limit) + || (lhs.soft_limit == 0 && rhs.soft_limit > 0) + || (lhs.committed == 0 && rhs.committed == 0 && lhs.soft_limit > rhs.soft_limit); } Int64 committed; @@ -62,10 +64,6 @@ protected: private: - // Number of queries are being canceled at the moment. Overcommit tracker - // must be in RUNNING state until this counter is not equal to 0. - UInt64 waiting_to_stop = 0; - void pickQueryToExclude() { if (cancelation_state != QueryCancelationState::RUNNING) diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index 1719f683add..5ffc65762aa 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -7,6 +7,7 @@ #include #include #include +#include "Common/tests/gtest_global_context.h" #include #include #include @@ -249,9 +250,6 @@ ProcessListEntry::~ProcessListEntry() const QueryStatus * process_list_element_ptr = &*it; - /// This removes the memory_tracker of one request. - parent.processes.erase(it); - auto user_process_list_it = parent.user_to_queries.find(user); if (user_process_list_it == parent.user_to_queries.end()) { @@ -273,6 +271,9 @@ ProcessListEntry::~ProcessListEntry() } } + /// This removes the memory_tracker of one request. + parent.processes.erase(it); + if (!found) { LOG_ERROR(&Poco::Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser"); @@ -303,6 +304,12 @@ QueryStatus::QueryStatus( QueryStatus::~QueryStatus() { assert(executors.empty()); + + auto * memory_tracker = getMemoryTracker(); + if (user_process_list) + user_process_list->user_overcommit_tracker.unsubscribe(memory_tracker); + if (auto context = getContext()) + context->getGlobalOvercommitTracker()->unsubscribe(memory_tracker); } CancellationCode QueryStatus::cancelQuery(bool) diff --git a/tests/queries/0_stateless/02104_overcommit_memory.sh b/tests/queries/0_stateless/02104_overcommit_memory.sh new file mode 100755 index 00000000000..77e8758c18f --- /dev/null +++ b/tests/queries/0_stateless/02104_overcommit_memory.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q 'SELECT number FROM numbers(1000000) GROUP BY number SETTINGS max_memory_usage_for_user = 1' 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL' +$CLICKHOUSE_CLIENT -q 'SELECT number FROM numbers(1000000) GROUP BY number SETTINGS max_guaranteed_memory_usage_for_user = 1' 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'FAIL' || echo 'OK' + +$CLICKHOUSE_CLIENT -q 'CREATE USER IF NOT EXISTS u1 IDENTIFIED WITH no_password' +$CLICKHOUSE_CLIENT -q 'GRANT ALL ON *.* TO u1' + +function overcommited() +{ + while true; do + $CLICKHOUSE_CLIENT -u u1 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS max_guaranteed_memory_usage=1,memory_usage_overcommit_max_wait_microseconds=500' 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo "OVERCOMMITED WITH USER LIMIT IS KILLED" + done +} + +function expect_execution() +{ + while true; do + $CLICKHOUSE_CLIENT -u u1 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS max_memory_usage_for_user=5000000,max_guaranteed_memory_usage=2,memory_usage_overcommit_max_wait_microseconds=500' >/dev/null 2>/dev/null 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo "bla\n" + done +} + +export -f overcommited +export -f expect_execution + +function user_test() +{ + for _ in {1..10}; + do + timeout 3 bash -c overcommited & + timeout 3 bash -c expect_execution & + done; + + wait +} + +output=$(user_test) + +if test -z "$output" +then + echo "OVERCOMMITED WITH USER LIMIT WAS NOT KILLED" +else + echo "OVERCOMMITED WITH USER LIMIT WAS KILLED" +fi + +$CLICKHOUSE_CLIENT -q 'DROP USER IF EXISTS u1'