Fix test for streams.

This commit is contained in:
Nikolai Kochetov 2020-04-13 12:02:50 +03:00
parent 29fc8f145b
commit 9a9bedc8cc
2 changed files with 14 additions and 2 deletions

View File

@ -164,7 +164,7 @@ void TreeExecutorBlockInputStream::execute(bool on_totals, bool on_extremes)
}
};
while (!stack.empty())
while (!stack.empty() && !is_cancelled)
{
IProcessor * node = stack.top();
@ -295,7 +295,7 @@ void TreeExecutorBlockInputStream::initRowsBeforeLimit()
Block TreeExecutorBlockInputStream::readImpl()
{
while (true)
while (!is_cancelled)
{
if (input_port->isFinished())
{
@ -338,6 +338,8 @@ Block TreeExecutorBlockInputStream::readImpl()
execute(false, false);
}
return {};
}
void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback & callback)
@ -373,4 +375,12 @@ void TreeExecutorBlockInputStream::addTotalRowsApprox(size_t value)
sources_with_progress.front()->addTotalRowsApprox(value);
}
void TreeExecutorBlockInputStream::cancel(bool kill)
{
IBlockInputStream::cancel(kill);
for (auto & processor : processors)
processor->cancel();
}
}

View File

@ -39,6 +39,8 @@ public:
String getName() const override { return "TreeExecutor"; }
Block getHeader() const override { return root->getOutputs().front().getHeader(); }
void cancel(bool kill) override;
/// This methods does not affect TreeExecutor as IBlockInputStream itself.
/// They just passed to all SourceWithProgress processors.
void setProgressCallback(const ProgressCallback & callback) final;