mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #35637 from ClickHouse/memory-overcommit-free
Memory overcommit: continue query execution if memory is available
This commit is contained in:
commit
71b6f89166
@ -190,9 +190,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
|
||||
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
|
||||
{
|
||||
bool need_to_throw = true;
|
||||
bool try_to_free_memory = overcommit_tracker != nullptr && query_tracker != nullptr;
|
||||
if (try_to_free_memory)
|
||||
need_to_throw = overcommit_tracker->needToStopQuery(query_tracker);
|
||||
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr != nullptr && query_tracker != nullptr)
|
||||
need_to_throw = overcommit_tracker_ptr->needToStopQuery(query_tracker, size);
|
||||
|
||||
if (need_to_throw)
|
||||
{
|
||||
@ -211,6 +210,9 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
|
||||
}
|
||||
else
|
||||
{
|
||||
// If OvercommitTracker::needToStopQuery returned false, it guarantees that enough memory is freed.
|
||||
// This memory is already counted in variable `amount` in the moment of `will_be` initialization.
|
||||
// Now we just need to update value stored in `will_be`, because it should have changed.
|
||||
will_be = amount.load(std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
@ -308,6 +310,8 @@ void MemoryTracker::free(Int64 size)
|
||||
accounted_size += new_amount;
|
||||
}
|
||||
}
|
||||
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr)
|
||||
overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size);
|
||||
|
||||
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
||||
loaded_next->free(size);
|
||||
|
@ -73,7 +73,7 @@ private:
|
||||
/// This description will be used as prefix into log messages (if isn't nullptr)
|
||||
std::atomic<const char *> description_ptr = nullptr;
|
||||
|
||||
OvercommitTracker * overcommit_tracker = nullptr;
|
||||
std::atomic<OvercommitTracker *> overcommit_tracker = nullptr;
|
||||
|
||||
bool updatePeak(Int64 will_be, bool log_memory_usage);
|
||||
void logMemoryUsage(Int64 current) const;
|
||||
@ -188,13 +188,18 @@ public:
|
||||
|
||||
void setOvercommitTracker(OvercommitTracker * tracker) noexcept
|
||||
{
|
||||
overcommit_tracker = tracker;
|
||||
overcommit_tracker.store(tracker, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void resetOvercommitTracker() noexcept
|
||||
{
|
||||
overcommit_tracker.store(nullptr, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
/// Reset the accumulated data
|
||||
void resetCounters();
|
||||
|
||||
/// Reset the accumulated data and the parent.
|
||||
/// Reset the accumulated data.
|
||||
void reset();
|
||||
|
||||
/// Reset current counter to a new value.
|
||||
|
@ -11,8 +11,11 @@ constexpr std::chrono::microseconds ZERO_MICROSEC = 0us;
|
||||
OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_)
|
||||
: max_wait_time(ZERO_MICROSEC)
|
||||
, picked_tracker(nullptr)
|
||||
, cancelation_state(QueryCancelationState::NONE)
|
||||
, cancellation_state(QueryCancellationState::NONE)
|
||||
, global_mutex(global_mutex_)
|
||||
, freed_memory(0)
|
||||
, required_memory(0)
|
||||
, allow_release(true)
|
||||
{}
|
||||
|
||||
void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
|
||||
@ -21,12 +24,12 @@ void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
|
||||
max_wait_time = wait_time * 1us;
|
||||
}
|
||||
|
||||
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
|
||||
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
|
||||
{
|
||||
// NOTE: Do not change the order of locks
|
||||
//
|
||||
// global_mutex must be acquired before overcommit_m, because
|
||||
// method OvercommitTracker::unsubscribe(MemoryTracker *) is
|
||||
// method OvercommitTracker::onQueryStop(MemoryTracker *) is
|
||||
// always called with already acquired global_mutex in
|
||||
// ProcessListEntry::~ProcessListEntry().
|
||||
std::unique_lock<std::mutex> global_lock(global_mutex);
|
||||
@ -36,42 +39,82 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
|
||||
return true;
|
||||
|
||||
pickQueryToExclude();
|
||||
assert(cancelation_state == QueryCancelationState::RUNNING);
|
||||
assert(cancellation_state != QueryCancellationState::NONE);
|
||||
global_lock.unlock();
|
||||
|
||||
// If no query was chosen we need to stop current query.
|
||||
// This may happen if no soft limit is set.
|
||||
if (picked_tracker == nullptr)
|
||||
{
|
||||
cancelation_state = QueryCancelationState::NONE;
|
||||
assert(cancellation_state == QueryCancellationState::SELECTED);
|
||||
cancellation_state = QueryCancellationState::NONE;
|
||||
return true;
|
||||
}
|
||||
if (picked_tracker == tracker)
|
||||
return true;
|
||||
bool timeout = !cv.wait_for(lk, max_wait_time, [this]()
|
||||
{
|
||||
return cancelation_state == QueryCancelationState::NONE;
|
||||
});
|
||||
if (timeout)
|
||||
LOG_DEBUG(getLogger(), "Need to stop query because reached waiting timeout");
|
||||
else
|
||||
LOG_DEBUG(getLogger(), "Memory freed within timeout");
|
||||
return timeout;
|
||||
assert(cancellation_state == QueryCancellationState::SELECTED);
|
||||
cancellation_state = QueryCancellationState::RUNNING;
|
||||
return true;
|
||||
}
|
||||
|
||||
void OvercommitTracker::unsubscribe(MemoryTracker * tracker)
|
||||
allow_release = true;
|
||||
|
||||
required_memory += amount;
|
||||
required_per_thread[tracker] = amount;
|
||||
bool timeout = !cv.wait_for(lk, max_wait_time, [this, tracker]()
|
||||
{
|
||||
return required_per_thread[tracker] == 0 || cancellation_state == QueryCancellationState::NONE;
|
||||
});
|
||||
LOG_DEBUG(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : ""));
|
||||
|
||||
required_memory -= amount;
|
||||
Int64 still_need = required_per_thread[tracker]; // If enough memory is freed it will be 0
|
||||
required_per_thread.erase(tracker);
|
||||
|
||||
// If threads where not released since last call of this method,
|
||||
// we can release them now.
|
||||
if (allow_release && required_memory <= freed_memory && still_need != 0)
|
||||
releaseThreads();
|
||||
|
||||
// All required amount of memory is free now and selected query to stop doesn't know about it.
|
||||
// As we don't need to free memory, we can continue execution of the selected query.
|
||||
if (required_memory == 0 && cancellation_state == QueryCancellationState::SELECTED)
|
||||
reset();
|
||||
return timeout || still_need != 0;
|
||||
}
|
||||
|
||||
void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount)
|
||||
{
|
||||
std::lock_guard guard(overcommit_m);
|
||||
if (cancellation_state != QueryCancellationState::NONE)
|
||||
{
|
||||
freed_memory += amount;
|
||||
if (freed_memory >= required_memory)
|
||||
releaseThreads();
|
||||
}
|
||||
}
|
||||
|
||||
void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(overcommit_m);
|
||||
if (picked_tracker == tracker)
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Picked query stopped");
|
||||
|
||||
picked_tracker = nullptr;
|
||||
cancelation_state = QueryCancelationState::NONE;
|
||||
reset();
|
||||
cv.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
void OvercommitTracker::releaseThreads()
|
||||
{
|
||||
for (auto & required : required_per_thread)
|
||||
required.second = 0;
|
||||
freed_memory = 0;
|
||||
allow_release = false; // To avoid repeating call of this method in OvercommitTracker::needToStopQuery
|
||||
cv.notify_all();
|
||||
}
|
||||
|
||||
UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list, DB::ProcessListForUser * user_process_list_)
|
||||
: OvercommitTracker(process_list->mutex)
|
||||
, user_process_list(user_process_list_)
|
||||
|
@ -34,6 +34,13 @@ struct OvercommitRatio
|
||||
|
||||
class MemoryTracker;
|
||||
|
||||
enum class QueryCancellationState
|
||||
{
|
||||
NONE = 0, // Hard limit is not reached, there is no selected query to kill.
|
||||
SELECTED = 1, // Hard limit is reached, query to stop was chosen but it still is not aware of cancellation.
|
||||
RUNNING = 2, // Hard limit is reached, selected query has started the process of cancellation.
|
||||
};
|
||||
|
||||
// Usually it's hard to set some reasonable hard memory limit
|
||||
// (especially, the default value). This class introduces new
|
||||
// mechanisim for the limiting of memory usage.
|
||||
@ -45,9 +52,11 @@ struct OvercommitTracker : boost::noncopyable
|
||||
{
|
||||
void setMaxWaitTime(UInt64 wait_time);
|
||||
|
||||
bool needToStopQuery(MemoryTracker * tracker);
|
||||
bool needToStopQuery(MemoryTracker * tracker, Int64 amount);
|
||||
|
||||
void unsubscribe(MemoryTracker * tracker);
|
||||
void tryContinueQueryExecutionAfterFree(Int64 amount);
|
||||
|
||||
void onQueryStop(MemoryTracker * tracker);
|
||||
|
||||
virtual ~OvercommitTracker() = default;
|
||||
|
||||
@ -58,23 +67,16 @@ protected:
|
||||
|
||||
// This mutex is used to disallow concurrent access
|
||||
// to picked_tracker and cancelation_state variables.
|
||||
mutable std::mutex overcommit_m;
|
||||
mutable std::condition_variable cv;
|
||||
std::mutex overcommit_m;
|
||||
std::condition_variable cv;
|
||||
|
||||
std::chrono::microseconds max_wait_time;
|
||||
|
||||
enum class QueryCancelationState
|
||||
{
|
||||
NONE,
|
||||
RUNNING,
|
||||
};
|
||||
|
||||
// Specifies memory tracker of the chosen to stop query.
|
||||
// If soft limit is not set, all the queries which reach hard limit must stop.
|
||||
// This case is represented as picked tracker pointer is set to nullptr and
|
||||
// overcommit tracker is in RUNNING state.
|
||||
// overcommit tracker is in SELECTED state.
|
||||
MemoryTracker * picked_tracker;
|
||||
QueryCancelationState cancelation_state;
|
||||
|
||||
virtual Poco::Logger * getLogger() = 0;
|
||||
|
||||
@ -82,19 +84,37 @@ private:
|
||||
|
||||
void pickQueryToExclude()
|
||||
{
|
||||
if (cancelation_state != QueryCancelationState::RUNNING)
|
||||
if (cancellation_state == QueryCancellationState::NONE)
|
||||
{
|
||||
pickQueryToExcludeImpl();
|
||||
cancelation_state = QueryCancelationState::RUNNING;
|
||||
cancellation_state = QueryCancellationState::SELECTED;
|
||||
}
|
||||
}
|
||||
|
||||
void reset() noexcept
|
||||
{
|
||||
picked_tracker = nullptr;
|
||||
cancellation_state = QueryCancellationState::NONE;
|
||||
freed_memory = 0;
|
||||
allow_release = true;
|
||||
}
|
||||
|
||||
void releaseThreads();
|
||||
|
||||
QueryCancellationState cancellation_state;
|
||||
|
||||
std::unordered_map<MemoryTracker *, Int64> required_per_thread;
|
||||
|
||||
// Global mutex which is used in ProcessList to synchronize
|
||||
// insertion and deletion of queries.
|
||||
// OvercommitTracker::pickQueryToExcludeImpl() implementations
|
||||
// require this mutex to be locked, because they read list (or sublist)
|
||||
// of queries.
|
||||
std::mutex & global_mutex;
|
||||
Int64 freed_memory;
|
||||
Int64 required_memory;
|
||||
|
||||
bool allow_release;
|
||||
};
|
||||
|
||||
namespace DB
|
||||
@ -110,7 +130,7 @@ struct UserOvercommitTracker : OvercommitTracker
|
||||
~UserOvercommitTracker() override = default;
|
||||
|
||||
protected:
|
||||
void pickQueryToExcludeImpl() override final;
|
||||
void pickQueryToExcludeImpl() override;
|
||||
|
||||
Poco::Logger * getLogger() override final { return logger; }
|
||||
private:
|
||||
@ -125,7 +145,7 @@ struct GlobalOvercommitTracker : OvercommitTracker
|
||||
~GlobalOvercommitTracker() override = default;
|
||||
|
||||
protected:
|
||||
void pickQueryToExcludeImpl() override final;
|
||||
void pickQueryToExcludeImpl() override;
|
||||
|
||||
Poco::Logger * getLogger() override final { return logger; }
|
||||
private:
|
||||
|
408
src/Common/tests/gtest_overcommit_tracker.cpp
Normal file
408
src/Common/tests/gtest_overcommit_tracker.cpp
Normal file
@ -0,0 +1,408 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/OvercommitTracker.h>
|
||||
#include <Interpreters/ProcessList.h>
|
||||
|
||||
using namespace DB;
|
||||
|
||||
template <typename BaseTracker>
|
||||
struct OvercommitTrackerForTest : BaseTracker
|
||||
{
|
||||
template <typename ...Ts>
|
||||
explicit OvercommitTrackerForTest(Ts && ...args)
|
||||
: BaseTracker(std::move(args)...)
|
||||
{}
|
||||
|
||||
void setCandidate(MemoryTracker * candidate)
|
||||
{
|
||||
tracker = candidate;
|
||||
}
|
||||
|
||||
protected:
|
||||
void pickQueryToExcludeImpl() override
|
||||
{
|
||||
BaseTracker::picked_tracker = tracker;
|
||||
}
|
||||
|
||||
MemoryTracker * tracker;
|
||||
};
|
||||
|
||||
using UserOvercommitTrackerForTest = OvercommitTrackerForTest<UserOvercommitTracker>;
|
||||
using GlobalOvercommitTrackerForTest = OvercommitTrackerForTest<GlobalOvercommitTracker>;
|
||||
|
||||
static constexpr UInt64 WAIT_TIME = 3'000'000;
|
||||
|
||||
template <typename T>
|
||||
void free_not_continue_test(T & overcommit_tracker)
|
||||
{
|
||||
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
|
||||
|
||||
static constexpr size_t THREADS = 5;
|
||||
std::vector<MemoryTracker> trackers(THREADS);
|
||||
std::atomic<int> need_to_stop = 0;
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(THREADS);
|
||||
|
||||
MemoryTracker picked;
|
||||
overcommit_tracker.setCandidate(&picked);
|
||||
|
||||
for (size_t i = 0; i < THREADS; ++i)
|
||||
{
|
||||
threads.push_back(std::thread(
|
||||
[&, i]()
|
||||
{
|
||||
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
|
||||
++need_to_stop;
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(50); }).join();
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(need_to_stop, THREADS);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, UserFreeNotContinue)
|
||||
{
|
||||
ProcessList process_list;
|
||||
ProcessListForUser user_process_list(&process_list);
|
||||
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
|
||||
free_not_continue_test(user_overcommit_tracker);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, GlobalFreeNotContinue)
|
||||
{
|
||||
ProcessList process_list;
|
||||
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
|
||||
free_not_continue_test(global_overcommit_tracker);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void free_continue_test(T & overcommit_tracker)
|
||||
{
|
||||
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
|
||||
|
||||
static constexpr size_t THREADS = 5;
|
||||
std::vector<MemoryTracker> trackers(THREADS);
|
||||
std::atomic<int> need_to_stop = 0;
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(THREADS);
|
||||
|
||||
MemoryTracker picked;
|
||||
overcommit_tracker.setCandidate(&picked);
|
||||
|
||||
for (size_t i = 0; i < THREADS; ++i)
|
||||
{
|
||||
threads.push_back(std::thread(
|
||||
[&, i]()
|
||||
{
|
||||
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
|
||||
++need_to_stop;
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); }).join();
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(need_to_stop, 0);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, UserFreeContinue)
|
||||
{
|
||||
ProcessList process_list;
|
||||
ProcessListForUser user_process_list(&process_list);
|
||||
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
|
||||
free_continue_test(user_overcommit_tracker);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, GlobalFreeContinue)
|
||||
{
|
||||
ProcessList process_list;
|
||||
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
|
||||
free_continue_test(global_overcommit_tracker);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void free_continue_and_alloc_test(T & overcommit_tracker)
|
||||
{
|
||||
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
|
||||
|
||||
static constexpr size_t THREADS = 5;
|
||||
std::vector<MemoryTracker> trackers(THREADS);
|
||||
std::atomic<int> need_to_stop = 0;
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(THREADS);
|
||||
|
||||
MemoryTracker picked;
|
||||
overcommit_tracker.setCandidate(&picked);
|
||||
|
||||
for (size_t i = 0; i < THREADS; ++i)
|
||||
{
|
||||
threads.push_back(std::thread(
|
||||
[&, i]()
|
||||
{
|
||||
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
|
||||
++need_to_stop;
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
bool stopped_next = false;
|
||||
std::thread(
|
||||
[&]()
|
||||
{
|
||||
MemoryTracker failed;
|
||||
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
|
||||
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
|
||||
}
|
||||
).join();
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(need_to_stop, 0);
|
||||
ASSERT_EQ(stopped_next, true);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, UserFreeContinueAndAlloc)
|
||||
{
|
||||
ProcessList process_list;
|
||||
ProcessListForUser user_process_list(&process_list);
|
||||
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
|
||||
free_continue_and_alloc_test(user_overcommit_tracker);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, GlobalFreeContinueAndAlloc)
|
||||
{
|
||||
ProcessList process_list;
|
||||
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
|
||||
free_continue_and_alloc_test(global_overcommit_tracker);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void free_continue_and_alloc_2_test(T & overcommit_tracker)
|
||||
{
|
||||
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
|
||||
|
||||
static constexpr size_t THREADS = 5;
|
||||
std::vector<MemoryTracker> trackers(THREADS);
|
||||
std::atomic<int> need_to_stop = 0;
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(THREADS);
|
||||
|
||||
MemoryTracker picked;
|
||||
overcommit_tracker.setCandidate(&picked);
|
||||
|
||||
for (size_t i = 0; i < THREADS; ++i)
|
||||
{
|
||||
threads.push_back(std::thread(
|
||||
[&, i]()
|
||||
{
|
||||
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
|
||||
++need_to_stop;
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
bool stopped_next = false;
|
||||
threads.push_back(std::thread(
|
||||
[&]()
|
||||
{
|
||||
MemoryTracker failed;
|
||||
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
|
||||
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
|
||||
}
|
||||
));
|
||||
|
||||
overcommit_tracker.tryContinueQueryExecutionAfterFree(90);
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(need_to_stop, 0);
|
||||
ASSERT_EQ(stopped_next, true);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, UserFreeContinueAndAlloc2)
|
||||
{
|
||||
ProcessList process_list;
|
||||
ProcessListForUser user_process_list(&process_list);
|
||||
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
|
||||
free_continue_and_alloc_2_test(user_overcommit_tracker);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, GlobalFreeContinueAndAlloc2)
|
||||
{
|
||||
ProcessList process_list;
|
||||
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
|
||||
free_continue_and_alloc_2_test(global_overcommit_tracker);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void free_continue_and_alloc_3_test(T & overcommit_tracker)
|
||||
{
|
||||
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
|
||||
|
||||
static constexpr size_t THREADS = 5;
|
||||
std::vector<MemoryTracker> trackers(THREADS);
|
||||
std::atomic<int> need_to_stop = 0;
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(THREADS);
|
||||
|
||||
MemoryTracker picked;
|
||||
overcommit_tracker.setCandidate(&picked);
|
||||
|
||||
for (size_t i = 0; i < THREADS; ++i)
|
||||
{
|
||||
threads.push_back(std::thread(
|
||||
[&, i]()
|
||||
{
|
||||
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
|
||||
++need_to_stop;
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
bool stopped_next = false;
|
||||
threads.push_back(std::thread(
|
||||
[&]()
|
||||
{
|
||||
MemoryTracker failed;
|
||||
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
|
||||
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
|
||||
}
|
||||
));
|
||||
|
||||
overcommit_tracker.tryContinueQueryExecutionAfterFree(100);
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(need_to_stop, 0);
|
||||
ASSERT_EQ(stopped_next, false);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, UserFreeContinueAndAlloc3)
|
||||
{
|
||||
ProcessList process_list;
|
||||
ProcessListForUser user_process_list(&process_list);
|
||||
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
|
||||
free_continue_and_alloc_2_test(user_overcommit_tracker);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, GlobalFreeContinueAndAlloc3)
|
||||
{
|
||||
ProcessList process_list;
|
||||
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
|
||||
free_continue_and_alloc_2_test(global_overcommit_tracker);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void free_continue_2_test(T & overcommit_tracker)
|
||||
{
|
||||
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
|
||||
|
||||
static constexpr size_t THREADS = 5;
|
||||
std::vector<MemoryTracker> trackers(THREADS);
|
||||
std::atomic<int> need_to_stop = 0;
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(THREADS);
|
||||
|
||||
MemoryTracker picked;
|
||||
overcommit_tracker.setCandidate(&picked);
|
||||
|
||||
for (size_t i = 0; i < THREADS; ++i)
|
||||
{
|
||||
threads.push_back(std::thread(
|
||||
[&, i]()
|
||||
{
|
||||
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
|
||||
++need_to_stop;
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
std::thread(
|
||||
[&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(300); }
|
||||
).join();
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(need_to_stop, 2);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, UserFreeContinue2)
|
||||
{
|
||||
ProcessList process_list;
|
||||
ProcessListForUser user_process_list(&process_list);
|
||||
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
|
||||
free_continue_2_test(user_overcommit_tracker);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, GlobalFreeContinue2)
|
||||
{
|
||||
ProcessList process_list;
|
||||
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
|
||||
free_continue_2_test(global_overcommit_tracker);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void query_stop_not_continue_test(T & overcommit_tracker)
|
||||
{
|
||||
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
|
||||
|
||||
std::atomic<int> need_to_stop = 0;
|
||||
|
||||
MemoryTracker picked;
|
||||
overcommit_tracker.setCandidate(&picked);
|
||||
|
||||
MemoryTracker another;
|
||||
auto thread = std::thread(
|
||||
[&]()
|
||||
{
|
||||
if (overcommit_tracker.needToStopQuery(&another, 100))
|
||||
++need_to_stop;
|
||||
}
|
||||
);
|
||||
overcommit_tracker.onQueryStop(&picked);
|
||||
thread.join();
|
||||
|
||||
ASSERT_EQ(need_to_stop, 1);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, UserQueryStopNotContinue)
|
||||
{
|
||||
ProcessList process_list;
|
||||
ProcessListForUser user_process_list(&process_list);
|
||||
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
|
||||
query_stop_not_continue_test(user_overcommit_tracker);
|
||||
}
|
||||
|
||||
TEST(OvercommitTracker, GlobalQueryStopNotContinue)
|
||||
{
|
||||
ProcessList process_list;
|
||||
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
|
||||
query_stop_not_continue_test(global_overcommit_tracker);
|
||||
}
|
@ -712,14 +712,21 @@ void Changelog::flush()
|
||||
current_writer->flush(force_sync);
|
||||
}
|
||||
|
||||
void Changelog::shutdown()
|
||||
{
|
||||
if (!log_files_to_delete_queue.isFinished())
|
||||
log_files_to_delete_queue.finish();
|
||||
|
||||
if (clean_log_thread.joinable())
|
||||
clean_log_thread.join();
|
||||
}
|
||||
|
||||
Changelog::~Changelog()
|
||||
{
|
||||
try
|
||||
{
|
||||
flush();
|
||||
log_files_to_delete_queue.finish();
|
||||
if (clean_log_thread.joinable())
|
||||
clean_log_thread.join();
|
||||
shutdown();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -121,6 +121,8 @@ public:
|
||||
/// Fsync latest log to disk and flush buffer
|
||||
void flush();
|
||||
|
||||
void shutdown();
|
||||
|
||||
uint64_t size() const
|
||||
{
|
||||
return logs.size();
|
||||
|
@ -118,4 +118,18 @@ nuraft::ptr<nuraft::log_entry> KeeperLogStore::getLatestConfigChange() const
|
||||
return changelog.getLatestConfigChange();
|
||||
}
|
||||
|
||||
void KeeperLogStore::shutdownChangelog()
|
||||
{
|
||||
std::lock_guard lock(changelog_lock);
|
||||
changelog.shutdown();
|
||||
}
|
||||
|
||||
bool KeeperLogStore::flushChangelogAndShutdown()
|
||||
{
|
||||
std::lock_guard lock(changelog_lock);
|
||||
changelog.flush();
|
||||
changelog.shutdown();
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -52,6 +52,12 @@ public:
|
||||
/// Call fsync to the stored data
|
||||
bool flush() override;
|
||||
|
||||
/// Stop background cleanup thread in change
|
||||
void shutdownChangelog();
|
||||
|
||||
/// Flush logstore and call shutdown of background thread in changelog
|
||||
bool flushChangelogAndShutdown();
|
||||
|
||||
/// Current log storage size
|
||||
uint64_t size() const;
|
||||
|
||||
|
@ -360,7 +360,7 @@ void KeeperServer::shutdownRaftServer()
|
||||
void KeeperServer::shutdown()
|
||||
{
|
||||
state_machine->shutdownStorage();
|
||||
state_manager->flushLogStore();
|
||||
state_manager->flushAndShutDownLogStore();
|
||||
shutdownRaftServer();
|
||||
}
|
||||
|
||||
|
@ -249,9 +249,9 @@ ClusterConfigPtr KeeperStateManager::getLatestConfigFromLogStore() const
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void KeeperStateManager::flushLogStore()
|
||||
void KeeperStateManager::flushAndShutDownLogStore()
|
||||
{
|
||||
log_store->flush();
|
||||
log_store->flushChangelogAndShutdown();
|
||||
}
|
||||
|
||||
void KeeperStateManager::save_config(const nuraft::cluster_config & config)
|
||||
|
@ -52,7 +52,8 @@ public:
|
||||
|
||||
void loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep);
|
||||
|
||||
void flushLogStore();
|
||||
/// Flush logstore and call shutdown of background thread
|
||||
void flushAndShutDownLogStore();
|
||||
|
||||
/// Called on server start, in our case we don't use any separate logic for load
|
||||
nuraft::ptr<nuraft::cluster_config> load_config() override
|
||||
|
@ -455,6 +455,8 @@ struct ContextSharedPart
|
||||
delete_message_broker_schedule_pool.reset();
|
||||
delete_ddl_worker.reset();
|
||||
delete_access_control.reset();
|
||||
|
||||
total_memory_tracker.resetOvercommitTracker();
|
||||
}
|
||||
|
||||
bool hasTraceCollector() const
|
||||
|
@ -344,9 +344,9 @@ QueryStatus::~QueryStatus()
|
||||
if (auto * memory_tracker = getMemoryTracker())
|
||||
{
|
||||
if (user_process_list)
|
||||
user_process_list->user_overcommit_tracker.unsubscribe(memory_tracker);
|
||||
user_process_list->user_overcommit_tracker.onQueryStop(memory_tracker);
|
||||
if (auto shared_context = getContext())
|
||||
shared_context->getGlobalOvercommitTracker()->unsubscribe(memory_tracker);
|
||||
shared_context->getGlobalOvercommitTracker()->onQueryStop(memory_tracker);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user