diff --git a/src/Common/CurrentMemoryTracker.cpp b/src/Common/CurrentMemoryTracker.cpp index 737f2566efc..d38a5a9c70c 100644 --- a/src/Common/CurrentMemoryTracker.cpp +++ b/src/Common/CurrentMemoryTracker.cpp @@ -27,30 +27,45 @@ namespace CurrentMemoryTracker using DB::current_thread; -void alloc(Int64 size) +namespace { - if (auto * memory_tracker = getMemoryTracker()) + void allocImpl(Int64 size, bool throw_if_memory_exceeded) { - if (current_thread) + if (auto * memory_tracker = getMemoryTracker()) { - current_thread->untracked_memory += size; - if (current_thread->untracked_memory > current_thread->untracked_memory_limit) + if (current_thread) { - /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes - /// more. It could be useful to enlarge Exception message in rethrow logic. - Int64 tmp = current_thread->untracked_memory; - current_thread->untracked_memory = 0; - memory_tracker->alloc(tmp); + current_thread->untracked_memory += size; + if (current_thread->untracked_memory > current_thread->untracked_memory_limit) + { + /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes + /// more. It could be useful to enlarge Exception message in rethrow logic. + Int64 tmp = current_thread->untracked_memory; + current_thread->untracked_memory = 0; + memory_tracker->allocImpl(tmp, throw_if_memory_exceeded); + } + } + /// total_memory_tracker only, ignore untracked_memory + else + { + memory_tracker->allocImpl(size, throw_if_memory_exceeded); } } - /// total_memory_tracker only, ignore untracked_memory - else - { - memory_tracker->alloc(size); - } } } +void alloc(Int64 size) +{ + bool throw_if_memory_exceeded = true; + allocImpl(size, throw_if_memory_exceeded); +} + +void allocNoThrow(Int64 size) +{ + bool throw_if_memory_exceeded = false; + allocImpl(size, throw_if_memory_exceeded); +} + void realloc(Int64 old_size, Int64 new_size) { Int64 addition = new_size - old_size; diff --git a/src/Common/CurrentMemoryTracker.h b/src/Common/CurrentMemoryTracker.h index 3a9e8990d66..5090b7c3687 100644 --- a/src/Common/CurrentMemoryTracker.h +++ b/src/Common/CurrentMemoryTracker.h @@ -6,6 +6,7 @@ namespace CurrentMemoryTracker { void alloc(Int64 size); + void allocNoThrow(Int64 size); void realloc(Int64 old_size, Int64 new_size); void free(Int64 size); } diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index e4c4e0b0ab1..e9ad40075e6 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -128,7 +128,7 @@ void MemoryTracker::logMemoryUsage(Int64 current) const } -void MemoryTracker::alloc(Int64 size) +void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) { if (size < 0) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size); @@ -137,7 +137,7 @@ void MemoryTracker::alloc(Int64 size) { /// Since the BlockerInThread should respect the level, we should go to the next parent. if (auto * loaded_next = parent.load(std::memory_order_relaxed)) - loaded_next->alloc(size); + loaded_next->allocImpl(size, throw_if_memory_exceeded); return; } @@ -173,7 +173,7 @@ void MemoryTracker::alloc(Int64 size) } #ifdef MEMORY_TRACKER_DEBUG_CHECKS - if (unlikely(_memory_tracker_always_throw_logical_error_on_allocation)) + if (unlikely(_memory_tracker_always_throw_logical_error_on_allocation) && throw_if_memory_exceeded) { _memory_tracker_always_throw_logical_error_on_allocation = false; throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Memory tracker: allocations not allowed."); @@ -181,21 +181,28 @@ void MemoryTracker::alloc(Int64 size) #endif std::bernoulli_distribution fault(fault_probability); - if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true)) + if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true) && throw_if_memory_exceeded) { + ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); + amount.fetch_sub(size, std::memory_order_relaxed); + /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc BlockerInThread untrack_lock(VariableContext::Global); ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); const auto * description = description_ptr.load(std::memory_order_relaxed); amount.fetch_sub(size, std::memory_order_relaxed); - throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED, - "Memory tracker{}{}: fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}", - description ? " " : "", description ? description : "", - formatReadableSizeWithBinarySuffix(will_be), - size, formatReadableSizeWithBinarySuffix(current_hard_limit)); + throw DB::Exception( + DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED, + "Memory tracker{}{}: fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}", + description ? " " : "", + description ? description : "", + formatReadableSizeWithBinarySuffix(will_be), + size, + formatReadableSizeWithBinarySuffix(current_hard_limit)); } + if (unlikely(current_profiler_limit && will_be > current_profiler_limit)) { BlockerInThread untrack_lock(VariableContext::Global); @@ -210,36 +217,59 @@ void MemoryTracker::alloc(Int64 size) DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size); } - if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false)) + if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded) { /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc BlockerInThread untrack_lock(VariableContext::Global); - ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); const auto * description = description_ptr.load(std::memory_order_relaxed); - amount.fetch_sub(size, std::memory_order_relaxed); - throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED, - "Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}", - description ? " " : "", description ? description : "", - formatReadableSizeWithBinarySuffix(will_be), - size, formatReadableSizeWithBinarySuffix(current_hard_limit)); + throw DB::Exception( + DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED, + "Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}", + description ? " " : "", + description ? description : "", + formatReadableSizeWithBinarySuffix(will_be), + size, + formatReadableSizeWithBinarySuffix(current_hard_limit)); } - updatePeak(will_be); + if (throw_if_memory_exceeded) + { + /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc + BlockerInThread untrack_lock(VariableContext::Global); + bool log_memory_usage = true; + updatePeak(will_be, log_memory_usage); + } + else + { + bool log_memory_usage = false; + updatePeak(will_be, log_memory_usage); + } if (auto * loaded_next = parent.load(std::memory_order_relaxed)) - loaded_next->alloc(size); + loaded_next->allocImpl(size, throw_if_memory_exceeded); } +void MemoryTracker::alloc(Int64 size) +{ + bool throw_if_memory_exceeded = true; + allocImpl(size, throw_if_memory_exceeded); +} -void MemoryTracker::updatePeak(Int64 will_be) +void MemoryTracker::allocNoThrow(Int64 size) +{ + bool throw_if_memory_exceeded = false; + allocImpl(size, throw_if_memory_exceeded); +} + +void MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage) { auto peak_old = peak.load(std::memory_order_relaxed); if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth. { peak.store(will_be, std::memory_order_relaxed); - if ((level == VariableContext::Process || level == VariableContext::Global) + if (log_memory_usage && (level == VariableContext::Process || level == VariableContext::Global) && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every) logMemoryUsage(will_be); } @@ -317,7 +347,9 @@ void MemoryTracker::reset() void MemoryTracker::set(Int64 to) { amount.store(to, std::memory_order_relaxed); - updatePeak(to); + + bool log_memory_usage = true; + updatePeak(to, log_memory_usage); } diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 4dada6f4ef7..e57fc2e0b75 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -58,7 +58,7 @@ private: /// This description will be used as prefix into log messages (if isn't nullptr) std::atomic description_ptr = nullptr; - void updatePeak(Int64 will_be); + void updatePeak(Int64 will_be, bool log_memory_usage); void logMemoryUsage(Int64 current) const; public: @@ -73,6 +73,10 @@ public: */ void alloc(Int64 size); + void allocNoThrow(Int64 size); + + void allocImpl(Int64 size, bool throw_if_memory_exceeded); + void realloc(Int64 old_size, Int64 new_size) { Int64 addition = new_size - old_size; diff --git a/src/Common/new_delete.cpp b/src/Common/new_delete.cpp index be3724666f8..56173fb108a 100644 --- a/src/Common/new_delete.cpp +++ b/src/Common/new_delete.cpp @@ -41,9 +41,9 @@ struct InitializeJemallocZoneAllocatorForOSX namespace Memory { -inline ALWAYS_INLINE void trackMemory(std::size_t size) +inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size) { - std::size_t actual_size = size; + size_t actual_size = size; #if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 5 /// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function @@ -52,21 +52,13 @@ inline ALWAYS_INLINE void trackMemory(std::size_t size) actual_size = nallocx(size, 0); #endif - CurrentMemoryTracker::alloc(actual_size); + return actual_size; } -inline ALWAYS_INLINE bool trackMemoryNoExcept(std::size_t size) noexcept +inline ALWAYS_INLINE void trackMemory(std::size_t size) { - try - { - trackMemory(size); - } - catch (...) - { - return false; - } - - return true; + std::size_t actual_size = getActualAllocationSize(size); + CurrentMemoryTracker::allocNoThrow(actual_size); } inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0) noexcept @@ -98,27 +90,29 @@ inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t void * operator new(std::size_t size) { Memory::trackMemory(size); + return Memory::newImpl(size); } void * operator new[](std::size_t size) { Memory::trackMemory(size); + return Memory::newImpl(size); } void * operator new(std::size_t size, const std::nothrow_t &) noexcept { - if (likely(Memory::trackMemoryNoExcept(size))) - return Memory::newNoExept(size); - return nullptr; + Memory::trackMemory(size); + + return Memory::newNoExept(size); } void * operator new[](std::size_t size, const std::nothrow_t &) noexcept { - if (likely(Memory::trackMemoryNoExcept(size))) - return Memory::newNoExept(size); - return nullptr; + Memory::trackMemory(size); + + return Memory::newNoExept(size); } /// delete