mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #8344 from ClickHouse/system-stack-trace
Added system.stack_trace table.
This commit is contained in:
commit
28c7e78d36
@ -476,6 +476,7 @@ namespace ErrorCodes
|
||||
extern const int S3_ERROR = 499;
|
||||
extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500;
|
||||
extern const int CANNOT_CREATE_DATABASE = 501;
|
||||
extern const int CANNOT_SIGQUEUE = 502;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
112
dbms/src/Common/PipeFDs.cpp
Normal file
112
dbms/src/Common/PipeFDs.cpp
Normal file
@ -0,0 +1,112 @@
|
||||
#include <Common/PipeFDs.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/formatReadable.h>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <string>
|
||||
#include <algorithm>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_PIPE;
|
||||
extern const int CANNOT_FCNTL;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void LazyPipeFDs::open()
|
||||
{
|
||||
for (int & fd : fds_rw)
|
||||
if (fd >= 0)
|
||||
throw Exception("Pipe is already opened", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
#ifndef __APPLE__
|
||||
if (0 != pipe2(fds_rw, O_CLOEXEC))
|
||||
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
|
||||
#else
|
||||
if (0 != pipe(fds_rw))
|
||||
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
|
||||
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
|
||||
throwFromErrno("Cannot setup auto-close on exec for read end of pipe", ErrorCodes::CANNOT_FCNTL);
|
||||
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
|
||||
throwFromErrno("Cannot setup auto-close on exec for write end of pipe", ErrorCodes::CANNOT_FCNTL);
|
||||
#endif
|
||||
}
|
||||
|
||||
void LazyPipeFDs::close()
|
||||
{
|
||||
for (int & fd : fds_rw)
|
||||
{
|
||||
if (fd < 0)
|
||||
continue;
|
||||
if (0 != ::close(fd))
|
||||
throwFromErrno("Cannot close pipe", ErrorCodes::CANNOT_PIPE);
|
||||
fd = -1;
|
||||
}
|
||||
}
|
||||
|
||||
PipeFDs::PipeFDs()
|
||||
{
|
||||
open();
|
||||
}
|
||||
|
||||
LazyPipeFDs::~LazyPipeFDs()
|
||||
{
|
||||
try
|
||||
{
|
||||
close();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void LazyPipeFDs::setNonBlocking()
|
||||
{
|
||||
int flags = fcntl(fds_rw[1], F_GETFL, 0);
|
||||
if (-1 == flags)
|
||||
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
|
||||
if (-1 == fcntl(fds_rw[1], F_SETFL, flags | O_NONBLOCK))
|
||||
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
|
||||
}
|
||||
|
||||
void LazyPipeFDs::tryIncreaseSize(int desired_size)
|
||||
{
|
||||
#if defined(OS_LINUX)
|
||||
Poco::Logger * log = &Poco::Logger::get("Pipe");
|
||||
|
||||
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
|
||||
*/
|
||||
int pipe_size = fcntl(fds_rw[1], F_GETPIPE_SZ);
|
||||
if (-1 == pipe_size)
|
||||
{
|
||||
if (errno == EINVAL)
|
||||
{
|
||||
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
|
||||
/// It will work nevertheless.
|
||||
}
|
||||
else
|
||||
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (errno = 0; errno != EPERM && pipe_size < desired_size; pipe_size *= 2)
|
||||
if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
|
||||
throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
|
||||
|
||||
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size)));
|
||||
}
|
||||
#else
|
||||
(void)desired_size;
|
||||
#endif
|
||||
}
|
||||
|
||||
}
|
35
dbms/src/Common/PipeFDs.h
Normal file
35
dbms/src/Common/PipeFDs.h
Normal file
@ -0,0 +1,35 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Struct containing a pipe with lazy initialization.
|
||||
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
|
||||
* pipe's file descriptors.
|
||||
*/
|
||||
struct LazyPipeFDs
|
||||
{
|
||||
int fds_rw[2] = {-1, -1};
|
||||
|
||||
void open();
|
||||
void close();
|
||||
|
||||
void setNonBlocking();
|
||||
void tryIncreaseSize(int desired_size);
|
||||
|
||||
~LazyPipeFDs();
|
||||
};
|
||||
|
||||
|
||||
/** Struct which opens new pipe on creation and closes it on destruction.
|
||||
* Use `fds_rw` field to access pipe's file descriptors.
|
||||
*/
|
||||
struct PipeFDs : public LazyPipeFDs
|
||||
{
|
||||
PipeFDs();
|
||||
};
|
||||
|
||||
}
|
@ -1,12 +1,12 @@
|
||||
#include "QueryProfiler.h"
|
||||
|
||||
#include <random>
|
||||
#include <common/Pipe.h>
|
||||
#include <common/phdr_cache.h>
|
||||
#include <common/config_common.h>
|
||||
#include <Common/StackTrace.h>
|
||||
#include <common/StringRef.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/PipeFDs.h>
|
||||
#include <Common/StackTrace.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
@ -22,7 +22,7 @@ namespace ProfileEvents
|
||||
namespace DB
|
||||
{
|
||||
|
||||
extern LazyPipe trace_pipe;
|
||||
extern LazyPipeFDs trace_pipe;
|
||||
|
||||
namespace
|
||||
{
|
||||
|
@ -4,11 +4,11 @@
|
||||
#include <dlfcn.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/PipeFDs.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <port/unistd.h>
|
||||
#include <csignal>
|
||||
#include <common/Pipe.h>
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -66,9 +66,9 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(const char * filename, c
|
||||
if (!real_vfork)
|
||||
throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM);
|
||||
|
||||
Pipe pipe_stdin;
|
||||
Pipe pipe_stdout;
|
||||
Pipe pipe_stderr;
|
||||
PipeFDs pipe_stdin;
|
||||
PipeFDs pipe_stdout;
|
||||
PipeFDs pipe_stderr;
|
||||
|
||||
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
|
||||
|
||||
|
@ -24,7 +24,7 @@ public:
|
||||
/// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info
|
||||
static bool checkPermissions();
|
||||
|
||||
#if defined(__linux__)
|
||||
#if defined(OS_LINUX)
|
||||
private:
|
||||
int netlink_socket_fd = -1;
|
||||
UInt16 taskstats_family_id = 0;
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
#include <Core/Field.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <common/Pipe.h>
|
||||
#include <Common/PipeFDs.h>
|
||||
#include <Common/StackTrace.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
@ -19,13 +19,12 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
LazyPipe trace_pipe;
|
||||
LazyPipeFDs trace_pipe;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NULL_POINTER_DEREFERENCE;
|
||||
extern const int THREAD_IS_NOT_JOINABLE;
|
||||
extern const int CANNOT_FCNTL;
|
||||
}
|
||||
|
||||
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
|
||||
@ -40,36 +39,8 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
|
||||
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
|
||||
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
|
||||
*/
|
||||
int flags = fcntl(trace_pipe.fds_rw[1], F_GETFL, 0);
|
||||
if (-1 == flags)
|
||||
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
|
||||
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK))
|
||||
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
|
||||
|
||||
#if defined(OS_LINUX)
|
||||
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
|
||||
*/
|
||||
int pipe_size = fcntl(trace_pipe.fds_rw[1], F_GETPIPE_SZ);
|
||||
if (-1 == pipe_size)
|
||||
{
|
||||
if (errno == EINVAL)
|
||||
{
|
||||
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
|
||||
/// It will work nevertheless.
|
||||
}
|
||||
else
|
||||
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
|
||||
}
|
||||
else
|
||||
{
|
||||
constexpr int max_pipe_capacity_to_set = 1048576;
|
||||
for (errno = 0; errno != EPERM && pipe_size < max_pipe_capacity_to_set; pipe_size *= 2)
|
||||
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
|
||||
throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
|
||||
|
||||
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, max_pipe_capacity_to_set)));
|
||||
}
|
||||
#endif
|
||||
trace_pipe.setNonBlocking();
|
||||
trace_pipe.tryIncreaseSize(1 << 20);
|
||||
|
||||
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
|
||||
}
|
||||
|
@ -83,6 +83,10 @@ public:
|
||||
{
|
||||
abort();
|
||||
}
|
||||
else if (mode == "std::terminate")
|
||||
{
|
||||
std::terminate();
|
||||
}
|
||||
else if (mode == "use after free")
|
||||
{
|
||||
int * x_ptr;
|
||||
|
@ -197,6 +197,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
|
||||
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
|
||||
memory_tracker.setParent(nullptr);
|
||||
|
||||
query_id.clear();
|
||||
query_context = nullptr;
|
||||
thread_group.reset();
|
||||
|
||||
|
219
dbms/src/Storages/System/StorageSystemStackTrace.cpp
Normal file
219
dbms/src/Storages/System/StorageSystemStackTrace.cpp
Normal file
@ -0,0 +1,219 @@
|
||||
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
|
||||
|
||||
#include <signal.h>
|
||||
#include <poll.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <filesystem>
|
||||
|
||||
#include <ext/scope_guard.h>
|
||||
|
||||
#include <Storages/System/StorageSystemStackTrace.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Common/PipeFDs.h>
|
||||
#include <common/getThreadNumber.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_SIGQUEUE;
|
||||
extern const int CANNOT_MANIPULATE_SIGSET;
|
||||
extern const int CANNOT_SET_SIGNAL_HANDLER;
|
||||
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
const pid_t expected_pid = getpid();
|
||||
const int sig = SIGRTMIN;
|
||||
|
||||
int sequence_num = 0; /// For messages sent via pipe.
|
||||
|
||||
UInt32 thread_number{0};
|
||||
std::optional<StackTrace> stack_trace;
|
||||
|
||||
static constexpr size_t max_query_id_size = 128;
|
||||
char query_id_data[max_query_id_size];
|
||||
size_t query_id_size = 0;
|
||||
|
||||
LazyPipeFDs notification_pipe;
|
||||
|
||||
void signalHandler(int, siginfo_t * info, void * context)
|
||||
{
|
||||
/// In case malicious user is sending signals manually (for unknown reason).
|
||||
/// If we don't check - it may break our synchronization.
|
||||
if (info->si_pid != expected_pid)
|
||||
return;
|
||||
|
||||
/// Signal received too late.
|
||||
if (info->si_value.sival_int != sequence_num)
|
||||
return;
|
||||
|
||||
/// All these methods are signal-safe.
|
||||
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
|
||||
stack_trace.emplace(signal_context);
|
||||
thread_number = getThreadNumber();
|
||||
|
||||
StringRef query_id = CurrentThread::getQueryId();
|
||||
query_id_size = std::min(query_id.size, max_query_id_size);
|
||||
memcpy(query_id_data, query_id.data, query_id_size);
|
||||
|
||||
int notification_num = info->si_value.sival_int;
|
||||
ssize_t res = ::write(notification_pipe.fds_rw[1], ¬ification_num, sizeof(notification_num));
|
||||
|
||||
/// We cannot do anything if write failed.
|
||||
(void)res;
|
||||
}
|
||||
|
||||
/// Wait for data in pipe and read it.
|
||||
bool wait(int timeout_ms)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
int fd = notification_pipe.fds_rw[0];
|
||||
pollfd poll_fd{fd, POLLIN, 0};
|
||||
|
||||
int poll_res = poll(&poll_fd, 1, timeout_ms);
|
||||
if (poll_res < 0)
|
||||
{
|
||||
if (errno == EINTR)
|
||||
{
|
||||
--timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting.
|
||||
if (timeout_ms == 0)
|
||||
return false;
|
||||
continue;
|
||||
}
|
||||
|
||||
throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
|
||||
}
|
||||
if (poll_res == 0)
|
||||
return false;
|
||||
|
||||
int notification_num = 0;
|
||||
ssize_t read_res = ::read(fd, ¬ification_num, sizeof(notification_num));
|
||||
|
||||
if (read_res < 0)
|
||||
{
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
|
||||
throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
|
||||
}
|
||||
|
||||
if (read_res == sizeof(notification_num))
|
||||
{
|
||||
if (notification_num == sequence_num)
|
||||
return true;
|
||||
else
|
||||
continue; /// Drain delayed notifications.
|
||||
}
|
||||
|
||||
throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
StorageSystemStackTrace::StorageSystemStackTrace(const String & name_)
|
||||
: IStorageSystemOneBlock<StorageSystemStackTrace>(name_)
|
||||
{
|
||||
notification_pipe.open();
|
||||
|
||||
/// Setup signal handler.
|
||||
|
||||
struct sigaction sa{};
|
||||
sa.sa_sigaction = signalHandler;
|
||||
sa.sa_flags = SA_SIGINFO;
|
||||
|
||||
if (sigemptyset(&sa.sa_mask))
|
||||
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
|
||||
|
||||
if (sigaddset(&sa.sa_mask, sig))
|
||||
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
|
||||
|
||||
if (sigaction(sig, &sa, nullptr))
|
||||
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
|
||||
}
|
||||
|
||||
|
||||
NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
|
||||
{
|
||||
return
|
||||
{
|
||||
{ "thread_number", std::make_shared<DataTypeUInt32>() },
|
||||
{ "query_id", std::make_shared<DataTypeString>() },
|
||||
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
|
||||
{
|
||||
/// It shouldn't be possible to do concurrent reads from this table.
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
/// Send a signal to every thread and wait for result.
|
||||
/// We must wait for every thread one by one sequentially,
|
||||
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
|
||||
/// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals).
|
||||
|
||||
/// Obviously, results for different threads may be out of sync.
|
||||
|
||||
/// There is no better way to enumerate threads in a process other than looking into procfs.
|
||||
|
||||
std::filesystem::directory_iterator end;
|
||||
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
|
||||
{
|
||||
pid_t tid = parse<pid_t>(it->path().filename());
|
||||
|
||||
sigval sig_value{};
|
||||
sig_value.sival_int = sequence_num;
|
||||
if (0 != ::sigqueue(tid, sig, sig_value))
|
||||
{
|
||||
/// The thread may has been already finished.
|
||||
if (ESRCH == errno)
|
||||
continue;
|
||||
|
||||
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
|
||||
}
|
||||
|
||||
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
|
||||
|
||||
if (wait(100))
|
||||
{
|
||||
size_t stack_trace_size = stack_trace->getSize();
|
||||
size_t stack_trace_offset = stack_trace->getOffset();
|
||||
|
||||
Array arr;
|
||||
arr.reserve(stack_trace_size - stack_trace_offset);
|
||||
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
|
||||
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace->getFrames()[i]));
|
||||
|
||||
res_columns[0]->insert(thread_number);
|
||||
res_columns[1]->insertData(query_id_data, query_id_size);
|
||||
res_columns[2]->insert(arr);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Cannot obtain a stack trace. But create a record in result nevertheless.
|
||||
|
||||
res_columns[0]->insert(tid); /// TODO Replace all thread numbers to OS thread numbers.
|
||||
res_columns[1]->insertDefault();
|
||||
res_columns[2]->insertDefault();
|
||||
}
|
||||
|
||||
sequence_num = static_cast<int>(static_cast<unsigned>(sequence_num) + 1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
37
dbms/src/Storages/System/StorageSystemStackTrace.h
Normal file
37
dbms/src/Storages/System/StorageSystemStackTrace.h
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
|
||||
|
||||
#include <mutex>
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
#include <Storages/System/IStorageSystemOneBlock.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
|
||||
/// Allows to introspect stack trace of all server threads.
|
||||
/// It acts like an embedded debugger.
|
||||
/// More than one instance of this table cannot be used.
|
||||
class StorageSystemStackTrace : public ext::shared_ptr_helper<StorageSystemStackTrace>, public IStorageSystemOneBlock<StorageSystemStackTrace>
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageSystemStackTrace>;
|
||||
public:
|
||||
String getName() const override { return "SystemStackTrace"; }
|
||||
static NamesAndTypesList getNamesAndTypes();
|
||||
|
||||
StorageSystemStackTrace(const String & name_);
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -39,6 +39,10 @@
|
||||
#include <Storages/System/StorageSystemDisks.h>
|
||||
#include <Storages/System/StorageSystemStoragePolicies.h>
|
||||
|
||||
#ifdef OS_LINUX
|
||||
#include <Storages/System/StorageSystemStackTrace.h>
|
||||
#endif
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -65,6 +69,9 @@ void attachSystemTablesLocal(IDatabase & system_database)
|
||||
system_database.attachTable("collations", StorageSystemCollations::create("collations"));
|
||||
system_database.attachTable("table_engines", StorageSystemTableEngines::create("table_engines"));
|
||||
system_database.attachTable("contributors", StorageSystemContributors::create("contributors"));
|
||||
#ifdef OS_LINUX
|
||||
system_database.attachTable("stack_trace", StorageSystemStackTrace::create("stack_trace"));
|
||||
#endif
|
||||
}
|
||||
|
||||
void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
|
||||
|
@ -23,7 +23,6 @@ add_library (common
|
||||
src/getThreadNumber.cpp
|
||||
src/sleep.cpp
|
||||
src/argsToConfig.cpp
|
||||
src/Pipe.cpp
|
||||
src/phdr_cache.cpp
|
||||
src/coverage.cpp
|
||||
|
||||
@ -47,7 +46,6 @@ add_library (common
|
||||
include/common/setTerminalEcho.h
|
||||
include/common/find_symbols.h
|
||||
include/common/constexpr_helpers.h
|
||||
include/common/Pipe.h
|
||||
include/common/getThreadNumber.h
|
||||
include/common/sleep.h
|
||||
include/common/SimpleCache.h
|
||||
|
@ -1,34 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdexcept>
|
||||
|
||||
/**
|
||||
* Struct containing a pipe with lazy initialization.
|
||||
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
|
||||
* pipe's file descriptors.
|
||||
*/
|
||||
struct LazyPipe
|
||||
{
|
||||
int fds_rw[2] = {-1, -1};
|
||||
|
||||
LazyPipe() = default;
|
||||
|
||||
void open();
|
||||
|
||||
void close();
|
||||
|
||||
virtual ~LazyPipe() = default;
|
||||
};
|
||||
|
||||
/**
|
||||
* Struct which opens new pipe on creation and closes it on destruction.
|
||||
* Use `fds_rw` field to access pipe's file descriptors.
|
||||
*/
|
||||
struct Pipe : public LazyPipe
|
||||
{
|
||||
Pipe();
|
||||
|
||||
~Pipe();
|
||||
};
|
@ -1,45 +0,0 @@
|
||||
#include "common/Pipe.h"
|
||||
|
||||
void LazyPipe::open()
|
||||
{
|
||||
for (int & fd : fds_rw)
|
||||
{
|
||||
if (fd >= 0)
|
||||
{
|
||||
throw std::logic_error("Pipe is already opened");
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef __APPLE__
|
||||
if (0 != pipe2(fds_rw, O_CLOEXEC))
|
||||
throw std::runtime_error("Cannot create pipe");
|
||||
#else
|
||||
if (0 != pipe(fds_rw))
|
||||
throw std::runtime_error("Cannot create pipe");
|
||||
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
|
||||
throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe");
|
||||
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
|
||||
throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe");
|
||||
#endif
|
||||
}
|
||||
|
||||
void LazyPipe::close()
|
||||
{
|
||||
for (int fd : fds_rw)
|
||||
{
|
||||
if (fd >= 0)
|
||||
{
|
||||
::close(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Pipe::Pipe()
|
||||
{
|
||||
open();
|
||||
}
|
||||
|
||||
Pipe::~Pipe()
|
||||
{
|
||||
close();
|
||||
}
|
@ -19,10 +19,12 @@
|
||||
#include <Poco/Version.h>
|
||||
#include <common/Types.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/getThreadNumber.h>
|
||||
#include <daemon/GraphiteWriter.h>
|
||||
#include <Common/Config/ConfigProcessor.h>
|
||||
#include <loggers/Loggers.h>
|
||||
|
||||
|
||||
namespace Poco { class TaskManager; }
|
||||
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include <daemon/BaseDaemon.h>
|
||||
#include <Common/Config/ConfigProcessor.h>
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/time.h>
|
||||
@ -12,20 +12,15 @@
|
||||
#include <unistd.h>
|
||||
|
||||
#include <typeinfo>
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/ErrorHandlers.h>
|
||||
#include <common/Pipe.h>
|
||||
#include <Common/StackTrace.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
#include <memory>
|
||||
|
||||
#include <Poco/Observer.h>
|
||||
#include <Poco/AutoPtr.h>
|
||||
#include <common/getThreadNumber.h>
|
||||
#include <common/coverage.h>
|
||||
#include <Poco/PatternFormatter.h>
|
||||
#include <Poco/TaskManager.h>
|
||||
#include <Poco/File.h>
|
||||
@ -37,16 +32,25 @@
|
||||
#include <Poco/Condition.h>
|
||||
#include <Poco/SyslogChannel.h>
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/ErrorHandlers.h>
|
||||
#include <common/argsToConfig.h>
|
||||
#include <common/getThreadNumber.h>
|
||||
#include <common/coverage.h>
|
||||
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
|
||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/PipeFDs.h>
|
||||
#include <Common/StackTrace.h>
|
||||
#include <Common/getMultipleKeysFromConfig.h>
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <Common/Config/ConfigProcessor.h>
|
||||
#include <Common/config_version.h>
|
||||
#include <common/argsToConfig.h>
|
||||
|
||||
#ifdef __APPLE__
|
||||
// ucontext is not available without _XOPEN_SOURCE
|
||||
@ -55,7 +59,7 @@
|
||||
#include <ucontext.h>
|
||||
|
||||
|
||||
Pipe signal_pipe;
|
||||
DB::PipeFDs signal_pipe;
|
||||
|
||||
|
||||
/** Reset signal handler to the default and send signal to itself.
|
||||
@ -68,8 +72,16 @@ static void call_default_signal_handler(int sig)
|
||||
}
|
||||
|
||||
|
||||
using ThreadNumber = decltype(getThreadNumber());
|
||||
static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(StackTrace) + sizeof(ThreadNumber);
|
||||
static constexpr size_t max_query_id_size = 127;
|
||||
|
||||
static const size_t buf_size =
|
||||
sizeof(int)
|
||||
+ sizeof(siginfo_t)
|
||||
+ sizeof(ucontext_t)
|
||||
+ sizeof(StackTrace)
|
||||
+ sizeof(UInt32)
|
||||
+ max_query_id_size + 1; /// query_id + varint encoded length
|
||||
|
||||
|
||||
using signal_function = void(int, siginfo_t*, void*);
|
||||
|
||||
@ -93,9 +105,9 @@ static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * co
|
||||
}
|
||||
|
||||
|
||||
/** Handler for "fault" signals. Send data about fault to separate thread to write into log.
|
||||
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
|
||||
*/
|
||||
static void faultSignalHandler(int sig, siginfo_t * info, void * context)
|
||||
static void signalHandler(int sig, siginfo_t * info, void * context)
|
||||
{
|
||||
char buf[buf_size];
|
||||
DB::WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], buf_size, buf);
|
||||
@ -103,11 +115,15 @@ static void faultSignalHandler(int sig, siginfo_t * info, void * context)
|
||||
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
|
||||
const StackTrace stack_trace(signal_context);
|
||||
|
||||
StringRef query_id = CurrentThread::getQueryId(); /// This is signal safe.
|
||||
query_id.size = std::min(query_id.size, max_query_id_size);
|
||||
|
||||
DB::writeBinary(sig, out);
|
||||
DB::writePODBinary(*info, out);
|
||||
DB::writePODBinary(signal_context, out);
|
||||
DB::writePODBinary(stack_trace, out);
|
||||
DB::writeBinary(getThreadNumber(), out);
|
||||
DB::writeBinary(UInt32(getThreadNumber()), out);
|
||||
DB::writeStringBinary(query_id, out);
|
||||
|
||||
out.next();
|
||||
|
||||
@ -163,7 +179,7 @@ public:
|
||||
}
|
||||
else if (sig == Signals::StdTerminate)
|
||||
{
|
||||
ThreadNumber thread_num;
|
||||
UInt32 thread_num;
|
||||
std::string message;
|
||||
|
||||
DB::readBinary(thread_num, in);
|
||||
@ -182,16 +198,18 @@ public:
|
||||
siginfo_t info;
|
||||
ucontext_t context;
|
||||
StackTrace stack_trace(NoCapture{});
|
||||
ThreadNumber thread_num;
|
||||
UInt32 thread_num;
|
||||
std::string query_id;
|
||||
|
||||
DB::readPODBinary(info, in);
|
||||
DB::readPODBinary(context, in);
|
||||
DB::readPODBinary(stack_trace, in);
|
||||
DB::readBinary(thread_num, in);
|
||||
DB::readBinary(query_id, in);
|
||||
|
||||
/// This allows to receive more signals if failure happens inside onFault function.
|
||||
/// Example: segfault while symbolizing stack trace.
|
||||
std::thread([=] { onFault(sig, info, context, stack_trace, thread_num); }).detach();
|
||||
std::thread([=] { onFault(sig, info, context, stack_trace, thread_num, query_id); }).detach();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -201,16 +219,33 @@ private:
|
||||
BaseDaemon & daemon;
|
||||
|
||||
private:
|
||||
void onTerminate(const std::string & message, ThreadNumber thread_num) const
|
||||
void onTerminate(const std::string & message, UInt32 thread_num) const
|
||||
{
|
||||
LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") " << message);
|
||||
}
|
||||
|
||||
void onFault(int sig, const siginfo_t & info, const ucontext_t & context, const StackTrace & stack_trace, ThreadNumber thread_num) const
|
||||
void onFault(
|
||||
int sig,
|
||||
const siginfo_t & info,
|
||||
const ucontext_t & context,
|
||||
const StackTrace & stack_trace,
|
||||
UInt32 thread_num,
|
||||
const std::string & query_id) const
|
||||
{
|
||||
LOG_FATAL(log, "########################################");
|
||||
LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") "
|
||||
<< "Received signal " << strsignal(sig) << " (" << sig << ")" << ".");
|
||||
|
||||
{
|
||||
std::stringstream message;
|
||||
message << "(version " << VERSION_STRING << VERSION_OFFICIAL << ")";
|
||||
message << " (from thread " << thread_num << ")";
|
||||
if (query_id.empty())
|
||||
message << " (no query)";
|
||||
else
|
||||
message << " (query_id: " << query_id << ")";
|
||||
message << " Received signal " << strsignal(sig) << " (" << sig << ")" << ".";
|
||||
|
||||
LOG_FATAL(log, message.rdbuf());
|
||||
}
|
||||
|
||||
LOG_FATAL(log, signalToErrorMessage(sig, info, context));
|
||||
|
||||
@ -265,7 +300,7 @@ static void terminate_handler()
|
||||
DB::WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
|
||||
|
||||
DB::writeBinary(static_cast<int>(SignalListener::StdTerminate), out);
|
||||
DB::writeBinary(getThreadNumber(), out);
|
||||
DB::writeBinary(UInt32(getThreadNumber()), out);
|
||||
DB::writeBinary(log_message, out);
|
||||
out.next();
|
||||
|
||||
@ -723,7 +758,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
|
||||
|
||||
/// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime.
|
||||
|
||||
add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP}, faultSignalHandler);
|
||||
add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP}, signalHandler);
|
||||
add_signal_handler({SIGHUP, SIGUSR1}, closeLogsSignalHandler);
|
||||
add_signal_handler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler);
|
||||
|
||||
@ -731,6 +766,9 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
|
||||
static KillingErrorHandler killing_error_handler;
|
||||
Poco::ErrorHandler::set(&killing_error_handler);
|
||||
|
||||
signal_pipe.setNonBlocking();
|
||||
signal_pipe.tryIncreaseSize(1 << 20);
|
||||
|
||||
signal_listener.reset(new SignalListener(*this));
|
||||
signal_listener_thread.start(*signal_listener);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user