Merge pull request #63518 from qhsong/dev/profile_uniq_id

Enhance ClickHouse Profile: generate a uniq id for steps and processors
This commit is contained in:
Max Kainov 2024-11-15 12:19:05 +00:00 committed by GitHub
commit f29d7841fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 276 additions and 53 deletions

View File

@ -161,6 +161,8 @@ Settings:
- `actions` — Prints detailed information about step actions. Default: 0.
- `json` — Prints query plan steps as a row in [JSON](../../interfaces/formats.md#json) format. Default: 0. It is recommended to use [TSVRaw](../../interfaces/formats.md#tabseparatedraw) format to avoid unnecessary escaping.
When `json=1` step names will contain an additional suffix with unique step identifier.
Example:
```sql
@ -194,30 +196,25 @@ EXPLAIN json = 1, description = 0 SELECT 1 UNION ALL SELECT 2 FORMAT TSVRaw;
{
"Plan": {
"Node Type": "Union",
"Node Id": "Union_10",
"Plans": [
{
"Node Type": "Expression",
"Node Id": "Expression_13",
"Plans": [
{
"Node Type": "SettingQuotaAndLimits",
"Plans": [
{
"Node Type": "ReadFromStorage"
}
]
"Node Type": "ReadFromStorage",
"Node Id": "ReadFromStorage_0"
}
]
},
{
"Node Type": "Expression",
"Node Id": "Expression_16",
"Plans": [
{
"Node Type": "SettingQuotaAndLimits",
"Plans": [
{
"Node Type": "ReadFromStorage"
}
]
"Node Type": "ReadFromStorage",
"Node Id": "ReadFromStorage_4"
}
]
}
@ -249,6 +246,7 @@ EXPLAIN json = 1, description = 0, header = 1 SELECT 1, 2 + dummy;
{
"Plan": {
"Node Type": "Expression",
"Node Id": "Expression_5",
"Header": [
{
"Name": "1",
@ -261,23 +259,13 @@ EXPLAIN json = 1, description = 0, header = 1 SELECT 1, 2 + dummy;
],
"Plans": [
{
"Node Type": "SettingQuotaAndLimits",
"Node Type": "ReadFromStorage",
"Node Id": "ReadFromStorage_0",
"Header": [
{
"Name": "dummy",
"Type": "UInt8"
}
],
"Plans": [
{
"Node Type": "ReadFromStorage",
"Header": [
{
"Name": "dummy",
"Type": "UInt8"
}
]
}
]
}
]
@ -351,17 +339,31 @@ EXPLAIN json = 1, actions = 1, description = 0 SELECT 1 FORMAT TSVRaw;
{
"Plan": {
"Node Type": "Expression",
"Node Id": "Expression_5",
"Expression": {
"Inputs": [],
"Inputs": [
{
"Name": "dummy",
"Type": "UInt8"
}
],
"Actions": [
{
"Node Type": "Column",
"Node Type": "INPUT",
"Result Type": "UInt8",
"Result Type": "Column",
"Result Name": "dummy",
"Arguments": [0],
"Removed Arguments": [0],
"Result": 0
},
{
"Node Type": "COLUMN",
"Result Type": "UInt8",
"Result Name": "1",
"Column": "Const(UInt8)",
"Arguments": [],
"Removed Arguments": [],
"Result": 0
"Result": 1
}
],
"Outputs": [
@ -370,17 +372,12 @@ EXPLAIN json = 1, actions = 1, description = 0 SELECT 1 FORMAT TSVRaw;
"Type": "UInt8"
}
],
"Positions": [0],
"Project Input": true
"Positions": [1]
},
"Plans": [
{
"Node Type": "SettingQuotaAndLimits",
"Plans": [
{
"Node Type": "ReadFromStorage"
}
]
"Node Type": "ReadFromStorage",
"Node Id": "ReadFromStorage_0"
}
]
}
@ -396,6 +393,8 @@ Settings:
- `graph` — Prints a graph described in the [DOT](https://en.wikipedia.org/wiki/DOT_(graph_description_language)) graph description language. Default: 0.
- `compact` — Prints graph in compact mode if `graph` setting is enabled. Default: 1.
When `compact=0` and `graph=1` processor names will contain an additional suffix with unique processor identifier.
Example:
```sql

View File

@ -204,6 +204,16 @@ bool ThreadStatus::isQueryCanceled() const
return false;
}
size_t ThreadStatus::getNextPlanStepIndex() const
{
return local_data.plan_step_index->fetch_add(1);
}
size_t ThreadStatus::getNextPipelineProcessorIndex() const
{
return local_data.pipeline_processor_index->fetch_add(1);
}
ThreadStatus::~ThreadStatus()
{
flushUntrackedMemory();

View File

@ -11,6 +11,7 @@
#include <boost/noncopyable.hpp>
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
@ -90,6 +91,11 @@ public:
String query_for_logs;
UInt64 normalized_query_hash = 0;
// Since processors might be added on the fly within expand() function we use atomic_size_t.
// These two fields are used for EXPLAIN PLAN / PIPELINE.
std::shared_ptr<std::atomic_size_t> plan_step_index = std::make_shared<std::atomic_size_t>(0);
std::shared_ptr<std::atomic_size_t> pipeline_processor_index = std::make_shared<std::atomic_size_t>(0);
QueryIsCanceledPredicate query_is_canceled_predicate = {};
};
@ -313,6 +319,9 @@ public:
void initGlobalProfiler(UInt64 global_profiler_real_time_period, UInt64 global_profiler_cpu_time_period);
size_t getNextPlanStepIndex() const;
size_t getNextPipelineProcessorIndex() const;
private:
void applyGlobalSettings();
void applyQuerySettings();

View File

@ -48,6 +48,8 @@ ColumnsDescription ProcessorProfileLogElement::getColumnsDescription()
{"input_bytes", std::make_shared<DataTypeUInt64>(), "The number of bytes consumed by processor."},
{"output_rows", std::make_shared<DataTypeUInt64>(), "The number of rows generated by processor."},
{"output_bytes", std::make_shared<DataTypeUInt64>(), "The number of bytes generated by processor."},
{"processor_uniq_id", std::make_shared<DataTypeString>(), "The uniq processor id in pipeline."},
{"step_uniq_id", std::make_shared<DataTypeString>(), "The uniq step id in plan."},
};
}
@ -83,6 +85,8 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(input_bytes);
columns[i++]->insert(output_rows);
columns[i++]->insert(output_bytes);
columns[i++]->insert(processor_uniq_id);
columns[i++]->insert(step_uniq_id);
}
void logProcessorProfile(ContextPtr context, const Processors & processors)
@ -120,6 +124,8 @@ void logProcessorProfile(ContextPtr context, const Processors & processors)
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_uniq_id = processor->getUniqID();
processor_elem.step_uniq_id = processor->getStepUniqID();
processor_elem.processor_name = processor->getName();

View File

@ -17,7 +17,7 @@ struct ProcessorProfileLogElement
UInt64 id{};
std::vector<UInt64> parent_ids;
UInt64 plan_step{};
UInt64 plan_step;
UInt64 plan_group{};
String plan_step_name;
String plan_step_description;
@ -25,6 +25,8 @@ struct ProcessorProfileLogElement
String initial_query_id;
String query_id;
String processor_name;
String processor_uniq_id;
String step_uniq_id;
/// Milliseconds spend in IProcessor::work()
UInt64 elapsed_us{};

View File

@ -79,7 +79,7 @@ bool ExecutionThreadContext::executeTask()
if (trace_processors)
{
span = std::make_unique<OpenTelemetry::SpanHolder>(node->processor->getName());
span = std::make_unique<OpenTelemetry::SpanHolder>(node->processor->getUniqID());
span->addAttribute("thread_number", thread_number);
}
std::optional<Stopwatch> execution_time_watch;

View File

@ -10,6 +10,20 @@
namespace DB
{
IProcessor::IProcessor()
{
processor_index = CurrentThread::isInitialized() ? CurrentThread::get().getNextPipelineProcessorIndex() : 0;
}
IProcessor::IProcessor(InputPorts inputs_, OutputPorts outputs_) : inputs(std::move(inputs_)), outputs(std::move(outputs_))
{
for (auto & port : inputs)
port.processor = this;
for (auto & port : outputs)
port.processor = this;
processor_index = CurrentThread::isInitialized() ? CurrentThread::get().getNextPipelineProcessorIndex() : 0;
}
void IProcessor::setQueryPlanStep(IQueryPlanStep * step, size_t group)
{
query_plan_step = step;
@ -18,6 +32,7 @@ void IProcessor::setQueryPlanStep(IQueryPlanStep * step, size_t group)
{
plan_step_name = step->getName();
plan_step_description = step->getStepDescription();
step_uniq_id = step->getUniqID();
}
}

View File

@ -1,9 +1,12 @@
#pragma once
#include <memory>
#include <Interpreters/Context.h>
#include <Processors/Port.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Common/CurrentThread.h>
#include <Common/Stopwatch.h>
#include <memory>
class EventCounter;
@ -121,19 +124,14 @@ protected:
OutputPorts outputs;
public:
IProcessor() = default;
IProcessor();
IProcessor(InputPorts inputs_, OutputPorts outputs_)
: inputs(std::move(inputs_)), outputs(std::move(outputs_))
{
for (auto & port : inputs)
port.processor = this;
for (auto & port : outputs)
port.processor = this;
}
IProcessor(InputPorts inputs_, OutputPorts outputs_);
virtual String getName() const = 0;
String getUniqID() const { return fmt::format("{}_{}", getName(), processor_index); }
enum class Status : uint8_t
{
/// Processor needs some data at its inputs to proceed.
@ -314,6 +312,7 @@ public:
void setQueryPlanStep(IQueryPlanStep * step, size_t group = 0);
IQueryPlanStep * getQueryPlanStep() const { return query_plan_step; }
const String & getStepUniqID() const { return step_uniq_id; }
size_t getQueryPlanStepGroup() const { return query_plan_step_group; }
const String & getPlanStepName() const { return plan_step_name; }
const String & getPlanStepDescription() const { return plan_step_description; }
@ -407,7 +406,10 @@ private:
size_t stream_number = NO_STREAM;
IQueryPlanStep * query_plan_step = nullptr;
String step_uniq_id;
size_t query_plan_step_group = 0;
size_t processor_index = 0;
String plan_step_name;
String plan_step_description;
};

View File

@ -10,6 +10,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
IQueryPlanStep::IQueryPlanStep()
{
step_index = CurrentThread::isInitialized() ? CurrentThread::get().getNextPlanStepIndex() : 0;
}
void IQueryPlanStep::updateInputHeaders(Headers input_headers_)
{
input_headers = std::move(input_headers_);

View File

@ -1,8 +1,13 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <fmt/core.h>
namespace DB
{
@ -26,6 +31,8 @@ using Headers = std::vector<Header>;
class IQueryPlanStep
{
public:
IQueryPlanStep();
virtual ~IQueryPlanStep() = default;
virtual String getName() const = 0;
@ -77,6 +84,8 @@ public:
/// Updates the input streams of the given step. Used during query plan optimizations.
/// It won't do any validation of new streams, so it is your responsibility to ensure that this update doesn't break anything
String getUniqID() const { return fmt::format("{}_{}", getName(), step_index); }
/// (e.g. you correctly remove / add columns).
void updateInputHeaders(Headers input_headers_);
void updateInputHeader(Header input_header, size_t idx = 0);
@ -95,6 +104,9 @@ protected:
Processors processors;
static void describePipeline(const Processors & processors, FormatSettings & settings);
private:
size_t step_index = 0;
};
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;

View File

@ -207,6 +207,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
static void explainStep(const IQueryPlanStep & step, JSONBuilder::JSONMap & map, const QueryPlan::ExplainPlanOptions & options)
{
map.add("Node Type", step.getName());
map.add("Node Id", step.getUniqID());
if (options.description)
{

View File

@ -398,10 +398,10 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
left->pipe.collected_processors = collected_processors;
/// Collect the NEW processors for the right pipeline.
QueryPipelineProcessorsCollector collector(*right);
/// Remember the last step of the right pipeline.
IQueryPlanStep * step = right->pipe.processors->back()->getQueryPlanStep();
/// Collect the NEW processors for the right pipeline.
QueryPipelineProcessorsCollector collector(*right, step);
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool default_totals = false;

View File

@ -30,7 +30,7 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri
for (const auto & processor : processors)
{
const auto & description = processor->getDescription();
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << (description.empty() ? "" : ":") << description;
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getUniqID() << (description.empty() ? "" : ":") << description;
if (statuses_iter != statuses.end())
{

View File

@ -29,6 +29,7 @@
Granules: 2/3
-----------------
"Node Type": "ReadFromMergeTree",
"Node Id": "ReadFromMergeTree_0",
"Description": "default.test_index",
"Indexes": [
{
@ -132,6 +133,7 @@
Granules: 3/6
-----------------
"Node Type": "ReadFromMergeTree",
"Node Id": "ReadFromMergeTree_0",
"Description": "default.test_index",
"Indexes": [
{

View File

@ -2,20 +2,25 @@
{
"Plan": {
"Node Type": "Union",
"Node Id": "Union_10",
"Plans": [
{
"Node Type": "Expression",
"Node Id": "Expression_13",
"Plans": [
{
"Node Type": "ReadFromStorage"
"Node Type": "ReadFromStorage",
"Node Id": "ReadFromStorage_0"
}
]
},
{
"Node Type": "Expression",
"Node Id": "Expression_16",
"Plans": [
{
"Node Type": "ReadFromStorage"
"Node Type": "ReadFromStorage",
"Node Id": "ReadFromStorage_4"
}
]
}
@ -35,6 +40,7 @@
}
--------
"Node Type": "Aggregating",
"Node Id": "Aggregating_3",
"Header": [
{
"Name": "__table1.number",
@ -73,13 +79,16 @@
],
--------
"Node Type": "ArrayJoin",
"Node Id": "ArrayJoin_4",
"Left": false,
"Columns": ["__table1.x", "__table1.y"],
--------
"Node Type": "Distinct",
"Node Id": "Distinct_4",
"Columns": ["intDiv(__table1.number, 2_UInt8)", "intDiv(__table1.number, 3_UInt8)"],
--
"Node Type": "Distinct",
"Node Id": "Distinct_3",
"Columns": ["intDiv(__table1.number, 2_UInt8)", "intDiv(__table1.number, 3_UInt8)"],
--------
"Sort Description": [

View File

@ -0,0 +1,91 @@
[\n {\n "Plan": {\n "Node Type": "CreatingSets",\n "Node Id": "CreatingSets_22",\n "Description": "Create sets before main query execution",\n "Plans": [\n {\n "Node Type": "Expression",\n "Node Id": "Expression_18",\n "Description": "(Project names + (Before ORDER BY + Projection) [lifted up part])",\n "Plans": [\n {\n "Node Type": "Sorting",\n "Node Id": "Sorting_7",\n "Description": "Sorting for ORDER BY",\n "Plans": [\n {\n "Node Type": "Expression",\n "Node Id": "Expression_16",\n "Description": "(Before ORDER BY + Projection)",\n "Plans": [\n {\n "Node Type": "Aggregating",\n "Node Id": "Aggregating_4",\n "Plans": [\n {\n "Node Type": "Expression",\n "Node Id": "Expression_3",\n "Description": "Before GROUP BY",\n "Plans": [\n {\n "Node Type": "Filter",\n "Node Id": "Filter_14",\n "Description": "(WHERE + Change column names to column identifiers)",\n "Plans": [\n {\n "Node Type": "ReadFromMergeTree",\n "Node Id": "ReadFromMergeTree_0",\n "Description": "default.t"\n }\n ]\n }\n ]\n }\n ]\n }\n ]\n }\n ]\n }\n ]\n }\n ]\n }\n }\n]
digraph
{
rankdir="LR";
{ node [shape = rect]
n0[label="MergeTreeSelect_5"];
n1[label="FilterTransform_6"];
n2[label="ExpressionTransform_7"];
n3[label="AggregatingTransform_8"];
n4[label="Resize_9"];
n5[label="ExpressionTransform_10"];
n6[label="ExpressionTransform_11"];
n7[label="ExpressionTransform_12"];
n8[label="ExpressionTransform_13"];
n9[label="PartialSortingTransform_14"];
n10[label="PartialSortingTransform_15"];
n11[label="PartialSortingTransform_16"];
n12[label="PartialSortingTransform_17"];
n13[label="LimitsCheckingTransform_18"];
n14[label="LimitsCheckingTransform_19"];
n15[label="LimitsCheckingTransform_20"];
n16[label="LimitsCheckingTransform_21"];
n17[label="MergeSortingTransform_22"];
n18[label="MergeSortingTransform_23"];
n19[label="MergeSortingTransform_24"];
n20[label="MergeSortingTransform_25"];
n21[label="MergingSortedTransform_26"];
n22[label="ExpressionTransform_27"];
}
n0 -> n1;
n1 -> n2;
n2 -> n3;
n3 -> n4;
n4 -> n5;
n4 -> n6;
n4 -> n7;
n4 -> n8;
n5 -> n9;
n6 -> n10;
n7 -> n11;
n8 -> n12;
n9 -> n13;
n10 -> n14;
n11 -> n15;
n12 -> n16;
n13 -> n17;
n14 -> n18;
n15 -> n19;
n16 -> n20;
n17 -> n21;
n18 -> n21;
n19 -> n21;
n20 -> n21;
n21 -> n22;
}
('AggregatingTransform_8','Aggregating_4')
('ConvertingAggregatedToChunksTransform_32','')
('CreatingSetsTransform_2','CreatingSet_19')
('EmptySink_3','')
('ExpressionTransform_1','Expression_21')
('ExpressionTransform_10','Expression_16')
('ExpressionTransform_11','Expression_16')
('ExpressionTransform_12','Expression_16')
('ExpressionTransform_13','Expression_16')
('ExpressionTransform_27','Expression_18')
('ExpressionTransform_7','Expression_3')
('FilterTransform_6','Filter_14')
('LazyOutputFormat_29','')
('LimitsCheckingTransform_18','Sorting_7')
('LimitsCheckingTransform_19','Sorting_7')
('LimitsCheckingTransform_20','Sorting_7')
('LimitsCheckingTransform_21','Sorting_7')
('LimitsCheckingTransform_28','')
('MergeSortingTransform_22','Sorting_7')
('MergeSortingTransform_23','Sorting_7')
('MergeSortingTransform_24','Sorting_7')
('MergeSortingTransform_25','Sorting_7')
('MergeTreeSelect_5','ReadFromMergeTree_0')
('MergingSortedTransform_26','Sorting_7')
('NullSource_30','')
('NullSource_31','')
('NumbersRange_0','ReadFromSystemNumbers_9')
('PartialSortingTransform_14','Sorting_7')
('PartialSortingTransform_15','Sorting_7')
('PartialSortingTransform_16','Sorting_7')
('PartialSortingTransform_17','Sorting_7')
('Resize_9','Aggregating_4')

View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-random-merge-tree-settings
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
opts=(
--enable_analyzer=1
--max_threads=4
)
$CLICKHOUSE_CLIENT -q "
CREATE TABLE t
(
a UInt32
)
ENGINE = MergeTree
ORDER BY a;
INSERT INTO t SELECT number FROM numbers_mt(1e6);
OPTIMIZE TABLE t FINAL;
"
query="
WITH t0 AS
(
SELECT *
FROM numbers(1000)
)
SELECT a * 3
FROM t
WHERE a IN (t0)
GROUP BY a
ORDER BY a
"
$CLICKHOUSE_CLIENT "${opts[@]}" -q "EXPLAIN json=1 $query"
printf "\n\n"
$CLICKHOUSE_CLIENT "${opts[@]}" -q "SELECT replaceRegexpAll(explain, '(\w+)\(.*\)', '\\1') FROM (EXPLAIN PIPELINE compact=0,graph=1 $query)"
printf "\n\n"
query_id="03269_explain_unique_ids_$RANDOM$RANDOM"
$CLICKHOUSE_CLIENT "${opts[@]}" --log_processors_profiles=1 --query_id="$query_id" --format Null -q "$query"
$CLICKHOUSE_CLIENT -q "
SYSTEM FLUSH LOGS;
SELECT DISTINCT (replaceRegexpAll(processor_uniq_id, '(\w+)\(.*\)', '\\1'), step_uniq_id)
FROM system.processors_profile_log
WHERE query_id = '$query_id'
ORDER BY ALL;
"