Merge pull request #10206 from ClickHouse/fix-limit-with-infinite-sources

Fix limit with infinite sources
This commit is contained in:
alexey-milovidov 2020-04-13 16:32:02 +03:00 committed by GitHub
commit d1c436788b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 42 additions and 2 deletions

View File

@ -263,6 +263,8 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Queue & queu
node.status = ExecStatus::Preparing;
return prepareProcessor(edge.to, thread_number, queue, std::move(lock));
}
else
graph[edge.to].processor->onUpdatePorts();
return true;
}

View File

@ -164,7 +164,7 @@ void TreeExecutorBlockInputStream::execute(bool on_totals, bool on_extremes)
}
};
while (!stack.empty())
while (!stack.empty() && !is_cancelled)
{
IProcessor * node = stack.top();
@ -295,7 +295,7 @@ void TreeExecutorBlockInputStream::initRowsBeforeLimit()
Block TreeExecutorBlockInputStream::readImpl()
{
while (true)
while (!is_cancelled)
{
if (input_port->isFinished())
{
@ -338,6 +338,8 @@ Block TreeExecutorBlockInputStream::readImpl()
execute(false, false);
}
return {};
}
void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback & callback)
@ -373,4 +375,12 @@ void TreeExecutorBlockInputStream::addTotalRowsApprox(size_t value)
sources_with_progress.front()->addTotalRowsApprox(value);
}
void TreeExecutorBlockInputStream::cancel(bool kill)
{
IBlockInputStream::cancel(kill);
for (auto & processor : processors)
processor->cancel();
}
}

View File

@ -39,6 +39,8 @@ public:
String getName() const override { return "TreeExecutor"; }
Block getHeader() const override { return root->getOutputs().front().getHeader(); }
void cancel(bool kill) override;
/// This methods does not affect TreeExecutor as IBlockInputStream itself.
/// They just passed to all SourceWithProgress processors.
void setProgressCallback(const ProgressCallback & callback) final;

View File

@ -233,6 +233,10 @@ public:
onCancel();
}
/// Additional method which is called in case if ports were updated while work() method.
/// May be used to stop execution in rare cases.
virtual void onUpdatePorts() {}
virtual ~IProcessor() = default;
auto & getInputs() { return inputs; }

View File

@ -176,6 +176,9 @@ Chunk SourceFromInputStream::generate()
return {};
}
if (isCancelled())
return {};
#ifndef NDEBUG
assertBlocksHaveEqualStructure(getPort().getHeader(), block, "SourceFromInputStream");
#endif

View File

@ -37,6 +37,13 @@ public:
void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); }
void addTotalRowsApprox(size_t value) final { stream->addTotalRowsApprox(value); }
/// Stop reading from stream if output port is finished.
void onUpdatePorts() override
{
if (getPort().isFinished())
cancel();
}
protected:
void onCancel() override { stream->cancel(false); }

View File

@ -0,0 +1,11 @@
SELECT number
FROM
(
SELECT zero AS number
FROM remote('127.0.0.2', system.zeros)
UNION ALL
SELECT number + sleep(0.5)
FROM system.numbers
)
WHERE number = 1
LIMIT 1