From a57f5a0983e0d0d7c31196b53aad05f0c8a2a653 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 8 Dec 2023 16:15:33 +0100 Subject: [PATCH 1/2] Cover optimizations for thread_id/thread_name in system.stack_trace separately Signed-off-by: Azat Khuzhin --- ..._system_stacktrace_optimizations.reference | 5 +++++ .../02940_system_stacktrace_optimizations.sh | 20 +++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 tests/queries/0_stateless/02940_system_stacktrace_optimizations.reference create mode 100755 tests/queries/0_stateless/02940_system_stacktrace_optimizations.sh diff --git a/tests/queries/0_stateless/02940_system_stacktrace_optimizations.reference b/tests/queries/0_stateless/02940_system_stacktrace_optimizations.reference new file mode 100644 index 00000000000..f08b8ee767b --- /dev/null +++ b/tests/queries/0_stateless/02940_system_stacktrace_optimizations.reference @@ -0,0 +1,5 @@ +thread = 0 +thread != 0 +Send signal to +thread_name = 'foo' +Send signal to 0 threads (total) diff --git a/tests/queries/0_stateless/02940_system_stacktrace_optimizations.sh b/tests/queries/0_stateless/02940_system_stacktrace_optimizations.sh new file mode 100755 index 00000000000..0e23bb6c42b --- /dev/null +++ b/tests/queries/0_stateless/02940_system_stacktrace_optimizations.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# Tags: no-parallel + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +# NOTE: due to grep "Cannot obtain a stack trace for thread {}' will be ignored automatically, which is the intention. + +# no message at all +echo "thread = 0" +$CLICKHOUSE_CLIENT --allow_repeated_settings --send_logs_level=test -nm -q "select * from system.stack_trace where thread_id = 0" |& grep -F -o 'Send signal to' + +# send messages to some threads +echo "thread != 0" +$CLICKHOUSE_CLIENT --allow_repeated_settings --send_logs_level=test -nm -q "select * from system.stack_trace where thread_id != 0 format Null" |& grep -F -o 'Send signal to' | grep -v 'Send signal to 0 threads (total)' + +# there is no thread with comm="foo", so no signals will be sent +echo "thread_name = 'foo'" +$CLICKHOUSE_CLIENT --allow_repeated_settings --send_logs_level=test -nm -q "select * from system.stack_trace where thread_name = 'foo' format Null" |& grep -F -o 'Send signal to 0 threads (total)' From f876bea0500e3d150609b2cd79878dd04d36b2dd Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 8 Dec 2023 14:58:44 +0100 Subject: [PATCH 2/2] Add support for system.stack_trace filtering optimizations for analyzer Signed-off-by: Azat Khuzhin --- .../System/StorageSystemStackTrace.cpp | 84 ++++++++++++++++--- src/Storages/System/StorageSystemStackTrace.h | 7 +- 2 files changed, 75 insertions(+), 16 deletions(-) diff --git a/src/Storages/System/StorageSystemStackTrace.cpp b/src/Storages/System/StorageSystemStackTrace.cpp index 477b784952e..efd425faa97 100644 --- a/src/Storages/System/StorageSystemStackTrace.cpp +++ b/src/Storages/System/StorageSystemStackTrace.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -24,9 +25,14 @@ #include #include #include +#include #include +#include #include +#include +#include #include +#include #include @@ -161,7 +167,7 @@ bool wait(int timeout_ms) } using ThreadIdToName = std::unordered_map>; -ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const PaddedPODArray & thread_ids, Poco::Logger * log) +ThreadIdToName getFilteredThreadNames(const ActionsDAG::Node * predicate, ContextPtr context, const PaddedPODArray & thread_ids, Poco::Logger * log) { ThreadIdToName tid_to_name; MutableColumnPtr all_thread_names = ColumnString::create(); @@ -192,7 +198,7 @@ ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const Pa LOG_TEST(log, "Read {} thread names for {} threads, took {} ms", tid_to_name.size(), thread_ids.size(), watch.elapsedMilliseconds()); Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared(), "thread_name") }; - VirtualColumnUtils::filterBlockWithQuery(query, block, context); + VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context); ColumnPtr thread_names = std::move(block.getByPosition(0).column); std::unordered_set filtered_thread_names; @@ -217,14 +223,16 @@ ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const Pa /// We must wait for every thread one by one sequentially, /// because there is a limit on number of queued signals in OS and otherwise signals may get lost. /// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals). -class StorageSystemStackTraceSource : public ISource +class StackTraceSource : public ISource { public: - StorageSystemStackTraceSource(const Names & column_names, Block header_, const ASTPtr query_, ContextPtr context_, UInt64 max_block_size_, Poco::Logger * log_) + StackTraceSource(const Names & column_names, Block header_, ASTPtr && query_, ActionsDAGPtr && filter_dag_, ContextPtr context_, UInt64 max_block_size_, Poco::Logger * log_) : ISource(header_) , context(context_) , header(std::move(header_)) - , query(query_) + , query(std::move(query_)) + , filter_dag(std::move(filter_dag_)) + , predicate(filter_dag ? filter_dag->getOutputs().at(0) : nullptr) , max_block_size(max_block_size_) , pipe_read_timeout_ms(static_cast(context->getSettingsRef().storage_system_stack_trace_pipe_read_timeout_ms.totalMilliseconds())) , log(log_) @@ -259,7 +267,7 @@ protected: ThreadIdToName thread_names; if (read_thread_names) - thread_names = getFilteredThreadNames(query, context, thread_ids_data, log); + thread_names = getFilteredThreadNames(predicate, context, thread_ids_data, log); for (UInt64 tid : thread_ids_data) { @@ -343,6 +351,8 @@ private: ContextPtr context; Block header; const ASTPtr query; + const ActionsDAGPtr filter_dag; + const ActionsDAG::Node * predicate; const size_t max_block_size; const int pipe_read_timeout_ms; @@ -372,11 +382,55 @@ private: } Block block { ColumnWithTypeAndName(std::move(all_thread_ids), std::make_shared(), "thread_id") }; - VirtualColumnUtils::filterBlockWithQuery(query, block, context); + VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context); + return block.getByPosition(0).column; } }; +class ReadFromSystemStackTrace : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromSystemStackTrace"; } + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override + { + auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context); + Pipe pipe(std::make_shared( + column_names, + getOutputStream().header, + std::move(query), + std::move(filter_actions_dag), + context, + max_block_size, + log)); + pipeline.init(std::move(pipe)); + } + + ReadFromSystemStackTrace( + const Names & column_names_, + Block sample_block, + ASTPtr && query_, + ContextPtr context_, + size_t max_block_size_, + Poco::Logger * log_) + : SourceStepWithFilter(DataStream{.header = std::move(sample_block)}) + , column_names(column_names_) + , query(query_) + , context(std::move(context_)) + , max_block_size(max_block_size_) + , log(log_) + { + } + +private: + Names column_names; + ASTPtr query; + ContextPtr context; + size_t max_block_size; + Poco::Logger * log; +}; + } @@ -412,23 +466,27 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_) } -Pipe StorageSystemStackTrace::read( +void StorageSystemStackTrace::read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum /*processed_stage*/, - const size_t max_block_size, - const size_t /*num_streams*/) + size_t max_block_size, + size_t /*num_streams*/) { storage_snapshot->check(column_names); - return Pipe(std::make_shared( + Block sample_block = storage_snapshot->metadata->getSampleBlock(); + + auto reading = std::make_unique( column_names, - storage_snapshot->metadata->getSampleBlock(), + sample_block, query_info.query->clone(), context, max_block_size, - log)); + log); + query_plan.addStep(std::move(reading)); } } diff --git a/src/Storages/System/StorageSystemStackTrace.h b/src/Storages/System/StorageSystemStackTrace.h index 9f15499ce90..18216cea1bd 100644 --- a/src/Storages/System/StorageSystemStackTrace.h +++ b/src/Storages/System/StorageSystemStackTrace.h @@ -25,14 +25,15 @@ public: String getName() const override { return "SystemStackTrace"; } - Pipe read( + void read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, - QueryProcessingStage::Enum processed_stage, + QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, - size_t num_streams) override; + size_t /*num_streams*/) override; bool isSystemStorage() const override { return true; }