mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
dbms: development [#CONV-2944].
This commit is contained in:
parent
8e76b228e1
commit
12a5e955c1
@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/Mutex.h>
|
||||
#include <Poco/Thread.h>
|
||||
#include <Poco/Runnable.h>
|
||||
#include <statdaemons/threadpool.hpp>
|
||||
|
||||
#include <Poco/Event.h>
|
||||
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
|
||||
@ -10,53 +10,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** thread-safe очередь из одного элемента,
|
||||
* рассчитанная на одного producer-а и одного consumer-а.
|
||||
*/
|
||||
template <typename T>
|
||||
class OneElementQueue
|
||||
{
|
||||
private:
|
||||
T data;
|
||||
Poco::FastMutex mutex_fill; /// Захвачен, когда данные есть.
|
||||
Poco::FastMutex mutex_empty; /// Захвачен, когда данных нет.
|
||||
|
||||
public:
|
||||
OneElementQueue()
|
||||
{
|
||||
mutex_empty.lock();
|
||||
}
|
||||
|
||||
/// Вызывается единственным producer-ом.
|
||||
void push(const T & x)
|
||||
{
|
||||
mutex_fill.lock();
|
||||
data = x;
|
||||
mutex_empty.unlock();
|
||||
}
|
||||
|
||||
/// Вызывается единственным consumer-ом.
|
||||
void pop(T & x)
|
||||
{
|
||||
mutex_empty.lock();
|
||||
x = data;
|
||||
mutex_fill.unlock();
|
||||
}
|
||||
|
||||
/// Позволяет ждать элемента не дольше заданного таймаута. Вызывается единственным consumer-ом.
|
||||
bool poll(UInt64 milliseconds)
|
||||
{
|
||||
if (mutex_empty.tryLock(milliseconds))
|
||||
{
|
||||
mutex_empty.unlock();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/** Выполняет другой BlockInputStream в отдельном потоке.
|
||||
* Это служит для двух целей:
|
||||
* 1. Позволяет сделать так, чтобы разные стадии конвеьера выполнения запроса работали параллельно.
|
||||
@ -67,135 +20,107 @@ public:
|
||||
class AsynchronousBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), started(false), runnable(*this)
|
||||
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), pool(1), started(false)
|
||||
{
|
||||
children.push_back(in);
|
||||
}
|
||||
|
||||
|
||||
/** Ждать готовность данных не более заданного таймаута. Запустить получение данных, если нужно.
|
||||
* Если функция вернула true - данные готовы и можно делать read().
|
||||
*/
|
||||
bool poll(UInt64 milliseconds)
|
||||
{
|
||||
startIfNeed();
|
||||
return output_queue.poll(milliseconds);
|
||||
}
|
||||
|
||||
|
||||
String getName() const { return "AsynchronousBlockInputStream"; }
|
||||
|
||||
BlockInputStreamPtr clone() { return new AsynchronousBlockInputStream(in); }
|
||||
|
||||
|
||||
|
||||
/** Ждать готовность данных не более заданного таймаута. Запустить получение данных, если нужно.
|
||||
* Если функция вернула true - данные готовы и можно делать read(); нельзя вызвать функцию сразу ещё раз.
|
||||
*/
|
||||
bool poll(UInt64 milliseconds)
|
||||
{
|
||||
if (!started)
|
||||
{
|
||||
next();
|
||||
started = true;
|
||||
}
|
||||
|
||||
return ready.tryWait(milliseconds);
|
||||
}
|
||||
|
||||
|
||||
~AsynchronousBlockInputStream()
|
||||
{
|
||||
if (started)
|
||||
thread->join();
|
||||
pool.wait();
|
||||
}
|
||||
|
||||
protected:
|
||||
Block readImpl()
|
||||
{
|
||||
OutputData res;
|
||||
|
||||
startIfNeed();
|
||||
|
||||
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
|
||||
output_queue.pop(res);
|
||||
|
||||
if (res.exception)
|
||||
res.exception->rethrow();
|
||||
|
||||
return res.block;
|
||||
}
|
||||
|
||||
|
||||
void startIfNeed()
|
||||
{
|
||||
if (!started)
|
||||
{
|
||||
started = true;
|
||||
thread = new Poco::Thread;
|
||||
thread->start(runnable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Вычисления, которые могут выполняться в отдельном потоке
|
||||
class Thread : public Poco::Runnable
|
||||
{
|
||||
public:
|
||||
Thread(AsynchronousBlockInputStream & parent_) : parent(parent_) {}
|
||||
|
||||
void run()
|
||||
{
|
||||
ExceptionPtr exception;
|
||||
|
||||
try
|
||||
{
|
||||
loop();
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
exception = e.clone();
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
exception = e.clone();
|
||||
}
|
||||
catch (const std::exception & e)
|
||||
{
|
||||
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
|
||||
}
|
||||
|
||||
if (exception)
|
||||
{
|
||||
parent.cancel();
|
||||
|
||||
/// Отдаём эксепшен в основной поток.
|
||||
parent.output_queue.push(exception);
|
||||
}
|
||||
}
|
||||
|
||||
void loop()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
Block res = parent.in->read();
|
||||
parent.output_queue.push(res);
|
||||
|
||||
if (!res)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
AsynchronousBlockInputStream & parent;
|
||||
};
|
||||
|
||||
|
||||
BlockInputStreamPtr in;
|
||||
boost::threadpool::pool pool;
|
||||
Poco::Event ready;
|
||||
bool started;
|
||||
|
||||
struct OutputData
|
||||
Block block;
|
||||
ExceptionPtr exception;
|
||||
|
||||
|
||||
Block readImpl()
|
||||
{
|
||||
Block block;
|
||||
ExceptionPtr exception;
|
||||
/// Если вычислений ещё не было - вычислим первый блок синхронно
|
||||
if (!started)
|
||||
{
|
||||
calculate();
|
||||
started = true;
|
||||
}
|
||||
else /// Если вычисления уже идут - подождём результата
|
||||
pool.wait();
|
||||
|
||||
OutputData() {}
|
||||
OutputData(Block & block_) : block(block_) {}
|
||||
OutputData(ExceptionPtr & exception_) : exception(exception_) {}
|
||||
};
|
||||
if (exception)
|
||||
exception->rethrow();
|
||||
|
||||
OneElementQueue<OutputData> output_queue;
|
||||
Block res = block;
|
||||
if (!res)
|
||||
return res;
|
||||
|
||||
Thread runnable;
|
||||
SharedPtr<Poco::Thread> thread;
|
||||
/// Запустим вычисления следующего блока
|
||||
block = Block();
|
||||
next();
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
void next()
|
||||
{
|
||||
pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this));
|
||||
}
|
||||
|
||||
|
||||
/// Вычисления, которые могут выполняться в отдельном потоке
|
||||
void calculate()
|
||||
{
|
||||
ready.reset();
|
||||
|
||||
try
|
||||
{
|
||||
block = in->read();
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
exception = e.clone();
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
exception = e.clone();
|
||||
}
|
||||
catch (const std::exception & e)
|
||||
{
|
||||
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
|
||||
}
|
||||
|
||||
ready.set();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user