From 9a9bedc8ccd7f1ded6c1a237b7452481887651f8 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 13 Apr 2020 12:02:50 +0300 Subject: [PATCH] Fix test for streams. --- .../Executors/TreeExecutorBlockInputStream.cpp | 14 ++++++++++++-- .../Executors/TreeExecutorBlockInputStream.h | 2 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/Processors/Executors/TreeExecutorBlockInputStream.cpp b/src/Processors/Executors/TreeExecutorBlockInputStream.cpp index 84fd97f4781..f797fee3ab5 100644 --- a/src/Processors/Executors/TreeExecutorBlockInputStream.cpp +++ b/src/Processors/Executors/TreeExecutorBlockInputStream.cpp @@ -164,7 +164,7 @@ void TreeExecutorBlockInputStream::execute(bool on_totals, bool on_extremes) } }; - while (!stack.empty()) + while (!stack.empty() && !is_cancelled) { IProcessor * node = stack.top(); @@ -295,7 +295,7 @@ void TreeExecutorBlockInputStream::initRowsBeforeLimit() Block TreeExecutorBlockInputStream::readImpl() { - while (true) + while (!is_cancelled) { if (input_port->isFinished()) { @@ -338,6 +338,8 @@ Block TreeExecutorBlockInputStream::readImpl() execute(false, false); } + + return {}; } void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback & callback) @@ -373,4 +375,12 @@ void TreeExecutorBlockInputStream::addTotalRowsApprox(size_t value) sources_with_progress.front()->addTotalRowsApprox(value); } +void TreeExecutorBlockInputStream::cancel(bool kill) +{ + IBlockInputStream::cancel(kill); + + for (auto & processor : processors) + processor->cancel(); +} + } diff --git a/src/Processors/Executors/TreeExecutorBlockInputStream.h b/src/Processors/Executors/TreeExecutorBlockInputStream.h index dfe8e66ed09..d96492b3fb8 100644 --- a/src/Processors/Executors/TreeExecutorBlockInputStream.h +++ b/src/Processors/Executors/TreeExecutorBlockInputStream.h @@ -39,6 +39,8 @@ public: String getName() const override { return "TreeExecutor"; } Block getHeader() const override { return root->getOutputs().front().getHeader(); } + void cancel(bool kill) override; + /// This methods does not affect TreeExecutor as IBlockInputStream itself. /// They just passed to all SourceWithProgress processors. void setProgressCallback(const ProgressCallback & callback) final;