Add repeatable uniq ID for processor and step

This commit is contained in:
qhsong 2024-05-08 16:18:13 +08:00 committed by qhsong
parent bf2edb972e
commit 83b79fce83
14 changed files with 73 additions and 11 deletions

View File

@ -221,6 +221,16 @@ bool ThreadStatus::isQueryCanceled() const
return false;
}
size_t ThreadStatus::incrStepIndex()
{
return ++(*local_data.step_count);
}
size_t ThreadStatus::incrProcessorIndex()
{
return ++(*local_data.processor_count);
}
ThreadStatus::~ThreadStatus()
{
flushUntrackedMemory();

View File

@ -10,6 +10,7 @@
#include <boost/noncopyable.hpp>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
@ -90,6 +91,11 @@ public:
String query_for_logs;
UInt64 normalized_query_hash = 0;
//QueryPlan can not build parallel, but processor may build parallel in expand() function.
//so we use atomic_size_t for processor_count
std::shared_ptr<size_t> step_count = std::make_shared<size_t>(0);
std::shared_ptr<std::atomic_size_t> processor_count = std::make_shared<std::atomic_size_t>(0);
QueryIsCanceledPredicate query_is_canceled_predicate = {};
};
@ -309,6 +315,9 @@ public:
void initGlobalProfiler(UInt64 global_profiler_real_time_period, UInt64 global_profiler_cpu_time_period);
size_t incrStepIndex();
size_t incrProcessorIndex();
private:
void applyGlobalSettings();
void applyQuerySettings();

View File

@ -1336,7 +1336,9 @@ private:
std::shared_ptr<Clusters> getClustersImpl(std::lock_guard<std::mutex> & lock) const;
/// Throttling
public:
ThrottlerPtr getReplicatedFetchesThrottler() const;
ThrottlerPtr getReplicatedSendsThrottler() const;

View File

@ -42,6 +42,8 @@ ColumnsDescription ProcessorProfileLogElement::getColumnsDescription()
{"input_bytes", std::make_shared<DataTypeUInt64>(), "The number of bytes consumed by processor."},
{"output_rows", std::make_shared<DataTypeUInt64>(), "The number of rows generated by processor."},
{"output_bytes", std::make_shared<DataTypeUInt64>(), "The number of bytes generated by processor."},
{"processor_uniq_id", std::make_shared<DataTypeString>(), "The uniq processor id in pipeline."},
{"step_uniq_id", std::make_shared<DataTypeString>(), "The uniq step id in plan."},
};
}
@ -75,6 +77,8 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(input_bytes);
columns[i++]->insert(output_rows);
columns[i++]->insert(output_bytes);
columns[i++]->insert(processor_uniq_id);
columns[i++]->insert(step_uniq_id);
}

View File

@ -17,12 +17,14 @@ struct ProcessorProfileLogElement
UInt64 id{};
std::vector<UInt64> parent_ids;
UInt64 plan_step{};
UInt64 plan_step;
UInt64 plan_group{};
String initial_query_id;
String query_id;
String processor_name;
String processor_uniq_id;
String step_uniq_id;
/// Milliseconds spend in IProcessor::work()
UInt32 elapsed_us{};

View File

@ -460,6 +460,8 @@ void logQueryFinish(
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_uniq_id = processor->getUniqID();
processor_elem.step_uniq_id = processor->getStepUniqID();
processor_elem.processor_name = processor->getName();

View File

@ -79,7 +79,7 @@ bool ExecutionThreadContext::executeTask()
if (trace_processors)
{
span = std::make_unique<OpenTelemetry::SpanHolder>(node->processor->getName());
span = std::make_unique<OpenTelemetry::SpanHolder>(node->processor->getUniqID());
span->addAttribute("thread_number", thread_number);
}
std::optional<Stopwatch> execution_time_watch;

View File

@ -3,7 +3,9 @@
#include <memory>
#include <Processors/Port.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentThread.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Interpreters/Context.h>
class EventCounter;
@ -121,7 +123,10 @@ protected:
OutputPorts outputs;
public:
IProcessor() = default;
IProcessor()
{
setProcessorIndex();
}
IProcessor(InputPorts inputs_, OutputPorts outputs_)
: inputs(std::move(inputs_)), outputs(std::move(outputs_))
@ -130,9 +135,16 @@ public:
port.processor = this;
for (auto & port : outputs)
port.processor = this;
setProcessorIndex();
}
void setProcessorIndex()
{
processor_index = CurrentThread::get().incrProcessorIndex();
}
virtual String getName() const = 0;
String getUniqID() const { return fmt::format("{}_{}", getName(), processor_index); }
enum class Status
{
@ -300,11 +312,16 @@ public:
/// Step of QueryPlan from which processor was created.
void setQueryPlanStep(IQueryPlanStep * step, size_t group = 0)
{
query_plan_step = step;
if (step != nullptr)
{
query_plan_step = step;
step_uniq_id = step->getUniqID();
}
query_plan_step_group = group;
}
IQueryPlanStep * getQueryPlanStep() const { return query_plan_step; }
const String &getStepUniqID() const { return step_uniq_id; }
size_t getQueryPlanStepGroup() const { return query_plan_step_group; }
uint64_t getElapsedUs() const { return elapsed_us; }
@ -392,7 +409,10 @@ private:
size_t stream_number = NO_STREAM;
IQueryPlanStep * query_plan_step = nullptr;
String step_uniq_id;
size_t query_plan_step_group = 0;
size_t processor_index = 0;
};

View File

@ -2,6 +2,9 @@
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <fmt/core.h>
#include <Common/CurrentThread.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -71,6 +74,10 @@ using QueryPlanRawPtrs = std::list<QueryPlan *>;
class IQueryPlanStep
{
public:
IQueryPlanStep()
{
step_index = CurrentThread::get().incrStepIndex();
}
virtual ~IQueryPlanStep() = default;
virtual String getName() const = 0;
@ -138,7 +145,7 @@ public:
}
virtual bool canUpdateInputStream() const { return false; }
String getUniqID() const { return fmt::format("{}_{}", getName(), step_index); }
protected:
virtual void updateOutputStream() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); }
@ -153,6 +160,9 @@ protected:
Processors processors;
static void describePipeline(const Processors & processors, FormatSettings & settings);
private:
size_t step_index = 0;
};
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;

View File

@ -206,6 +206,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
static void explainStep(const IQueryPlanStep & step, JSONBuilder::JSONMap & map, const QueryPlan::ExplainPlanOptions & options)
{
map.add("Node Type", step.getName());
map.add("Node Id", step.getUniqID());
if (options.description)
{

View File

@ -400,10 +400,10 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
left->pipe.collected_processors = collected_processors;
/// Collect the NEW processors for the right pipeline.
QueryPipelineProcessorsCollector collector(*right);
/// Remember the last step of the right pipeline.
IQueryPlanStep * step = right->pipe.processors->back()->getQueryPlanStep();
/// Collect the NEW processors for the right pipeline.
QueryPipelineProcessorsCollector collector(*right, step);
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool default_totals = false;

View File

@ -113,7 +113,7 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool
if (item.first != nullptr)
{
out << " subgraph cluster_" << next_step << " {\n";
out << " label =\"" << item.first->getName() << "\";\n";
out << " label =\"" << item.first->getUniqID() << "\";\n";
out << " style=filled;\n";
out << " color=lightgrey;\n";
out << " node [style=filled,color=white];\n";
@ -125,7 +125,7 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool
for (const auto & node : item.second)
{
const auto & processor = node->agents.front();
out << " n" << node->id << " [label=\"" << processor->getName();
out << " n" << node->id << " [label=\"" << processor->getUniqID();
if (node->agents.size() > 1)
out << " × " << node->agents.size();

View File

@ -30,7 +30,7 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri
for (const auto & processor : processors)
{
const auto & description = processor->getDescription();
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << (description.empty() ? "" : ":") << description;
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getUniqID() << (description.empty() ? "" : ":") << description;
if (statuses_iter != statuses.end())
{

View File

@ -29,6 +29,7 @@
Granules: 2/3
-----------------
"Node Type": "ReadFromMergeTree",
"Node Id": "ReadFromMergeTree_0",
"Description": "default.test_index",
"Indexes": [
{
@ -126,6 +127,7 @@
Granules: 3/6
-----------------
"Node Type": "ReadFromMergeTree",
"Node Id": "ReadFromMergeTree_0",
"Description": "default.test_index",
"Indexes": [
{