Merge pull request #24483 from kitaisreal/memory-tracker-new-no-throw

MemoryTracker new no throw
This commit is contained in:
Maksim Kita 2021-05-27 10:35:55 +03:00 committed by GitHub
commit e5b78723bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 58 deletions

View File

@ -27,30 +27,45 @@ namespace CurrentMemoryTracker
using DB::current_thread; using DB::current_thread;
void alloc(Int64 size) namespace
{ {
if (auto * memory_tracker = getMemoryTracker()) void allocImpl(Int64 size, bool throw_if_memory_exceeded)
{ {
if (current_thread) if (auto * memory_tracker = getMemoryTracker())
{ {
current_thread->untracked_memory += size; if (current_thread)
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
{ {
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes current_thread->untracked_memory += size;
/// more. It could be useful to enlarge Exception message in rethrow logic. if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
Int64 tmp = current_thread->untracked_memory; {
current_thread->untracked_memory = 0; /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
memory_tracker->alloc(tmp); /// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->allocImpl(tmp, throw_if_memory_exceeded);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->allocImpl(size, throw_if_memory_exceeded);
} }
} }
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->alloc(size);
}
} }
} }
void alloc(Int64 size)
{
bool throw_if_memory_exceeded = true;
allocImpl(size, throw_if_memory_exceeded);
}
void allocNoThrow(Int64 size)
{
bool throw_if_memory_exceeded = false;
allocImpl(size, throw_if_memory_exceeded);
}
void realloc(Int64 old_size, Int64 new_size) void realloc(Int64 old_size, Int64 new_size)
{ {
Int64 addition = new_size - old_size; Int64 addition = new_size - old_size;

View File

@ -6,6 +6,7 @@
namespace CurrentMemoryTracker namespace CurrentMemoryTracker
{ {
void alloc(Int64 size); void alloc(Int64 size);
void allocNoThrow(Int64 size);
void realloc(Int64 old_size, Int64 new_size); void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size); void free(Int64 size);
} }

View File

@ -128,7 +128,7 @@ void MemoryTracker::logMemoryUsage(Int64 current) const
} }
void MemoryTracker::alloc(Int64 size) void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
{ {
if (size < 0) if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size); throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
@ -137,7 +137,7 @@ void MemoryTracker::alloc(Int64 size)
{ {
/// Since the BlockerInThread should respect the level, we should go to the next parent. /// Since the BlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed)) if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size); loaded_next->allocImpl(size, throw_if_memory_exceeded);
return; return;
} }
@ -173,7 +173,7 @@ void MemoryTracker::alloc(Int64 size)
} }
#ifdef MEMORY_TRACKER_DEBUG_CHECKS #ifdef MEMORY_TRACKER_DEBUG_CHECKS
if (unlikely(_memory_tracker_always_throw_logical_error_on_allocation)) if (unlikely(_memory_tracker_always_throw_logical_error_on_allocation) && throw_if_memory_exceeded)
{ {
_memory_tracker_always_throw_logical_error_on_allocation = false; _memory_tracker_always_throw_logical_error_on_allocation = false;
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Memory tracker: allocations not allowed."); throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Memory tracker: allocations not allowed.");
@ -181,21 +181,28 @@ void MemoryTracker::alloc(Int64 size)
#endif #endif
std::bernoulli_distribution fault(fault_probability); std::bernoulli_distribution fault(fault_probability);
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true)) if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true) && throw_if_memory_exceeded)
{ {
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
amount.fetch_sub(size, std::memory_order_relaxed);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global); BlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed); const auto * description = description_ptr.load(std::memory_order_relaxed);
amount.fetch_sub(size, std::memory_order_relaxed); amount.fetch_sub(size, std::memory_order_relaxed);
throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED, throw DB::Exception(
"Memory tracker{}{}: fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}", DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
description ? " " : "", description ? description : "", "Memory tracker{}{}: fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
formatReadableSizeWithBinarySuffix(will_be), description ? " " : "",
size, formatReadableSizeWithBinarySuffix(current_hard_limit)); description ? description : "",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_hard_limit));
} }
if (unlikely(current_profiler_limit && will_be > current_profiler_limit)) if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{ {
BlockerInThread untrack_lock(VariableContext::Global); BlockerInThread untrack_lock(VariableContext::Global);
@ -210,36 +217,59 @@ void MemoryTracker::alloc(Int64 size)
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size); DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
} }
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false)) if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{ {
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global); BlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed); const auto * description = description_ptr.load(std::memory_order_relaxed);
amount.fetch_sub(size, std::memory_order_relaxed); throw DB::Exception(
throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED, DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
"Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}", "Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
description ? " " : "", description ? description : "", description ? " " : "",
formatReadableSizeWithBinarySuffix(will_be), description ? description : "",
size, formatReadableSizeWithBinarySuffix(current_hard_limit)); formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_hard_limit));
} }
updatePeak(will_be); if (throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
bool log_memory_usage = true;
updatePeak(will_be, log_memory_usage);
}
else
{
bool log_memory_usage = false;
updatePeak(will_be, log_memory_usage);
}
if (auto * loaded_next = parent.load(std::memory_order_relaxed)) if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size); loaded_next->allocImpl(size, throw_if_memory_exceeded);
} }
void MemoryTracker::alloc(Int64 size)
{
bool throw_if_memory_exceeded = true;
allocImpl(size, throw_if_memory_exceeded);
}
void MemoryTracker::updatePeak(Int64 will_be) void MemoryTracker::allocNoThrow(Int64 size)
{
bool throw_if_memory_exceeded = false;
allocImpl(size, throw_if_memory_exceeded);
}
void MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
{ {
auto peak_old = peak.load(std::memory_order_relaxed); auto peak_old = peak.load(std::memory_order_relaxed);
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth. if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
{ {
peak.store(will_be, std::memory_order_relaxed); peak.store(will_be, std::memory_order_relaxed);
if ((level == VariableContext::Process || level == VariableContext::Global) if (log_memory_usage && (level == VariableContext::Process || level == VariableContext::Global)
&& will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every) && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
logMemoryUsage(will_be); logMemoryUsage(will_be);
} }
@ -317,7 +347,9 @@ void MemoryTracker::reset()
void MemoryTracker::set(Int64 to) void MemoryTracker::set(Int64 to)
{ {
amount.store(to, std::memory_order_relaxed); amount.store(to, std::memory_order_relaxed);
updatePeak(to);
bool log_memory_usage = true;
updatePeak(to, log_memory_usage);
} }

View File

@ -58,7 +58,7 @@ private:
/// This description will be used as prefix into log messages (if isn't nullptr) /// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic<const char *> description_ptr = nullptr; std::atomic<const char *> description_ptr = nullptr;
void updatePeak(Int64 will_be); void updatePeak(Int64 will_be, bool log_memory_usage);
void logMemoryUsage(Int64 current) const; void logMemoryUsage(Int64 current) const;
public: public:
@ -73,6 +73,10 @@ public:
*/ */
void alloc(Int64 size); void alloc(Int64 size);
void allocNoThrow(Int64 size);
void allocImpl(Int64 size, bool throw_if_memory_exceeded);
void realloc(Int64 old_size, Int64 new_size) void realloc(Int64 old_size, Int64 new_size)
{ {
Int64 addition = new_size - old_size; Int64 addition = new_size - old_size;

View File

@ -41,9 +41,9 @@ struct InitializeJemallocZoneAllocatorForOSX
namespace Memory namespace Memory
{ {
inline ALWAYS_INLINE void trackMemory(std::size_t size) inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size)
{ {
std::size_t actual_size = size; size_t actual_size = size;
#if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 5 #if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 5
/// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function /// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function
@ -52,21 +52,13 @@ inline ALWAYS_INLINE void trackMemory(std::size_t size)
actual_size = nallocx(size, 0); actual_size = nallocx(size, 0);
#endif #endif
CurrentMemoryTracker::alloc(actual_size); return actual_size;
} }
inline ALWAYS_INLINE bool trackMemoryNoExcept(std::size_t size) noexcept inline ALWAYS_INLINE void trackMemory(std::size_t size)
{ {
try std::size_t actual_size = getActualAllocationSize(size);
{ CurrentMemoryTracker::allocNoThrow(actual_size);
trackMemory(size);
}
catch (...)
{
return false;
}
return true;
} }
inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0) noexcept inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0) noexcept
@ -98,27 +90,29 @@ inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t
void * operator new(std::size_t size) void * operator new(std::size_t size)
{ {
Memory::trackMemory(size); Memory::trackMemory(size);
return Memory::newImpl(size); return Memory::newImpl(size);
} }
void * operator new[](std::size_t size) void * operator new[](std::size_t size)
{ {
Memory::trackMemory(size); Memory::trackMemory(size);
return Memory::newImpl(size); return Memory::newImpl(size);
} }
void * operator new(std::size_t size, const std::nothrow_t &) noexcept void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{ {
if (likely(Memory::trackMemoryNoExcept(size))) Memory::trackMemory(size);
return Memory::newNoExept(size);
return nullptr; return Memory::newNoExept(size);
} }
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{ {
if (likely(Memory::trackMemoryNoExcept(size))) Memory::trackMemory(size);
return Memory::newNoExept(size);
return nullptr; return Memory::newNoExept(size);
} }
/// delete /// delete