dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-11-28 06:22:25 +00:00
parent c0a2326bf2
commit 44fdb6c7d1

View File

@ -1,8 +1,8 @@
#pragma once
#include <boost/thread.hpp>
#include <statdaemons/threadpool.hpp>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -10,23 +10,24 @@ namespace DB
/** Выполняет другой BlockInputStream в отдельном потоке, используя двойную буферизацию.
*/
class AsynchronousBlockInputStream : public IBlockInputStream
class AsynchronousBlockInputStream : public IProfilingBlockInputStream
{
typedef SharedPtr<boost::thread> ThreadPtr;
public:
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_)
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), pool(1), started(false)
{
children.push_back(in);
}
Block read()
Block readImpl()
{
/// Если вычислений ещё не было - вычислим первый блок синхронно
if (!thread)
if (!started)
{
calculate();
started = true;
}
else /// Если вычисления уже идут - подождём результата
thread->join();
pool.wait();
if (exception)
throw *exception;
@ -37,11 +38,9 @@ public:
/// Запустим вычисления следующего блока
block = Block();
thread = new boost::thread(&AsynchronousBlockInputStream::calculate, this);
pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this));
return res;
return in->read();
}
String getName() const { return "AsynchronousBlockInputStream"; }
@ -50,7 +49,8 @@ public:
protected:
BlockInputStreamPtr in;
ThreadPtr thread;
boost::threadpool::pool pool;
bool started;
Block block;
SharedPtr<Exception> exception;