From 4548957f5a73600d227dcc26926c7678dd284c5b Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 15 Aug 2024 15:07:07 +0000 Subject: [PATCH] Add default status to not-prepared processors. --- src/Processors/Executors/ExecutingGraph.cpp | 4 ++-- src/Processors/Executors/ExecutingGraph.h | 2 +- src/Processors/Executors/PipelineExecutor.cpp | 2 +- src/Processors/IProcessor.cpp | 7 +++++-- src/Processors/IProcessor.h | 2 +- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/Processors/Executors/ExecutingGraph.cpp b/src/Processors/Executors/ExecutingGraph.cpp index 6d5b60d8159..02ae8af5f52 100644 --- a/src/Processors/Executors/ExecutingGraph.cpp +++ b/src/Processors/Executors/ExecutingGraph.cpp @@ -279,7 +279,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue try { auto & processor = *node.processor; - IProcessor::Status last_status = node.last_processor_status; + const auto last_status = node.last_processor_status; IProcessor::Status status = processor.prepare(node.updated_input_ports, node.updated_output_ports); node.last_processor_status = status; @@ -319,7 +319,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue node.updated_input_ports.clear(); node.updated_output_ports.clear(); - switch (node.last_processor_status) + switch (*node.last_processor_status) { case IProcessor::Status::NeedData: case IProcessor::Status::PortFull: diff --git a/src/Processors/Executors/ExecutingGraph.h b/src/Processors/Executors/ExecutingGraph.h index 71dcd360a2c..6f91c120d47 100644 --- a/src/Processors/Executors/ExecutingGraph.h +++ b/src/Processors/Executors/ExecutingGraph.h @@ -92,7 +92,7 @@ public: std::exception_ptr exception; /// Last state for profiling. - IProcessor::Status last_processor_status = IProcessor::Status::NeedData; + std::optional last_processor_status; /// Ports which have changed their state since last processor->prepare() call. /// They changed when neighbour processors interact with connected ports. diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index 82cad471a29..d4630f21688 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -432,7 +432,7 @@ String PipelineExecutor::dumpPipeline() const } } - std::vector statuses; + std::vector> statuses; std::vector proc_list; statuses.reserve(graph->nodes.size()); proc_list.reserve(graph->nodes.size()); diff --git a/src/Processors/IProcessor.cpp b/src/Processors/IProcessor.cpp index edb4d662d8b..fc595a7b565 100644 --- a/src/Processors/IProcessor.cpp +++ b/src/Processors/IProcessor.cpp @@ -55,9 +55,12 @@ void IProcessor::dump() const } -std::string IProcessor::statusToName(Status status) +std::string IProcessor::statusToName(std::optional status) { - switch (status) + if (status == std::nullopt) + return "NotStarted"; + + switch (*status) { case Status::NeedData: return "NeedData"; diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index f1ce044d92f..02b8a3daa28 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -162,7 +162,7 @@ public: ExpandPipeline, }; - static std::string statusToName(Status status); + static std::string statusToName(std::optional status); /** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations. *