#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals. #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int CANNOT_SIGQUEUE; extern const int CANNOT_MANIPULATE_SIGSET; extern const int CANNOT_SET_SIGNAL_HANDLER; extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; extern const int LOGICAL_ERROR; } namespace { const pid_t expected_pid = getpid(); const int sig = SIGRTMIN; UInt32 thread_number{0}; std::optional stack_trace; static constexpr size_t max_query_id_size = 128; char query_id_data[max_query_id_size]; size_t query_id_size = 0; LazyPipeFDs notification_pipe; void signalHandler(int, siginfo_t * info, void * context) { /// In case malicious user is sending signals manually (for unknown reason). /// If we don't check - it may break our synchronization. if (info->si_pid != expected_pid) return; /// All these methods are signal-safe. const ucontext_t signal_context = *reinterpret_cast(context); stack_trace.emplace(signal_context); thread_number = getThreadNumber(); StringRef query_id = CurrentThread::getQueryId(); query_id_size = std::min(query_id.size, max_query_id_size); memcpy(query_id_data, query_id.data, query_id_size); char buf = 0; /// We cannot do anything if write failed. (void)::write(notification_pipe.fds_rw[1], &buf, 1); } /// Wait for data in pipe and read it. bool wait(int timeout_ms) { while (true) { int fd = notification_pipe.fds_rw[0]; pollfd poll_fd{fd, POLLIN, 0}; int poll_res = poll(&poll_fd, 1, timeout_ms); if (poll_res < 0) { if (errno == EINTR) { --timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting. if (timeout_ms == 0) return false; continue; } throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); } if (poll_res == 0) return false; char buf = 0; ssize_t read_res = ::read(fd, &buf, 1); if (read_res == 1) return true; if (read_res < 0) { if (errno == EINTR) continue; throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); } throw Exception("Logical error: read for one byte returned more than one byte", ErrorCodes::LOGICAL_ERROR); } } } StorageSystemStackTrace::StorageSystemStackTrace(const String & name_) : IStorageSystemOneBlock(name_) { notification_pipe.open(); /// Setup signal handler. struct sigaction sa{}; sa.sa_sigaction = signalHandler; sa.sa_flags = SA_SIGINFO; if (sigemptyset(&sa.sa_mask)) throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET); if (sigaddset(&sa.sa_mask, sig)) throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET); if (sigaction(sig, &sa, nullptr)) throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER); } NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes() { return { { "thread_number", std::make_shared() }, { "query_id", std::make_shared() }, { "trace", std::make_shared(std::make_shared()) } }; } void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const { /// It shouldn't be possible to do concurrent reads from this table. std::lock_guard lock(mutex); /// Send a signal to every thread and wait for result. /// We must wait for every thread one by one sequentially, /// because there is a limit on number of queued signals in OS and otherwise signals may get lost. /// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals). /// Obviously, results for different threads may be out of sync. /// There is no better way to enumerate threads in a process other than looking into procfs. std::filesystem::directory_iterator end; for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it) { sigval sig_value{}; pid_t tid = parse(it->path().filename()); if (0 != ::sigqueue(tid, sig, sig_value)) { /// The thread may has been already finished. if (ESRCH == errno) continue; throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE); } /// Just in case we will wait for pipe with timeout. In case signal didn't get processed. /// TODO How to deal with stale values in a pipe? TSan will also argue. if (wait(100)) { size_t stack_trace_size = stack_trace->getSize(); size_t stack_trace_offset = stack_trace->getOffset(); Array arr; arr.reserve(stack_trace_size - stack_trace_offset); for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) arr.emplace_back(reinterpret_cast(stack_trace->getFrames()[i])); res_columns[0]->insert(thread_number); res_columns[1]->insertData(query_id_data, query_id_size); res_columns[2]->insert(arr); } else { /// Cannot obtain a stack trace. But create a record in result nevertheless. res_columns[0]->insert(tid); /// TODO Replace all thread numbers to OS thread numbers. res_columns[1]->insertDefault(); res_columns[2]->insertDefault(); } } } } #endif