Merge pull request #39177 from azat/stack_trace-filter

Optimize accesses to system.stack_trace
This commit is contained in:
Yakov Olkhovskiy 2022-07-14 01:47:14 -04:00 committed by GitHub
commit d6a09acb38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 205 additions and 75 deletions

View File

@ -5,10 +5,14 @@
#include <mutex>
#include <filesystem>
#include <unordered_map>
#include <base/scope_guard.h>
#include <Storages/System/StorageSystemStackTrace.h>
#include <Storages/VirtualColumnUtils.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
@ -16,8 +20,11 @@
#include <IO/ReadBufferFromFile.h>
#include <Common/PipeFDs.h>
#include <Common/CurrentThread.h>
#include <base/getThreadId.h>
#include <Common/HashTable/Hash.h>
#include <Common/logger_useful.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <base/getThreadId.h>
namespace DB
@ -147,13 +154,82 @@ namespace
throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR);
}
}
ColumnPtr getFilteredThreadIds(ASTPtr query, ContextPtr context)
{
MutableColumnPtr all_thread_ids = ColumnUInt64::create();
std::filesystem::directory_iterator end;
/// There is no better way to enumerate threads in a process other than looking into procfs.
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
{
pid_t tid = parse<pid_t>(it->path().filename());
all_thread_ids->insert(tid);
}
Block block { ColumnWithTypeAndName(std::move(all_thread_ids), std::make_shared<DataTypeUInt64>(), "thread_id") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
return block.getByPosition(0).column;
}
using ThreadIdToName = std::unordered_map<UInt64, String, DefaultHash<UInt64>>;
ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const PaddedPODArray<UInt64> & thread_ids)
{
ThreadIdToName tid_to_name;
MutableColumnPtr all_thread_names = ColumnString::create();
for (UInt64 tid : thread_ids)
{
std::filesystem::path thread_name_path = fmt::format("/proc/self/task/{}/comm", tid);
String thread_name;
if (std::filesystem::exists(thread_name_path))
{
constexpr size_t comm_buf_size = 32; /// More than enough for thread name
ReadBufferFromFile comm(thread_name_path.string(), comm_buf_size);
readEscapedStringUntilEOL(thread_name, comm);
comm.close();
}
tid_to_name[tid] = thread_name;
all_thread_names->insert(thread_name);
}
Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared<DataTypeString>(), "thread_name") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
ColumnPtr thread_names = std::move(block.getByPosition(0).column);
std::unordered_set<String> filtered_thread_names;
for (size_t i = 0; i != thread_names->size(); ++i)
{
const auto & thread_name = thread_names->getDataAt(i);
filtered_thread_names.emplace(thread_name);
}
for (const auto & [tid, name] : tid_to_name)
{
if (!filtered_thread_names.contains(name))
tid_to_name.erase(tid);
}
return tid_to_name;
}
}
StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
: IStorageSystemOneBlock<StorageSystemStackTrace>(table_id_)
: IStorage(table_id_)
, log(&Poco::Logger::get("StorageSystemStackTrace"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(ColumnsDescription({
{ "thread_name", std::make_shared<DataTypeString>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) },
}, { /* aliases */ }));
setInMemoryMetadata(storage_metadata);
notification_pipe.open();
/// Setup signal handler.
@ -173,23 +249,40 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
}
NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
Pipe StorageSystemStackTrace::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
return
{
{ "thread_name", std::make_shared<DataTypeString>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
};
}
storage_snapshot->check(column_names);
void StorageSystemStackTrace::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
{
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
}
}
bool send_signal = names_set.contains("trace") || names_set.contains("query_id");
bool read_thread_names = names_set.contains("thread_name");
MutableColumns res_columns = sample_block.cloneEmptyColumns();
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
@ -197,71 +290,80 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, ContextPtr,
/// Obviously, results for different threads may be out of sync.
/// There is no better way to enumerate threads in a process other than looking into procfs.
ColumnPtr thread_ids = getFilteredThreadIds(query_info.query, context);
const auto & thread_ids_data = assert_cast<const ColumnUInt64 &>(*thread_ids).getData();
std::filesystem::directory_iterator end;
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
ThreadIdToName thread_names;
if (read_thread_names)
thread_names = getFilteredThreadNames(query_info.query, context, thread_ids_data);
for (UInt64 tid : thread_ids_data)
{
pid_t tid = parse<pid_t>(it->path().filename());
sigval sig_value{};
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(tid, sig, sig_value))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
std::filesystem::path thread_name_path = it->path();
thread_name_path.append("comm");
size_t res_index = 0;
String thread_name;
if (std::filesystem::exists(thread_name_path))
if (auto it = thread_names.find(tid); it != thread_names.end())
thread_name = it->second;
if (!send_signal)
{
constexpr size_t comm_buf_size = 32; /// More than enough for thread name
ReadBufferFromFile comm(thread_name_path.string(), comm_buf_size);
readEscapedStringUntilEOL(thread_name, comm);
comm.close();
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(100) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[0]->insert(thread_name);
res_columns[1]->insert(tid);
res_columns[2]->insertData(query_id_data, query_id_size);
res_columns[3]->insert(arr);
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
else
{
LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
sigval sig_value{};
/// Cannot obtain a stack trace. But create a record in result nevertheless.
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(tid, sig, sig_value))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
res_columns[0]->insert(thread_name);
res_columns[1]->insert(tid);
res_columns[2]->insertDefault();
res_columns[3]->insertDefault();
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (send_signal && wait(100) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertData(query_id_data, query_id_size);
res_columns[res_index++]->insert(arr);
}
else
{
LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
/// Signed integer overflow is undefined behavior in both C and C++. However, according to
/// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
/// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
/// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
++sequence_num;
}
/// Signed integer overflow is undefined behavior in both C and C++. However, according to
/// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
/// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
/// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
++sequence_num;
}
UInt64 num_rows = res_columns.at(0)->size();
Chunk chunk(std::move(res_columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
}
}

View File

@ -3,7 +3,7 @@
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include <mutex>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <Storages/IStorage.h>
namespace Poco
{
@ -19,20 +19,26 @@ class Context;
/// Allows to introspect stack trace of all server threads.
/// It acts like an embedded debugger.
/// More than one instance of this table cannot be used.
class StorageSystemStackTrace final : public IStorageSystemOneBlock<StorageSystemStackTrace>
class StorageSystemStackTrace final : public IStorage
{
public:
explicit StorageSystemStackTrace(const StorageID & table_id_);
String getName() const override { return "SystemStackTrace"; }
static NamesAndTypesList getNamesAndTypes();
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
bool isSystemStorage() const override { return true; }
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
mutable std::mutex mutex;
Poco::Logger * log;
};

View File

@ -1 +1,15 @@
-- { echo }
SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
1
-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
1
-- optimization for trace
SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
1
-- optimization for query_id
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;
1
-- optimization for thread_name
SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;
1

View File

@ -1,4 +1,12 @@
-- Tags: race
-- at least this query should be present
-- { echo }
SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
-- optimization for trace
SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
-- optimization for query_id
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;
-- optimization for thread_name
SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;