diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index cbd588a25d1..59285d3b03f 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -521,7 +521,7 @@ Connection::Packet Connection::receivePacket() } else { - LOG_TRACE(log_wrapper.get(), "Receiving packet type"); + //LOG_TRACE(log_wrapper.get(), "Receiving packet type " << StackTrace().toString()); readVarUInt(res.type, *in); } diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index dc8f86c78f7..72f03d17829 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -18,7 +18,7 @@ namespace DB MemoryTracker::~MemoryTracker() { - if (peak) + if (level != VariableContext::Thread && peak) { try { @@ -114,18 +114,26 @@ void MemoryTracker::free(Int64 size) if (blocker.isCancelled()) return; - Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size; - - /** Sometimes, query could free some data, that was allocated outside of query context. - * Example: cache eviction. - * To avoid negative memory usage, we "saturate" amount. - * Memory usage will be calculated with some error. - * NOTE The code is not atomic. Not worth to fix. - */ - if (new_amount < 0) + if (level == VariableContext::Thread) { - amount.fetch_sub(new_amount); - size += new_amount; + /// Could become negative if memory allocated in this thread is freed in another one + amount.fetch_sub(size, std::memory_order_relaxed); + } + else + { + Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size; + + /** Sometimes, query could free some data, that was allocated outside of query context. + * Example: cache eviction. + * To avoid negative memory usage, we "saturate" amount. + * Memory usage will be calculated with some error. + * NOTE: The code is not atomic. Not worth to fix. + */ + if (unlikely(new_amount < 0)) + { + amount.fetch_sub(new_amount); + size += new_amount; + } } if (auto loaded_next = parent.load(std::memory_order_relaxed)) @@ -135,14 +143,20 @@ void MemoryTracker::free(Int64 size) } +void MemoryTracker::resetCounters() +{ + amount.store(0, std::memory_order_relaxed); + peak.store(0, std::memory_order_relaxed); + limit.store(0, std::memory_order_relaxed); +} + + void MemoryTracker::reset() { if (!parent.load(std::memory_order_relaxed)) CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed)); - amount.store(0, std::memory_order_relaxed); - peak.store(0, std::memory_order_relaxed); - limit.store(0, std::memory_order_relaxed); + resetCounters(); } diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 63e21da6ac1..bf6a0a42bc0 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -1,5 +1,6 @@ #pragma once +#include "VariableContext.h" #include #include #include @@ -36,12 +37,14 @@ class MemoryTracker const char * description = nullptr; public: - MemoryTracker() {} - MemoryTracker(Int64 limit_) : limit(limit_) {} - MemoryTracker(MemoryTracker * parent_) : parent(parent_) {} + MemoryTracker(VariableContext level = VariableContext::Thread) : level(level) {} + MemoryTracker(Int64 limit_, VariableContext level = VariableContext::Thread) : limit(limit_), level(level) {} + MemoryTracker(MemoryTracker * parent_, VariableContext level = VariableContext::Thread) : parent(parent_), level(level) {} ~MemoryTracker(); + VariableContext level; + /** Call the following functions before calling of corresponding operations with memory allocators. */ void alloc(Int64 size); @@ -103,7 +106,10 @@ public: description = description_; } - /// Reset the accumulated data. + /// Reset the accumulated data + void resetCounters(); + + /// Reset the accumulated data and the parent. void reset(); /// Prints info about peak memory consumption into log. diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 976f8976547..020601797f1 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -181,7 +181,7 @@ Counters global_counters(global_counters_array); const Event Counters::num_counters = END; -Counters::Counters(Level level, Counters * parent) +Counters::Counters(VariableContext level, Counters * parent) : counters_holder(new Counter[num_counters] {}), parent(parent), level(level) @@ -206,7 +206,7 @@ void Counters::reset() Counters Counters::getPartiallyAtomicSnapshot() const { - Counters res(Level::Snapshot, nullptr); + Counters res(VariableContext::Snapshot, nullptr); for (Event i = 0; i < num_counters; ++i) res.counters[i].store(counters[i].load(std::memory_order_relaxed), std::memory_order_relaxed); return res; diff --git a/dbms/src/Common/ProfileEvents.h b/dbms/src/Common/ProfileEvents.h index ed69c8d78ac..ef18ef5531a 100644 --- a/dbms/src/Common/ProfileEvents.h +++ b/dbms/src/Common/ProfileEvents.h @@ -1,5 +1,6 @@ #pragma once +#include "VariableContext.h" #include #include #include @@ -27,15 +28,6 @@ namespace ProfileEvents /// Counters - how many times each event happened extern Counters global_counters; - enum class Level - { - Global = 0, - User, - Process, - Thread, - Snapshot - }; - class Counters { Counter * counters = nullptr; @@ -45,14 +37,14 @@ namespace ProfileEvents public: - Level level = Level::Thread; + VariableContext level = VariableContext::Thread; /// By default, any instance have to increment global counters - Counters(Level level = Level::Thread, Counters * parent = &global_counters); + Counters(VariableContext level = VariableContext::Thread, Counters * parent = &global_counters); /// Global level static initializer Counters(Counter * allocated_counters) - : counters(allocated_counters), parent(nullptr), level(Level::Global) {} + : counters(allocated_counters), parent(nullptr), level(VariableContext::Global) {} inline Counter & operator[] (Event event) { diff --git a/dbms/src/Common/ThreadStatus.cpp b/dbms/src/Common/ThreadStatus.cpp index a117d9ed692..abaaa12da3e 100644 --- a/dbms/src/Common/ThreadStatus.cpp +++ b/dbms/src/Common/ThreadStatus.cpp @@ -58,6 +58,7 @@ public: ThreadStatus & thread = *CurrentThread::get(); LOG_DEBUG(thread.log, "Thread " << thread.thread_number << " exited"); + thread.memory_tracker.logPeakMemoryUsage(); if (thread.getCurrentState() != ThreadStatus::ThreadState::DetachedFromQuery) thread.detachQuery(true); @@ -192,14 +193,12 @@ struct TasksStatsCounters TasksStatsCounters TasksStatsCounters::current() { TasksStatsCounters res; - current_thread->taskstats_getter->getStat(res.stat); + current_thread->taskstats_getter->getStat(res.stat, current_thread->os_thread_id); return res; } ThreadStatus::ThreadStatus() - : performance_counters(ProfileEvents::Level::Thread), - log(&Poco::Logger::get("ThreadStatus")) { thread_number = Poco::ThreadNumber::get(); os_thread_id = TaskStatsInfoGetter::getCurrentTID(); @@ -208,6 +207,9 @@ ThreadStatus::ThreadStatus() last_taskstats = std::make_unique(); taskstats_getter = std::make_unique(); + memory_tracker.setDescription("(for thread)"); + log = &Poco::Logger::get("ThreadStatus"); + /// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created /// Otherwise it could lead to SIGSEGV due to current_thread dereferencing } @@ -287,7 +289,10 @@ void ThreadStatus::attachQuery( /// Clear stats from previous query if a new query is started /// TODO: make separate query_thread_performance_counters and thread_performance_counters if (queries_started != 1) + { performance_counters.resetCounters(); + memory_tracker.resetCounters(); + } *last_rusage = RusageCounters::current(query_start_time_nanoseconds); *last_taskstats = TasksStatsCounters::current(); @@ -327,6 +332,7 @@ void ThreadStatus::detachQuery(bool thread_exits) log_profile_events = true; } + void ThreadStatus::updatePerfomanceCountersImpl() { try @@ -352,7 +358,7 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log) elem.read_bytes = progress_in.bytes.load(std::memory_order_relaxed); elem.written_rows = progress_out.rows.load(std::memory_order_relaxed); elem.written_bytes = progress_out.bytes.load(std::memory_order_relaxed); - elem.memory_usage = std::max(0, memory_tracker.getPeak()); + elem.memory_usage = memory_tracker.getPeak(); elem.thread_name = getThreadName(); elem.thread_number = thread_number; diff --git a/dbms/src/Common/ThreadStatus.h b/dbms/src/Common/ThreadStatus.h index 7d920597191..f5ede8593a9 100644 --- a/dbms/src/Common/ThreadStatus.h +++ b/dbms/src/Common/ThreadStatus.h @@ -40,8 +40,9 @@ public: /// Linux's PID (or TGID) (the same id is shown by ps util) Int32 os_thread_id = -1; - ProfileEvents::Counters performance_counters; - MemoryTracker memory_tracker; + /// TODO: merge them into common entity + ProfileEvents::Counters performance_counters{VariableContext::Thread}; + MemoryTracker memory_tracker{VariableContext::Thread}; /// Statistics of read and write rows/bytes Progress progress_in; @@ -67,11 +68,6 @@ public: return thread_state.load(std::memory_order_relaxed); } - Context * getGlobalContext() - { - return global_context.load(std::memory_order_relaxed); - } - SystemLogsQueuePtr getSystemLogsQueue() const { std::lock_guard lock(mutex); @@ -84,6 +80,11 @@ public: logs_queue_ptr = logs_queue; } + Context * getGlobalContext() + { + return global_context.load(std::memory_order_relaxed); + } + ~ThreadStatus(); protected: diff --git a/dbms/src/Common/VariableContext.h b/dbms/src/Common/VariableContext.h new file mode 100644 index 00000000000..2fe4ffb565a --- /dev/null +++ b/dbms/src/Common/VariableContext.h @@ -0,0 +1,12 @@ +#pragma once + +/// Used in ProfileEvents and MemoryTracker to determine their hierarchy level +/// The less value the higher level (zero level is the root) +enum class VariableContext +{ + Global = 0, + User, /// Group of processes + Process, /// For example, a query or a merge + Thread, /// A thread of a process + Snapshot /// Does not belong to anybody +}; diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index b8595d604ac..0e55ccc4642 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -168,8 +168,8 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as /// Attach master thread CurrentThread::attachQuery(&*process_it); - CurrentThread::getMemoryTracker().setOrRaiseLimit(settings.max_memory_usage); - CurrentThread::getMemoryTracker().setDescription("(for thread)"); + /// NOTE: Do not set the limit for thread-level memory tracker since it could show unreal values + /// since allocation and deallocation could happen in different threads if (!user_process_list.user_throttler) { @@ -272,7 +272,6 @@ QueryStatus::QueryStatus( query(query_), client_info(client_info_), priority_handle(std::move(priority_handle_)), - performance_counters(ProfileEvents::Level::Process), num_queries_increment{CurrentMetrics::Query} { memory_tracker.setOrRaiseLimit(max_memory_usage); @@ -282,10 +281,7 @@ QueryStatus::QueryStatus( memory_tracker.setFaultProbability(memory_tracker_fault_probability); } -QueryStatus::~QueryStatus() -{ - LOG_DEBUG(&Poco::Logger::get("QueryStatus"), __PRETTY_FUNCTION__ << ":" << __LINE__); -} +QueryStatus::~QueryStatus() = default; void QueryStatus::setQueryStreams(const BlockIO & io) { @@ -444,9 +440,7 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev } -ProcessListForUser::ProcessListForUser() -: user_performance_counters(ProfileEvents::Level::User, &ProfileEvents::global_counters) -{} +ProcessListForUser::ProcessListForUser() = default; } diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index df1db61b498..584143380b4 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -96,8 +96,8 @@ protected: QueryPriorities::Handle priority_handle; - ProfileEvents::Counters performance_counters; - MemoryTracker memory_tracker; + ProfileEvents::Counters performance_counters{VariableContext::Process}; + MemoryTracker memory_tracker{VariableContext::Process}; mutable std::shared_mutex threads_mutex; /// Key is Poco's thread_id @@ -207,9 +207,9 @@ struct ProcessListForUser using QueryToElement = std::unordered_multimap; QueryToElement queries; - ProfileEvents::Counters user_performance_counters; + ProfileEvents::Counters user_performance_counters{VariableContext::User, &ProfileEvents::global_counters}; /// Limit and counter for memory of all simultaneously running queries of single user. - MemoryTracker user_memory_tracker; + MemoryTracker user_memory_tracker{VariableContext::User}; /// Count network usage for all simultaneously running queries of single user. ThrottlerPtr user_throttler; @@ -282,7 +282,7 @@ protected: QueryPriorities priorities; /// Limit and counter for memory of all simultaneously running queries. - MemoryTracker total_memory_tracker; + MemoryTracker total_memory_tracker{VariableContext::Global}; /// Limit network bandwidth for all users ThrottlerPtr total_network_throttler; diff --git a/dbms/src/Interpreters/QueryThreadLog.cpp b/dbms/src/Interpreters/QueryThreadLog.cpp index dc290917f56..7b9dbe02d83 100644 --- a/dbms/src/Interpreters/QueryThreadLog.cpp +++ b/dbms/src/Interpreters/QueryThreadLog.cpp @@ -30,7 +30,7 @@ Block QueryThreadLogElement::createBlock() {std::make_shared(), "read_bytes"}, {std::make_shared(), "written_rows"}, {std::make_shared(), "written_bytes"}, - {std::make_shared(), "memory_usage"}, + {std::make_shared(), "memory_usage"}, {std::make_shared(), "thread_name"}, {std::make_shared(), "thread_number"}, @@ -80,7 +80,7 @@ void QueryThreadLogElement::appendToBlock(Block & block) const columns[i++]->insert(UInt64(written_rows)); columns[i++]->insert(UInt64(written_bytes)); - columns[i++]->insert(UInt64(memory_usage)); + columns[i++]->insert(Int64(memory_usage)); columns[i++]->insertData(thread_name.data(), thread_name.size()); columns[i++]->insert(UInt64(thread_number)); diff --git a/dbms/src/Interpreters/QueryThreadLog.h b/dbms/src/Interpreters/QueryThreadLog.h index 3552f15c623..9de116fa8b6 100644 --- a/dbms/src/Interpreters/QueryThreadLog.h +++ b/dbms/src/Interpreters/QueryThreadLog.h @@ -22,7 +22,7 @@ struct QueryThreadLogElement UInt64 written_rows{}; UInt64 written_bytes{}; - UInt64 memory_usage{}; + Int64 memory_usage{}; String thread_name; UInt32 thread_number{}; diff --git a/dbms/src/Server/Client.cpp b/dbms/src/Server/Client.cpp index b013d195209..5b251355e88 100644 --- a/dbms/src/Server/Client.cpp +++ b/dbms/src/Server/Client.cpp @@ -1529,7 +1529,7 @@ public: ("echo", "in batch mode, print query before execution") ("max_client_network_bandwidth", po::value(), "the maximum speed of data exchange over the network for the client in bytes per second.") ("compression", po::value(), "enable or disable compression") - ("log-level", po::value(), "log level") + ("log-level", po::value(), "client log level") ("server_logs_file", po::value(), "put server logs into specified file") APPLY_FOR_SETTINGS(DECLARE_SETTING) ; diff --git a/dbms/src/Storages/MergeTree/MergeList.h b/dbms/src/Storages/MergeTree/MergeList.h index 464cca89812..7bc98a02610 100644 --- a/dbms/src/Storages/MergeTree/MergeList.h +++ b/dbms/src/Storages/MergeTree/MergeList.h @@ -65,7 +65,7 @@ struct MergeListElement : boost::noncopyable /// Updated only for Vertical algorithm std::atomic columns_written{}; - MemoryTracker memory_tracker; + MemoryTracker memory_tracker{VariableContext::Process}; MemoryTracker * background_thread_memory_tracker; MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr; diff --git a/dbms/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh b/dbms/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh index 43392107b31..c8453119eb9 100755 --- a/dbms/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh +++ b/dbms/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh @@ -70,7 +70,7 @@ number AS di10, [hex(number), hex(number+1)] AS \`n.s\` FROM system.numbers LIMIT $res_rows" -while [[ `get_num_parts` -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001"; done +while [[ `get_num_parts` -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001" --server_logs_file=/dev/null; done $CLICKHOUSE_CLIENT -q "ALTER TABLE $name ADD COLUMN n.a Array(String)" $CLICKHOUSE_CLIENT -q "ALTER TABLE $name ADD COLUMN da Array(String) DEFAULT ['def']"