diff --git a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp index ee482d62f27..44967440957 100644 --- a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp +++ b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp @@ -17,7 +17,7 @@ static void checkProcessorHasSingleOutput(IProcessor * processor) /// Check tree invariants (described in TreeExecutor.h). /// Collect sources with progress. -static void validateTree(const Processors & processors, IProcessor * root, std::vector & sources) +static void validateTree(const Processors & processors, IProcessor * root, IProcessor * totals_root, std::vector & sources) { std::unordered_map index; @@ -34,6 +34,8 @@ static void validateTree(const Processors & processors, IProcessor * root, std:: std::stack stack; stack.push(root); + if (totals_root) + stack.push(totals_root); while (!stack.empty()) { @@ -81,18 +83,33 @@ void TreeExecutorBlockInputStream::init() throw Exception("No processors were passed to TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR); root = &output_port.getProcessor(); + IProcessor * totals_root = nullptr; - validateTree(processors, root, sources_with_progress); + if (totals_port) + totals_root = &totals_port->getProcessor(); + + validateTree(processors, root, totals_root, sources_with_progress); input_port = std::make_unique(getHeader(), root); connect(output_port, *input_port); input_port->setNeeded(); + + if (totals_port) + { + input_totals_port = std::make_unique(totals_port->getHeader(), root); + connect(*totals_port, *input_totals_port); + input_totals_port->setNeeded(); + } } -void TreeExecutorBlockInputStream::execute() +void TreeExecutorBlockInputStream::execute(bool on_totals) { std::stack stack; - stack.push(root); + + if (on_totals) + stack.push(&totals_port->getProcessor()); + else + stack.push(root); auto prepare_processor = [](IProcessor * processor) { @@ -141,10 +158,6 @@ void TreeExecutorBlockInputStream::execute() break; } case IProcessor::Status::PortFull: - { - stack.pop(); - break; - } case IProcessor::Status::Finished: { stack.pop(); @@ -178,12 +191,21 @@ Block TreeExecutorBlockInputStream::readImpl() while (true) { if (input_port->isFinished()) + { + if (totals_port && !input_totals_port->isFinished()) + { + execute(true); + if (input_totals_port->hasData()) + totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns()); + } + return {}; + } if (input_port->hasData()) return getHeader().cloneWithColumns(input_port->pull().detachColumns()); - execute(); + execute(false); } } diff --git a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h index 8787d3090c1..36f065bae45 100644 --- a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h +++ b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h @@ -29,6 +29,7 @@ public: for (auto & context : pipe.getContexts()) interpreter_context.emplace_back(context); + totals_port = pipe.getTotalsPort(); processors = std::move(pipe).detachProcessors(); init(); } @@ -49,9 +50,11 @@ protected: private: OutputPort & output_port; + OutputPort * totals_port = nullptr; Processors processors; IProcessor * root = nullptr; std::unique_ptr input_port; + std::unique_ptr input_totals_port; /// Remember sources that support progress. std::vector sources_with_progress; @@ -60,7 +63,7 @@ private: void init(); /// Execute tree step-by-step until root returns next chunk or execution is finished. - void execute(); + void execute(bool on_totals); /// Moved from pipe. std::vector> interpreter_context;