ClickHouse/dbms/src/DataStreams/LazyBlockInputStream.h
proller 82a361d0e9 Show error to client if query was killed (#1989)
* Show error to client if query was killed

* Kill exception v2

* Use kill

* fix

* wip

* fix

* fxi

* try fix

* Revert "try fix"

This reverts commit eb76e4c040.

* QUERY_WASCANCELLED

* Fxi all cancel()

* fix
2018-03-06 00:09:39 +03:00

78 lines
1.8 KiB
C++

#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Initialize another source on the first `read` call, and then use it.
* This is needed, for example, to read from a table that will be populated
* after creation of LazyBlockInputStream object, but before the first `read` call.
*/
class LazyBlockInputStream : public IProfilingBlockInputStream
{
public:
using Generator = std::function<BlockInputStreamPtr()>;
LazyBlockInputStream(const Block & header_, Generator generator_)
: header(header_), generator(std::move(generator_))
{
}
LazyBlockInputStream(const char * name_, const Block & header_, Generator generator_)
: name(name_), header(header_), generator(std::move(generator_))
{
}
String getName() const override { return name; }
Block getHeader() const override
{
return header;
}
protected:
Block readImpl() override
{
if (!input)
{
input = generator();
if (!input)
return Block();
auto * p_input = dynamic_cast<IProfilingBlockInputStream *>(input.get());
if (p_input)
{
/// They could have been set before, but were not passed into the `input`.
if (progress_callback)
p_input->setProgressCallback(progress_callback);
if (process_list_elem)
p_input->setProcessListElement(process_list_elem);
}
input->readPrefix();
{
addChild(input);
if (isCancelled() && p_input)
p_input->cancel(is_killed);
}
}
return input->read();
}
private:
const char * name = "Lazy";
Block header;
Generator generator;
BlockInputStreamPtr input;
};
}