From e0000bef989a7fff327f22e8cf4e4443e0e45dff Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 20:20:33 +0300 Subject: [PATCH 01/16] Added "system.stack_trace" table (development) --- dbms/src/Common/ErrorCodes.cpp | 1 + dbms/src/Common/PipeFDs.cpp | 110 +++++++++++++ dbms/src/Common/PipeFDs.h | 35 +++++ dbms/src/Common/QueryProfiler.cpp | 6 +- dbms/src/Common/ShellCommand.cpp | 8 +- dbms/src/Common/TaskStatsInfoGetter.h | 2 +- dbms/src/Common/TraceCollector.cpp | 37 +---- .../System/StorageSystemStackTrace.cpp | 145 ++++++++++++++++++ .../Storages/System/StorageSystemStackTrace.h | 30 ++++ .../Storages/System/attachSystemTables.cpp | 2 + libs/libcommon/CMakeLists.txt | 2 - libs/libcommon/include/common/Pipe.h | 34 ---- libs/libcommon/src/Pipe.cpp | 45 ------ libs/libdaemon/include/daemon/BaseDaemon.h | 8 + libs/libdaemon/src/BaseDaemon.cpp | 66 +++++--- 15 files changed, 389 insertions(+), 142 deletions(-) create mode 100644 dbms/src/Common/PipeFDs.cpp create mode 100644 dbms/src/Common/PipeFDs.h create mode 100644 dbms/src/Storages/System/StorageSystemStackTrace.cpp create mode 100644 dbms/src/Storages/System/StorageSystemStackTrace.h delete mode 100644 libs/libcommon/include/common/Pipe.h delete mode 100644 libs/libcommon/src/Pipe.cpp diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 25a3bb7ba91..25d1b015a03 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -476,6 +476,7 @@ namespace ErrorCodes extern const int S3_ERROR = 499; extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500; extern const int CANNOT_CREATE_DATABASE = 501; + extern const int CANNOT_SIGQUEUE = 502; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Common/PipeFDs.cpp b/dbms/src/Common/PipeFDs.cpp new file mode 100644 index 00000000000..463897f2c08 --- /dev/null +++ b/dbms/src/Common/PipeFDs.cpp @@ -0,0 +1,110 @@ +#include +#include +#include + +#include + +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_PIPE; + extern const int CANNOT_FCNTL; + extern const int LOGICAL_ERROR; +} + +void LazyPipeFDs::open() +{ + for (int & fd : fds_rw) + if (fd >= 0) + throw Exception("Pipe is already opened", ErrorCodes::LOGICAL_ERROR); + +#ifndef __APPLE__ + if (0 != pipe2(fds_rw, O_CLOEXEC)) + throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE); +#else + if (0 != pipe(fds_rw)) + throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE); + if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC)) + throwFromErrno("Cannot setup auto-close on exec for read end of pipe", ErrorCodes::CANNOT_FCNTL); + if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC)) + throwFromErrno("Cannot setup auto-close on exec for write end of pipe", ErrorCodes::CANNOT_FCNTL); +#endif +} + +void LazyPipeFDs::close() +{ + for (int & fd : fds_rw) + { + if (fd < 0) + continue; + if (0 != ::close(fd)) + throwFromErrno("Cannot close pipe", ErrorCodes::CANNOT_PIPE); + fd = -1; + } +} + +PipeFDs::PipeFDs() +{ + open(); +} + +LazyPipeFDs::~LazyPipeFDs() +{ + try + { + close(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + + +void LazyPipeFDs::setNonBlocking() +{ + int flags = fcntl(fds_rw[1], F_GETFL, 0); + if (-1 == flags) + throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL); + if (-1 == fcntl(fds_rw[1], F_SETFL, flags | O_NONBLOCK)) + throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL); +} + +void LazyPipeFDs::tryIncreaseSize(int desired_size) +{ +#if defined(OS_LINUX) + Poco::Logger * log = &Poco::Logger::get("Pipe"); + + /** Increase pipe size to avoid slowdown during fine-grained trace collection. + */ + int pipe_size = fcntl(fds_rw[1], F_GETPIPE_SZ); + if (-1 == pipe_size) + { + if (errno == EINVAL) + { + LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl."); + /// It will work nevertheless. + } + else + throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL); + } + else + { + for (errno = 0; errno != EPERM && pipe_size < desired_size; pipe_size *= 2) + if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM) + throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL); + + LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size))); + } +#endif +} + +} diff --git a/dbms/src/Common/PipeFDs.h b/dbms/src/Common/PipeFDs.h new file mode 100644 index 00000000000..fe76740da70 --- /dev/null +++ b/dbms/src/Common/PipeFDs.h @@ -0,0 +1,35 @@ +#pragma once + +#include + + +namespace DB +{ + +/** Struct containing a pipe with lazy initialization. + * Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access + * pipe's file descriptors. + */ +struct LazyPipeFDs +{ + int fds_rw[2] = {-1, -1}; + + void open(); + void close(); + + void setNonBlocking(); + void tryIncreaseSize(int desired_size); + + ~LazyPipeFDs(); +}; + + +/** Struct which opens new pipe on creation and closes it on destruction. + * Use `fds_rw` field to access pipe's file descriptors. + */ +struct PipeFDs : public LazyPipeFDs +{ + PipeFDs(); +}; + +} diff --git a/dbms/src/Common/QueryProfiler.cpp b/dbms/src/Common/QueryProfiler.cpp index 34d6acc27b1..e142be2e4d9 100644 --- a/dbms/src/Common/QueryProfiler.cpp +++ b/dbms/src/Common/QueryProfiler.cpp @@ -1,12 +1,12 @@ #include "QueryProfiler.h" #include -#include #include #include -#include #include #include +#include +#include #include #include #include @@ -22,7 +22,7 @@ namespace ProfileEvents namespace DB { -extern LazyPipe trace_pipe; +extern LazyPipeFDs trace_pipe; namespace { diff --git a/dbms/src/Common/ShellCommand.cpp b/dbms/src/Common/ShellCommand.cpp index 8807d795a0d..9dbe3e2f074 100644 --- a/dbms/src/Common/ShellCommand.cpp +++ b/dbms/src/Common/ShellCommand.cpp @@ -4,11 +4,11 @@ #include #include #include +#include #include #include #include #include -#include namespace { @@ -66,9 +66,9 @@ std::unique_ptr ShellCommand::executeImpl(const char * filename, c if (!real_vfork) throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM); - Pipe pipe_stdin; - Pipe pipe_stdout; - Pipe pipe_stderr; + PipeFDs pipe_stdin; + PipeFDs pipe_stdout; + PipeFDs pipe_stderr; pid_t pid = reinterpret_cast(real_vfork)(); diff --git a/dbms/src/Common/TaskStatsInfoGetter.h b/dbms/src/Common/TaskStatsInfoGetter.h index b3e35d65674..f3a581b2c78 100644 --- a/dbms/src/Common/TaskStatsInfoGetter.h +++ b/dbms/src/Common/TaskStatsInfoGetter.h @@ -24,7 +24,7 @@ public: /// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info static bool checkPermissions(); -#if defined(__linux__) +#if defined(OS_LINUX) private: int netlink_socket_fd = -1; UInt16 taskstats_family_id = 0; diff --git a/dbms/src/Common/TraceCollector.cpp b/dbms/src/Common/TraceCollector.cpp index bd06a200460..4b582c5ad4f 100644 --- a/dbms/src/Common/TraceCollector.cpp +++ b/dbms/src/Common/TraceCollector.cpp @@ -2,7 +2,7 @@ #include #include -#include +#include #include #include #include @@ -19,13 +19,12 @@ namespace DB { -LazyPipe trace_pipe; +LazyPipeFDs trace_pipe; namespace ErrorCodes { extern const int NULL_POINTER_DEREFERENCE; extern const int THREAD_IS_NOT_JOINABLE; - extern const int CANNOT_FCNTL; } TraceCollector::TraceCollector(std::shared_ptr & trace_log_) @@ -40,36 +39,8 @@ TraceCollector::TraceCollector(std::shared_ptr & trace_log_) /** Turn write end of pipe to non-blocking mode to avoid deadlocks * when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe. */ - int flags = fcntl(trace_pipe.fds_rw[1], F_GETFL, 0); - if (-1 == flags) - throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL); - if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK)) - throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL); - -#if defined(OS_LINUX) - /** Increase pipe size to avoid slowdown during fine-grained trace collection. - */ - int pipe_size = fcntl(trace_pipe.fds_rw[1], F_GETPIPE_SZ); - if (-1 == pipe_size) - { - if (errno == EINVAL) - { - LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl."); - /// It will work nevertheless. - } - else - throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL); - } - else - { - constexpr int max_pipe_capacity_to_set = 1048576; - for (errno = 0; errno != EPERM && pipe_size < max_pipe_capacity_to_set; pipe_size *= 2) - if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM) - throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size * 2), ErrorCodes::CANNOT_FCNTL); - - LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, max_pipe_capacity_to_set))); - } -#endif + trace_pipe.setNonBlocking(); + trace_pipe.tryIncreaseSize(1 << 20); thread = ThreadFromGlobalPool(&TraceCollector::run, this); } diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp new file mode 100644 index 00000000000..97149fa8712 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -0,0 +1,145 @@ +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_SIGQUEUE; +} + + +NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes() +{ + return + { + { "thread_number", std::make_shared() }, + { "query_id", std::make_shared() }, + { "trace", std::make_shared(std::make_shared()) } + }; +} + +namespace +{ + struct State + { + std::mutex mutex; + std::condition_variable condvar; + + size_t total_threads; + size_t threads_processed; + std::exception_ptr exception; + MutableColumns * columns_to_fill; + + State() { reset(); } + + void reset(MutableColumns * columns_to_fill_ = nullptr) + { + total_threads = 0; + threads_processed = 0; + exception = std::exception_ptr(); + columns_to_fill = columns_to_fill_; + } + + operator bool() + { + return columns_to_fill != nullptr; + } + }; + + State state; + + void callback(const siginfo_t &, const StackTrace & stack_trace, UInt32 thread_number) + { + std::lock_guard lock(state.mutex); + + std::cerr << thread_number << " !\n"; + + if (!state) + return; + + try + { + size_t stack_trace_size = stack_trace.getSize(); + size_t stack_trace_offset = stack_trace.getOffset(); + + Array arr; + arr.reserve(stack_trace_size - stack_trace_offset); + for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) + arr.emplace_back(reinterpret_cast(stack_trace.getFrames()[i])); + + std::cerr << thread_number << " !!\n"; + + state.columns_to_fill->at(0)->insert(thread_number); + state.columns_to_fill->at(1)->insertDefault(); + state.columns_to_fill->at(2)->insert(arr); + + std::cerr << thread_number << " !!!\n"; + + ++state.threads_processed; + + std::cerr << state.threads_processed << ", " << state.total_threads << " !!!!\n"; + if (state.threads_processed >= state.total_threads) + state.condvar.notify_one(); + } + catch (...) + { + state.reset(); + state.exception = std::current_exception(); + state.condvar.notify_one(); + } + } +} + +void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const +{ + std::unique_lock lock(state.mutex); + + state.reset(&res_columns); + SCOPE_EXIT({ state.reset(); }); + + std::cerr << state.columns_to_fill->size() << "\n"; + + /// Send a signal to every thread + std::filesystem::directory_iterator end; + for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it) + { + sigval sig_value; + sig_value.sival_ptr = reinterpret_cast(&callback); + pid_t tid = parse(it->path().filename()); + if (0 == ::sigqueue(tid, SIGTSTP, sig_value)) + { + ++state.total_threads; + } + else + { + /// The thread may have been already finished. + if (ESRCH != errno) + throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE); + } + } + + std::cerr << state.threads_processed << ", " << state.total_threads << " sent\n"; + + /// Timeout one second for the case the signal pipe will be full and messages will be dropped. + state.condvar.wait_for(lock, std::chrono::seconds(1), []{ return state.threads_processed >= state.total_threads || state.exception; }); + if (state.exception) + std::rethrow_exception(state.exception); +} + +} + diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.h b/dbms/src/Storages/System/StorageSystemStackTrace.h new file mode 100644 index 00000000000..a402f56b420 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemStackTrace.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class Context; + + +/// Allows to introspect stack trace of all server threads. +/// It acts like an embedded debugger. +class StorageSystemStackTrace : public ext::shared_ptr_helper, public IStorageSystemOneBlock +{ + friend struct ext::shared_ptr_helper; +public: + String getName() const override { return "SystemStackTrace"; } + + static NamesAndTypesList getNamesAndTypes(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override; +}; + +} + diff --git a/dbms/src/Storages/System/attachSystemTables.cpp b/dbms/src/Storages/System/attachSystemTables.cpp index 2b8e630cbed..cd224353acb 100644 --- a/dbms/src/Storages/System/attachSystemTables.cpp +++ b/dbms/src/Storages/System/attachSystemTables.cpp @@ -38,6 +38,7 @@ #include #include #include +#include namespace DB @@ -65,6 +66,7 @@ void attachSystemTablesLocal(IDatabase & system_database) system_database.attachTable("collations", StorageSystemCollations::create("collations")); system_database.attachTable("table_engines", StorageSystemTableEngines::create("table_engines")); system_database.attachTable("contributors", StorageSystemContributors::create("contributors")); + system_database.attachTable("stack_trace", StorageSystemStackTrace::create("stack_trace")); } void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) diff --git a/libs/libcommon/CMakeLists.txt b/libs/libcommon/CMakeLists.txt index 3e58cba0164..f9d8ea696f3 100644 --- a/libs/libcommon/CMakeLists.txt +++ b/libs/libcommon/CMakeLists.txt @@ -23,7 +23,6 @@ add_library (common src/getThreadNumber.cpp src/sleep.cpp src/argsToConfig.cpp - src/Pipe.cpp src/phdr_cache.cpp include/common/SimpleCache.h @@ -46,7 +45,6 @@ add_library (common include/common/setTerminalEcho.h include/common/find_symbols.h include/common/constexpr_helpers.h - include/common/Pipe.h include/common/getThreadNumber.h include/common/sleep.h include/common/SimpleCache.h diff --git a/libs/libcommon/include/common/Pipe.h b/libs/libcommon/include/common/Pipe.h deleted file mode 100644 index 0137c3d97af..00000000000 --- a/libs/libcommon/include/common/Pipe.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include -#include -#include - -/** - * Struct containing a pipe with lazy initialization. - * Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access - * pipe's file descriptors. - */ -struct LazyPipe -{ - int fds_rw[2] = {-1, -1}; - - LazyPipe() = default; - - void open(); - - void close(); - - virtual ~LazyPipe() = default; -}; - -/** - * Struct which opens new pipe on creation and closes it on destruction. - * Use `fds_rw` field to access pipe's file descriptors. - */ -struct Pipe : public LazyPipe -{ - Pipe(); - - ~Pipe(); -}; diff --git a/libs/libcommon/src/Pipe.cpp b/libs/libcommon/src/Pipe.cpp deleted file mode 100644 index 83268b76ea6..00000000000 --- a/libs/libcommon/src/Pipe.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "common/Pipe.h" - -void LazyPipe::open() -{ - for (int & fd : fds_rw) - { - if (fd >= 0) - { - throw std::logic_error("Pipe is already opened"); - } - } - -#ifndef __APPLE__ - if (0 != pipe2(fds_rw, O_CLOEXEC)) - throw std::runtime_error("Cannot create pipe"); -#else - if (0 != pipe(fds_rw)) - throw std::runtime_error("Cannot create pipe"); - if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC)) - throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe"); - if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC)) - throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe"); -#endif -} - -void LazyPipe::close() -{ - for (int fd : fds_rw) - { - if (fd >= 0) - { - ::close(fd); - } - } -} - -Pipe::Pipe() -{ - open(); -} - -Pipe::~Pipe() -{ - close(); -} diff --git a/libs/libdaemon/include/daemon/BaseDaemon.h b/libs/libdaemon/include/daemon/BaseDaemon.h index 9d323492c1f..9457d9cdbe0 100644 --- a/libs/libdaemon/include/daemon/BaseDaemon.h +++ b/libs/libdaemon/include/daemon/BaseDaemon.h @@ -19,10 +19,12 @@ #include #include #include +#include #include #include #include + namespace Poco { class TaskManager; } @@ -234,3 +236,9 @@ std::optional> BaseDaemon::tryGetInstance() else return {}; } + + +/// If you send TSTP signal with value (sigqueue) to a thread, it will make a callback +/// from a separate thread and you can call non signal-safe function from there. +using SignalCallback = void(const siginfo_t &, const StackTrace &, UInt32); + diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index 15b61c9b454..a3d56511508 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -1,5 +1,5 @@ #include -#include + #include #include #include @@ -12,19 +12,15 @@ #include #include -#include -#include -#include -#include #include #include #include #include #include #include + #include #include -#include #include #include #include @@ -36,16 +32,23 @@ #include #include #include -#include + +#include +#include +#include + #include #include #include #include #include +#include +#include +#include #include #include +#include #include -#include #ifdef __APPLE__ // ucontext is not available without _XOPEN_SOURCE @@ -54,7 +57,7 @@ #include -Pipe signal_pipe; +DB::PipeFDs signal_pipe; /** Reset signal handler to the default and send signal to itself. @@ -67,8 +70,13 @@ static void call_default_signal_handler(int sig) } -using ThreadNumber = decltype(getThreadNumber()); -static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(StackTrace) + sizeof(ThreadNumber); +/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. +/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. +constexpr size_t QUERY_ID_MAX_LEN = 1024; + +static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(StackTrace) + sizeof(UInt32) + + QUERY_ID_MAX_LEN + 2 /* varint encoding query_id length */; + using signal_function = void(int, siginfo_t*, void*); @@ -92,11 +100,12 @@ static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * co } -/** Handler for "fault" signals. Send data about fault to separate thread to write into log. +/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log. */ -static void faultSignalHandler(int sig, siginfo_t * info, void * context) +static void signalHandler(int sig, siginfo_t * info, void * context) { char buf[buf_size]; + std::cerr << "Size of buffer: " << buf_size << "\n"; DB::WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], buf_size, buf); const ucontext_t signal_context = *reinterpret_cast(context); @@ -106,7 +115,7 @@ static void faultSignalHandler(int sig, siginfo_t * info, void * context) DB::writePODBinary(*info, out); DB::writePODBinary(signal_context, out); DB::writePODBinary(stack_trace, out); - DB::writeBinary(getThreadNumber(), out); + DB::writeBinary(UInt32(getThreadNumber()), out); out.next(); @@ -162,7 +171,7 @@ public: } else if (sig == Signals::StdTerminate) { - ThreadNumber thread_num; + UInt32 thread_num; std::string message; DB::readBinary(thread_num, in); @@ -181,13 +190,27 @@ public: siginfo_t info; ucontext_t context; StackTrace stack_trace(NoCapture{}); - ThreadNumber thread_num; + UInt32 thread_num; DB::readPODBinary(info, in); DB::readPODBinary(context, in); DB::readPODBinary(stack_trace, in); DB::readBinary(thread_num, in); + if (sig == SIGTSTP && info.si_value.sival_ptr) + { + /// TSTP signal with value is used to make a custom callback from this thread. + try + { + reinterpret_cast(info.si_value.sival_ptr)(info, stack_trace, thread_num); + continue; + } + catch (...) + { + /// Failed to process, will use 'onFault' function. + } + } + /// This allows to receive more signals if failure happens inside onFault function. /// Example: segfault while symbolizing stack trace. std::thread([=] { onFault(sig, info, context, stack_trace, thread_num); }).detach(); @@ -200,12 +223,12 @@ private: BaseDaemon & daemon; private: - void onTerminate(const std::string & message, ThreadNumber thread_num) const + void onTerminate(const std::string & message, UInt32 thread_num) const { LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") " << message); } - void onFault(int sig, const siginfo_t & info, const ucontext_t & context, const StackTrace & stack_trace, ThreadNumber thread_num) const + void onFault(int sig, const siginfo_t & info, const ucontext_t & context, const StackTrace & stack_trace, UInt32 thread_num) const { LOG_FATAL(log, "########################################"); LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") " @@ -264,7 +287,7 @@ static void terminate_handler() DB::WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf); DB::writeBinary(static_cast(SignalListener::StdTerminate), out); - DB::writeBinary(getThreadNumber(), out); + DB::writeBinary(UInt32(getThreadNumber()), out); DB::writeBinary(log_message, out); out.next(); @@ -721,7 +744,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing() /// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime. - add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP}, faultSignalHandler); + add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP}, signalHandler); add_signal_handler({SIGHUP, SIGUSR1}, closeLogsSignalHandler); add_signal_handler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler); @@ -729,6 +752,9 @@ void BaseDaemon::initializeTerminationAndSignalProcessing() static KillingErrorHandler killing_error_handler; Poco::ErrorHandler::set(&killing_error_handler); + signal_pipe.setNonBlocking(); + signal_pipe.tryIncreaseSize(1 << 20); + signal_listener.reset(new SignalListener(*this)); signal_listener_thread.start(*signal_listener); } From 9c868c910a5e7c46178e247393787ff96dc48b3f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 23:17:16 +0300 Subject: [PATCH 02/16] Simplification --- .../System/StorageSystemStackTrace.cpp | 225 +++++++++++------- .../Storages/System/StorageSystemStackTrace.h | 7 +- libs/libdaemon/src/BaseDaemon.cpp | 15 -- 3 files changed, 141 insertions(+), 106 deletions(-) diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp index 97149fa8712..7f538a7c6de 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.cpp +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -1,7 +1,7 @@ #include +#include #include -#include #include #include @@ -10,8 +10,9 @@ #include #include #include -#include #include +#include +#include namespace DB @@ -20,6 +21,100 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_SIGQUEUE; + extern const int CANNOT_MANIPULATE_SIGSET; + extern const int CANNOT_SET_SIGNAL_HANDLER; + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int LOGICAL_ERROR; +} + + +namespace +{ + const pid_t expected_pid = getpid(); + const int sig = SIGRTMIN; + UInt32 thread_number{0}; + std::optional stack_trace; + LazyPipeFDs notification_pipe; + + void signalHandler(int, siginfo_t * info, void * context) + { + /// In case malicious user is sending signals manually (for unknown reason). + /// If we don't check - it may break our synchronization. + if (info->si_pid != expected_pid) + return; + + /// All these methods are signal-safe. + const ucontext_t signal_context = *reinterpret_cast(context); + stack_trace.emplace(signal_context); + thread_number = getThreadNumber(); + + char buf = 0; + /// We cannot do anything if write failed. + (void)::write(notification_pipe.fds_rw[1], &buf, 1); + } + + /// Wait for data in pipe. + bool wait(int timeout_ms) + { + while (true) + { + int fd = notification_pipe.fds_rw[0]; + pollfd poll_fd{fd, POLLIN, 0}; + + int poll_res = poll(&poll_fd, 1, timeout_ms); + if (poll_res < 0) + { + if (errno == EINTR) + { + --timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting. + if (timeout_ms == 0) + return false; + continue; + } + + throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + if (poll_res == 0) + return false; + + char buf = 0; + ssize_t read_res = ::read(fd, &buf, 1); + if (read_res == 1) + return true; + + if (read_res < 0) + { + if (errno == EINTR) + continue; + + throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + + throw Exception("Logical error: read for one byte returned more than one byte", ErrorCodes::LOGICAL_ERROR); + } + } +} + + +StorageSystemStackTrace::StorageSystemStackTrace(const String & name) + : IStorageSystemOneBlock(name) +{ + notification_pipe.open(); + + /// Setup signal handler. + + struct sigaction sa{}; + sa.sa_sigaction = signalHandler; + sa.sa_flags = SA_SIGINFO; + + if (sigemptyset(&sa.sa_mask)) + throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET); + + if (sigaddset(&sa.sa_mask, sig)) + throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET); + + if (sigaction(sig, &sa, nullptr)) + throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER); } @@ -33,112 +128,64 @@ NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes() }; } -namespace + +void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const { - struct State + /// It shouldn't be possible to do concurrent reads from this table. + std::lock_guard lock(mutex); + + /// Send a signal to every thread and wait for result. + /// We must wait for every thread one by one sequentially, + /// because there is a limit on number of queued signals in OS and otherwise signals may get lost. + /// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals). + + /// Obviously, results for different threads may be out of sync. + + /// There is no better way to enumerate threads in a process other than looking into procfs. + + std::filesystem::directory_iterator end; + for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it) { - std::mutex mutex; - std::condition_variable condvar; + sigval sig_value{}; + pid_t tid = parse(it->path().filename()); - size_t total_threads; - size_t threads_processed; - std::exception_ptr exception; - MutableColumns * columns_to_fill; + std::cerr << "Requested: " << tid << "\n"; - State() { reset(); } - - void reset(MutableColumns * columns_to_fill_ = nullptr) + if (0 != ::sigqueue(tid, sig, sig_value)) { - total_threads = 0; - threads_processed = 0; - exception = std::exception_ptr(); - columns_to_fill = columns_to_fill_; + /// The thread may has been already finished. + if (ESRCH == errno) + continue; + + throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE); } - operator bool() + /// Just in case we will wait for pipe with timeout. In case signal didn't get processed. + if (wait(100)) { - return columns_to_fill != nullptr; - } - }; - - State state; - - void callback(const siginfo_t &, const StackTrace & stack_trace, UInt32 thread_number) - { - std::lock_guard lock(state.mutex); - - std::cerr << thread_number << " !\n"; - - if (!state) - return; - - try - { - size_t stack_trace_size = stack_trace.getSize(); - size_t stack_trace_offset = stack_trace.getOffset(); + size_t stack_trace_size = stack_trace->getSize(); + size_t stack_trace_offset = stack_trace->getOffset(); Array arr; arr.reserve(stack_trace_size - stack_trace_offset); for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) - arr.emplace_back(reinterpret_cast(stack_trace.getFrames()[i])); + arr.emplace_back(reinterpret_cast(stack_trace->getFrames()[i])); - std::cerr << thread_number << " !!\n"; + std::cerr << tid << ", " << thread_number << " !!\n"; - state.columns_to_fill->at(0)->insert(thread_number); - state.columns_to_fill->at(1)->insertDefault(); - state.columns_to_fill->at(2)->insert(arr); - - std::cerr << thread_number << " !!!\n"; - - ++state.threads_processed; - - std::cerr << state.threads_processed << ", " << state.total_threads << " !!!!\n"; - if (state.threads_processed >= state.total_threads) - state.condvar.notify_one(); - } - catch (...) - { - state.reset(); - state.exception = std::current_exception(); - state.condvar.notify_one(); - } - } -} - -void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const -{ - std::unique_lock lock(state.mutex); - - state.reset(&res_columns); - SCOPE_EXIT({ state.reset(); }); - - std::cerr << state.columns_to_fill->size() << "\n"; - - /// Send a signal to every thread - std::filesystem::directory_iterator end; - for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it) - { - sigval sig_value; - sig_value.sival_ptr = reinterpret_cast(&callback); - pid_t tid = parse(it->path().filename()); - if (0 == ::sigqueue(tid, SIGTSTP, sig_value)) - { - ++state.total_threads; + res_columns[0]->insert(thread_number); + res_columns[1]->insertDefault(); + res_columns[2]->insert(arr); } else { - /// The thread may have been already finished. - if (ESRCH != errno) - throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE); + /// Cannot obtain a stack trace. But create a record in result nevertheless. + + res_columns[0]->insert(tid); + res_columns[1]->insertDefault(); + res_columns[2]->insertDefault(); } } - - std::cerr << state.threads_processed << ", " << state.total_threads << " sent\n"; - - /// Timeout one second for the case the signal pipe will be full and messages will be dropped. - state.condvar.wait_for(lock, std::chrono::seconds(1), []{ return state.threads_processed >= state.total_threads || state.exception; }); - if (state.exception) - std::rethrow_exception(state.exception); } } diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.h b/dbms/src/Storages/System/StorageSystemStackTrace.h index a402f56b420..161e7f2e2fb 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.h +++ b/dbms/src/Storages/System/StorageSystemStackTrace.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -17,13 +18,15 @@ class StorageSystemStackTrace : public ext::shared_ptr_helper; public: String getName() const override { return "SystemStackTrace"; } - static NamesAndTypesList getNamesAndTypes(); + StorageSystemStackTrace(const String & name); + protected: using IStorageSystemOneBlock::IStorageSystemOneBlock; - void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override; + + mutable std::mutex mutex; }; } diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index a3d56511508..c09139257a9 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -105,7 +105,6 @@ static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * co static void signalHandler(int sig, siginfo_t * info, void * context) { char buf[buf_size]; - std::cerr << "Size of buffer: " << buf_size << "\n"; DB::WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], buf_size, buf); const ucontext_t signal_context = *reinterpret_cast(context); @@ -197,20 +196,6 @@ public: DB::readPODBinary(stack_trace, in); DB::readBinary(thread_num, in); - if (sig == SIGTSTP && info.si_value.sival_ptr) - { - /// TSTP signal with value is used to make a custom callback from this thread. - try - { - reinterpret_cast(info.si_value.sival_ptr)(info, stack_trace, thread_num); - continue; - } - catch (...) - { - /// Failed to process, will use 'onFault' function. - } - } - /// This allows to receive more signals if failure happens inside onFault function. /// Example: segfault while symbolizing stack trace. std::thread([=] { onFault(sig, info, context, stack_trace, thread_num); }).detach(); From 48d126e88a577881674e55d30acc7487c79bb73d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 23:19:29 +0300 Subject: [PATCH 03/16] Removed unrelated changes --- libs/libdaemon/include/daemon/BaseDaemon.h | 6 ------ libs/libdaemon/src/BaseDaemon.cpp | 7 +------ 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/libs/libdaemon/include/daemon/BaseDaemon.h b/libs/libdaemon/include/daemon/BaseDaemon.h index 9457d9cdbe0..462cbb95418 100644 --- a/libs/libdaemon/include/daemon/BaseDaemon.h +++ b/libs/libdaemon/include/daemon/BaseDaemon.h @@ -236,9 +236,3 @@ std::optional> BaseDaemon::tryGetInstance() else return {}; } - - -/// If you send TSTP signal with value (sigqueue) to a thread, it will make a callback -/// from a separate thread and you can call non signal-safe function from there. -using SignalCallback = void(const siginfo_t &, const StackTrace &, UInt32); - diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index c09139257a9..233a11707a9 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -70,12 +70,7 @@ static void call_default_signal_handler(int sig) } -/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. -/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. -constexpr size_t QUERY_ID_MAX_LEN = 1024; - -static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(StackTrace) + sizeof(UInt32) - + QUERY_ID_MAX_LEN + 2 /* varint encoding query_id length */; +static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(StackTrace) + sizeof(UInt32); using signal_function = void(int, siginfo_t*, void*); From ca25e2f30fe723f9bd68687a54659c2cdba3a0d2 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 23:37:29 +0300 Subject: [PATCH 04/16] Added query_id to "system.stack_trace" --- .../System/StorageSystemStackTrace.cpp | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp index 7f538a7c6de..8a932a3eb7c 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.cpp +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -32,8 +32,14 @@ namespace { const pid_t expected_pid = getpid(); const int sig = SIGRTMIN; + UInt32 thread_number{0}; std::optional stack_trace; + + static constexpr size_t max_query_id_size = 128; + char query_id_data[max_query_id_size]; + size_t query_id_size = 0; + LazyPipeFDs notification_pipe; void signalHandler(int, siginfo_t * info, void * context) @@ -48,12 +54,16 @@ namespace stack_trace.emplace(signal_context); thread_number = getThreadNumber(); + StringRef query_id = CurrentThread::getQueryId(); + query_id_size = std::min(query_id.size, max_query_id_size); + memcpy(query_id_data, query_id.data, query_id_size); + char buf = 0; /// We cannot do anything if write failed. (void)::write(notification_pipe.fds_rw[1], &buf, 1); } - /// Wait for data in pipe. + /// Wait for data in pipe and read it. bool wait(int timeout_ms) { while (true) @@ -149,8 +159,6 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte sigval sig_value{}; pid_t tid = parse(it->path().filename()); - std::cerr << "Requested: " << tid << "\n"; - if (0 != ::sigqueue(tid, sig, sig_value)) { /// The thread may has been already finished. @@ -161,6 +169,7 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte } /// Just in case we will wait for pipe with timeout. In case signal didn't get processed. + if (wait(100)) { size_t stack_trace_size = stack_trace->getSize(); @@ -171,17 +180,15 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) arr.emplace_back(reinterpret_cast(stack_trace->getFrames()[i])); - std::cerr << tid << ", " << thread_number << " !!\n"; - res_columns[0]->insert(thread_number); - res_columns[1]->insertDefault(); + res_columns[1]->insertData(query_id_data, query_id_size); res_columns[2]->insert(arr); } else { /// Cannot obtain a stack trace. But create a record in result nevertheless. - res_columns[0]->insert(tid); + res_columns[0]->insert(tid); /// TODO Replace all thread numbers to OS thread numbers. res_columns[1]->insertDefault(); res_columns[2]->insertDefault(); } From 1e83ebe5214df6fe0ddd301c0ff6ada71645caf9 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 23:50:00 +0300 Subject: [PATCH 05/16] Added test --- .../queries/0_stateless/01051_system_stack_trace.reference | 1 + dbms/tests/queries/0_stateless/01051_system_stack_trace.sql | 2 ++ 2 files changed, 3 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/01051_system_stack_trace.reference create mode 100644 dbms/tests/queries/0_stateless/01051_system_stack_trace.sql diff --git a/dbms/tests/queries/0_stateless/01051_system_stack_trace.reference b/dbms/tests/queries/0_stateless/01051_system_stack_trace.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01051_system_stack_trace.reference @@ -0,0 +1 @@ +1 diff --git a/dbms/tests/queries/0_stateless/01051_system_stack_trace.sql b/dbms/tests/queries/0_stateless/01051_system_stack_trace.sql new file mode 100644 index 00000000000..32d344fce7e --- /dev/null +++ b/dbms/tests/queries/0_stateless/01051_system_stack_trace.sql @@ -0,0 +1,2 @@ +-- at least this query should be present +SELECT count() > 0 FROM system.stack_trace WHERE query_id != ''; From c78443d9393781b7ca00e04673b16fa9522a42f4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 23:50:33 +0300 Subject: [PATCH 06/16] Avoid using query_id from old threads --- dbms/src/Interpreters/ThreadStatusExt.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Interpreters/ThreadStatusExt.cpp b/dbms/src/Interpreters/ThreadStatusExt.cpp index 8c578422d6e..0bf1ac36d3d 100644 --- a/dbms/src/Interpreters/ThreadStatusExt.cpp +++ b/dbms/src/Interpreters/ThreadStatusExt.cpp @@ -197,6 +197,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) /// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below. memory_tracker.setParent(nullptr); + query_id.clear(); query_context = nullptr; thread_group.reset(); From 4d349e9ed0f24ec8335501cbce0c1d180f59aa19 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 00:01:12 +0300 Subject: [PATCH 07/16] Added TODO --- dbms/src/Storages/System/StorageSystemStackTrace.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp index 8a932a3eb7c..284c4241f48 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.cpp +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -169,6 +169,7 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte } /// Just in case we will wait for pipe with timeout. In case signal didn't get processed. + /// TODO How to deal with stale values in a pipe? TSan will also argue. if (wait(100)) { From 56870c6b3b03bd5b926f06f07862f8a17c13ff3d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 19:49:06 +0300 Subject: [PATCH 08/16] Fixed -Wshadow --- dbms/src/Storages/System/StorageSystemStackTrace.cpp | 4 ++-- dbms/src/Storages/System/StorageSystemStackTrace.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp index 284c4241f48..b9c299cffaa 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.cpp +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -106,8 +106,8 @@ namespace } -StorageSystemStackTrace::StorageSystemStackTrace(const String & name) - : IStorageSystemOneBlock(name) +StorageSystemStackTrace::StorageSystemStackTrace(const String & name_) + : IStorageSystemOneBlock(name_) { notification_pipe.open(); diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.h b/dbms/src/Storages/System/StorageSystemStackTrace.h index 161e7f2e2fb..249ceebfd8c 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.h +++ b/dbms/src/Storages/System/StorageSystemStackTrace.h @@ -20,7 +20,7 @@ public: String getName() const override { return "SystemStackTrace"; } static NamesAndTypesList getNamesAndTypes(); - StorageSystemStackTrace(const String & name); + StorageSystemStackTrace(const String & name_); protected: using IStorageSystemOneBlock::IStorageSystemOneBlock; From f04a2a5f7bb17f16dd8a634a5ffa042395945765 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 19:54:51 +0300 Subject: [PATCH 09/16] Fixed Darwin build --- dbms/src/Common/PipeFDs.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Common/PipeFDs.cpp b/dbms/src/Common/PipeFDs.cpp index 463897f2c08..17eeb9aaef7 100644 --- a/dbms/src/Common/PipeFDs.cpp +++ b/dbms/src/Common/PipeFDs.cpp @@ -104,6 +104,8 @@ void LazyPipeFDs::tryIncreaseSize(int desired_size) LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size))); } +#else + (void)desired_size; #endif } From 9da4b63fe953392fb004b73b93e54b58e2f6fe03 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 21:56:57 +0300 Subject: [PATCH 10/16] Fixed Darwin build --- dbms/src/Storages/System/StorageSystemStackTrace.cpp | 3 +++ dbms/src/Storages/System/StorageSystemStackTrace.h | 3 +++ dbms/src/Storages/System/attachSystemTables.cpp | 5 +++++ 3 files changed, 11 insertions(+) diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp index b9c299cffaa..24426eabca7 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.cpp +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -1,3 +1,5 @@ +#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals. + #include #include @@ -198,3 +200,4 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte } +#endif diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.h b/dbms/src/Storages/System/StorageSystemStackTrace.h index 249ceebfd8c..79185ca805a 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.h +++ b/dbms/src/Storages/System/StorageSystemStackTrace.h @@ -1,5 +1,7 @@ #pragma once +#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals. + #include #include #include @@ -31,3 +33,4 @@ protected: } +#endif diff --git a/dbms/src/Storages/System/attachSystemTables.cpp b/dbms/src/Storages/System/attachSystemTables.cpp index cd224353acb..fc3a36d7c2c 100644 --- a/dbms/src/Storages/System/attachSystemTables.cpp +++ b/dbms/src/Storages/System/attachSystemTables.cpp @@ -38,7 +38,10 @@ #include #include #include + +#ifdef OS_LINUX #include +#endif namespace DB @@ -66,7 +69,9 @@ void attachSystemTablesLocal(IDatabase & system_database) system_database.attachTable("collations", StorageSystemCollations::create("collations")); system_database.attachTable("table_engines", StorageSystemTableEngines::create("table_engines")); system_database.attachTable("contributors", StorageSystemContributors::create("contributors")); +#ifdef OS_LINUX system_database.attachTable("stack_trace", StorageSystemStackTrace::create("stack_trace")); +#endif } void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) From e907ce103eb5aa720afd2bcf39391641b5ef98c7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 21:58:42 +0300 Subject: [PATCH 11/16] Fixed warning --- dbms/src/Storages/System/StorageSystemStackTrace.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp index 24426eabca7..b964ab0f51d 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.cpp +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -61,8 +61,10 @@ namespace memcpy(query_id_data, query_id.data, query_id_size); char buf = 0; + ssize_t res = ::write(notification_pipe.fds_rw[1], &buf, 1); + /// We cannot do anything if write failed. - (void)::write(notification_pipe.fds_rw[1], &buf, 1); + (void)res; } /// Wait for data in pipe and read it. From 3b7f3b07cd2f59346857d28937b96fa19a0f934a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 22:23:54 +0300 Subject: [PATCH 12/16] Better handling of signals --- .../System/StorageSystemStackTrace.cpp | 32 +++++++++++++------ .../Storages/System/StorageSystemStackTrace.h | 1 + 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.cpp b/dbms/src/Storages/System/StorageSystemStackTrace.cpp index b964ab0f51d..20767464038 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.cpp +++ b/dbms/src/Storages/System/StorageSystemStackTrace.cpp @@ -35,6 +35,8 @@ namespace const pid_t expected_pid = getpid(); const int sig = SIGRTMIN; + int sequence_num = 0; /// For messages sent via pipe. + UInt32 thread_number{0}; std::optional stack_trace; @@ -51,6 +53,10 @@ namespace if (info->si_pid != expected_pid) return; + /// Signal received too late. + if (info->si_value.sival_int != sequence_num) + return; + /// All these methods are signal-safe. const ucontext_t signal_context = *reinterpret_cast(context); stack_trace.emplace(signal_context); @@ -60,8 +66,8 @@ namespace query_id_size = std::min(query_id.size, max_query_id_size); memcpy(query_id_data, query_id.data, query_id_size); - char buf = 0; - ssize_t res = ::write(notification_pipe.fds_rw[1], &buf, 1); + int notification_num = info->si_value.sival_int; + ssize_t res = ::write(notification_pipe.fds_rw[1], ¬ification_num, sizeof(notification_num)); /// We cannot do anything if write failed. (void)res; @@ -91,10 +97,8 @@ namespace if (poll_res == 0) return false; - char buf = 0; - ssize_t read_res = ::read(fd, &buf, 1); - if (read_res == 1) - return true; + int notification_num = 0; + ssize_t read_res = ::read(fd, ¬ification_num, sizeof(notification_num)); if (read_res < 0) { @@ -104,7 +108,15 @@ namespace throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); } - throw Exception("Logical error: read for one byte returned more than one byte", ErrorCodes::LOGICAL_ERROR); + if (read_res == sizeof(notification_num)) + { + if (notification_num == sequence_num) + return true; + else + continue; /// Drain delayed notifications. + } + + throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR); } } } @@ -160,9 +172,10 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte std::filesystem::directory_iterator end; for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it) { - sigval sig_value{}; pid_t tid = parse(it->path().filename()); + sigval sig_value{}; + sig_value.sival_int = sequence_num; if (0 != ::sigqueue(tid, sig, sig_value)) { /// The thread may has been already finished. @@ -173,7 +186,6 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte } /// Just in case we will wait for pipe with timeout. In case signal didn't get processed. - /// TODO How to deal with stale values in a pipe? TSan will also argue. if (wait(100)) { @@ -197,6 +209,8 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Conte res_columns[1]->insertDefault(); res_columns[2]->insertDefault(); } + + sequence_num = static_cast(static_cast(sequence_num) + 1); } } diff --git a/dbms/src/Storages/System/StorageSystemStackTrace.h b/dbms/src/Storages/System/StorageSystemStackTrace.h index 79185ca805a..4961d786f59 100644 --- a/dbms/src/Storages/System/StorageSystemStackTrace.h +++ b/dbms/src/Storages/System/StorageSystemStackTrace.h @@ -15,6 +15,7 @@ class Context; /// Allows to introspect stack trace of all server threads. /// It acts like an embedded debugger. +/// More than one instance of this table cannot be used. class StorageSystemStackTrace : public ext::shared_ptr_helper, public IStorageSystemOneBlock { friend struct ext::shared_ptr_helper; From 9d631c2c779b06c552864750ab1209b2a4e4cc54 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 23:19:49 +0300 Subject: [PATCH 13/16] Add query_id to crash messages --- libs/libdaemon/src/BaseDaemon.cpp | 41 +++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index ce3eb2cd787..1c0f86db600 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -72,7 +72,15 @@ static void call_default_signal_handler(int sig) } -static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(StackTrace) + sizeof(UInt32); +static constexpr size_t max_query_id_size = 127; + +static const size_t buf_size = + sizeof(int) + + sizeof(siginfo_t) + + sizeof(ucontext_t) + + sizeof(StackTrace) + + sizeof(UInt32) + + max_query_id_size + 1; /// query_id + varint encoded length using signal_function = void(int, siginfo_t*, void*); @@ -107,11 +115,15 @@ static void signalHandler(int sig, siginfo_t * info, void * context) const ucontext_t signal_context = *reinterpret_cast(context); const StackTrace stack_trace(signal_context); + StringRef query_id = CurrentThread::getQueryId(); /// This is signal safe. + query_id.size = std::min(query_id.size, max_query_id_size); + DB::writeBinary(sig, out); DB::writePODBinary(*info, out); DB::writePODBinary(signal_context, out); DB::writePODBinary(stack_trace, out); DB::writeBinary(UInt32(getThreadNumber()), out); + DB::writeStringBinary(query_id, out); out.next(); @@ -187,15 +199,17 @@ public: ucontext_t context; StackTrace stack_trace(NoCapture{}); UInt32 thread_num; + std::string query_id; DB::readPODBinary(info, in); DB::readPODBinary(context, in); DB::readPODBinary(stack_trace, in); DB::readBinary(thread_num, in); + DB::readBinary(query_id, in); /// This allows to receive more signals if failure happens inside onFault function. /// Example: segfault while symbolizing stack trace. - std::thread([=] { onFault(sig, info, context, stack_trace, thread_num); }).detach(); + std::thread([=] { onFault(sig, info, context, stack_trace, thread_num, query_id); }).detach(); } } } @@ -210,11 +224,28 @@ private: LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") " << message); } - void onFault(int sig, const siginfo_t & info, const ucontext_t & context, const StackTrace & stack_trace, UInt32 thread_num) const + void onFault( + int sig, + const siginfo_t & info, + const ucontext_t & context, + const StackTrace & stack_trace, + UInt32 thread_num, + const std::string & query_id) const { LOG_FATAL(log, "########################################"); - LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") " - << "Received signal " << strsignal(sig) << " (" << sig << ")" << "."); + + { + std::stringstream message; + message << "(version " << VERSION_STRING << VERSION_OFFICIAL << ")"; + message << " (from thread " << thread_num << ")"; + if (query_id.empty()) + message << " (no query)"; + else + message << " (query_id: " << query_id << ")"; + message << "Received signal " << strsignal(sig) << " (" << sig << ")" << "."; + + LOG_FATAL(log, message.rdbuf()); + } LOG_FATAL(log, signalToErrorMessage(sig, info, context)); From f7d9ada51cabadd7a9efee214807184e0911cb25 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 23:26:11 +0300 Subject: [PATCH 14/16] Addition to prev. revision --- libs/libdaemon/src/BaseDaemon.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index 1c0f86db600..70cc7157344 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -242,7 +242,7 @@ private: message << " (no query)"; else message << " (query_id: " << query_id << ")"; - message << "Received signal " << strsignal(sig) << " (" << sig << ")" << "."; + message << " Received signal " << strsignal(sig) << " (" << sig << ")" << "."; LOG_FATAL(log, message.rdbuf()); } From badbee23856651c21a632dac0a58989db1308e5d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 23 Dec 2019 23:26:46 +0300 Subject: [PATCH 15/16] Added another mode of diagnostic trap --- dbms/src/Functions/trap.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbms/src/Functions/trap.cpp b/dbms/src/Functions/trap.cpp index 217b7091dc1..9176a8656af 100644 --- a/dbms/src/Functions/trap.cpp +++ b/dbms/src/Functions/trap.cpp @@ -83,6 +83,10 @@ public: { abort(); } + else if (mode == "std::terminate") + { + std::terminate(); + } else if (mode == "use after free") { int * x_ptr; From bdefa9248c2beeb4527cc2e1bcc0948824bf53cb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 24 Dec 2019 03:54:32 +0300 Subject: [PATCH 16/16] Removed test because it cannot run in Sandbox (CI) --- .../queries/0_stateless/01051_system_stack_trace.reference | 1 - dbms/tests/queries/0_stateless/01051_system_stack_trace.sql | 2 -- 2 files changed, 3 deletions(-) delete mode 100644 dbms/tests/queries/0_stateless/01051_system_stack_trace.reference delete mode 100644 dbms/tests/queries/0_stateless/01051_system_stack_trace.sql diff --git a/dbms/tests/queries/0_stateless/01051_system_stack_trace.reference b/dbms/tests/queries/0_stateless/01051_system_stack_trace.reference deleted file mode 100644 index d00491fd7e5..00000000000 --- a/dbms/tests/queries/0_stateless/01051_system_stack_trace.reference +++ /dev/null @@ -1 +0,0 @@ -1 diff --git a/dbms/tests/queries/0_stateless/01051_system_stack_trace.sql b/dbms/tests/queries/0_stateless/01051_system_stack_trace.sql deleted file mode 100644 index 32d344fce7e..00000000000 --- a/dbms/tests/queries/0_stateless/01051_system_stack_trace.sql +++ /dev/null @@ -1,2 +0,0 @@ --- at least this query should be present -SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';