Measure time that Process spent in work/NeedData/PortFull

Note, that right now it is done not in IProcessor, but in
ExecutingGraph/ExecutionThreadContext, to avoid lots of changes in the
IProcessor interface, to make review easier, but I'm not against of
change the IProcessor interface to incapsulate it there.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-02-06 14:44:34 +03:00
parent 14538f6456
commit 5fd402eaba
3 changed files with 46 additions and 1 deletions

View File

@ -263,7 +263,30 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
try try
{ {
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports); auto & processor = *node.processor;
IProcessor::Status last_status = node.last_processor_status;
IProcessor::Status status = processor.prepare(node.updated_input_ports, node.updated_output_ports);
node.last_processor_status = status;
/// NeedData
if (last_status != IProcessor::Status::NeedData && status == IProcessor::Status::NeedData)
{
processor.need_data_watch.restart();
}
else if (last_status == IProcessor::Status::NeedData && status != IProcessor::Status::NeedData)
{
processor.need_data_elapsed_us += processor.need_data_watch.elapsedMicroseconds();
}
/// PortFull
if (last_status != IProcessor::Status::PortFull && status == IProcessor::Status::PortFull)
{
processor.port_full_watch.restart();
}
else if (last_status == IProcessor::Status::PortFull && status != IProcessor::Status::PortFull)
{
processor.port_full_elapsed_us += processor.port_full_watch.elapsedMicroseconds();
}
} }
catch (...) catch (...)
{ {

View File

@ -58,6 +58,7 @@ bool ExecutionThreadContext::executeTask()
Stopwatch execution_time_watch; Stopwatch execution_time_watch;
#endif #endif
Stopwatch watch;
try try
{ {
executeJob(node->processor); executeJob(node->processor);
@ -68,6 +69,7 @@ bool ExecutionThreadContext::executeTask()
{ {
node->exception = std::current_exception(); node->exception = std::current_exception();
} }
node->processor->elapsed_us += watch.elapsedMicroseconds();
#ifndef NDEBUG #ifndef NDEBUG
execution_time_ns += execution_time_watch.elapsed(); execution_time_ns += execution_time_watch.elapsed();

View File

@ -2,6 +2,7 @@
#include <memory> #include <memory>
#include <Processors/Port.h> #include <Processors/Port.h>
#include <Common/Stopwatch.h>
class EventCounter; class EventCounter;
@ -299,14 +300,33 @@ public:
IQueryPlanStep * getQueryPlanStep() const { return query_plan_step; } IQueryPlanStep * getQueryPlanStep() const { return query_plan_step; }
size_t getQueryPlanStepGroup() const { return query_plan_step_group; } size_t getQueryPlanStepGroup() const { return query_plan_step_group; }
uint64_t getElapsedUs() const { return elapsed_us; }
uint64_t getNeedDataElapsedUs() const { return need_data_elapsed_us; }
uint64_t getPortFullElapsedUs() const { return port_full_elapsed_us; }
protected: protected:
virtual void onCancel() {} virtual void onCancel() {}
private: private:
/// For:
/// - elapsed_us
friend class ExecutionThreadContext;
/// For
/// - need_data_elapsed_us
/// - port_full_elapsed_us
friend class ExecutingGraph;
std::atomic<bool> is_cancelled{false}; std::atomic<bool> is_cancelled{false};
std::string processor_description; std::string processor_description;
/// For processors_profile_log
uint64_t elapsed_us = 0;
Stopwatch need_data_watch;
uint64_t need_data_elapsed_us = 0;
Stopwatch port_full_watch;
uint64_t port_full_elapsed_us = 0;
size_t stream_number = NO_STREAM; size_t stream_number = NO_STREAM;
IQueryPlanStep * query_plan_step = nullptr; IQueryPlanStep * query_plan_step = nullptr;