ClickHouse/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h

127 lines
3.2 KiB
C
Raw Normal View History

2011-11-28 05:48:52 +00:00
#pragma once
2012-10-20 05:36:32 +00:00
#include <statdaemons/threadpool.hpp>
#include <Poco/Event.h>
2011-11-28 05:48:52 +00:00
2011-11-28 06:22:25 +00:00
#include <DB/DataStreams/IProfilingBlockInputStream.h>
2011-11-28 05:48:52 +00:00
namespace DB
{
2012-10-20 02:10:47 +00:00
/** Выполняет другой BlockInputStream в отдельном потоке.
* Это служит для двух целей:
* 1. Позволяет сделать так, чтобы разные стадии конвеьера выполнения запроса работали параллельно.
* 2. Позволяет не ждать до того, как данные будут готовы, а периодически проверять их готовность без блокировки.
* Это нужно, например, чтобы можно было во время ожидания проверить, не пришёл ли по сети пакет с просьбой прервать выполнение запроса.
* Также это позволяет выполнить несколько запросов одновременно.
*/
class AsynchronousBlockInputStream : public IProfilingBlockInputStream
{
public:
2012-10-20 05:36:32 +00:00
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), pool(1), started(false)
2012-10-20 02:10:47 +00:00
{
children.push_back(in);
}
2012-10-20 05:36:32 +00:00
String getName() const { return "AsynchronousBlockInputStream"; }
BlockInputStreamPtr clone() { return new AsynchronousBlockInputStream(in); }
2011-11-28 05:48:52 +00:00
2012-10-20 02:10:47 +00:00
/** Ждать готовность данных не более заданного таймаута. Запустить получение данных, если нужно.
2012-10-20 05:36:32 +00:00
* Если функция вернула true - данные готовы и можно делать read(); нельзя вызвать функцию сразу ещё раз.
2012-10-20 02:10:47 +00:00
*/
bool poll(UInt64 milliseconds)
{
2012-10-20 05:36:32 +00:00
if (!started)
{
next();
started = true;
}
return ready.tryWait(milliseconds);
2011-11-28 05:48:52 +00:00
}
2012-03-05 02:34:20 +00:00
~AsynchronousBlockInputStream()
{
if (started)
2012-10-20 05:36:32 +00:00
pool.wait();
2012-03-05 02:34:20 +00:00
}
2011-11-28 05:48:52 +00:00
protected:
2012-10-20 05:36:32 +00:00
BlockInputStreamPtr in;
boost::threadpool::pool pool;
Poco::Event ready;
bool started;
2012-10-20 02:10:47 +00:00
2012-10-20 05:36:32 +00:00
Block block;
ExceptionPtr exception;
2012-10-20 02:10:47 +00:00
2012-10-20 05:36:32 +00:00
Block readImpl()
2011-11-28 05:48:52 +00:00
{
2012-10-20 05:36:32 +00:00
/// Если вычислений ещё не было - вычислим первый блок синхронно
2012-10-20 02:10:47 +00:00
if (!started)
2011-11-28 05:48:52 +00:00
{
2012-10-20 05:36:32 +00:00
calculate();
2012-10-20 02:35:44 +00:00
started = true;
2011-11-28 05:48:52 +00:00
}
2012-10-20 05:36:32 +00:00
else /// Если вычисления уже идут - подождём результата
pool.wait();
if (exception)
exception->rethrow();
Block res = block;
if (!res)
return res;
/// Запустим вычисления следующего блока
block = Block();
next();
return res;
2012-10-20 02:10:47 +00:00
}
2012-10-20 05:36:32 +00:00
2012-10-20 02:10:47 +00:00
2012-10-20 05:36:32 +00:00
void next()
{
ready.reset();
2012-10-20 05:36:32 +00:00
pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this));
}
2012-10-20 02:10:47 +00:00
/// Вычисления, которые могут выполняться в отдельном потоке
2012-10-20 05:36:32 +00:00
void calculate()
2012-10-20 02:10:47 +00:00
{
2012-10-20 05:36:32 +00:00
try
2011-11-28 05:48:52 +00:00
{
2012-10-20 05:36:32 +00:00
block = in->read();
2011-11-28 05:48:52 +00:00
}
2012-10-20 05:36:32 +00:00
catch (const Exception & e)
2011-11-28 05:48:52 +00:00
{
2012-10-20 05:36:32 +00:00
exception = e.clone();
}
catch (const Poco::Exception & e)
{
exception = e.clone();
}
catch (const std::exception & e)
{
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
}
catch (...)
{
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
2011-11-28 05:48:52 +00:00
}
2012-10-20 02:10:47 +00:00
2012-10-20 05:36:32 +00:00
ready.set();
}
2011-11-28 05:48:52 +00:00
};
}