ClickHouse/dbms/include/DB/DataStreams/ParallelInputsProcessor.h
2015-12-02 00:20:14 +03:00

314 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
#include <list>
#include <queue>
#include <atomic>
#include <thread>
#include <mutex>
#include <common/logger_useful.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/setThreadName.h>
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
* Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler.
*
* Устроено так:
* - есть набор источников, из которых можно вынимать блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
*/
namespace DB
{
/** Режим объединения.
*/
enum class StreamUnionMode
{
Basic = 0, /// вынимать блоки
ExtraInfo /// вынимать блоки + дополнительную информацию
};
/// Пример обработчика.
struct ParallelInputsHandler
{
/// Обработка блока данных.
void onBlock(Block & block, size_t thread_num) {}
/// Обработка блока данных + дополнительных информаций.
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num) {}
/// Вызывается для каждого потока, когда потоку стало больше нечего делать.
/// Из-за того, что иссякла часть источников, и сейчас источников осталось меньше, чем потоков.
/// Вызывается, если метод onException не кидает исключение; вызывается до метода onFinish.
void onFinishThread(size_t thread_num) {}
/// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы.
/// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение.
void onFinish() {}
/// Обработка исключения. Разумно вызывать в этом методе метод ParallelInputsProcessor::cancel, а также передавать эксепшен в основной поток.
void onException(std::exception_ptr & exception, size_t thread_num) {}
};
template <typename Handler, StreamUnionMode mode = StreamUnionMode::Basic>
class ParallelInputsProcessor
{
public:
/** additional_input_at_end - если не nullptr,
* то из этого источника начинают доставаться блоки лишь после того, как все остальные источники обработаны.
* Это делается в основном потоке.
*
* Предназначено для реализации FULL и RIGHT JOIN
* - где нужно сначала параллельно сделать JOIN, при этом отмечая, какие ключи не найдены,
* и только после завершения этой работы, создать блоки из ненайденных ключей.
*/
ParallelInputsProcessor(BlockInputStreams inputs_, BlockInputStreamPtr additional_input_at_end_, size_t max_threads_, Handler & handler_)
: inputs(inputs_), additional_input_at_end(additional_input_at_end_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_)
{
for (size_t i = 0; i < inputs_.size(); ++i)
available_inputs.emplace(inputs_[i], i);
}
~ParallelInputsProcessor()
{
try
{
wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// Запустить фоновые потоки, начать работу.
void process()
{
active_threads = max_threads;
threads.reserve(max_threads);
for (size_t i = 0; i < max_threads; ++i)
threads.emplace_back(std::bind(&ParallelInputsProcessor::thread, this, current_memory_tracker, i));
}
/// Попросить все источники остановиться раньше, чем они иссякнут.
void cancel()
{
finish = true;
for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*input))
{
try
{
child->cancel();
}
catch (...)
{
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
}
}
}
/// Подождать завершения работы всех потоков раньше деструктора.
void wait()
{
if (joined_threads)
return;
for (auto & thread : threads)
thread.join();
threads.clear();
joined_threads = true;
}
size_t getNumActiveThreads() const
{
return active_threads;
}
private:
/// Данные отдельного источника
struct InputData
{
BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки).
InputData() {}
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
};
template <StreamUnionMode mode2 = mode>
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num,
typename std::enable_if<mode2 == StreamUnionMode::Basic>::type * = nullptr)
{
handler.onBlock(block, thread_num);
}
template <StreamUnionMode mode2 = mode>
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num,
typename std::enable_if<mode2 == StreamUnionMode::ExtraInfo>::type * = nullptr)
{
BlockExtraInfo extra_info = stream->getBlockExtraInfo();
handler.onBlock(block, extra_info, thread_num);
}
void thread(MemoryTracker * memory_tracker, size_t thread_num)
{
current_memory_tracker = memory_tracker;
std::exception_ptr exception;
setThreadName("ParalInputsProc");
try
{
loop(thread_num);
}
catch (...)
{
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
handler.onFinishThread(thread_num);
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{
/// И ещё обрабатывает дополнительный источник, если такой есть.
if (additional_input_at_end)
{
try
{
while (Block block = additional_input_at_end->read())
publishPayload(additional_input_at_end, block, thread_num);
}
catch (...)
{
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
}
handler.onFinish();
}
}
void loop(size_t thread_num)
{
while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{
InputData input;
/// Выбираем следующий источник.
{
std::lock_guard<std::mutex> lock(available_inputs_mutex);
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (available_inputs.empty())
break;
input = available_inputs.front();
/// Убираем источник из очереди доступных источников.
available_inputs.pop();
}
/// Основная работа.
Block block = input.in->read();
{
if (finish)
break;
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
{
std::lock_guard<std::mutex> lock(available_inputs_mutex);
if (block)
{
available_inputs.push(input);
}
else
{
if (available_inputs.empty())
break;
}
}
if (finish)
break;
if (block)
publishPayload(input.in, block, thread_num);
}
}
}
BlockInputStreams inputs;
BlockInputStreamPtr additional_input_at_end;
unsigned max_threads;
Handler & handler;
/// Потоки.
typedef std::vector<std::thread> ThreadsData;
ThreadsData threads;
/** Набор доступных источников, которые не заняты каким-либо потоком в данный момент.
* Каждый поток берёт из этого набора один источник, вынимает из источника блок (в этот момент источник делает вычисления),
* и (если источник не исчерпан), кладёт назад в набор доступных источников.
*
* Возникает вопрос, что лучше использовать:
* - очередь (только что обработанный источник будет в следующий раз обработан позже остальных)
* - стек (только что обработанный источник будет обработан как можно раньше).
*
* Стек лучше очереди, когда надо выполнять работу по чтению одного источника более последовательно,
* и теоретически, это позволяет достичь более последовательных чтений с диска.
*
* Но при использовании стека, возникает проблема при распределённой обработке запроса:
* данные всё-время читаются только с части серверов, а на остальных серверах
* возникает таймаут при send-е, и обработка запроса завершается с исключением.
*
* Поэтому, используется очередь. Это можно улучшить в дальнейшем.
*/
typedef std::queue<InputData> AvailableInputs;
AvailableInputs available_inputs;
/// Для операций с available_inputs.
std::mutex available_inputs_mutex;
/// Сколько источников иссякло.
std::atomic<size_t> active_threads { 0 };
/// Завершить работу потоков (раньше, чем иссякнут источники).
std::atomic<bool> finish { false };
/// Подождали завершения всех потоков.
std::atomic<bool> joined_threads { false };
Logger * log = &Logger::get("ParallelInputsProcessor");
};
}