ClickHouse/src/DataStreams/AsynchronousBlockInputStream.h

117 lines
2.8 KiB
C++
Raw Normal View History

2011-11-28 05:48:52 +00:00
#pragma once
2012-10-20 05:36:32 +00:00
#include <Poco/Event.h>
2011-11-28 05:48:52 +00:00
#include <DataStreams/IBlockInputStream.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
2011-11-28 05:48:52 +00:00
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
2011-11-28 05:48:52 +00:00
namespace DB
{
2017-05-13 22:19:04 +00:00
/** Executes another BlockInputStream in a separate thread.
* This serves two purposes:
* 1. Allows you to make the different stages of the query execution pipeline work in parallel.
* 2. Allows you not to wait until the data is ready, and periodically check their readiness without blocking.
* This is necessary, for example, so that during the waiting period you can check if a packet
* has come over the network with a request to interrupt the execution of the query.
* It also allows you to execute multiple queries at the same time.
2012-10-20 02:10:47 +00:00
*/
class AsynchronousBlockInputStream : public IBlockInputStream
2012-10-20 02:10:47 +00:00
{
public:
AsynchronousBlockInputStream(const BlockInputStreamPtr & in)
{
children.push_back(in);
}
String getName() const override { return "Asynchronous"; }
void waitInnerThread()
{
if (started)
pool.wait();
}
void readPrefix() override
{
2017-05-13 22:19:04 +00:00
/// Do not call `readPrefix` on the child, so that the corresponding actions are performed in a separate thread.
if (!started)
{
next();
started = true;
}
}
void readSuffix() override
{
if (started)
{
pool.wait();
if (exception)
std::rethrow_exception(exception);
children.back()->readSuffix();
started = false;
}
}
2017-05-13 22:19:04 +00:00
/** Wait for the data to be ready no more than the specified timeout. Start receiving data if necessary.
* If the function returned true - the data is ready and you can do `read()`; You can not call the function just at the same moment again.
*/
bool poll(UInt64 milliseconds)
{
if (!started)
{
next();
started = true;
}
return ready.tryWait(milliseconds);
}
Block getHeader() const override { return children.at(0)->getHeader(); }
void cancel(bool kill) override
{
IBlockInputStream::cancel(kill);
2020-08-08 00:47:03 +00:00
/// Wait for some background calculations to be sure,
/// that after end of stream nothing is being executing.
if (started)
pool.wait();
}
~AsynchronousBlockInputStream() override
{
if (started)
pool.wait();
}
2012-03-05 02:34:20 +00:00
2011-11-28 05:48:52 +00:00
protected:
ThreadPool pool{1};
Poco::Event ready;
bool started = false;
bool first = true;
Block block;
std::exception_ptr exception;
Block readImpl() override;
void next();
2017-05-13 22:19:04 +00:00
/// Calculations that can be performed in a separate thread
void calculate();
2011-11-28 05:48:52 +00:00
};
}