Fixed wrong code around Memory Profiler

This commit is contained in:
Alexey Milovidov 2020-03-03 03:24:44 +03:00
parent ac7860dc49
commit dc086b20a9
11 changed files with 90 additions and 157 deletions

View File

@ -52,7 +52,7 @@ IncludeCategories:
ReflowComments: false
AlignEscapedNewlinesLeft: false
AlignEscapedNewlines: DontAlign
AlignTrailingComments: true
AlignTrailingComments: false
# Not changed:
AccessModifierOffset: -4

View File

@ -1,44 +0,0 @@
#pragma once
#include <memory>
namespace ext
{
/** Thread-unsafe singleton. It works simply like a global variable.
* Supports deinitialization.
*
* In most of the cases, you don't need this class.
* Use "Meyers Singleton" instead: static T & instance() { static T x; return x; }
*/
template <class T>
class Singleton
{
public:
Singleton()
{
if (!instance)
instance = std::make_unique<T>();
}
T * operator->()
{
return instance.get();
}
static bool isInitialized()
{
return !!instance;
}
static void reset()
{
instance.reset();
}
private:
inline static std::unique_ptr<T> instance{};
};
}

View File

@ -7,7 +7,6 @@
#include <Common/formatReadable.h>
#include <common/likely.h>
#include <common/logger_useful.h>
#include <ext/singleton.h>
#include <atomic>
#include <cmath>
@ -78,7 +77,7 @@ void MemoryTracker::alloc(Int64 size)
return;
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
@ -112,8 +111,8 @@ void MemoryTracker::alloc(Int64 size)
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
auto no_track = blocker.cancel();
ext::Singleton<DB::TraceCollector>()->collect(size);
setOrRaiseProfilerLimit(current_profiler_limit + Int64(std::ceil((will_be - current_profiler_limit) / profiler_step)) * profiler_step);
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
}
if (unlikely(current_hard_limit && will_be > current_hard_limit))
@ -212,7 +211,6 @@ void MemoryTracker::setOrRaiseHardLimit(Int64 value)
void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = profiler_limit.load(std::memory_order_relaxed);
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
;

View File

@ -9,24 +9,48 @@
#include <common/config_common.h>
#include <common/logger_useful.h>
#include <common/phdr_cache.h>
#include <ext/singleton.h>
#include <random>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
namespace
{
thread_local size_t write_trace_iteration = 0;
void writeTraceInfo(TraceType trace_type, int /* sig */, siginfo_t * info, void * context)
{
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
int overrun_count = 0;
#if defined(OS_LINUX)
if (info)
overrun_count = info->si_overrun;
{
int overrun_count = info->si_overrun;
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (overrun_count)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % overrun_count == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1);
return;
}
}
}
#else
UNUSED(info);
#endif
@ -34,7 +58,7 @@ namespace
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
ext::Singleton<TraceCollector>()->collect(trace_type, stack_trace, overrun_count);
TraceCollector::collect(trace_type, stack_trace, 0);
errno = saved_errno;
}
@ -156,7 +180,7 @@ QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TraceType::REAL_TIME, sig, info, context);
writeTraceInfo(TraceType::Real, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
@ -165,7 +189,7 @@ QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TraceType::CPU_TIME, sig, info, context);
writeTraceInfo(TraceType::CPU, sig, info, context);
}
}

View File

@ -17,11 +17,6 @@
#include <fcntl.h>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
@ -30,15 +25,13 @@ namespace
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
thread_local size_t write_trace_iteration = 0;
}
namespace ErrorCodes
{
}
LazyPipeFDs pipe;
TraceCollector::TraceCollector()
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
: trace_log(std::move(trace_log_))
{
pipe.open();
@ -51,38 +44,20 @@ TraceCollector::TraceCollector()
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
else
{
stop();
thread.join();
}
pipe.close();
}
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, int overrun_count)
{
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (overrun_count)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % overrun_count == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1);
return;
}
}
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, UInt64 size)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
@ -99,7 +74,7 @@ void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trac
auto thread_id = CurrentThread::get().thread_id;
writeChar(false, out);
writeChar(false, out); /// true if requested to stop the collecting thread.
writeStringBinary(query_id, out);
size_t stack_trace_size = stack_trace.getSize();
@ -110,64 +85,27 @@ void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trac
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(UInt64(0), out);
out.next();
}
void TraceCollector::collect(UInt64 size)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // trace type
sizeof(UInt64) + // thread_id
sizeof(UInt64); // size
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
auto thread_id = CurrentThread::get().thread_id;
writeChar(false, out);
writeStringBinary(query_id, out);
const auto & stack_trace = StackTrace();
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(TraceType::MEMORY, out);
writePODBinary(thread_id, out);
writePODBinary(size, out);
out.next();
}
/**
* Sends TraceCollector stop message
/** Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(pipe.fds_rw[1]);
writeChar(true, out);
out.next();
thread.join();
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(pipe.fds_rw[0]);

View File

@ -15,28 +15,27 @@ namespace DB
class TraceLog;
enum class TraceType : UInt8
enum class TraceType : uint8_t
{
REAL_TIME,
CPU_TIME,
MEMORY,
Real,
CPU,
Memory,
};
class TraceCollector
{
public:
TraceCollector();
TraceCollector(std::shared_ptr<TraceLog> trace_log_);
~TraceCollector();
void setTraceLog(const std::shared_ptr<TraceLog> & trace_log_) { trace_log = trace_log_; }
void collect(TraceType type, const StackTrace & stack_trace, int overrun_count = 0);
void collect(UInt64 size);
/// Collect a stack trace. This method is signal safe.
/// Precondition: the TraceCollector object must be created.
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
static void collect(TraceType trace_type, const StackTrace & stack_trace, UInt64 size);
private:
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
LazyPipeFDs pipe;
void run();
void stop();

View File

@ -56,7 +56,7 @@
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
#include <Common/RemoteHostFilter.h>
#include <ext/singleton.h>
namespace ProfileEvents
{
@ -164,6 +164,8 @@ struct ContextShared
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
std::optional<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
class SessionKeyHash
@ -294,15 +296,21 @@ struct ContextShared
schedule_pool.reset();
ddl_worker.reset();
ext::Singleton<TraceCollector>::reset();
/// Stop trace collector if any
trace_collector.reset();
}
bool hasTraceCollector() const
{
return trace_collector.has_value();
}
void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
{
if (trace_log == nullptr)
if (hasTraceCollector())
return;
ext::Singleton<TraceCollector>()->setTraceLog(trace_log);
trace_collector.emplace(std::move(trace_log));
}
};
@ -1786,6 +1794,11 @@ void Context::initializeTraceCollector()
shared->initializeTraceCollector(getTraceLog());
}
bool Context::hasTraceCollector() const
{
return shared->hasTraceCollector();
}
std::shared_ptr<QueryLog> Context::getQueryLog()
{

View File

@ -516,8 +516,10 @@ public:
/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();
/// Call after initialization before using trace collector.
void initializeTraceCollector();
bool hasTraceCollector();
bool hasTraceCollector() const;
/// Nullptr if the query log is not ready for this moment.
std::shared_ptr<QueryLog> getQueryLog();

View File

@ -197,8 +197,14 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
/// Set query-level memory trackers
thread_group->memory_tracker.setOrRaiseHardLimit(process_it->max_memory_usage);
thread_group->memory_tracker.setOrRaiseProfilerLimit(settings.memory_profiler_step);
thread_group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
if (query_context.hasTraceCollector())
{
/// Set up memory profiling
thread_group->memory_tracker.setOrRaiseProfilerLimit(settings.memory_profiler_step);
thread_group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
}
thread_group->memory_tracker.setDescription("(for query)");
if (process_it->memory_tracker_fault_probability)
thread_group->memory_tracker.setFaultProbability(process_it->memory_tracker_fault_probability);

View File

@ -9,8 +9,6 @@
#include <Common/ThreadProfileEvents.h>
#include <Common/TraceCollector.h>
#include <ext/singleton.h>
#if defined(OS_LINUX)
# include <Common/hasLinuxCapability.h>
@ -157,7 +155,7 @@ void ThreadStatus::finalizePerformanceCounters()
void ThreadStatus::initQueryProfiler()
{
/// query profilers are useless without trace collector
if (!global_context || !ext::Singleton<TraceCollector>::isInitialized())
if (!global_context || !global_context->hasTraceCollector())
return;
const auto & settings = query_context->getSettingsRef();

View File

@ -14,13 +14,12 @@ struct TraceLogElement
using TraceDataType = DataTypeEnum8;
static const TraceDataType::Values trace_values;
time_t event_time;
TraceType trace_type;
UInt64 thread_id;
String query_id;
Array trace;
UInt64 size; /// Allocation size in bytes for |TraceType::MEMORY|
time_t event_time{};
TraceType trace_type{};
UInt64 thread_id{};
String query_id{};
Array trace{};
UInt64 size{}; /// Allocation size in bytes for TraceType::Memory
static std::string name() { return "TraceLog"; }
static Block createBlock();