2011-11-28 05:48:52 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2012-10-20 05:36:32 +00:00
|
|
|
|
#include <Poco/Event.h>
|
2011-11-28 05:48:52 +00:00
|
|
|
|
|
2011-11-28 06:22:25 +00:00
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2015-09-24 18:54:21 +00:00
|
|
|
|
#include <DB/Common/setThreadName.h>
|
2016-01-21 01:47:28 +00:00
|
|
|
|
#include <DB/Common/CurrentMetrics.h>
|
2016-08-02 01:46:05 +00:00
|
|
|
|
#include <DB/Common/ThreadPool.h>
|
2011-11-28 05:48:52 +00:00
|
|
|
|
|
|
|
|
|
|
2016-10-24 04:06:27 +00:00
|
|
|
|
namespace CurrentMetrics
|
|
|
|
|
{
|
|
|
|
|
extern const Metric QueryThread;
|
|
|
|
|
}
|
|
|
|
|
|
2011-11-28 05:48:52 +00:00
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
/** Выполняет другой BlockInputStream в отдельном потоке.
|
|
|
|
|
* Это служит для двух целей:
|
2017-01-27 12:38:59 +00:00
|
|
|
|
* 1. Позволяет сделать так, чтобы разные стадии конвейера выполнения запроса работали параллельно.
|
2012-10-20 02:10:47 +00:00
|
|
|
|
* 2. Позволяет не ждать до того, как данные будут готовы, а периодически проверять их готовность без блокировки.
|
|
|
|
|
* Это нужно, например, чтобы можно было во время ожидания проверить, не пришёл ли по сети пакет с просьбой прервать выполнение запроса.
|
|
|
|
|
* Также это позволяет выполнить несколько запросов одновременно.
|
|
|
|
|
*/
|
|
|
|
|
class AsynchronousBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
2015-01-18 08:25:56 +00:00
|
|
|
|
AsynchronousBlockInputStream(BlockInputStreamPtr in_)
|
2012-10-20 02:10:47 +00:00
|
|
|
|
{
|
2013-05-04 04:05:15 +00:00
|
|
|
|
children.push_back(in_);
|
2012-10-20 02:10:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-06-08 20:22:02 +00:00
|
|
|
|
String getName() const override { return "Asynchronous"; }
|
2012-10-20 05:36:32 +00:00
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
String getID() const override
|
2013-05-03 10:20:53 +00:00
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
2013-05-04 05:20:07 +00:00
|
|
|
|
res << "Asynchronous(" << children.back()->getID() << ")";
|
2013-05-03 10:20:53 +00:00
|
|
|
|
return res.str();
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-20 20:43:07 +00:00
|
|
|
|
void readPrefix() override
|
2013-12-27 13:22:32 +00:00
|
|
|
|
{
|
2016-05-20 20:43:07 +00:00
|
|
|
|
/// Не будем вызывать readPrefix у ребёнка, чтобы соответствующие действия совершались в отдельном потоке.
|
2016-05-24 18:22:15 +00:00
|
|
|
|
if (!started)
|
|
|
|
|
{
|
|
|
|
|
next();
|
|
|
|
|
started = true;
|
|
|
|
|
}
|
2013-12-27 13:22:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
void readSuffix() override
|
2013-12-27 13:22:32 +00:00
|
|
|
|
{
|
|
|
|
|
if (started)
|
|
|
|
|
{
|
|
|
|
|
pool.wait();
|
|
|
|
|
if (exception)
|
2015-10-05 05:40:27 +00:00
|
|
|
|
std::rethrow_exception(exception);
|
2013-12-27 13:22:32 +00:00
|
|
|
|
children.back()->readSuffix();
|
|
|
|
|
started = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-11-28 05:48:52 +00:00
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
/** Ждать готовность данных не более заданного таймаута. Запустить получение данных, если нужно.
|
2012-10-20 05:36:32 +00:00
|
|
|
|
* Если функция вернула true - данные готовы и можно делать read(); нельзя вызвать функцию сразу ещё раз.
|
2012-10-20 02:10:47 +00:00
|
|
|
|
*/
|
|
|
|
|
bool poll(UInt64 milliseconds)
|
|
|
|
|
{
|
2012-10-20 05:36:32 +00:00
|
|
|
|
if (!started)
|
|
|
|
|
{
|
|
|
|
|
next();
|
|
|
|
|
started = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ready.tryWait(milliseconds);
|
2011-11-28 05:48:52 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2017-03-09 04:26:17 +00:00
|
|
|
|
~AsynchronousBlockInputStream() override
|
2012-03-05 02:34:20 +00:00
|
|
|
|
{
|
|
|
|
|
if (started)
|
2012-10-20 05:36:32 +00:00
|
|
|
|
pool.wait();
|
2012-03-05 02:34:20 +00:00
|
|
|
|
}
|
|
|
|
|
|
2011-11-28 05:48:52 +00:00
|
|
|
|
protected:
|
2016-08-02 01:46:05 +00:00
|
|
|
|
ThreadPool pool{1};
|
2012-10-20 05:36:32 +00:00
|
|
|
|
Poco::Event ready;
|
2015-01-18 08:25:56 +00:00
|
|
|
|
bool started = false;
|
2016-05-20 20:43:07 +00:00
|
|
|
|
bool first = true;
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2012-10-20 05:36:32 +00:00
|
|
|
|
Block block;
|
2015-10-05 05:40:27 +00:00
|
|
|
|
std::exception_ptr exception;
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2014-07-16 02:06:58 +00:00
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
Block readImpl() override
|
2011-11-28 05:48:52 +00:00
|
|
|
|
{
|
2012-10-20 05:36:32 +00:00
|
|
|
|
/// Если вычислений ещё не было - вычислим первый блок синхронно
|
2012-10-20 02:10:47 +00:00
|
|
|
|
if (!started)
|
2011-11-28 05:48:52 +00:00
|
|
|
|
{
|
2014-05-03 22:57:43 +00:00
|
|
|
|
calculate(current_memory_tracker);
|
2012-10-20 02:35:44 +00:00
|
|
|
|
started = true;
|
2011-11-28 05:48:52 +00:00
|
|
|
|
}
|
2012-10-20 05:36:32 +00:00
|
|
|
|
else /// Если вычисления уже идут - подождём результата
|
|
|
|
|
pool.wait();
|
|
|
|
|
|
|
|
|
|
if (exception)
|
2015-10-05 05:40:27 +00:00
|
|
|
|
std::rethrow_exception(exception);
|
2012-10-20 05:36:32 +00:00
|
|
|
|
|
|
|
|
|
Block res = block;
|
|
|
|
|
if (!res)
|
|
|
|
|
return res;
|
|
|
|
|
|
|
|
|
|
/// Запустим вычисления следующего блока
|
|
|
|
|
block = Block();
|
|
|
|
|
next();
|
|
|
|
|
|
|
|
|
|
return res;
|
2012-10-20 02:10:47 +00:00
|
|
|
|
}
|
2014-07-16 02:06:58 +00:00
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2012-10-20 05:36:32 +00:00
|
|
|
|
void next()
|
|
|
|
|
{
|
2012-10-29 02:55:14 +00:00
|
|
|
|
ready.reset();
|
2014-10-16 01:21:03 +00:00
|
|
|
|
pool.schedule(std::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker));
|
2012-10-20 05:36:32 +00:00
|
|
|
|
}
|
2014-07-16 02:06:58 +00:00
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
|
|
|
|
/// Вычисления, которые могут выполняться в отдельном потоке
|
2014-05-03 22:57:43 +00:00
|
|
|
|
void calculate(MemoryTracker * memory_tracker)
|
2012-10-20 02:10:47 +00:00
|
|
|
|
{
|
2016-01-21 01:47:28 +00:00
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
|
2014-05-03 22:57:43 +00:00
|
|
|
|
|
2012-10-20 05:36:32 +00:00
|
|
|
|
try
|
2011-11-28 05:48:52 +00:00
|
|
|
|
{
|
2016-05-20 21:37:30 +00:00
|
|
|
|
if (first)
|
|
|
|
|
{
|
|
|
|
|
first = false;
|
|
|
|
|
setThreadName("AsyncBlockInput");
|
|
|
|
|
current_memory_tracker = memory_tracker;
|
|
|
|
|
children.back()->readPrefix();
|
|
|
|
|
}
|
|
|
|
|
|
2013-05-04 05:20:07 +00:00
|
|
|
|
block = children.back()->read();
|
2011-11-28 05:48:52 +00:00
|
|
|
|
}
|
2012-10-20 05:36:32 +00:00
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2015-10-05 05:40:27 +00:00
|
|
|
|
exception = std::current_exception();
|
2011-11-28 05:48:52 +00:00
|
|
|
|
}
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2012-10-20 05:36:32 +00:00
|
|
|
|
ready.set();
|
|
|
|
|
}
|
2011-11-28 05:48:52 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|