mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #16206 from ClickHouse/fix-memory-tracking
Fix multiple issues with memory tracking
This commit is contained in:
commit
0b7430dda1
@ -30,6 +30,8 @@ namespace ProfileEvents
|
|||||||
|
|
||||||
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
||||||
|
|
||||||
|
thread_local bool MemoryTracker::BlockerInThread::is_blocked = false;
|
||||||
|
|
||||||
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
||||||
|
|
||||||
|
|
||||||
@ -56,13 +58,15 @@ MemoryTracker::~MemoryTracker()
|
|||||||
void MemoryTracker::logPeakMemoryUsage() const
|
void MemoryTracker::logPeakMemoryUsage() const
|
||||||
{
|
{
|
||||||
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
||||||
LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak));
|
LOG_DEBUG(&Poco::Logger::get("MemoryTracker"),
|
||||||
|
"Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak));
|
||||||
}
|
}
|
||||||
|
|
||||||
void MemoryTracker::logMemoryUsage(Int64 current) const
|
void MemoryTracker::logMemoryUsage(Int64 current) const
|
||||||
{
|
{
|
||||||
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
||||||
LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current));
|
LOG_DEBUG(&Poco::Logger::get("MemoryTracker"),
|
||||||
|
"Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -71,7 +75,7 @@ void MemoryTracker::alloc(Int64 size)
|
|||||||
if (size < 0)
|
if (size < 0)
|
||||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
|
||||||
|
|
||||||
if (blocker.isCancelled())
|
if (BlockerInThread::isBlocked())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/** Using memory_order_relaxed means that if allocations are done simultaneously,
|
/** Using memory_order_relaxed means that if allocations are done simultaneously,
|
||||||
@ -86,12 +90,15 @@ void MemoryTracker::alloc(Int64 size)
|
|||||||
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
|
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
|
||||||
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
|
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
/// Cap the limit to the total_memory_tracker, since it may include some drift.
|
/// Cap the limit to the total_memory_tracker, since it may include some drift
|
||||||
|
/// for user-level memory tracker.
|
||||||
///
|
///
|
||||||
/// And since total_memory_tracker is reset to the process resident
|
/// And since total_memory_tracker is reset to the process resident
|
||||||
/// memory peridically (in AsynchronousMetrics::update()), any limit can be
|
/// memory peridically (in AsynchronousMetrics::update()), any limit can be
|
||||||
/// capped to it, to avoid possible drift.
|
/// capped to it, to avoid possible drift.
|
||||||
if (unlikely(current_hard_limit && will_be > current_hard_limit))
|
if (unlikely(current_hard_limit
|
||||||
|
&& will_be > current_hard_limit
|
||||||
|
&& level == VariableContext::User))
|
||||||
{
|
{
|
||||||
Int64 total_amount = total_memory_tracker.get();
|
Int64 total_amount = total_memory_tracker.get();
|
||||||
if (amount > total_amount)
|
if (amount > total_amount)
|
||||||
@ -104,10 +111,8 @@ void MemoryTracker::alloc(Int64 size)
|
|||||||
std::bernoulli_distribution fault(fault_probability);
|
std::bernoulli_distribution fault(fault_probability);
|
||||||
if (unlikely(fault_probability && fault(thread_local_rng)))
|
if (unlikely(fault_probability && fault(thread_local_rng)))
|
||||||
{
|
{
|
||||||
free(size);
|
|
||||||
|
|
||||||
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
|
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
|
||||||
auto untrack_lock = blocker.cancel(); // NOLINT
|
BlockerInThread untrack_lock;
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
|
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
|
||||||
std::stringstream message;
|
std::stringstream message;
|
||||||
@ -118,12 +123,13 @@ void MemoryTracker::alloc(Int64 size)
|
|||||||
<< " (attempt to allocate chunk of " << size << " bytes)"
|
<< " (attempt to allocate chunk of " << size << " bytes)"
|
||||||
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
|
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
|
||||||
|
|
||||||
|
amount.fetch_sub(size, std::memory_order_relaxed);
|
||||||
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
|
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
|
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
|
||||||
{
|
{
|
||||||
auto no_track = blocker.cancel();
|
BlockerInThread untrack_lock;
|
||||||
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
|
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
|
||||||
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
|
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
|
||||||
}
|
}
|
||||||
@ -131,16 +137,14 @@ void MemoryTracker::alloc(Int64 size)
|
|||||||
std::bernoulli_distribution sample(sample_probability);
|
std::bernoulli_distribution sample(sample_probability);
|
||||||
if (unlikely(sample_probability && sample(thread_local_rng)))
|
if (unlikely(sample_probability && sample(thread_local_rng)))
|
||||||
{
|
{
|
||||||
auto no_track = blocker.cancel();
|
BlockerInThread untrack_lock;
|
||||||
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
|
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unlikely(current_hard_limit && will_be > current_hard_limit))
|
if (unlikely(current_hard_limit && will_be > current_hard_limit))
|
||||||
{
|
{
|
||||||
free(size);
|
|
||||||
|
|
||||||
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
|
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
|
||||||
auto no_track = blocker.cancel(); // NOLINT
|
BlockerInThread untrack_lock;
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
|
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
|
||||||
std::stringstream message;
|
std::stringstream message;
|
||||||
@ -151,6 +155,7 @@ void MemoryTracker::alloc(Int64 size)
|
|||||||
<< " (attempt to allocate chunk of " << size << " bytes)"
|
<< " (attempt to allocate chunk of " << size << " bytes)"
|
||||||
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
|
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
|
||||||
|
|
||||||
|
amount.fetch_sub(size, std::memory_order_relaxed);
|
||||||
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
|
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,13 +182,13 @@ void MemoryTracker::updatePeak(Int64 will_be)
|
|||||||
|
|
||||||
void MemoryTracker::free(Int64 size)
|
void MemoryTracker::free(Int64 size)
|
||||||
{
|
{
|
||||||
if (blocker.isCancelled())
|
if (BlockerInThread::isBlocked())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
std::bernoulli_distribution sample(sample_probability);
|
std::bernoulli_distribution sample(sample_probability);
|
||||||
if (unlikely(sample_probability && sample(thread_local_rng)))
|
if (unlikely(sample_probability && sample(thread_local_rng)))
|
||||||
{
|
{
|
||||||
auto no_track = blocker.cancel();
|
BlockerInThread untrack_lock;
|
||||||
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size);
|
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,11 +303,3 @@ namespace CurrentMemoryTracker
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DB::SimpleActionLock getCurrentMemoryTrackerActionLock()
|
|
||||||
{
|
|
||||||
auto * memory_tracker = DB::CurrentThread::getMemoryTracker();
|
|
||||||
if (!memory_tracker)
|
|
||||||
return {};
|
|
||||||
return memory_tracker->blocker.cancel();
|
|
||||||
}
|
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <common/types.h>
|
#include <common/types.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
#include <Common/SimpleActionBlocker.h>
|
|
||||||
#include <Common/VariableContext.h>
|
#include <Common/VariableContext.h>
|
||||||
|
|
||||||
|
|
||||||
@ -131,8 +130,18 @@ public:
|
|||||||
/// Prints info about peak memory consumption into log.
|
/// Prints info about peak memory consumption into log.
|
||||||
void logPeakMemoryUsage() const;
|
void logPeakMemoryUsage() const;
|
||||||
|
|
||||||
/// To be able to temporarily stop memory tracker
|
/// To be able to temporarily stop memory tracking from current thread.
|
||||||
DB::SimpleActionBlocker blocker;
|
struct BlockerInThread
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
BlockerInThread(const BlockerInThread &) = delete;
|
||||||
|
BlockerInThread & operator=(const BlockerInThread &) = delete;
|
||||||
|
static thread_local bool is_blocked;
|
||||||
|
public:
|
||||||
|
BlockerInThread() { is_blocked = true; }
|
||||||
|
~BlockerInThread() { is_blocked = false; }
|
||||||
|
static bool isBlocked() { return is_blocked; }
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
extern MemoryTracker total_memory_tracker;
|
extern MemoryTracker total_memory_tracker;
|
||||||
@ -145,7 +154,3 @@ namespace CurrentMemoryTracker
|
|||||||
void realloc(Int64 old_size, Int64 new_size);
|
void realloc(Int64 old_size, Int64 new_size);
|
||||||
void free(Int64 size);
|
void free(Int64 size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Holding this object will temporarily disable memory tracking.
|
|
||||||
DB::SimpleActionLock getCurrentMemoryTrackerActionLock();
|
|
||||||
|
@ -233,7 +233,7 @@ void SystemLog<LogElement>::add(const LogElement & element)
|
|||||||
/// The size of allocation can be in order of a few megabytes.
|
/// The size of allocation can be in order of a few megabytes.
|
||||||
/// But this should not be accounted for query memory usage.
|
/// But this should not be accounted for query memory usage.
|
||||||
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
|
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||||
|
|
||||||
/// Should not log messages under mutex.
|
/// Should not log messages under mutex.
|
||||||
bool queue_is_half_full = false;
|
bool queue_is_half_full = false;
|
||||||
|
@ -157,7 +157,7 @@ static void setExceptionStackTrace(QueryLogElement & elem)
|
|||||||
{
|
{
|
||||||
/// Disable memory tracker for stack trace.
|
/// Disable memory tracker for stack trace.
|
||||||
/// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
|
/// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -408,7 +408,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
|
|||||||
/// Memory should not be limited during ATTACH TABLE query.
|
/// Memory should not be limited during ATTACH TABLE query.
|
||||||
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
|
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
|
||||||
/// Motivation: memory for index is shared between queries - not belong to the query itself.
|
/// Motivation: memory for index is shared between queries - not belong to the query itself.
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||||
|
|
||||||
loadColumns(require_columns_checksums);
|
loadColumns(require_columns_checksums);
|
||||||
loadChecksums(require_columns_checksums);
|
loadChecksums(require_columns_checksums);
|
||||||
|
@ -212,7 +212,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
|
|||||||
* And otherwise it will look like excessively growing memory consumption in context of query.
|
* And otherwise it will look like excessively growing memory consumption in context of query.
|
||||||
* (observed in long INSERT SELECTs)
|
* (observed in long INSERT SELECTs)
|
||||||
*/
|
*/
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||||
|
|
||||||
/// Write index. The index contains Primary Key value for each `index_granularity` row.
|
/// Write index. The index contains Primary Key value for each `index_granularity` row.
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
|
|||||||
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
||||||
{
|
{
|
||||||
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||||
|
|
||||||
size_t file_size = disk->getFileSize(mrk_path);
|
size_t file_size = disk->getFileSize(mrk_path);
|
||||||
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
|
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
|
||||||
|
@ -316,7 +316,7 @@ static void appendBlock(const Block & from, Block & to)
|
|||||||
|
|
||||||
size_t old_rows = to.rows();
|
size_t old_rows = to.rows();
|
||||||
|
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -694,7 +694,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
|||||||
}
|
}
|
||||||
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
|
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||||
|
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
|
||||||
|
|
||||||
auto insert = std::make_shared<ASTInsertQuery>();
|
auto insert = std::make_shared<ASTInsertQuery>();
|
||||||
insert->table_id = destination_id;
|
insert->table_id = destination_id;
|
||||||
|
10
tests/queries/0_stateless/01529_bad_memory_tracking.sh
Executable file
10
tests/queries/0_stateless/01529_bad_memory_tracking.sh
Executable file
@ -0,0 +1,10 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=fatal
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
for _ in {1..10}; do
|
||||||
|
${CLICKHOUSE_CLIENT} --max_memory_usage '10G' --query "SELECT i FROM generateRandom('i Array(Int8)', 1, 1, 1048577) LIMIT 65536" 2>&1 | grep -v -P '^(Received exception from server|Code: 241)' ||:
|
||||||
|
done
|
Loading…
Reference in New Issue
Block a user