dbms: probably fixed error [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-09-03 19:52:57 +00:00
parent ffc6ef092e
commit fd0815a735

View File

@ -1,6 +1,7 @@
#pragma once
#include <Poco/Semaphore.h>
#include <Poco/ThreadPool.h>
#include <statdaemons/threadpool.hpp>
@ -13,6 +14,24 @@ namespace DB
using Poco::SharedPtr;
/** Будем использовать Poco::ThreadPool вместо boost::threadpool.
* Последний неудобен тем, что в нём не совсем так как надо узнаётся количество свободных потоков.
*/
class RunnableFromFunction : public Poco::Runnable
{
public:
typedef boost::function<void()> Func;
RunnableFromFunction() {}
RunnableFromFunction(const Func & func_) : func(func_) {}
void run() { func(); }
private:
Func func;
};
/** Объединяет несколько источников в один.
* Блоки из разных источников перемежаются друг с другом произвольным образом.
* Можно указать количество потоков (max_threads),
@ -23,7 +42,7 @@ class UnionBlockInputStream : public IProfilingBlockInputStream
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
pool(max_threads),
pool(max_threads, max_threads),
ready_any(0, inputs_.size())
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
@ -48,7 +67,7 @@ public:
if (threads_data.empty())
return res;
ssize_t max_threads_to_start = static_cast<ssize_t>(pool.size()) - (pool.pending() + pool.active());
ssize_t max_threads_to_start = pool.available();
if (max_threads_to_start > 0)
{
@ -63,11 +82,15 @@ public:
++it->count;
++started_threads;
it->runnable = RunnableFromFunction::Func(
boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(threads_data.back())));
RunnableFromFunction & runnable = it->runnable;
/// Переносим этот источник в конец списка
threads_data.push_back(*it);
threads_data.erase(it++);
pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(threads_data.back())));
pool.start(runnable);
if (started_threads == max_threads_to_start)
break;
@ -122,7 +145,8 @@ public:
it->reset();
// std::cerr << "Scheduling again " << it->i << std::endl;
++it->count;
pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(*it)));
it->runnable = RunnableFromFunction::Func(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(*it)));
pool.start(it->runnable);
return res;
}
@ -135,18 +159,18 @@ public:
~UnionBlockInputStream()
{
pool.clear();
pool.wait();
pool.joinAll();
}
private:
unsigned max_threads;
boost::threadpool::pool pool;
Poco::ThreadPool pool;
/// Данные отдельного источника
struct ThreadData
{
RunnableFromFunction runnable;
BlockInputStreamPtr in;
unsigned count; /// Сколько блоков было вычислено
bool ready; /// Блок уже вычислен
@ -183,8 +207,6 @@ private:
data.ready = true;
data.block = block;
}
ready_any.set();
}
catch (const Exception & e)
{
@ -202,6 +224,8 @@ private:
{
data.exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
ready_any.set();
}
};