This commit is contained in:
Ivan Lezhankin 2020-01-16 15:37:29 +03:00
parent c566f406c5
commit 1934706ca9
12 changed files with 270 additions and 149 deletions

View File

@ -52,12 +52,12 @@ IncludeCategories:
ReflowComments: false
AlignEscapedNewlinesLeft: false
AlignEscapedNewlines: DontAlign
AlignTrailingComments: true
# Not changed:
AccessModifierOffset: -4
AlignConsecutiveAssignments: false
AlignOperands: false
AlignTrailingComments: false
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false

View File

@ -93,8 +93,9 @@ public:
/// Allocate memory range.
void * alloc(size_t size, size_t alignment = 0)
{
auto * ptr = allocNoTrack(size, alignment);
CurrentMemoryTracker::alloc(size);
return allocNoTrack(size, alignment);
return ptr;
}
/// Free memory range.
@ -118,13 +119,13 @@ public:
else if (old_size < MMAP_THRESHOLD && new_size < MMAP_THRESHOLD
&& alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
buf = new_buf;
if constexpr (clear_memory)
if (new_size > old_size)
@ -132,25 +133,26 @@ public:
}
else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE,
PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
/// No need for zero-fill, because mmap guarantees it.
}
else if (new_size < MMAP_THRESHOLD)
{
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = allocNoTrack(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
freeNoTrack(buf, old_size);
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
CurrentMemoryTracker::realloc(old_size, new_size);
buf = new_buf;
}
else

View File

@ -1,12 +1,15 @@
#include <cstdlib>
#include "MemoryTracker.h"
#include <common/likely.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <common/likely.h>
#include <common/logger_useful.h>
#include <common/singleton.h>
#include <cstdlib>
namespace DB
@ -73,7 +76,7 @@ void MemoryTracker::alloc(Int64 size)
return;
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
@ -207,10 +210,13 @@ namespace CurrentMemoryTracker
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be usefull for enlarge Exception message in rethrow logic.
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
auto no_track = memory_tracker->blocker.cancel();
Singleton<DB::TraceCollector>()->collect(tmp);
}
}
}
@ -218,10 +224,7 @@ namespace CurrentMemoryTracker
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
addition > 0 ? alloc(addition) : free(-addition);
}
void free(Int64 size)

View File

@ -1,92 +1,38 @@
#include "QueryProfiler.h"
#include <random>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/thread_local_rng.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <Common/Exception.h>
#include <Common/StackTrace.h>
#include <Common/TraceCollector.h>
#include <Common/thread_local_rng.h>
#include <common/StringRef.h>
#include <common/config_common.h>
#include <common/logger_useful.h>
#include <common/phdr_cache.h>
#include <common/singleton.h>
#include <random>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
extern LazyPipeFDs trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
#if defined(OS_LINUX)
thread_local size_t write_trace_iteration = 0;
#endif
void writeTraceInfo(TraceType trace_type, int /* sig */, siginfo_t * info, void * context)
{
int overrun_count = 0;
#if defined(OS_LINUX)
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (info && info->si_overrun > 0)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % info->si_overrun == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun + 1);
return;
}
}
if (info)
overrun_count = info->si_overrun;
#else
UNUSED(info);
#endif
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // timer type
sizeof(UInt32); // thread_number
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
UInt32 thread_number = CurrentThread::get().thread_number;
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
writeChar(false, out);
writeStringBinary(query_id, out);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_number, out);
out.next();
Singleton<TraceCollector>()->collect(trace_type, stack_trace, overrun_count);
}
[[maybe_unused]] const UInt32 TIMER_PRECISION = 1e9;

View File

@ -15,13 +15,6 @@ namespace Poco
namespace DB
{
enum class TraceType : UInt8
{
REAL_TIME,
CPU_TIME,
MEMORY,
};
/**
* Query profiler implementation for selected thread.
*

View File

@ -1,25 +1,38 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Logger.h>
#include <Common/Exception.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Common/Exception.h>
#include <Interpreters/TraceLog.h>
#include <unistd.h>
#include <fcntl.h>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
LazyPipeFDs trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
thread_local size_t write_trace_iteration = 0;
}
namespace ErrorCodes
{
@ -27,20 +40,15 @@ namespace ErrorCodes
extern const int THREAD_IS_NOT_JOINABLE;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log_)
TraceCollector::TraceCollector() : log(&Poco::Logger::get("TraceCollector"))
{
if (trace_log == nullptr)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open();
pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
trace_pipe.setNonBlocking();
trace_pipe.tryIncreaseSize(1 << 20);
pipe.setNonBlocking();
pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
@ -51,11 +59,100 @@ TraceCollector::~TraceCollector()
LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined");
else
{
TraceCollector::notifyToStop();
stop();
thread.join();
}
trace_pipe.close();
pipe.close();
}
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, int overrun_count)
{
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (overrun_count)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % overrun_count == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1);
return;
}
}
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // trace type
sizeof(UInt32) + // thread_number
sizeof(Int64) + // size
sizeof(UInt64); // pointer
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
UInt32 thread_number = CurrentThread::get().thread_number;
writeChar(false, out);
writeStringBinary(query_id, out);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_number, out);
writePODBinary(0, out);
writePODBinary(0, out);
out.next();
}
void TraceCollector::collect(UInt64 size)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // trace type
sizeof(UInt32) + // thread_number
sizeof(UInt64); // size
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
UInt32 thread_number = CurrentThread::get().thread_number;
writeChar(false, out);
writeStringBinary(query_id, out);
const auto & stack_trace = StackTrace();
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(TraceType::MEMORY, out);
writePODBinary(thread_number, out);
writePODBinary(size, out);
out.next();
}
/**
@ -68,16 +165,20 @@ TraceCollector::~TraceCollector()
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void TraceCollector::notifyToStop()
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
WriteBufferFromFileDescriptor out(pipe.fds_rw[1]);
writeChar(true, out);
out.next();
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
ReadBufferFromFileDescriptor in(pipe.fds_rw[0]);
/// FIXME: use condvar to wait for |trace_log|
while (!trace_log)
sleep(1);
while (true)
{
@ -89,13 +190,13 @@ void TraceCollector::run()
std::string query_id;
readStringBinary(query_id, in);
UInt8 size = 0;
readIntBinary(size, in);
UInt8 trace_size = 0;
readIntBinary(trace_size, in);
Array trace;
trace.reserve(size);
trace.reserve(trace_size);
for (size_t i = 0; i < size; i++)
for (size_t i = 0; i < trace_size; i++)
{
uintptr_t addr = 0;
readPODBinary(addr, in);
@ -108,7 +209,13 @@ void TraceCollector::run()
UInt32 thread_number;
readPODBinary(thread_number, in);
TraceLogElement element{std::time(nullptr), trace_type, thread_number, query_id, trace};
Int64 size;
readPODBinary(size, in);
UInt64 pointer;
readPODBinary(pointer, in);
TraceLogElement element{std::time(nullptr), trace_type, thread_number, query_id, trace, size, pointer};
trace_log->add(element);
}
}

View File

@ -1,7 +1,10 @@
#pragma once
#include "Common/PipeFDs.h"
#include <Common/ThreadPool.h>
class StackTrace;
namespace Poco
{
class Logger;
@ -12,21 +15,33 @@ namespace DB
class TraceLog;
enum class TraceType : UInt8
{
REAL_TIME,
CPU_TIME,
MEMORY,
};
class TraceCollector
{
public:
TraceCollector();
~TraceCollector();
void setTraceLog(const std::shared_ptr<TraceLog> & trace_log_) { trace_log = trace_log_; }
void collect(TraceType type, const StackTrace & stack_trace, int overrun_count = 0);
void collect(UInt64 size);
void stop();
private:
Poco::Logger * log;
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
LazyPipeFDs pipe;
void run();
static void notifyToStop();
public:
TraceCollector(std::shared_ptr<TraceLog> & trace_log_);
~TraceCollector();
};
}

View File

@ -1,14 +1,16 @@
#if defined(OS_LINUX)
#include <malloc.h>
#elif defined(OS_DARWIN)
#include <malloc/malloc.h>
#endif
#include <new>
#include <common/config_common.h>
#include <common/memory.h>
#include <Common/MemoryTracker.h>
#include <iostream>
#include <new>
#if defined(OS_LINUX)
# include <malloc.h>
#elif defined(OS_DARWIN)
# include <malloc/malloc.h>
#endif
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new
/// https://en.cppreference.com/w/cpp/memory/new/operator_delete
@ -29,7 +31,7 @@ ALWAYS_INLINE void trackMemory(std::size_t size)
#endif
}
ALWAYS_INLINE bool trackMemoryNoExept(std::size_t size) noexcept
ALWAYS_INLINE bool trackMemoryNoExcept(std::size_t size) noexcept
{
try
{
@ -54,11 +56,11 @@ ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [
#else
if (size)
CurrentMemoryTracker::free(size);
#ifdef _GNU_SOURCE
# ifdef _GNU_SOURCE
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
else
CurrentMemoryTracker::free(malloc_usable_size(ptr));
#endif
# endif
#endif
}
catch (...)
@ -83,14 +85,14 @@ void * operator new[](std::size_t size)
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}

View File

@ -55,6 +55,7 @@
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
#include <Common/RemoteHostFilter.h>
#include <common/singleton.h>
namespace ProfileEvents
{
@ -298,8 +299,7 @@ struct ContextShared
schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
trace_collector.reset();
Singleton<TraceCollector>()->stop();
}
bool hasTraceCollector()
@ -312,7 +312,7 @@ struct ContextShared
if (trace_log == nullptr)
return;
trace_collector = std::make_unique<TraceCollector>(trace_log);
Singleton<TraceCollector>()->setTraceLog(trace_log);
}
private:

View File

@ -27,7 +27,9 @@ Block TraceLogElement::createBlock()
{std::make_shared<TraceDataType>(trace_values), "trace_type"},
{std::make_shared<DataTypeUInt32>(), "thread_number"},
{std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"}
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"},
{std::make_shared<DataTypeInt64>(), "size"},
{std::make_shared<DataTypeUInt64>(), "pointer"},
};
}
@ -44,6 +46,8 @@ void TraceLogElement::appendToBlock(Block & block) const
columns[i++]->insert(thread_number);
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insert(trace);
columns[i++]->insert(size);
columns[i++]->insert(pointer);
block.setColumns(std::move(columns));
}

View File

@ -1,9 +1,10 @@
#pragma once
#include <Common/QueryProfiler.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/SystemLog.h>
#include <Common/QueryProfiler.h>
#include <Common/TraceCollector.h>
namespace DB
{
@ -19,6 +20,10 @@ struct TraceLogElement
String query_id;
Array trace;
/// for |TraceType::MEMORY|
Int64 size; /// Allocation size in bytes. In case of deallocation should match the allocation size.
UInt64 pointer; /// Address of allocated region - to track the deallocations.
static std::string name() { return "TraceLog"; }
static Block createBlock();
void appendToBlock(Block & block) const;

View File

@ -0,0 +1,44 @@
#pragma once
#include <memory>
#include <type_traits>
template <class T, typename DefaultConstructable = void>
class Singleton;
/// For default-constructable type we don't need to implement |create()|
/// and may use "arrow" operator immediately.
template <class T>
class Singleton<T, std::enable_if_t<std::is_default_constructible_v<T>>>
{
public:
T * operator->()
{
static T instance;
return &instance;
}
};
/// For custom-constructed type we have to enforce call to |create()|
/// before any use of "arrow" operator.
template <class T>
class Singleton<T, std::enable_if_t<!std::is_default_constructible_v<T>>>
{
public:
Singleton() = default;
template <typename ... Args>
Singleton(const Args & ... args)
{
instance.reset(new T(args...));
/// TODO: throw exception on double-creation.
}
T * operator->()
{
return instance.get();
}
private:
inline static std::unique_ptr<T> instance{};
};