diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index da2a5bbea2b..79c8e08fd14 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -199,8 +199,10 @@ void TCPHandler::runImpl() else processOrdinaryQuery(); - sendLogs(); + /// Do it before sending end of stream, to have a chance to show log message in client. + query_scope->logPeakMemoryUsage(); + sendLogs(); sendEndOfStream(); query_scope.reset(); diff --git a/dbms/src/Common/CurrentThread.h b/dbms/src/Common/CurrentThread.h index 6daf7a224cd..9820b3620ce 100644 --- a/dbms/src/Common/CurrentThread.h +++ b/dbms/src/Common/CurrentThread.h @@ -75,11 +75,14 @@ public: static void detachQuery(); static void detachQueryIfNotDetached(); - /// Initializes query with current thread as master thread in constructor, and detaches it in desstructor + /// Initializes query with current thread as master thread in constructor, and detaches it in destructor struct QueryScope { explicit QueryScope(Context & query_context); ~QueryScope(); + + void logPeakMemoryUsage(); + bool log_peak_memory_usage_in_destructor = true; }; /// Implicitly finalizes current thread in the destructor diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 082477c1d9a..326f93c754d 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -43,8 +43,7 @@ MemoryTracker::~MemoryTracker() * then memory usage of 'next' memory trackers will be underestimated, * because amount will be decreased twice (first - here, second - when real 'free' happens). */ - if (auto value = amount.load(std::memory_order_relaxed)) - free(value); + reset(); } @@ -74,7 +73,7 @@ void MemoryTracker::alloc(Int64 size) */ Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); - if (!parent.load(std::memory_order_relaxed)) + if (metric != CurrentMetrics::end()) CurrentMetrics::add(metric, size); Int64 current_limit = limit.load(std::memory_order_relaxed); @@ -154,7 +153,8 @@ void MemoryTracker::free(Int64 size) if (auto loaded_next = parent.load(std::memory_order_relaxed)) loaded_next->free(size); - else + + if (metric != CurrentMetrics::end()) CurrentMetrics::sub(metric, size); } @@ -170,7 +170,8 @@ void MemoryTracker::resetCounters() void MemoryTracker::reset() { if (!parent.load(std::memory_order_relaxed)) - CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed)); + if (auto value = amount.load(std::memory_order_relaxed)) + free(value); resetCounters(); } diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 68c145393fe..9f439c7550c 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -7,12 +7,6 @@ #include -namespace CurrentMetrics -{ - extern const Metric MemoryTracking; -} - - /** Tracks memory consumption. * It throws an exception if amount of consumed memory become greater than certain limit. * The same memory tracker could be simultaneously used in different threads. @@ -31,7 +25,7 @@ class MemoryTracker std::atomic parent {}; /// You could specify custom metric to track memory usage. - CurrentMetrics::Metric metric = CurrentMetrics::MemoryTracking; + CurrentMetrics::Metric metric = CurrentMetrics::end(); /// This description will be used as prefix into log messages (if isn't nullptr) const char * description = nullptr; diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index 07dca964c4f..557a006663d 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -14,6 +14,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric MemoryTracking; +} + namespace DB { @@ -74,6 +79,13 @@ static bool isUnlimitedQuery(const IAST * ast) } +ProcessList::ProcessList(size_t max_size_) + : max_size(max_size_) +{ + total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); +} + + ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, Context & query_context) { EntryPtr res; @@ -207,7 +219,7 @@ ProcessListEntry::~ProcessListEntry() /// Destroy all streams to avoid long lock of ProcessList it->releaseQueryStreams(); - std::lock_guard lock(parent.mutex); + std::lock_guard lock(parent.mutex); String user = it->getClientInfo().current_user; String query_id = it->getClientInfo().current_query_id; diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index 56d46f2aaa3..87e43162202 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -287,7 +287,7 @@ protected: QueryStatus * tryGetProcessListElement(const String & current_query_id, const String & current_user); public: - ProcessList(size_t max_size_ = 0) : max_size(max_size_) {} + ProcessList(size_t max_size_ = 0); using EntryPtr = std::shared_ptr; diff --git a/dbms/src/Interpreters/ThreadStatusExt.cpp b/dbms/src/Interpreters/ThreadStatusExt.cpp index 558e2ea3f4f..eac9251cdf0 100644 --- a/dbms/src/Interpreters/ThreadStatusExt.cpp +++ b/dbms/src/Interpreters/ThreadStatusExt.cpp @@ -130,13 +130,13 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__); finalizePerformanceCounters(); - /// For better logging ({query_id} will be shown here) - if (thread_group && thread_group.use_count() == 1) - thread_group->memory_tracker.logPeakMemoryUsage(); - /// Detach from thread group performance_counters.setParent(&ProfileEvents::global_counters); + memory_tracker.reset(); + + /// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below. memory_tracker.setParent(nullptr); + query_context = nullptr; thread_group.reset(); @@ -244,10 +244,19 @@ CurrentThread::QueryScope::QueryScope(Context & query_context) CurrentThread::attachQueryContext(query_context); } +void CurrentThread::QueryScope::logPeakMemoryUsage() +{ + log_peak_memory_usage_in_destructor = false; + CurrentThread::getGroup()->memory_tracker.logPeakMemoryUsage(); +} + CurrentThread::QueryScope::~QueryScope() { try { + if (log_peak_memory_usage_in_destructor) + logPeakMemoryUsage(); + CurrentThread::detachQueryIfNotDetached(); } catch (...) diff --git a/dbms/tests/queries/0_stateless/00725_memory_tracking.reference b/dbms/tests/queries/0_stateless/00725_memory_tracking.reference new file mode 100644 index 00000000000..9590230ceee --- /dev/null +++ b/dbms/tests/queries/0_stateless/00725_memory_tracking.reference @@ -0,0 +1,3 @@ +0 +100000000 +0 diff --git a/dbms/tests/queries/0_stateless/00725_memory_tracking.sql b/dbms/tests/queries/0_stateless/00725_memory_tracking.sql new file mode 100644 index 00000000000..46d7948b1b3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00725_memory_tracking.sql @@ -0,0 +1,3 @@ +SELECT least(value, 0) FROM system.metrics WHERE metric = 'MemoryTracking'; +SELECT length(range(100000000)); +SELECT least(value, 0) FROM system.metrics WHERE metric = 'MemoryTracking';