From 12a5e955c11b713ecbbe8d293833774e42244824 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 20 Oct 2012 05:36:32 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- .../AsynchronousBlockInputStream.h | 241 ++++++------------ 1 file changed, 83 insertions(+), 158 deletions(-) diff --git a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h index 5df3f7baf8d..0777561c05a 100644 --- a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h @@ -1,8 +1,8 @@ #pragma once -#include -#include -#include +#include + +#include #include @@ -10,53 +10,6 @@ namespace DB { -/** thread-safe очередь из одного элемента, - * рассчитанная на одного producer-а и одного consumer-а. - */ -template -class OneElementQueue -{ -private: - T data; - Poco::FastMutex mutex_fill; /// Захвачен, когда данные есть. - Poco::FastMutex mutex_empty; /// Захвачен, когда данных нет. - -public: - OneElementQueue() - { - mutex_empty.lock(); - } - - /// Вызывается единственным producer-ом. - void push(const T & x) - { - mutex_fill.lock(); - data = x; - mutex_empty.unlock(); - } - - /// Вызывается единственным consumer-ом. - void pop(T & x) - { - mutex_empty.lock(); - x = data; - mutex_fill.unlock(); - } - - /// Позволяет ждать элемента не дольше заданного таймаута. Вызывается единственным consumer-ом. - bool poll(UInt64 milliseconds) - { - if (mutex_empty.tryLock(milliseconds)) - { - mutex_empty.unlock(); - return true; - } - - return false; - } -}; - - /** Выполняет другой BlockInputStream в отдельном потоке. * Это служит для двух целей: * 1. Позволяет сделать так, чтобы разные стадии конвеьера выполнения запроса работали параллельно. @@ -67,135 +20,107 @@ public: class AsynchronousBlockInputStream : public IProfilingBlockInputStream { public: - AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), started(false), runnable(*this) + AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), pool(1), started(false) { children.push_back(in); } - - /** Ждать готовность данных не более заданного таймаута. Запустить получение данных, если нужно. - * Если функция вернула true - данные готовы и можно делать read(). - */ - bool poll(UInt64 milliseconds) - { - startIfNeed(); - return output_queue.poll(milliseconds); - } - - String getName() const { return "AsynchronousBlockInputStream"; } BlockInputStreamPtr clone() { return new AsynchronousBlockInputStream(in); } - + + + /** Ждать готовность данных не более заданного таймаута. Запустить получение данных, если нужно. + * Если функция вернула true - данные готовы и можно делать read(); нельзя вызвать функцию сразу ещё раз. + */ + bool poll(UInt64 milliseconds) + { + if (!started) + { + next(); + started = true; + } + + return ready.tryWait(milliseconds); + } + ~AsynchronousBlockInputStream() { if (started) - thread->join(); + pool.wait(); } protected: - Block readImpl() - { - OutputData res; - - startIfNeed(); - - /// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение. - output_queue.pop(res); - - if (res.exception) - res.exception->rethrow(); - - return res.block; - } - - - void startIfNeed() - { - if (!started) - { - started = true; - thread = new Poco::Thread; - thread->start(runnable); - } - } - - - /// Вычисления, которые могут выполняться в отдельном потоке - class Thread : public Poco::Runnable - { - public: - Thread(AsynchronousBlockInputStream & parent_) : parent(parent_) {} - - void run() - { - ExceptionPtr exception; - - try - { - loop(); - } - catch (const Exception & e) - { - exception = e.clone(); - } - catch (const Poco::Exception & e) - { - exception = e.clone(); - } - catch (const std::exception & e) - { - exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION); - } - catch (...) - { - exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION); - } - - if (exception) - { - parent.cancel(); - - /// Отдаём эксепшен в основной поток. - parent.output_queue.push(exception); - } - } - - void loop() - { - while (true) - { - Block res = parent.in->read(); - parent.output_queue.push(res); - - if (!res) - break; - } - } - - private: - AsynchronousBlockInputStream & parent; - }; - - BlockInputStreamPtr in; + boost::threadpool::pool pool; + Poco::Event ready; bool started; - struct OutputData + Block block; + ExceptionPtr exception; + + + Block readImpl() { - Block block; - ExceptionPtr exception; + /// Если вычислений ещё не было - вычислим первый блок синхронно + if (!started) + { + calculate(); + started = true; + } + else /// Если вычисления уже идут - подождём результата + pool.wait(); - OutputData() {} - OutputData(Block & block_) : block(block_) {} - OutputData(ExceptionPtr & exception_) : exception(exception_) {} - }; + if (exception) + exception->rethrow(); - OneElementQueue output_queue; + Block res = block; + if (!res) + return res; - Thread runnable; - SharedPtr thread; + /// Запустим вычисления следующего блока + block = Block(); + next(); + + return res; + } + + + void next() + { + pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this)); + } + + + /// Вычисления, которые могут выполняться в отдельном потоке + void calculate() + { + ready.reset(); + + try + { + block = in->read(); + } + catch (const Exception & e) + { + exception = e.clone(); + } + catch (const Poco::Exception & e) + { + exception = e.clone(); + } + catch (const std::exception & e) + { + exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION); + } + catch (...) + { + exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION); + } + + ready.set(); + } }; }