From 2619efadc8c5f1b5cb235b12c59962e03726fe12 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 21 Oct 2020 03:31:12 +0300 Subject: [PATCH 1/2] Fix multiple issues with memory tracking --- src/Common/MemoryTracker.cpp | 43 +++++++++---------- src/Common/MemoryTracker.h | 19 +++++--- src/Interpreters/SystemLog.h | 2 +- src/Interpreters/executeQuery.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 2 +- .../MergeTreeDataPartWriterOnDisk.cpp | 2 +- .../MergeTree/MergeTreeMarksLoader.cpp | 2 +- src/Storages/StorageBuffer.cpp | 4 +- .../01529_bad_memory_tracking.reference | 0 .../0_stateless/01529_bad_memory_tracking.sh | 10 +++++ 10 files changed, 49 insertions(+), 37 deletions(-) create mode 100644 tests/queries/0_stateless/01529_bad_memory_tracking.reference create mode 100755 tests/queries/0_stateless/01529_bad_memory_tracking.sh diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 5d51fc9f301..87567591ddf 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -30,6 +30,8 @@ namespace ProfileEvents static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; +thread_local bool MemoryTracker::BlockerInThread::is_blocked = false; + MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); @@ -56,13 +58,15 @@ MemoryTracker::~MemoryTracker() void MemoryTracker::logPeakMemoryUsage() const { const auto * description = description_ptr.load(std::memory_order_relaxed); - LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak)); + LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), + "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak)); } void MemoryTracker::logMemoryUsage(Int64 current) const { const auto * description = description_ptr.load(std::memory_order_relaxed); - LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current)); + LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), + "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current)); } @@ -71,7 +75,7 @@ void MemoryTracker::alloc(Int64 size) if (size < 0) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size); - if (blocker.isCancelled()) + if (BlockerInThread::isBlocked()) return; /** Using memory_order_relaxed means that if allocations are done simultaneously, @@ -86,12 +90,15 @@ void MemoryTracker::alloc(Int64 size) Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed); Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed); - /// Cap the limit to the total_memory_tracker, since it may include some drift. + /// Cap the limit to the total_memory_tracker, since it may include some drift + /// for user-level memory tracker. /// /// And since total_memory_tracker is reset to the process resident /// memory peridically (in AsynchronousMetrics::update()), any limit can be /// capped to it, to avoid possible drift. - if (unlikely(current_hard_limit && will_be > current_hard_limit)) + if (unlikely(current_hard_limit + && will_be > current_hard_limit + && level == VariableContext::User)) { Int64 total_amount = total_memory_tracker.get(); if (amount > total_amount) @@ -104,10 +111,8 @@ void MemoryTracker::alloc(Int64 size) std::bernoulli_distribution fault(fault_probability); if (unlikely(fault_probability && fault(thread_local_rng))) { - free(size); - /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc - auto untrack_lock = blocker.cancel(); // NOLINT + BlockerInThread untrack_lock; ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); std::stringstream message; @@ -118,12 +123,13 @@ void MemoryTracker::alloc(Int64 size) << " (attempt to allocate chunk of " << size << " bytes)" << ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit); + amount.fetch_sub(size, std::memory_order_relaxed); throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); } if (unlikely(current_profiler_limit && will_be > current_profiler_limit)) { - auto no_track = blocker.cancel(); + BlockerInThread untrack_lock; DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size); setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step); } @@ -131,16 +137,14 @@ void MemoryTracker::alloc(Int64 size) std::bernoulli_distribution sample(sample_probability); if (unlikely(sample_probability && sample(thread_local_rng))) { - auto no_track = blocker.cancel(); + BlockerInThread untrack_lock; DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size); } if (unlikely(current_hard_limit && will_be > current_hard_limit)) { - free(size); - /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc - auto no_track = blocker.cancel(); // NOLINT + BlockerInThread untrack_lock; ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); std::stringstream message; @@ -151,6 +155,7 @@ void MemoryTracker::alloc(Int64 size) << " (attempt to allocate chunk of " << size << " bytes)" << ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit); + amount.fetch_sub(size, std::memory_order_relaxed); throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); } @@ -177,13 +182,13 @@ void MemoryTracker::updatePeak(Int64 will_be) void MemoryTracker::free(Int64 size) { - if (blocker.isCancelled()) + if (BlockerInThread::isBlocked()) return; std::bernoulli_distribution sample(sample_probability); if (unlikely(sample_probability && sample(thread_local_rng))) { - auto no_track = blocker.cancel(); + BlockerInThread untrack_lock; DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size); } @@ -298,11 +303,3 @@ namespace CurrentMemoryTracker } } } - -DB::SimpleActionLock getCurrentMemoryTrackerActionLock() -{ - auto * memory_tracker = DB::CurrentThread::getMemoryTracker(); - if (!memory_tracker) - return {}; - return memory_tracker->blocker.cancel(); -} diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 8af683ae790..9f4f4357024 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -3,7 +3,6 @@ #include #include #include -#include #include @@ -131,8 +130,18 @@ public: /// Prints info about peak memory consumption into log. void logPeakMemoryUsage() const; - /// To be able to temporarily stop memory tracker - DB::SimpleActionBlocker blocker; + /// To be able to temporarily stop memory tracking from current thread. + struct BlockerInThread + { + private: + BlockerInThread(const BlockerInThread &) = delete; + BlockerInThread & operator=(const BlockerInThread &) = delete; + static thread_local bool is_blocked; + public: + BlockerInThread() { is_blocked = true; } + ~BlockerInThread() { is_blocked = false; } + static bool isBlocked() { return is_blocked; } + }; }; extern MemoryTracker total_memory_tracker; @@ -145,7 +154,3 @@ namespace CurrentMemoryTracker void realloc(Int64 old_size, Int64 new_size); void free(Int64 size); } - - -/// Holding this object will temporarily disable memory tracking. -DB::SimpleActionLock getCurrentMemoryTrackerActionLock(); diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index 2a0ce9cef53..99a85405348 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -233,7 +233,7 @@ void SystemLog::add(const LogElement & element) /// The size of allocation can be in order of a few megabytes. /// But this should not be accounted for query memory usage. /// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky. - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; /// Should not log messages under mutex. bool queue_is_half_full = false; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index d66fdeea46f..81768283944 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -157,7 +157,7 @@ static void setExceptionStackTrace(QueryLogElement & elem) { /// Disable memory tracker for stack trace. /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string. - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; try { diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 27d306e1642..319b486c2c6 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -408,7 +408,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks /// Memory should not be limited during ATTACH TABLE query. /// This is already true at the server startup but must be also ensured for manual table ATTACH. /// Motivation: memory for index is shared between queries - not belong to the query itself. - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; loadColumns(require_columns_checksums); loadChecksums(require_columns_checksums); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index c6b689da33a..21db4827f42 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -212,7 +212,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc * And otherwise it will look like excessively growing memory consumption in context of query. * (observed in long INSERT SELECTs) */ - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; /// Write index. The index contains Primary Key value for each `index_granularity` row. diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index a7107789bfd..c5a99b128e9 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -48,7 +48,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() { /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; size_t file_size = disk->getFileSize(mrk_path); size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark); diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 659df2026c8..87d11e32eae 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -315,7 +315,7 @@ static void appendBlock(const Block & from, Block & to) size_t old_rows = to.rows(); - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; try { @@ -693,7 +693,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl } auto destination_metadata_snapshot = table->getInMemoryMetadataPtr(); - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; auto insert = std::make_shared(); insert->table_id = destination_id; diff --git a/tests/queries/0_stateless/01529_bad_memory_tracking.reference b/tests/queries/0_stateless/01529_bad_memory_tracking.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01529_bad_memory_tracking.sh b/tests/queries/0_stateless/01529_bad_memory_tracking.sh new file mode 100755 index 00000000000..dc3851831c2 --- /dev/null +++ b/tests/queries/0_stateless/01529_bad_memory_tracking.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=fatal + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../shell_config.sh + +for _ in {1..10}; do + ${CLICKHOUSE_CLIENT} --max_memory_usage '10G' --query "SELECT i FROM generateRandom('i Array(Int8)', 1, 1, 1048577) LIMIT 65536" 2>&1 | grep -v -P '^(Received exception from server|Code: 241)' +done From 3636ff5b37294f37a15629e97c16f2a2acd7b267 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 21 Oct 2020 04:34:16 +0300 Subject: [PATCH 2/2] Fix test --- tests/queries/0_stateless/01529_bad_memory_tracking.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/01529_bad_memory_tracking.sh b/tests/queries/0_stateless/01529_bad_memory_tracking.sh index dc3851831c2..f91f6ebaf80 100755 --- a/tests/queries/0_stateless/01529_bad_memory_tracking.sh +++ b/tests/queries/0_stateless/01529_bad_memory_tracking.sh @@ -6,5 +6,5 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../shell_config.sh for _ in {1..10}; do - ${CLICKHOUSE_CLIENT} --max_memory_usage '10G' --query "SELECT i FROM generateRandom('i Array(Int8)', 1, 1, 1048577) LIMIT 65536" 2>&1 | grep -v -P '^(Received exception from server|Code: 241)' + ${CLICKHOUSE_CLIENT} --max_memory_usage '10G' --query "SELECT i FROM generateRandom('i Array(Int8)', 1, 1, 1048577) LIMIT 65536" 2>&1 | grep -v -P '^(Received exception from server|Code: 241)' ||: done