Fixed memory tracking. [#CLICKHOUSE-2910]

This commit is contained in:
Vitaliy Lyudvichenko 2018-06-09 18:29:08 +03:00
parent ed7cd86f09
commit c04dfb40a6
15 changed files with 91 additions and 66 deletions

View File

@ -521,7 +521,7 @@ Connection::Packet Connection::receivePacket()
}
else
{
LOG_TRACE(log_wrapper.get(), "Receiving packet type");
//LOG_TRACE(log_wrapper.get(), "Receiving packet type " << StackTrace().toString());
readVarUInt(res.type, *in);
}

View File

@ -18,7 +18,7 @@ namespace DB
MemoryTracker::~MemoryTracker()
{
if (peak)
if (level != VariableContext::Thread && peak)
{
try
{
@ -114,18 +114,26 @@ void MemoryTracker::free(Int64 size)
if (blocker.isCancelled())
return;
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
* To avoid negative memory usage, we "saturate" amount.
* Memory usage will be calculated with some error.
* NOTE The code is not atomic. Not worth to fix.
*/
if (new_amount < 0)
if (level == VariableContext::Thread)
{
amount.fetch_sub(new_amount);
size += new_amount;
/// Could become negative if memory allocated in this thread is freed in another one
amount.fetch_sub(size, std::memory_order_relaxed);
}
else
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
* To avoid negative memory usage, we "saturate" amount.
* Memory usage will be calculated with some error.
* NOTE: The code is not atomic. Not worth to fix.
*/
if (unlikely(new_amount < 0))
{
amount.fetch_sub(new_amount);
size += new_amount;
}
}
if (auto loaded_next = parent.load(std::memory_order_relaxed))
@ -135,14 +143,20 @@ void MemoryTracker::free(Int64 size)
}
void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
}
void MemoryTracker::reset()
{
if (!parent.load(std::memory_order_relaxed))
CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed));
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
resetCounters();
}

View File

@ -1,5 +1,6 @@
#pragma once
#include "VariableContext.h"
#include <atomic>
#include <common/Types.h>
#include <Common/CurrentMetrics.h>
@ -36,12 +37,14 @@ class MemoryTracker
const char * description = nullptr;
public:
MemoryTracker() {}
MemoryTracker(Int64 limit_) : limit(limit_) {}
MemoryTracker(MemoryTracker * parent_) : parent(parent_) {}
MemoryTracker(VariableContext level = VariableContext::Thread) : level(level) {}
MemoryTracker(Int64 limit_, VariableContext level = VariableContext::Thread) : limit(limit_), level(level) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level = VariableContext::Thread) : parent(parent_), level(level) {}
~MemoryTracker();
VariableContext level;
/** Call the following functions before calling of corresponding operations with memory allocators.
*/
void alloc(Int64 size);
@ -103,7 +106,10 @@ public:
description = description_;
}
/// Reset the accumulated data.
/// Reset the accumulated data
void resetCounters();
/// Reset the accumulated data and the parent.
void reset();
/// Prints info about peak memory consumption into log.

View File

@ -181,7 +181,7 @@ Counters global_counters(global_counters_array);
const Event Counters::num_counters = END;
Counters::Counters(Level level, Counters * parent)
Counters::Counters(VariableContext level, Counters * parent)
: counters_holder(new Counter[num_counters] {}),
parent(parent),
level(level)
@ -206,7 +206,7 @@ void Counters::reset()
Counters Counters::getPartiallyAtomicSnapshot() const
{
Counters res(Level::Snapshot, nullptr);
Counters res(VariableContext::Snapshot, nullptr);
for (Event i = 0; i < num_counters; ++i)
res.counters[i].store(counters[i].load(std::memory_order_relaxed), std::memory_order_relaxed);
return res;

View File

@ -1,5 +1,6 @@
#pragma once
#include "VariableContext.h"
#include <stddef.h>
#include <atomic>
#include <memory>
@ -27,15 +28,6 @@ namespace ProfileEvents
/// Counters - how many times each event happened
extern Counters global_counters;
enum class Level
{
Global = 0,
User,
Process,
Thread,
Snapshot
};
class Counters
{
Counter * counters = nullptr;
@ -45,14 +37,14 @@ namespace ProfileEvents
public:
Level level = Level::Thread;
VariableContext level = VariableContext::Thread;
/// By default, any instance have to increment global counters
Counters(Level level = Level::Thread, Counters * parent = &global_counters);
Counters(VariableContext level = VariableContext::Thread, Counters * parent = &global_counters);
/// Global level static initializer
Counters(Counter * allocated_counters)
: counters(allocated_counters), parent(nullptr), level(Level::Global) {}
: counters(allocated_counters), parent(nullptr), level(VariableContext::Global) {}
inline Counter & operator[] (Event event)
{

View File

@ -58,6 +58,7 @@ public:
ThreadStatus & thread = *CurrentThread::get();
LOG_DEBUG(thread.log, "Thread " << thread.thread_number << " exited");
thread.memory_tracker.logPeakMemoryUsage();
if (thread.getCurrentState() != ThreadStatus::ThreadState::DetachedFromQuery)
thread.detachQuery(true);
@ -192,14 +193,12 @@ struct TasksStatsCounters
TasksStatsCounters TasksStatsCounters::current()
{
TasksStatsCounters res;
current_thread->taskstats_getter->getStat(res.stat);
current_thread->taskstats_getter->getStat(res.stat, current_thread->os_thread_id);
return res;
}
ThreadStatus::ThreadStatus()
: performance_counters(ProfileEvents::Level::Thread),
log(&Poco::Logger::get("ThreadStatus"))
{
thread_number = Poco::ThreadNumber::get();
os_thread_id = TaskStatsInfoGetter::getCurrentTID();
@ -208,6 +207,9 @@ ThreadStatus::ThreadStatus()
last_taskstats = std::make_unique<TasksStatsCounters>();
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus");
/// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created
/// Otherwise it could lead to SIGSEGV due to current_thread dereferencing
}
@ -287,7 +289,10 @@ void ThreadStatus::attachQuery(
/// Clear stats from previous query if a new query is started
/// TODO: make separate query_thread_performance_counters and thread_performance_counters
if (queries_started != 1)
{
performance_counters.resetCounters();
memory_tracker.resetCounters();
}
*last_rusage = RusageCounters::current(query_start_time_nanoseconds);
*last_taskstats = TasksStatsCounters::current();
@ -327,6 +332,7 @@ void ThreadStatus::detachQuery(bool thread_exits)
log_profile_events = true;
}
void ThreadStatus::updatePerfomanceCountersImpl()
{
try
@ -352,7 +358,7 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
elem.read_bytes = progress_in.bytes.load(std::memory_order_relaxed);
elem.written_rows = progress_out.rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.bytes.load(std::memory_order_relaxed);
elem.memory_usage = std::max(0, memory_tracker.getPeak());
elem.memory_usage = memory_tracker.getPeak();
elem.thread_name = getThreadName();
elem.thread_number = thread_number;

View File

@ -40,8 +40,9 @@ public:
/// Linux's PID (or TGID) (the same id is shown by ps util)
Int32 os_thread_id = -1;
ProfileEvents::Counters performance_counters;
MemoryTracker memory_tracker;
/// TODO: merge them into common entity
ProfileEvents::Counters performance_counters{VariableContext::Thread};
MemoryTracker memory_tracker{VariableContext::Thread};
/// Statistics of read and write rows/bytes
Progress progress_in;
@ -67,11 +68,6 @@ public:
return thread_state.load(std::memory_order_relaxed);
}
Context * getGlobalContext()
{
return global_context.load(std::memory_order_relaxed);
}
SystemLogsQueuePtr getSystemLogsQueue() const
{
std::lock_guard lock(mutex);
@ -84,6 +80,11 @@ public:
logs_queue_ptr = logs_queue;
}
Context * getGlobalContext()
{
return global_context.load(std::memory_order_relaxed);
}
~ThreadStatus();
protected:

View File

@ -0,0 +1,12 @@
#pragma once
/// Used in ProfileEvents and MemoryTracker to determine their hierarchy level
/// The less value the higher level (zero level is the root)
enum class VariableContext
{
Global = 0,
User, /// Group of processes
Process, /// For example, a query or a merge
Thread, /// A thread of a process
Snapshot /// Does not belong to anybody
};

View File

@ -168,8 +168,8 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
/// Attach master thread
CurrentThread::attachQuery(&*process_it);
CurrentThread::getMemoryTracker().setOrRaiseLimit(settings.max_memory_usage);
CurrentThread::getMemoryTracker().setDescription("(for thread)");
/// NOTE: Do not set the limit for thread-level memory tracker since it could show unreal values
/// since allocation and deallocation could happen in different threads
if (!user_process_list.user_throttler)
{
@ -272,7 +272,6 @@ QueryStatus::QueryStatus(
query(query_),
client_info(client_info_),
priority_handle(std::move(priority_handle_)),
performance_counters(ProfileEvents::Level::Process),
num_queries_increment{CurrentMetrics::Query}
{
memory_tracker.setOrRaiseLimit(max_memory_usage);
@ -282,10 +281,7 @@ QueryStatus::QueryStatus(
memory_tracker.setFaultProbability(memory_tracker_fault_probability);
}
QueryStatus::~QueryStatus()
{
LOG_DEBUG(&Poco::Logger::get("QueryStatus"), __PRETTY_FUNCTION__ << ":" << __LINE__);
}
QueryStatus::~QueryStatus() = default;
void QueryStatus::setQueryStreams(const BlockIO & io)
{
@ -444,9 +440,7 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev
}
ProcessListForUser::ProcessListForUser()
: user_performance_counters(ProfileEvents::Level::User, &ProfileEvents::global_counters)
{}
ProcessListForUser::ProcessListForUser() = default;
}

View File

@ -96,8 +96,8 @@ protected:
QueryPriorities::Handle priority_handle;
ProfileEvents::Counters performance_counters;
MemoryTracker memory_tracker;
ProfileEvents::Counters performance_counters{VariableContext::Process};
MemoryTracker memory_tracker{VariableContext::Process};
mutable std::shared_mutex threads_mutex;
/// Key is Poco's thread_id
@ -207,9 +207,9 @@ struct ProcessListForUser
using QueryToElement = std::unordered_multimap<String, QueryStatus *>;
QueryToElement queries;
ProfileEvents::Counters user_performance_counters;
ProfileEvents::Counters user_performance_counters{VariableContext::User, &ProfileEvents::global_counters};
/// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker;
MemoryTracker user_memory_tracker{VariableContext::User};
/// Count network usage for all simultaneously running queries of single user.
ThrottlerPtr user_throttler;
@ -282,7 +282,7 @@ protected:
QueryPriorities priorities;
/// Limit and counter for memory of all simultaneously running queries.
MemoryTracker total_memory_tracker;
MemoryTracker total_memory_tracker{VariableContext::Global};
/// Limit network bandwidth for all users
ThrottlerPtr total_network_throttler;

View File

@ -30,7 +30,7 @@ Block QueryThreadLogElement::createBlock()
{std::make_shared<DataTypeUInt64>(), "read_bytes"},
{std::make_shared<DataTypeUInt64>(), "written_rows"},
{std::make_shared<DataTypeUInt64>(), "written_bytes"},
{std::make_shared<DataTypeUInt64>(), "memory_usage"},
{std::make_shared<DataTypeInt64>(), "memory_usage"},
{std::make_shared<DataTypeString>(), "thread_name"},
{std::make_shared<DataTypeUInt32>(), "thread_number"},
@ -80,7 +80,7 @@ void QueryThreadLogElement::appendToBlock(Block & block) const
columns[i++]->insert(UInt64(written_rows));
columns[i++]->insert(UInt64(written_bytes));
columns[i++]->insert(UInt64(memory_usage));
columns[i++]->insert(Int64(memory_usage));
columns[i++]->insertData(thread_name.data(), thread_name.size());
columns[i++]->insert(UInt64(thread_number));

View File

@ -22,7 +22,7 @@ struct QueryThreadLogElement
UInt64 written_rows{};
UInt64 written_bytes{};
UInt64 memory_usage{};
Int64 memory_usage{};
String thread_name;
UInt32 thread_number{};

View File

@ -1529,7 +1529,7 @@ public:
("echo", "in batch mode, print query before execution")
("max_client_network_bandwidth", po::value<int>(), "the maximum speed of data exchange over the network for the client in bytes per second.")
("compression", po::value<bool>(), "enable or disable compression")
("log-level", po::value<std::string>(), "log level")
("log-level", po::value<std::string>(), "client log level")
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
APPLY_FOR_SETTINGS(DECLARE_SETTING)
;

View File

@ -65,7 +65,7 @@ struct MergeListElement : boost::noncopyable
/// Updated only for Vertical algorithm
std::atomic<UInt64> columns_written{};
MemoryTracker memory_tracker;
MemoryTracker memory_tracker{VariableContext::Process};
MemoryTracker * background_thread_memory_tracker;
MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr;

View File

@ -70,7 +70,7 @@ number AS di10,
[hex(number), hex(number+1)] AS \`n.s\`
FROM system.numbers LIMIT $res_rows"
while [[ `get_num_parts` -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001"; done
while [[ `get_num_parts` -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001" --server_logs_file=/dev/null; done
$CLICKHOUSE_CLIENT -q "ALTER TABLE $name ADD COLUMN n.a Array(String)"
$CLICKHOUSE_CLIENT -q "ALTER TABLE $name ADD COLUMN da Array(String) DEFAULT ['def']"