ClickHouse/dbms/include/DB/DataStreams/UnionBlockInputStream.h

411 lines
11 KiB
C
Raw Normal View History

2012-01-10 22:11:51 +00:00
#pragma once
#include <list>
#include <queue>
#include <Poco/Thread.h>
#include <Poco/Mutex.h>
2012-01-10 22:11:51 +00:00
#include <Poco/Semaphore.h>
#include <Yandex/logger_useful.h>
2012-01-10 22:11:51 +00:00
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Очень простая thread-safe очередь ограниченной длины.
* Если пытаться вынуть элемент из пустой очереди, то поток блокируется, пока очередь не станет непустой.
* Если пытаться вставить элемент в переполненную очередь, то поток блокируется, пока в очереди не появится элемент.
*/
template <typename T>
class ConcurrentBoundedQueue
{
private:
size_t max_fill;
std::queue<T> queue;
Poco::Mutex mutex;
Poco::Semaphore fill_count;
Poco::Semaphore empty_count;
public:
ConcurrentBoundedQueue(size_t max_fill)
: fill_count(0, max_fill), empty_count(max_fill, max_fill) {}
void push(const T & x)
{
empty_count.wait();
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
queue.push(x);
}
fill_count.set();
}
void pop(T & x)
{
fill_count.wait();
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
x = queue.front();
queue.pop();
}
empty_count.set();
}
2012-10-20 23:02:13 +00:00
2012-11-10 05:20:56 +00:00
bool tryPush(const T & x)
{
if (empty_count.tryWait(0))
{
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
queue.push(x);
}
fill_count.set();
return true;
}
return false;
}
bool tryPop(T & x)
{
if (fill_count.tryWait(0))
{
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
x = queue.front();
queue.pop();
}
empty_count.set();
return true;
}
return false;
}
2012-10-20 23:02:13 +00:00
void clear()
{
while (fill_count.tryWait(0))
{
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
queue.pop();
}
empty_count.set();
}
}
};
2012-01-10 22:11:51 +00:00
/** Объединяет несколько источников в один.
* Блоки из разных источников перемежаются друг с другом произвольным образом.
* Можно указать количество потоков (max_threads),
* в которых будет выполняться получение данных из разных источников.
*
* Устроено так:
* - есть набор источников, из которых можно вынимать блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
* - полученные блоки складываются в ограниченную очередь готовых блоков;
* - основной поток вынимает готовые блоки из очереди готовых блоков.
2012-01-10 22:11:51 +00:00
*/
class UnionBlockInputStream : public IProfilingBlockInputStream
{
class Thread;
2012-01-10 22:11:51 +00:00
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false),
log(&Logger::get("UnionBlockInputStream"))
2012-01-10 22:11:51 +00:00
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
2012-06-24 23:17:06 +00:00
2012-01-10 22:11:51 +00:00
for (size_t i = 0; i < inputs_.size(); ++i)
2012-06-24 23:17:06 +00:00
{
input_queue.push(InputData());
input_queue.back().in = inputs_[i];
input_queue.back().i = i;
2012-06-24 23:17:06 +00:00
}
2012-01-10 22:11:51 +00:00
}
2012-10-20 02:10:47 +00:00
String getName() const { return "UnionBlockInputStream"; }
BlockInputStreamPtr clone() { return new UnionBlockInputStream(children, max_threads); }
~UnionBlockInputStream()
{
LOG_TRACE(log, "Waiting for threads to finish");
finish = true;
cancel();
2012-10-20 06:40:55 +00:00
2012-12-06 17:32:48 +00:00
ExceptionPtr exception;
2012-10-20 23:02:13 +00:00
/// Вынем всё, что есть в очереди готовых данных.
2012-11-10 05:20:56 +00:00
OutputData res;
while (output_queue.tryPop(res))
2012-12-06 17:32:48 +00:00
if (res.exception && !exception)
exception = res.exception;
2012-10-20 23:02:13 +00:00
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер?
*/
2012-10-20 02:10:47 +00:00
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
2012-11-10 05:20:56 +00:00
/// Может быть, нам под конец положили эксепшен.
while (output_queue.tryPop(res))
2012-12-06 17:32:48 +00:00
if (res.exception && !exception)
exception = res.exception;
if (exception && !std::uncaught_exception())
exception->rethrow();
2012-11-10 05:20:56 +00:00
2012-10-20 02:10:47 +00:00
LOG_TRACE(log, "Waited for threads to finish");
}
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
* пропуская отвалившиеся по эксепшену.
*/
void cancel()
{
2012-11-10 05:13:46 +00:00
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
ExceptionPtr exception;
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&**it))
{
try
{
child->cancel();
}
catch (const Exception & e)
{
if (!exception)
exception = e.clone();
}
catch (const Poco::Exception & e)
{
if (!exception)
exception = e.clone();
}
catch (const std::exception & e)
{
if (!exception)
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
}
catch (...)
{
if (!exception)
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
}
if (exception)
exception->rethrow();
}
2012-10-20 02:10:47 +00:00
protected:
2012-01-10 22:11:51 +00:00
Block readImpl()
{
OutputData res;
if (all_read)
return res.block;
2012-10-20 02:10:47 +00:00
/// Запускаем потоки, если это ещё не было сделано.
if (threads_data.empty())
2012-01-10 22:11:51 +00:00
{
threads_data.resize(max_threads);
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
2012-06-22 18:27:40 +00:00
{
it->runnable = new Thread(*this);
it->thread = new Poco::Thread;
it->thread->start(*it->runnable);
2012-01-10 22:11:51 +00:00
}
}
2012-06-24 23:17:06 +00:00
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
output_queue.pop(res);
2012-01-10 22:11:51 +00:00
if (res.exception)
res.exception->rethrow();
if (!res.block)
all_read = true;
return res.block;
2012-01-10 22:11:51 +00:00
}
private:
2012-01-10 22:11:51 +00:00
/// Данные отдельного источника
struct InputData
2012-01-10 22:11:51 +00:00
{
BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки).
};
2012-01-10 22:11:51 +00:00
/// Данные отдельного потока
struct ThreadData
{
SharedPtr<Poco::Thread> thread;
SharedPtr<Thread> runnable;
2012-01-10 22:11:51 +00:00
};
class Thread : public Poco::Runnable
2012-01-10 22:11:51 +00:00
{
public:
Thread(UnionBlockInputStream & parent_) : parent(parent_) {}
void run()
2012-01-10 22:11:51 +00:00
{
ExceptionPtr exception;
try
{
loop();
}
catch (const Exception & e)
2012-01-10 22:11:51 +00:00
{
exception = e.clone();
2012-01-10 22:11:51 +00:00
}
catch (const Poco::Exception & e)
{
exception = e.clone();
}
catch (const std::exception & e)
{
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
}
catch (...)
{
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
if (exception)
{
/// Попросим остальные потоки побыстрее прекратить работу.
parent.finish = true;
try
{
parent.cancel();
}
catch (...)
{
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
}
/// Отдаём эксепшен в основной поток.
parent.output_queue.push(exception);
}
}
void loop()
{
while (!parent.finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{
InputData input;
/// Выбираем следующий источник.
{
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (parent.input_queue.empty())
break;
input = parent.input_queue.front();
/// Убираем источник из очереди доступных источников.
parent.input_queue.pop();
}
/// Основная работа.
Block block = input.in->read();
{
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
if (block)
{
parent.input_queue.push(input);
parent.output_queue.push(block);
}
else
{
++parent.exhausted_inputs;
/// Если все источники иссякли.
if (parent.exhausted_inputs == parent.children.size())
{
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет.
parent.output_queue.push(OutputData());
parent.finish = true;
break;
}
}
}
}
2012-01-10 22:11:51 +00:00
}
private:
UnionBlockInputStream & parent;
};
unsigned max_threads;
/// Потоки.
typedef std::list<ThreadData> ThreadsData;
ThreadsData threads_data;
/// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент.
typedef std::queue<InputData> InputQueue;
InputQueue input_queue;
/// Блок или эксепшен.
struct OutputData
{
Block block;
ExceptionPtr exception;
OutputData() {}
OutputData(Block & block_) : block(block_) {}
OutputData(ExceptionPtr & exception_) : exception(exception_) {}
};
/// Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue;
/// Для операций с очередями.
Poco::FastMutex mutex;
/// Сколько источников иссякло.
size_t exhausted_inputs;
/// Завершить работу потоков (раньше, чем иссякнут источники).
volatile bool finish;
bool all_read;
Logger * log;
2012-01-10 22:11:51 +00:00
};
}