Merge pull request #58136 from azat/system.stack_trace-rt_tgsigqueueinfo-v2

Fix system.stack_trace for threads with blocked SIGRTMIN (resubmit)
This commit is contained in:
Alexey Milovidov 2023-12-24 03:51:13 +01:00 committed by GitHub
commit b4bf1d1c4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,4 +1,4 @@
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#ifdef OS_LINUX /// Because of 'rt_tgsigqueueinfo' functions and RT signals.
#include <csignal>
#include <poll.h>
@ -34,7 +34,7 @@
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <base/getThreadId.h>
#include <sys/syscall.h>
namespace DB
{
@ -54,8 +54,8 @@ namespace
{
// Initialized in StorageSystemStackTrace's ctor and used in signalHandler.
std::atomic<pid_t> expected_pid;
const int sig = SIGRTMIN;
std::atomic<pid_t> server_pid;
const int STACK_TRACE_SERVICE_SIGNAL = SIGRTMIN;
std::atomic<int> sequence_num = 0; /// For messages sent via pipe.
std::atomic<int> data_ready_num = 0;
@ -79,6 +79,11 @@ size_t query_id_size = 0;
LazyPipeFDs notification_pipe;
int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *info)
{
return static_cast<int>(syscall(__NR_rt_tgsigqueueinfo, tgid, tid, sig, info));
}
void signalHandler(int, siginfo_t * info, void * context)
{
DENY_ALLOCATIONS_IN_SCOPE;
@ -86,7 +91,7 @@ void signalHandler(int, siginfo_t * info, void * context)
/// In case malicious user is sending signals manually (for unknown reason).
/// If we don't check - it may break our synchronization.
if (info->si_pid != expected_pid)
if (info->si_pid != server_pid)
return;
/// Signal received too late.
@ -220,6 +225,48 @@ ThreadIdToName getFilteredThreadNames(const ActionsDAG::Node * predicate, Contex
return tid_to_name;
}
bool parseHexNumber(std::string_view sv, UInt64 & res)
{
errno = 0; /// Functions strto* don't clear errno.
char * pos_integer = const_cast<char *>(sv.begin());
res = std::strtoull(sv.begin(), &pos_integer, 16);
return (pos_integer == sv.begin() + sv.size() && errno != ERANGE);
}
bool isSignalBlocked(UInt64 tid, int signal)
{
String buffer;
try
{
ReadBufferFromFile status(fmt::format("/proc/{}/status", tid));
while (!status.eof())
{
readEscapedStringUntilEOL(buffer, status);
if (!status.eof())
++status.position();
if (buffer.starts_with("SigBlk:"))
break;
}
status.close();
std::string_view line(buffer);
line = line.substr(strlen("SigBlk:"));
line = line.substr(0, line.rend() - std::find_if_not(line.rbegin(), line.rend(), ::isspace));
UInt64 sig_blk;
if (parseHexNumber(line, sig_blk))
return sig_blk & signal;
}
catch (const Exception & e)
{
/// Ignore TOCTOU error
if (e.code() != ErrorCodes::FILE_DOESNT_EXIST)
throw;
}
return false;
}
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
@ -240,6 +287,7 @@ public:
, proc_it("/proc/self/task")
/// It shouldn't be possible to do concurrent reads from this table.
, lock(mutex)
, signal_str(strsignal(STACK_TRACE_SERVICE_SIGNAL)) /// NOLINT(concurrency-mt-unsafe) // not thread-safe but ok in this context
{
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
@ -265,6 +313,7 @@ protected:
const auto & thread_ids_data = assert_cast<const ColumnUInt64 &>(*thread_ids).getData();
/// NOTE: This is racy, so you may get incorrect thread_name.
ThreadIdToName thread_names;
if (read_thread_names)
thread_names = getFilteredThreadNames(predicate, context, thread_ids_data, log);
@ -291,53 +340,71 @@ protected:
}
else
{
++signals_sent;
Stopwatch watch;
SCOPE_EXIT({ signals_sent_ms += watch.elapsedMilliseconds(); });
sigval sig_value{};
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(static_cast<int>(tid), sig, sig_value))
/// NOTE: This check is racy (thread can be
/// destroyed/replaced/...), but it is OK, since only the
/// following could happen:
/// - it will incorrectly detect that the signal is blocked and
/// will not send it this time
/// - it will incorrectly detect that the signal is not blocked
/// then it will wait storage_system_stack_trace_pipe_read_timeout_ms
bool signal_blocked = isSignalBlocked(tid, STACK_TRACE_SERVICE_SIGNAL);
if (!signal_blocked)
{
/// The thread may has been already finished.
if (ESRCH == errno)
++signals_sent;
Stopwatch watch;
SCOPE_EXIT({
signals_sent_ms += watch.elapsedMilliseconds();
/// Signed integer overflow is undefined behavior in both C and C++. However, according to
/// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
/// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
/// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
++sequence_num;
});
siginfo_t sig_info{};
sig_info.si_code = SI_QUEUE; /// sigqueue()
sig_info.si_pid = server_pid;
sig_info.si_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != rt_tgsigqueueinfo(server_pid, static_cast<pid_t>(tid), STACK_TRACE_SERVICE_SIGNAL, &sig_info))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throw ErrnoException(ErrorCodes::CANNOT_SIGQUEUE, "Cannot queue a signal");
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(pipe_read_timeout_ms) && sig_info.si_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertData(query_id_data, query_id_size);
res_columns[res_index++]->insert(arr);
continue;
throw ErrnoException(ErrorCodes::CANNOT_SIGQUEUE, "Cannot send signal with sigqueue");
}
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(pipe_read_timeout_ms) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertData(query_id_data, query_id_size);
res_columns[res_index++]->insert(arr);
}
if (signal_blocked)
LOG_DEBUG(log, "Thread {} ({}) blocks SIG{} signal", tid, thread_name, signal_str);
else
{
LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
LOG_DEBUG(log, "Cannot obtain a stack trace for thread {} ({})", tid, thread_name);
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
/// Signed integer overflow is undefined behavior in both C and C++. However, according to
/// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
/// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
/// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
++sequence_num;
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
}
LOG_TRACE(log, "Send signal to {} threads (total), took {} ms", signals_sent, signals_sent_ms);
@ -368,6 +435,7 @@ private:
size_t signals_sent_ms = 0;
std::unique_lock<std::mutex> lock;
const char * signal_str;
ColumnPtr getFilteredThreadIds()
{
@ -450,7 +518,7 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
notification_pipe.open();
/// Setup signal handler.
expected_pid = getpid();
server_pid = getpid();
struct sigaction sa{};
sa.sa_sigaction = signalHandler;
sa.sa_flags = SA_SIGINFO;
@ -458,10 +526,10 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
if (sigemptyset(&sa.sa_mask))
throw ErrnoException(ErrorCodes::CANNOT_MANIPULATE_SIGSET, "Cannot set signal handler");
if (sigaddset(&sa.sa_mask, sig))
if (sigaddset(&sa.sa_mask, STACK_TRACE_SERVICE_SIGNAL))
throw ErrnoException(ErrorCodes::CANNOT_MANIPULATE_SIGSET, "Cannot set signal handler");
if (sigaction(sig, &sa, nullptr))
if (sigaction(STACK_TRACE_SERVICE_SIGNAL, &sa, nullptr))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
}