Fix TSan warning in system.stack_trace

This commit is contained in:
Alexey Milovidov 2020-12-16 01:43:07 +03:00
parent 13ac1bee7a
commit 7ca86f176c

View File

@ -38,8 +38,18 @@ namespace
const int sig = SIGRTMIN;
std::atomic<int> sequence_num = 0; /// For messages sent via pipe.
std::atomic<int> data_ready_num = 0;
std::optional<StackTrace> stack_trace;
/** Notes:
* Only one query from the table can be processed at the moment of time.
* This is ensured by the mutex in fillData function.
* We obtain information about threads by sending signal and receiving info from the signal handler.
* Information is passed via global variables and pipe is used for signaling.
* Actually we can send all information via pipe, but we read from it with timeout just in case,
* so it's convenient to use is only for signaling.
*/
StackTrace stack_trace{NoCapture{}};
constexpr size_t max_query_id_size = 128;
char query_id_data[max_query_id_size];
@ -47,7 +57,7 @@ namespace
LazyPipeFDs notification_pipe;
void NO_SANITIZE_THREAD signalHandler(int, siginfo_t * info, void * context)
void signalHandler(int, siginfo_t * info, void * context)
{
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
@ -57,19 +67,22 @@ namespace
return;
/// Signal received too late.
if (info->si_value.sival_int != sequence_num.load(std::memory_order_relaxed))
int notification_num = info->si_value.sival_int;
if (notification_num != sequence_num.load(std::memory_order_acquire))
return;
/// All these methods are signal-safe.
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
stack_trace.emplace(signal_context);
stack_trace = StackTrace(signal_context);
StringRef query_id = CurrentThread::getQueryId();
query_id_size = std::min(query_id.size, max_query_id_size);
if (query_id.data && query_id.size)
memcpy(query_id_data, query_id.data, query_id_size);
int notification_num = info->si_value.sival_int;
/// This is unneeded (because we synchronize through pipe) but makes TSan happy.
data_ready_num.store(sequence_num, std::memory_order_release);
ssize_t res = ::write(notification_pipe.fds_rw[1], &notification_num, sizeof(notification_num));
/// We cannot do anything if write failed.
@ -79,7 +92,7 @@ namespace
}
/// Wait for data in pipe and read it.
bool NO_SANITIZE_THREAD wait(int timeout_ms)
bool wait(int timeout_ms)
{
while (true)
{
@ -127,7 +140,7 @@ namespace
}
NO_SANITIZE_THREAD StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
: IStorageSystemOneBlock<StorageSystemStackTrace>(table_id_)
{
notification_pipe.open();
@ -160,7 +173,7 @@ NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
}
void NO_SANITIZE_THREAD StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
@ -180,7 +193,7 @@ void NO_SANITIZE_THREAD StorageSystemStackTrace::fillData(MutableColumns & res_c
pid_t tid = parse<pid_t>(it->path().filename());
sigval sig_value{};
sig_value.sival_int = sequence_num.load(std::memory_order_relaxed);
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(tid, sig, sig_value))
{
/// The thread may has been already finished.
@ -192,15 +205,15 @@ void NO_SANITIZE_THREAD StorageSystemStackTrace::fillData(MutableColumns & res_c
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(100))
if (wait(100) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace->getSize();
size_t stack_trace_offset = stack_trace->getOffset();
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace->getFramePointers()[i]));
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[0]->insert(tid);
res_columns[1]->insertData(query_id_data, query_id_size);