refactor and comments

This commit is contained in:
Nikita Lapkov 2019-07-05 13:48:47 +00:00
parent dc6e6eaca7
commit b8585a5630
21 changed files with 141 additions and 180 deletions

View File

@ -294,6 +294,8 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_log>
<!-- Trace log. Stores stack traces collected by query profilers.
See query_profiler_real_time_period and query_profiler_cpu_time_period settings. -->
<trace_log>
<database>system</database>
<table>trace_log</table>

View File

@ -5,11 +5,4 @@ namespace DB
LazyPipe trace_pipe;
void CloseQueryTraceStream()
{
DB::WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
DB::writeIntBinary(true, out);
out.next();
}
}

View File

@ -27,28 +27,31 @@ enum class TimerType : UInt8
Cpu,
};
void CloseQueryTraceStream();
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr UInt32 QUERY_ID_MAX_LEN = 1024;
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context)
{
char buffer[DBMS_DEFAULT_BUFFER_SIZE];
DB::WriteBufferFromFileDescriptor out(
/* fd */ trace_pipe.fds_rw[1],
/* buf_size */ DBMS_DEFAULT_BUFFER_SIZE,
/* existing_memory */ buffer
);
constexpr UInt32 buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(StackTrace) + // collected stack trace
sizeof(TimerType); // timer type
char buffer[buf_size];
DB::WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer);
const std::string & query_id = CurrentThread::getQueryId();
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
DB::writeIntBinary(false, out);
DB::writeChar(false, out);
DB::writeStringBinary(query_id, out);
DB::writePODBinary(stack_trace, out);
DB::writeIntBinary(timer_type, out);
DB::writePODBinary(timer_type, out);
out.next();
}
@ -56,6 +59,17 @@ namespace
}
/**
* Query profiler implementation for selected thread.
*
* This class installs timer and signal handler on creation to:
* 1. periodically pause given thread
* 2. collect thread's current stack trace
* 3. write collected stack trace to trace_pipe for TraceCollector
*
* Desctructor tries to unset timer and restore previous signal handler.
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCpu.
*/
template <typename ProfilerImpl>
class QueryProfilerBase
{
@ -112,6 +126,7 @@ private:
struct sigaction * previous_handler = nullptr;
};
/// Query profiler with timer based on real clock
class QueryProfilerReal : public QueryProfilerBase<QueryProfilerReal>
{
public:
@ -125,6 +140,7 @@ public:
}
};
/// Query profiler with timer based on CPU clock
class QueryProfilerCpu : public QueryProfilerBase<QueryProfilerCpu>
{
public:

View File

@ -10,6 +10,17 @@
#include <csignal>
#include <common/Pipe.h>
namespace
{
/// By these return codes from the child process, we learn (for sure) about errors when creating it.
enum class ReturnCodes : int
{
CANNOT_DUP_STDIN = 0x55555555, /// The value is not important, but it is chosen so that it's rare to conflict with the program return code.
CANNOT_DUP_STDOUT = 0x55555556,
CANNOT_DUP_STDERR = 0x55555557,
CANNOT_EXEC = 0x55555558,
};
}
namespace DB
{
@ -23,15 +34,6 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_CHILD_PROCESS;
}
/// By these return codes from the child process, we learn (for sure) about errors when creating it.
enum class ReturnCodes : int
{
CANNOT_DUP_STDIN = 0x55555555, /// The value is not important, but it is chosen so that it's rare to conflict with the program return code.
CANNOT_DUP_STDOUT = 0x55555556,
CANNOT_DUP_STDERR = 0x55555557,
CANNOT_EXEC = 0x55555558,
};
ShellCommand::ShellCommand(pid_t pid, int in_fd, int out_fd, int err_fd, bool terminate_in_destructor_)
: pid(pid)
, terminate_in_destructor(terminate_in_destructor_)

View File

@ -11,7 +11,6 @@
#include <shared_mutex>
#include <functional>
#include <boost/noncopyable.hpp>
#include <signal.h>
namespace Poco
@ -176,7 +175,6 @@ protected:
size_t queries_started = 0;
// CPU and Real time query profilers
bool has_query_profiler = false;
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;

View File

@ -218,8 +218,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \
M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.") \
M(SettingUInt64, query_profiler_real_time_period, 500000000, "Period for real clock timer of query profiler (in nanoseconds).") \
M(SettingUInt64, query_profiler_cpu_time_period, 500000000, "Period for CPU clock timer of query profiler (in nanoseconds).") \
M(SettingUInt64, query_profiler_real_time_period, 500000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \
M(SettingUInt64, query_profiler_cpu_time_period, 500000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \
\
\
/** Limits during query execution are part of the settings. \

View File

@ -84,17 +84,20 @@ public:
auto result_column = ColumnString::create();
size_t pos = 0;
StackTrace::Frames frames;
size_t current_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
std::vector<void *> frames;
for (; pos < offsets[i]; ++pos)
size_t current_size = 0;
for (; current_size < frames.size() && current_offset + current_size < offsets[i]; ++current_size)
{
frames.push_back(reinterpret_cast<void *>(data[pos]));
frames[current_size] = reinterpret_cast<void *>(data[current_offset + current_size]);
}
std::string backtrace = StackTrace(frames).toString();
std::string backtrace = StackTrace(frames.begin(), frames.begin() + current_size).toString();
result_column->insertDataWithTerminatingZero(backtrace.c_str(), backtrace.length() + 1);
current_offset = offsets[i];
}
block.getByPosition(result).column = std::move(result_column);

View File

@ -295,7 +295,7 @@ struct ContextShared
if (trace_collector != nullptr)
{
/// Stop trace collector
CloseQueryTraceStream();
NotifyTraceCollectorToStop();
trace_collector_thread.join();
/// Close trace pipe - definitely nobody needs to write there after

View File

@ -11,12 +11,6 @@
#include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <csignal>
#include <time.h>
#include <signal.h>
#include <sys/syscall.h>
#include <unistd.h>
/// Implement some methods of ThreadStatus and CurrentThread here to avoid extra linking dependencies in clickhouse_common_io
/// TODO It doesn't make sense.
@ -156,19 +150,12 @@ void ThreadStatus::initQueryProfiler()
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period)
);
has_query_profiler = true;
}
void ThreadStatus::finalizeQueryProfiler()
{
if (!has_query_profiler)
return;
query_profiler_real.reset(nullptr);
query_profiler_cpu.reset(nullptr);
has_query_profiler = false;
}
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)

View File

@ -1,6 +1,7 @@
#include "TraceCollector.h"
#include <common/Sleep.h>
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
@ -9,50 +10,62 @@
#include <Common/QueryProfiler.h>
#include <Interpreters/TraceLog.h>
namespace DB
using namespace DB;
/**
* Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void DB::NotifyTraceCollectorToStop()
{
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
writeIntBinary(true, out);
out.next();
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log)
: log(&Logger::get("TraceCollector"))
, trace_log(trace_log)
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log)
{
if (trace_log == nullptr)
throw Poco::Exception("Invalid trace log pointer passed");
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
while (true)
{
}
char is_last;
readChar(is_last, in);
if (is_last)
break;
void TraceCollector::run()
{
DB::ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
std::string query_id;
StackTrace stack_trace(NoCapture{});
TimerType timer_type;
while (true)
{
SleepForMicroseconds(1);
readStringBinary(query_id, in);
readPODBinary(stack_trace, in);
readPODBinary(timer_type, in);
bool is_last;
DB::readIntBinary(is_last, in);
if (is_last)
break;
const auto size = stack_trace.getSize();
const auto& frames = stack_trace.getFrames();
std::string query_id;
StackTrace stack_trace(NoCapture{});
TimerType timer_type;
Array trace;
trace.reserve(size);
for (size_t i = 0; i < size; i++)
trace.emplace_back(UInt64(reinterpret_cast<uintptr_t>(frames[i])));
DB::readStringBinary(query_id, in);
DB::readPODBinary(stack_trace, in);
DB::readIntBinary(timer_type, in);
TraceLogElement element{std::time(nullptr), timer_type, query_id, trace};
if (trace_log != nullptr)
{
const auto size = stack_trace.getSize();
const auto& frames = stack_trace.getFrames();
std::vector<UInt64> trace;
trace.reserve(size);
for (size_t i = 0; i < size; i++)
trace.push_back(reinterpret_cast<uintptr_t>(frames[i]));
TraceLogElement element{std::time(nullptr), timer_type, query_id, trace};
trace_log->add(element);
}
}
trace_log->add(element);
}
}

View File

@ -1,24 +1,28 @@
#pragma once
#include <future>
#include <Poco/Runnable.h>
#include <Poco/Logger.h>
#include <ext/singleton.h>
#include <Interpreters/Context.h>
namespace Poco
{
class Logger;
}
namespace DB
{
using Poco::Logger;
class TraceCollector : public Poco::Runnable
{
private:
Logger * log;
std::shared_ptr<TraceLog> trace_log;
void NotifyTraceCollectorToStop();
public:
TraceCollector(std::shared_ptr<TraceLog> trace_log);
class TraceCollector : public Poco::Runnable
{
private:
Poco::Logger * log;
std::shared_ptr<TraceLog> trace_log;
public:
TraceCollector(std::shared_ptr<TraceLog> trace_log);
void run() override;
};
void run() override;
};
}

View File

@ -36,13 +36,7 @@ void TraceLogElement::appendToBlock(Block &block) const
columns[i++]->insert(event_time);
columns[i++]->insert(static_cast<UInt8>(timer_type));
columns[i++]->insertData(query_id.data(), query_id.size());
{
Array trace_array;
trace_array.reserve(trace.size());
for (const UInt32 trace_address : trace)
trace_array.emplace_back(UInt64(trace_address));
columns[i++]->insert(trace_array);
}
columns[i++]->insert(trace);
block.setColumns(std::move(columns));
}

View File

@ -17,7 +17,7 @@ struct TraceLogElement
time_t event_time{};
TimerType timer_type;
String query_id{};
std::vector<UInt64> trace{};
Array trace{};
static std::string name() { return "TraceLog"; }
static Block createBlock();

View File

@ -63,8 +63,7 @@ add_library (common
include/ext/unlock_guard.h
include/ext/singleton.h
${CONFIG_COMMON}
)
${CONFIG_COMMON})
if (USE_UNWIND)
target_compile_definitions (common PRIVATE USE_UNWIND=1)
@ -133,8 +132,7 @@ target_link_libraries (common
PRIVATE
${MALLOC_LIBRARIES}
Threads::Threads
${MEMCPY_LIBRARIES}
)
${MEMCPY_LIBRARIES})
if (RT_LIBRARY)
target_link_libraries (common PRIVATE ${RT_LIBRARY})

View File

@ -1,52 +0,0 @@
include (CMakePushCheckState)
cmake_push_check_state ()
option (ENABLE_UNWIND "Enable libunwind (better stacktraces)" ON)
if (ENABLE_UNWIND)
if (CMAKE_SYSTEM MATCHES "Linux" AND NOT ARCH_ARM AND NOT ARCH_32)
option (USE_INTERNAL_UNWIND_LIBRARY "Set to FALSE to use system unwind library instead of bundled" ${NOT_UNBUNDLED})
else ()
option (USE_INTERNAL_UNWIND_LIBRARY "Set to FALSE to use system unwind library instead of bundled" OFF)
endif ()
if (NOT USE_INTERNAL_UNWIND_LIBRARY)
find_library (UNWIND_LIBRARY unwind)
find_path (UNWIND_INCLUDE_DIR NAMES unwind.h PATHS ${UNWIND_INCLUDE_PATHS})
include (CheckCXXSourceCompiles)
set(CMAKE_REQUIRED_INCLUDES ${UNWIND_INCLUDE_DIR})
set(CMAKE_REQUIRED_LIBRARIES ${UNWIND_LIBRARY})
check_cxx_source_compiles("
#include <ucontext.h>
#define UNW_LOCAL_ONLY
#include <libunwind.h>
int main () {
ucontext_t context;
unw_cursor_t cursor;
unw_init_local2(&cursor, &context, UNW_INIT_SIGNAL_FRAME);
return 0;
}
" HAVE_UNW_INIT_LOCAL2)
if (NOT HAVE_UNW_INIT_LOCAL2)
set(UNWIND_LIBRARY "")
set(UNWIND_INCLUDE_DIR "")
endif ()
endif ()
if (UNWIND_LIBRARY AND UNWIND_INCLUDE_DIR)
set (USE_UNWIND 1)
elseif (CMAKE_SYSTEM MATCHES "Linux" AND NOT ARCH_ARM AND NOT ARCH_32)
set (USE_INTERNAL_UNWIND_LIBRARY 1)
set (UNWIND_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libunwind/include")
set (UNWIND_LIBRARY unwind)
set (USE_UNWIND 1)
endif ()
endif ()
message (STATUS "Using unwind=${USE_UNWIND}: ${UNWIND_INCLUDE_DIR} : ${UNWIND_LIBRARY}")
cmake_pop_check_state ()

View File

@ -1,11 +1,7 @@
#pragma once
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <dlfcn.h>
#include <Common/Exception.h>
#include <port/unistd.h>
#include <stdexcept>
struct LazyPipe

View File

@ -1,8 +1,6 @@
#pragma once
#include <cstdint>
#include <time.h>
#include <errno.h>
void SleepForNanoseconds(uint64_t nanoseconds);

View File

@ -35,7 +35,14 @@ public:
StackTrace(NoCapture);
/// Fills stack trace frames with provided sequence
StackTrace(const std::vector<void *> & source_frames);
template <typename Iterator>
StackTrace(Iterator it, Iterator end)
{
while (size < capacity && it != end)
{
frames[size++] = *(it++);
}
}
size_t getSize() const;
const Frames & getFrames() const;

View File

@ -1,6 +1,17 @@
#include "common/Sleep.h"
#include <time.h>
#include <errno.h>
/**
* Sleep with nanoseconds precision
*
* In case query profiler is turned on, all threads spawned for
* query execution are repeatedly interrupted by signals from timer.
* Functions for relative sleep (sleep(3), nanosleep(2), etc.) have
* problems in this setup and man page for nanosleep(2) suggests
* using absolute deadlines, for instance clock_nanosleep(2).
*/
void SleepForNanoseconds(uint64_t nanoseconds)
{
const auto clock_type = CLOCK_REALTIME;

View File

@ -195,12 +195,6 @@ StackTrace::StackTrace(NoCapture)
{
}
StackTrace::StackTrace(const std::vector<void *> & source_frames)
{
for (size = 0; size < std::min(source_frames.size(), capacity); ++size)
frames[size] = source_frames[size];
}
void StackTrace::tryCapture()
{
size = 0;

View File

@ -1,7 +1,4 @@
#include <daemon/BaseDaemon.h>
#include <common/Pipe.h>
#include <loggers/OwnFormattingChannel.h>
#include <loggers/OwnPatternFormatter.h>
#include <Common/Config/ConfigProcessor.h>
#include <sys/stat.h>
#include <sys/types.h>