Get rid of allocations in OvercommitTracker

This commit is contained in:
Dmitry Novik 2022-06-21 10:15:33 +00:00
parent 9ca368ac20
commit e10f079bd3
2 changed files with 18 additions and 11 deletions

View File

@ -20,6 +20,8 @@ OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_)
, global_mutex(global_mutex_)
, freed_memory(0)
, required_memory(0)
, next_id(0)
, id_to_release(0)
, allow_release(true)
{}
@ -42,6 +44,8 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
std::unique_lock<std::mutex> global_lock(global_mutex);
std::unique_lock<std::mutex> lk(overcommit_m);
size_t id = next_id++;
auto max_wait_time = tracker->getOvercommitWaitingTime();
if (max_wait_time == ZERO_MICROSEC)
@ -73,23 +77,21 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
allow_release = true;
required_memory += amount;
required_per_thread[tracker] = amount;
auto wait_start_time = std::chrono::system_clock::now();
bool timeout = !cv.wait_for(lk, max_wait_time, [this, tracker]()
bool timeout = !cv.wait_for(lk, max_wait_time, [this, id]()
{
return required_per_thread[tracker] == 0 || cancellation_state == QueryCancellationState::NONE;
return id < id_to_release || cancellation_state == QueryCancellationState::NONE;
});
auto wait_end_time = std::chrono::system_clock::now();
ProfileEvents::increment(ProfileEvents::MemoryOvercommitWaitTimeMicroseconds, (wait_end_time - wait_start_time) / 1us);
LOG_DEBUG_SAFE(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : ""));
required_memory -= amount;
Int64 still_need = required_per_thread[tracker]; // If enough memory is freed it will be 0
required_per_thread.erase(tracker);
bool still_need = !(id < id_to_release); // True if thread wasn't released
// If threads where not released since last call of this method,
// we can release them now.
if (allow_release && required_memory <= freed_memory && still_need != 0)
if (allow_release && required_memory <= freed_memory && still_need)
releaseThreads();
// All required amount of memory is free now and selected query to stop doesn't know about it.
@ -98,7 +100,7 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
reset();
if (timeout)
return OvercommitResult::TIMEOUTED;
if (still_need != 0)
if (still_need)
return OvercommitResult::NOT_ENOUGH_FREED;
else
return OvercommitResult::MEMORY_FREED;
@ -132,8 +134,7 @@ void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
void OvercommitTracker::releaseThreads()
{
for (auto & required : required_per_thread)
required.second = 0;
id_to_release = next_id;
freed_memory = 0;
allow_release = false; // To avoid repeating call of this method in OvercommitTracker::needToStopQuery
cv.notify_all();

View File

@ -7,6 +7,7 @@
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <mutex>
#include <unordered_map>
@ -104,6 +105,10 @@ private:
picked_tracker = nullptr;
cancellation_state = QueryCancellationState::NONE;
freed_memory = 0;
next_id = 0;
id_to_release = 0;
allow_release = true;
}
@ -111,8 +116,6 @@ private:
QueryCancellationState cancellation_state;
std::unordered_map<MemoryTracker *, Int64> required_per_thread;
// Global mutex which is used in ProcessList to synchronize
// insertion and deletion of queries.
// OvercommitTracker::pickQueryToExcludeImpl() implementations
@ -122,6 +125,9 @@ private:
Int64 freed_memory;
Int64 required_memory;
size_t next_id; // Id provided to the next thread to come in OvercommitTracker
size_t id_to_release; // We can release all threads with id smaller than this
bool allow_release;
};