Move LockMemoryExceptionInThread and MemoryTrackerBlockerInThread

This commit is contained in:
Azat Khuzhin 2022-01-10 22:39:10 +03:00
parent c1dea66907
commit cb70544dfe
18 changed files with 145 additions and 126 deletions

View File

@ -2,7 +2,7 @@
#include <base/scope_guard.h>
#include <base/logger_useful.h>
#include <Common/MemoryTracker.h>
#include <Common/LockMemoryExceptionInThread.h>
/// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors.
///
@ -12,8 +12,7 @@
///
/// NOTE: it should be used with caution.
#define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
)
@ -57,8 +56,7 @@
#define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \
try \
{ \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
} \
catch (...) \

View File

@ -11,6 +11,7 @@
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <base/getThreadId.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/IO.h>
@ -58,7 +59,7 @@ void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
/// but let's log it into the stderr at least.
catch (...)
{
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
const std::string & exception_message = getCurrentExceptionMessage(true);
const std::string & message = msg.getText();

View File

@ -15,6 +15,7 @@
#include <Common/formatReadable.h>
#include <Common/filesystemHelpers.h>
#include <Common/ErrorCodes.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <filesystem>
#include <Common/config_version.h>
@ -175,7 +176,7 @@ void tryLogCurrentException(const char * log_name, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
/// Poco::Logger::get can allocate memory too
tryLogCurrentExceptionImpl(&Poco::Logger::get(log_name), start_of_message);
@ -188,7 +189,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
tryLogCurrentExceptionImpl(logger, start_of_message);
}

View File

@ -0,0 +1,20 @@
#include <Common/LockMemoryExceptionInThread.h>
/// LockMemoryExceptionInThread
thread_local uint64_t LockMemoryExceptionInThread::counter = 0;
thread_local VariableContext LockMemoryExceptionInThread::level = VariableContext::Global;
thread_local bool LockMemoryExceptionInThread::block_fault_injections = false;
LockMemoryExceptionInThread::LockMemoryExceptionInThread(VariableContext level_, bool block_fault_injections_)
: previous_level(level)
, previous_block_fault_injections(block_fault_injections)
{
++counter;
level = level_;
block_fault_injections = block_fault_injections_;
}
LockMemoryExceptionInThread::~LockMemoryExceptionInThread()
{
--counter;
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Common/VariableContext.h>
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
/// - either configured memory limit reached
/// - or fault injected
///
/// So this will simply ignore the configured memory limit (and avoid fault injection).
///
/// NOTE: exception will be silently ignored, no message in log
/// (since logging from MemoryTracker::alloc() is tricky)
///
/// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if
/// stack unwinding is currently in progress in this thread (to avoid
/// std::terminate()), so you don't need to use it in this case explicitly.
struct LockMemoryExceptionInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
static thread_local bool block_fault_injections;
VariableContext previous_level;
bool previous_block_fault_injections;
public:
/// level_ - block in level and above
/// block_fault_injections_ - block in fault injection too
explicit LockMemoryExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true);
~LockMemoryExceptionInThread();
LockMemoryExceptionInThread(const LockMemoryExceptionInThread &) = delete;
LockMemoryExceptionInThread & operator=(const LockMemoryExceptionInThread &) = delete;
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections);
}
};

View File

@ -3,6 +3,8 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceCollector.h>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/formatReadable.h>
#include <Common/ProfileEvents.h>
#include <Common/thread_local_rng.h>
@ -34,7 +36,7 @@ namespace
/// noexcept(false)) will cause std::terminate()
bool inline memoryTrackerCanThrow(VariableContext level, bool fault_injection)
{
return !MemoryTracker::LockExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
return !LockMemoryExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
}
}
@ -55,41 +57,6 @@ namespace ProfileEvents
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
// BlockerInThread
thread_local uint64_t MemoryTracker::BlockerInThread::counter = 0;
thread_local VariableContext MemoryTracker::BlockerInThread::level = VariableContext::Global;
MemoryTracker::BlockerInThread::BlockerInThread(VariableContext level_)
: previous_level(level)
{
++counter;
level = level_;
}
MemoryTracker::BlockerInThread::~BlockerInThread()
{
--counter;
level = previous_level;
}
/// LockExceptionInThread
thread_local uint64_t MemoryTracker::LockExceptionInThread::counter = 0;
thread_local VariableContext MemoryTracker::LockExceptionInThread::level = VariableContext::Global;
thread_local bool MemoryTracker::LockExceptionInThread::block_fault_injections = false;
MemoryTracker::LockExceptionInThread::LockExceptionInThread(VariableContext level_, bool block_fault_injections_)
: previous_level(level)
, previous_block_fault_injections(block_fault_injections)
{
++counter;
level = level_;
block_fault_injections = block_fault_injections_;
}
MemoryTracker::LockExceptionInThread::~LockExceptionInThread()
{
--counter;
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
@ -133,9 +100,9 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
if (BlockerInThread::isBlocked(level))
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->allocImpl(size, throw_if_memory_exceeded);
return;
@ -184,7 +151,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true) && throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
@ -203,7 +170,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
bool allocation_traced = false;
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
allocation_traced = true;
@ -212,7 +179,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability && sample(thread_local_rng)))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
allocation_traced = true;
}
@ -220,7 +187,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
throw DB::Exception(
@ -237,7 +204,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
bool log_memory_usage = true;
peak_updated = updatePeak(will_be, log_memory_usage);
}
@ -249,7 +216,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (peak_updated && allocation_traced)
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemoryPeak, StackTrace(), will_be);
}
@ -288,9 +255,9 @@ bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
void MemoryTracker::free(Int64 size)
{
if (BlockerInThread::isBlocked(level))
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);
return;
@ -299,7 +266,7 @@ void MemoryTracker::free(Int64 size)
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability && sample(thread_local_rng)))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size);
}

View File

@ -31,6 +31,9 @@ extern thread_local bool memory_tracker_always_throw_logical_error_on_allocation
/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
*
* @see LockMemoryExceptionInThread
* @see MemoryTrackerBlockerInThread
*/
class MemoryTracker
{
@ -167,64 +170,6 @@ public:
/// Prints info about peak memory consumption into log.
void logPeakMemoryUsage() const;
/// To be able to temporarily stop memory tracking from current thread.
struct BlockerInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
VariableContext previous_level;
public:
/// level_ - block in level and above
explicit BlockerInThread(VariableContext level_ = VariableContext::User);
~BlockerInThread();
BlockerInThread(const BlockerInThread &) = delete;
BlockerInThread & operator=(const BlockerInThread &) = delete;
static bool isBlocked(VariableContext current_level)
{
return counter > 0 && current_level >= level;
}
};
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
/// - either configured memory limit reached
/// - or fault injected
///
/// So this will simply ignore the configured memory limit (and avoid fault injection).
///
/// NOTE: exception will be silently ignored, no message in log
/// (since logging from MemoryTracker::alloc() is tricky)
///
/// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if
/// stack unwinding is currently in progress in this thread (to avoid
/// std::terminate()), so you don't need to use it in this case explicitly.
struct LockExceptionInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
static thread_local bool block_fault_injections;
VariableContext previous_level;
bool previous_block_fault_injections;
public:
/// level_ - block in level and above
/// block_fault_injections_ - block in fault injection too
explicit LockExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true);
~LockExceptionInThread();
LockExceptionInThread(const LockExceptionInThread &) = delete;
LockExceptionInThread & operator=(const LockExceptionInThread &) = delete;
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections);
}
};
};
extern MemoryTracker total_memory_tracker;

View File

@ -0,0 +1,16 @@
#include <Common/MemoryTrackerBlockerInThread.h>
// MemoryTrackerBlockerInThread
thread_local uint64_t MemoryTrackerBlockerInThread::counter = 0;
thread_local VariableContext MemoryTrackerBlockerInThread::level = VariableContext::Global;
MemoryTrackerBlockerInThread::MemoryTrackerBlockerInThread(VariableContext level_)
: previous_level(level)
{
++counter;
level = level_;
}
MemoryTrackerBlockerInThread::~MemoryTrackerBlockerInThread()
{
--counter;
level = previous_level;
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Common/VariableContext.h>
/// To be able to temporarily stop memory tracking from current thread.
struct MemoryTrackerBlockerInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
VariableContext previous_level;
public:
/// level_ - block in level and above
explicit MemoryTrackerBlockerInThread(VariableContext level_ = VariableContext::User);
~MemoryTrackerBlockerInThread();
MemoryTrackerBlockerInThread(const MemoryTrackerBlockerInThread &) = delete;
MemoryTrackerBlockerInThread & operator=(const MemoryTrackerBlockerInThread &) = delete;
static bool isBlocked(VariableContext current_level)
{
return counter > 0 && current_level >= level;
}
};

View File

@ -8,7 +8,7 @@
#include <string.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <IO/BufferBase.h>
@ -116,7 +116,7 @@ public:
return;
/// finalize() is often called from destructors.
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
LockMemoryExceptionInThread lock(VariableContext::Global);
try
{
finalizeImpl();

View File

@ -24,6 +24,7 @@
#include <Parsers/ASTInsertQuery.h>
#include <Storages/IStorage.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/WriteHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -286,7 +287,7 @@ void SystemLog<LogElement>::add(const LogElement & element)
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;

View File

@ -16,6 +16,7 @@
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <base/errnoToString.h>
#if defined(OS_LINUX)
@ -342,7 +343,7 @@ void ThreadStatus::finalizeQueryProfiler()
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
LockMemoryExceptionInThread lock(VariableContext::Global);
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{

View File

@ -2,6 +2,7 @@
#include <Common/PODArray.h>
#include <Common/typeid_cast.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <IO/WriteBufferFromFile.h>
@ -195,7 +196,7 @@ static void setExceptionStackTrace(QueryLogElement & elem)
{
/// Disable memory tracker for stack trace.
/// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
try
{

View File

@ -15,6 +15,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/CurrentMetrics.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <base/JSON.h>
#include <base/logger_useful.h>
#include <Compression/getCompressionCodecForFile.h>
@ -617,7 +618,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
/// Memory should not be limited during ATTACH TABLE query.
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
/// Motivation: memory for index is shared between queries - not belong to the query itself.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
loadUUID();
loadColumns(require_columns_checksums);

View File

@ -2,8 +2,9 @@
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Common/CurrentMetrics.h>
#include <base/getThreadId.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
#include <base/getThreadId.h>
namespace DB

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <utility>
@ -184,7 +185,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
* And otherwise it will look like excessively growing memory consumption in context of query.
* (observed in long INSERT SELECTs)
*/
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (const auto & granule : granules_to_write)

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/ReadBufferFromFile.h>
#include <utility>
@ -47,7 +48,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
{
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
size_t file_size = disk->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);

View File

@ -13,7 +13,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
@ -467,7 +467,7 @@ static void appendBlock(const Block & from, Block & to)
MutableColumnPtr last_col;
try
{
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
if (to.rows() == 0)
{
@ -496,7 +496,7 @@ static void appendBlock(const Block & from, Block & to)
/// In case of rollback, it is better to ignore memory limits instead of abnormal server termination.
/// So ignore any memory limits, even global (since memory tracking has drift).
MemoryTracker::BlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
try
{
@ -924,7 +924,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
}
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = destination_id;