mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #17646 from azat/Buffer-memory-tracking
Do not ignore server memory limits during Buffer flush
This commit is contained in:
commit
19e0e1a403
@ -28,7 +28,7 @@
|
||||
#include <common/mremap.h>
|
||||
#include <common/getPageSize.h>
|
||||
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/CurrentMemoryTracker.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/formatReadable.h>
|
||||
|
||||
|
81
src/Common/CurrentMemoryTracker.cpp
Normal file
81
src/Common/CurrentMemoryTracker.cpp
Normal file
@ -0,0 +1,81 @@
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
|
||||
#include <Common/CurrentMemoryTracker.h>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
MemoryTracker * getMemoryTracker()
|
||||
{
|
||||
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
|
||||
return thread_memory_tracker;
|
||||
|
||||
/// Once the main thread is initialized,
|
||||
/// total_memory_tracker is initialized too.
|
||||
/// And can be used, since MainThreadStatus is required for profiling.
|
||||
if (DB::MainThreadStatus::get())
|
||||
return &total_memory_tracker;
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace CurrentMemoryTracker
|
||||
{
|
||||
|
||||
using DB::current_thread;
|
||||
|
||||
void alloc(Int64 size)
|
||||
{
|
||||
if (auto * memory_tracker = getMemoryTracker())
|
||||
{
|
||||
if (current_thread)
|
||||
{
|
||||
current_thread->untracked_memory += size;
|
||||
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
|
||||
{
|
||||
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
|
||||
/// more. It could be useful to enlarge Exception message in rethrow logic.
|
||||
Int64 tmp = current_thread->untracked_memory;
|
||||
current_thread->untracked_memory = 0;
|
||||
memory_tracker->alloc(tmp);
|
||||
}
|
||||
}
|
||||
/// total_memory_tracker only, ignore untracked_memory
|
||||
else
|
||||
{
|
||||
memory_tracker->alloc(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void realloc(Int64 old_size, Int64 new_size)
|
||||
{
|
||||
Int64 addition = new_size - old_size;
|
||||
addition > 0 ? alloc(addition) : free(-addition);
|
||||
}
|
||||
|
||||
void free(Int64 size)
|
||||
{
|
||||
if (auto * memory_tracker = getMemoryTracker())
|
||||
{
|
||||
if (current_thread)
|
||||
{
|
||||
current_thread->untracked_memory -= size;
|
||||
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
|
||||
{
|
||||
memory_tracker->free(-current_thread->untracked_memory);
|
||||
current_thread->untracked_memory = 0;
|
||||
}
|
||||
}
|
||||
/// total_memory_tracker only, ignore untracked_memory
|
||||
else
|
||||
{
|
||||
memory_tracker->free(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
11
src/Common/CurrentMemoryTracker.h
Normal file
11
src/Common/CurrentMemoryTracker.h
Normal file
@ -0,0 +1,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <common/types.h>
|
||||
|
||||
/// Convenience methods, that use current thread's memory_tracker if it is available.
|
||||
namespace CurrentMemoryTracker
|
||||
{
|
||||
void alloc(Int64 size);
|
||||
void realloc(Int64 old_size, Int64 new_size);
|
||||
void free(Int64 size);
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
#include <common/defines.h>
|
||||
#include <boost/context/stack_context.hpp>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/CurrentMemoryTracker.h>
|
||||
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include "Common/TraceCollector.h"
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <common/logger_useful.h>
|
||||
@ -16,20 +15,6 @@
|
||||
namespace
|
||||
{
|
||||
|
||||
MemoryTracker * getMemoryTracker()
|
||||
{
|
||||
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
|
||||
return thread_memory_tracker;
|
||||
|
||||
/// Once the main thread is initialized,
|
||||
/// total_memory_tracker is initialized too.
|
||||
/// And can be used, since MainThreadStatus is required for profiling.
|
||||
if (DB::MainThreadStatus::get())
|
||||
return &total_memory_tracker;
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/// MemoryTracker cannot throw MEMORY_LIMIT_EXCEEDED (either configured memory
|
||||
/// limit reached or fault injected), in the following cases:
|
||||
///
|
||||
@ -41,9 +26,9 @@ MemoryTracker * getMemoryTracker()
|
||||
/// NOTE: that since C++11 destructor marked with noexcept by default, and
|
||||
/// this means that any throw from destructor (that is not marked with
|
||||
/// noexcept(false)) will cause std::terminate()
|
||||
bool inline memoryTrackerCanThrow()
|
||||
bool inline memoryTrackerCanThrow(VariableContext level, bool fault_injection)
|
||||
{
|
||||
return !MemoryTracker::LockExceptionInThread::isBlocked() && !std::uncaught_exceptions();
|
||||
return !MemoryTracker::LockExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
|
||||
}
|
||||
|
||||
}
|
||||
@ -64,8 +49,40 @@ namespace ProfileEvents
|
||||
|
||||
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
||||
|
||||
// BlockerInThread
|
||||
thread_local uint64_t MemoryTracker::BlockerInThread::counter = 0;
|
||||
thread_local VariableContext MemoryTracker::BlockerInThread::level = VariableContext::Global;
|
||||
MemoryTracker::BlockerInThread::BlockerInThread(VariableContext level_)
|
||||
: previous_level(level)
|
||||
{
|
||||
++counter;
|
||||
level = level_;
|
||||
}
|
||||
MemoryTracker::BlockerInThread::~BlockerInThread()
|
||||
{
|
||||
--counter;
|
||||
level = previous_level;
|
||||
}
|
||||
|
||||
/// LockExceptionInThread
|
||||
thread_local uint64_t MemoryTracker::LockExceptionInThread::counter = 0;
|
||||
thread_local VariableContext MemoryTracker::LockExceptionInThread::level = VariableContext::Global;
|
||||
thread_local bool MemoryTracker::LockExceptionInThread::block_fault_injections = false;
|
||||
MemoryTracker::LockExceptionInThread::LockExceptionInThread(VariableContext level_, bool block_fault_injections_)
|
||||
: previous_level(level)
|
||||
, previous_block_fault_injections(block_fault_injections)
|
||||
{
|
||||
++counter;
|
||||
level = level_;
|
||||
block_fault_injections = block_fault_injections_;
|
||||
}
|
||||
MemoryTracker::LockExceptionInThread::~LockExceptionInThread()
|
||||
{
|
||||
--counter;
|
||||
level = previous_level;
|
||||
block_fault_injections = previous_block_fault_injections;
|
||||
}
|
||||
|
||||
|
||||
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
||||
|
||||
@ -110,8 +127,13 @@ void MemoryTracker::alloc(Int64 size)
|
||||
if (size < 0)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
|
||||
|
||||
if (BlockerInThread::isBlocked())
|
||||
if (BlockerInThread::isBlocked(level))
|
||||
{
|
||||
/// Since the BlockerInThread should respect the level, we should go to the next parent.
|
||||
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
||||
loaded_next->alloc(size);
|
||||
return;
|
||||
}
|
||||
|
||||
/** Using memory_order_relaxed means that if allocations are done simultaneously,
|
||||
* we allow exception about memory limit exceeded to be thrown only on next allocation.
|
||||
@ -144,7 +166,7 @@ void MemoryTracker::alloc(Int64 size)
|
||||
}
|
||||
|
||||
std::bernoulli_distribution fault(fault_probability);
|
||||
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow())
|
||||
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true))
|
||||
{
|
||||
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
|
||||
BlockerInThread untrack_lock;
|
||||
@ -173,7 +195,7 @@ void MemoryTracker::alloc(Int64 size)
|
||||
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
|
||||
}
|
||||
|
||||
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow())
|
||||
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false))
|
||||
{
|
||||
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
|
||||
BlockerInThread untrack_lock;
|
||||
@ -211,7 +233,7 @@ void MemoryTracker::updatePeak(Int64 will_be)
|
||||
|
||||
void MemoryTracker::free(Int64 size)
|
||||
{
|
||||
if (BlockerInThread::isBlocked())
|
||||
if (BlockerInThread::isBlocked(level))
|
||||
return;
|
||||
|
||||
std::bernoulli_distribution sample(sample_probability);
|
||||
@ -292,60 +314,3 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
|
||||
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
|
||||
;
|
||||
}
|
||||
|
||||
|
||||
namespace CurrentMemoryTracker
|
||||
{
|
||||
using DB::current_thread;
|
||||
|
||||
void alloc(Int64 size)
|
||||
{
|
||||
if (auto * memory_tracker = getMemoryTracker())
|
||||
{
|
||||
if (current_thread)
|
||||
{
|
||||
current_thread->untracked_memory += size;
|
||||
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
|
||||
{
|
||||
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
|
||||
/// more. It could be useful to enlarge Exception message in rethrow logic.
|
||||
Int64 tmp = current_thread->untracked_memory;
|
||||
current_thread->untracked_memory = 0;
|
||||
memory_tracker->alloc(tmp);
|
||||
}
|
||||
}
|
||||
/// total_memory_tracker only, ignore untracked_memory
|
||||
else
|
||||
{
|
||||
memory_tracker->alloc(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void realloc(Int64 old_size, Int64 new_size)
|
||||
{
|
||||
Int64 addition = new_size - old_size;
|
||||
addition > 0 ? alloc(addition) : free(-addition);
|
||||
}
|
||||
|
||||
void free(Int64 size)
|
||||
{
|
||||
if (auto * memory_tracker = getMemoryTracker())
|
||||
{
|
||||
if (current_thread)
|
||||
{
|
||||
current_thread->untracked_memory -= size;
|
||||
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
|
||||
{
|
||||
memory_tracker->free(-current_thread->untracked_memory);
|
||||
current_thread->untracked_memory = 0;
|
||||
}
|
||||
}
|
||||
/// total_memory_tracker only, ignore untracked_memory
|
||||
else
|
||||
{
|
||||
memory_tracker->free(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -136,11 +136,20 @@ public:
|
||||
private:
|
||||
BlockerInThread(const BlockerInThread &) = delete;
|
||||
BlockerInThread & operator=(const BlockerInThread &) = delete;
|
||||
|
||||
static thread_local uint64_t counter;
|
||||
static thread_local VariableContext level;
|
||||
|
||||
VariableContext previous_level;
|
||||
public:
|
||||
BlockerInThread() { ++counter; }
|
||||
~BlockerInThread() { --counter; }
|
||||
static bool isBlocked() { return counter > 0; }
|
||||
/// level_ - block in level and above
|
||||
BlockerInThread(VariableContext level_ = VariableContext::Global);
|
||||
~BlockerInThread();
|
||||
|
||||
static bool isBlocked(VariableContext current_level)
|
||||
{
|
||||
return counter > 0 && current_level >= level;
|
||||
}
|
||||
};
|
||||
|
||||
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
|
||||
@ -160,21 +169,24 @@ public:
|
||||
private:
|
||||
LockExceptionInThread(const LockExceptionInThread &) = delete;
|
||||
LockExceptionInThread & operator=(const LockExceptionInThread &) = delete;
|
||||
|
||||
static thread_local uint64_t counter;
|
||||
static thread_local VariableContext level;
|
||||
static thread_local bool block_fault_injections;
|
||||
|
||||
VariableContext previous_level;
|
||||
bool previous_block_fault_injections;
|
||||
public:
|
||||
LockExceptionInThread() { ++counter; }
|
||||
~LockExceptionInThread() { --counter; }
|
||||
static bool isBlocked() { return counter > 0; }
|
||||
/// level_ - block in level and above
|
||||
/// block_fault_injections_ - block in fault injection too
|
||||
LockExceptionInThread(VariableContext level_ = VariableContext::Global, bool block_fault_injections_ = true);
|
||||
~LockExceptionInThread();
|
||||
|
||||
static bool isBlocked(VariableContext current_level, bool fault_injection)
|
||||
{
|
||||
return counter > 0 && current_level >= level && (!fault_injection || (fault_injection && block_fault_injections));
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
extern MemoryTracker total_memory_tracker;
|
||||
|
||||
|
||||
/// Convenience methods, that use current thread's memory_tracker if it is available.
|
||||
namespace CurrentMemoryTracker
|
||||
{
|
||||
void alloc(Int64 size);
|
||||
void realloc(Int64 old_size, Int64 new_size);
|
||||
void free(Int64 size);
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include <common/memory.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/CurrentMemoryTracker.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <new>
|
||||
|
@ -33,6 +33,7 @@ SRCS(
|
||||
Config/ConfigProcessor.cpp
|
||||
Config/ConfigReloader.cpp
|
||||
Config/configReadClient.cpp
|
||||
CurrentMemoryTracker.cpp
|
||||
CurrentMetrics.cpp
|
||||
CurrentThread.cpp
|
||||
DNSResolver.cpp
|
||||
|
@ -384,11 +384,11 @@ static void appendBlock(const Block & from, Block & to)
|
||||
|
||||
size_t old_rows = to.rows();
|
||||
|
||||
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||
|
||||
MutableColumnPtr last_col;
|
||||
try
|
||||
{
|
||||
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::User);
|
||||
|
||||
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
|
||||
{
|
||||
const IColumn & col_from = *from.getByPosition(column_no).column.get();
|
||||
@ -402,6 +402,11 @@ static void appendBlock(const Block & from, Block & to)
|
||||
catch (...)
|
||||
{
|
||||
/// Rollback changes.
|
||||
|
||||
/// In case of rollback, it is better to ignore memory limits instead of abnormal server termination.
|
||||
/// So ignore any memory limits, even global (since memory tracking has drift).
|
||||
MemoryTracker::BlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
|
||||
|
||||
try
|
||||
{
|
||||
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
|
||||
@ -774,7 +779,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
||||
}
|
||||
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
|
||||
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::User);
|
||||
|
||||
auto insert = std::make_shared<ASTInsertQuery>();
|
||||
insert->table_id = destination_id;
|
||||
|
@ -1,5 +1,6 @@
|
||||
DROP TABLE IF EXISTS null_;
|
||||
DROP TABLE IF EXISTS buffer_;
|
||||
DROP TABLE IF EXISTS aggregation_;
|
||||
|
||||
-- Each UInt64 is 8 bytes
|
||||
-- So 10e6 rows is 80e6 bytes
|
||||
@ -30,6 +31,11 @@ INSERT INTO buffer_ SELECT toUInt64(number) FROM system.numbers LIMIT toUInt64(1
|
||||
|
||||
OPTIMIZE TABLE buffer_; -- flush just in case
|
||||
|
||||
-- create complex aggregation to fail with Memory limit exceede error while writing to Buffer()
|
||||
-- String over UInt64 is enough to trigger the problem.
|
||||
CREATE MATERIALIZED VIEW aggregation_ engine=Memory() AS SELECT toString(key) FROM null_;
|
||||
|
||||
-- Check that max_memory_usage is ignored during write from StorageBuffer
|
||||
SET min_insert_block_size_bytes=0;
|
||||
SET min_insert_block_size_rows=100e3;
|
||||
INSERT INTO buffer_ SELECT toUInt64(number) FROM system.numbers LIMIT toUInt64(10e6+1);
|
||||
@ -38,3 +44,4 @@ SELECT count() FROM buffer_;
|
||||
|
||||
DROP TABLE null_;
|
||||
DROP TABLE buffer_;
|
||||
DROP TABLE aggregation_;
|
||||
|
Loading…
Reference in New Issue
Block a user