ClickHouse/dbms/include/DB/DataStreams/ParallelInputsProcessor.h

307 lines
11 KiB
C
Raw Normal View History

#pragma once
#include <list>
2015-04-23 00:20:41 +00:00
#include <queue>
#include <atomic>
#include <thread>
#include <mutex>
2015-09-29 19:19:54 +00:00
#include <common/logger_useful.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/setThreadName.h>
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
* Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler.
*
* Устроено так:
* - есть набор источников, из которых можно вынимать блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
*/
namespace DB
{
2015-10-12 14:53:16 +00:00
/** Режим объединения.
*/
enum class StreamUnionMode
{
Basic = 0, /// вынимать блоки
ExtraInfo /// вынимать блоки + дополнительную информацию
};
/// Пример обработчика.
struct ParallelInputsHandler
{
/// Обработка блока данных.
void onBlock(Block & block, size_t thread_num) {}
2015-10-12 14:53:16 +00:00
/// Обработка блока данных + дополнительных информаций.
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num) {}
/// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы.
/// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение.
void onFinish() {}
/// Обработка исключения. Разумно вызывать в этом методе метод ParallelInputsProcessor::cancel, а также передавать эксепшен в основной поток.
2015-10-05 05:40:27 +00:00
void onException(std::exception_ptr & exception, size_t thread_num) {}
};
2015-10-12 14:53:16 +00:00
template <typename Handler, StreamUnionMode mode = StreamUnionMode::Basic>
class ParallelInputsProcessor
{
public:
/** additional_input_at_end - если не nullptr,
* то из этого источника начинают доставаться блоки лишь после того, как все остальные источники обработаны.
* Это делается в основном потоке.
*
* Предназначено для реализации FULL и RIGHT JOIN
* - где нужно сначала параллельно сделать JOIN, при этом отмечая, какие ключи не найдены,
* и только после завершения этой работы, создать блоки из ненайденных ключей.
*/
ParallelInputsProcessor(BlockInputStreams inputs_, BlockInputStreamPtr additional_input_at_end_, size_t max_threads_, Handler & handler_)
: inputs(inputs_), additional_input_at_end(additional_input_at_end_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_)
{
for (size_t i = 0; i < inputs_.size(); ++i)
2015-04-23 00:20:41 +00:00
available_inputs.emplace(inputs_[i], i);
}
~ParallelInputsProcessor()
{
try
{
wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// Запустить фоновые потоки, начать работу.
void process()
{
active_threads = max_threads;
threads.reserve(max_threads);
for (size_t i = 0; i < max_threads; ++i)
threads.emplace_back(std::bind(&ParallelInputsProcessor::thread, this, current_memory_tracker, i));
}
/// Попросить все источники остановиться раньше, чем они иссякнут.
void cancel()
{
finish = true;
for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*input))
{
try
{
child->cancel();
}
catch (...)
{
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
}
}
}
/// Подождать завершения работы всех потоков раньше деструктора.
void wait()
{
if (joined_threads)
return;
for (auto & thread : threads)
thread.join();
threads.clear();
joined_threads = true;
}
size_t getNumActiveThreads() const
{
return active_threads;
}
private:
/// Данные отдельного источника
struct InputData
{
BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки).
InputData() {}
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
};
2015-10-12 14:53:16 +00:00
template <StreamUnionMode mode2 = mode>
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num,
typename std::enable_if<mode2 == StreamUnionMode::Basic>::type * = nullptr)
{
handler.onBlock(block, thread_num);
}
template <StreamUnionMode mode2 = mode>
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num,
typename std::enable_if<mode2 == StreamUnionMode::ExtraInfo>::type * = nullptr)
{
BlockExtraInfo extra_info = stream->getBlockExtraInfo();
handler.onBlock(block, extra_info, thread_num);
}
void thread(MemoryTracker * memory_tracker, size_t thread_num)
{
current_memory_tracker = memory_tracker;
2015-10-05 05:40:27 +00:00
std::exception_ptr exception;
setThreadName("ParalInputsProc");
try
{
loop(thread_num);
}
catch (...)
{
2015-10-05 05:40:27 +00:00
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{
/// И ещё обрабатывает дополнительный источник, если такой есть.
if (additional_input_at_end)
{
try
{
while (Block block = additional_input_at_end->read())
2015-10-12 14:53:16 +00:00
publishPayload(additional_input_at_end, block, thread_num);
}
catch (...)
{
2015-10-05 05:40:27 +00:00
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
}
handler.onFinish();
}
}
void loop(size_t thread_num)
{
while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{
InputData input;
/// Выбираем следующий источник.
{
2015-04-23 00:20:41 +00:00
std::lock_guard<std::mutex> lock(available_inputs_mutex);
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
2015-04-23 00:20:41 +00:00
if (available_inputs.empty())
break;
2015-04-23 00:20:41 +00:00
input = available_inputs.front();
/// Убираем источник из очереди доступных источников.
2015-04-23 00:20:41 +00:00
available_inputs.pop();
}
/// Основная работа.
Block block = input.in->read();
{
if (finish)
break;
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
{
2015-04-23 00:20:41 +00:00
std::lock_guard<std::mutex> lock(available_inputs_mutex);
if (block)
{
2015-04-23 00:20:41 +00:00
available_inputs.push(input);
}
else
{
2015-04-23 00:20:41 +00:00
if (available_inputs.empty())
break;
}
}
if (finish)
break;
if (block)
2015-10-12 14:53:16 +00:00
publishPayload(input.in, block, thread_num);
}
}
}
BlockInputStreams inputs;
BlockInputStreamPtr additional_input_at_end;
unsigned max_threads;
Handler & handler;
/// Потоки.
typedef std::vector<std::thread> ThreadsData;
ThreadsData threads;
2015-04-23 00:20:41 +00:00
/** Набор доступных источников, которые не заняты каким-либо потоком в данный момент.
* Каждый поток берёт из этого набора один источник, вынимает из источника блок (в этот момент источник делает вычисления),
* и (если источник не исчерпан), кладёт назад в набор доступных источников.
*
* Возникает вопрос, что лучше использовать:
* - очередь (только что обработанный источник будет в следующий раз обработан позже остальных)
* - стек (только что обработанный источник будет обработан как можно раньше).
*
* Стек лучше очереди, когда надо выполнять работу по чтению одного источника более последовательно,
* и теоретически, это позволяет достичь более последовательных чтений с диска.
*
* Но при использовании стека, возникает проблема при распределённой обработке запроса:
* данные всё-время читаются только с части серверов, а на остальных серверах
* возникает таймаут при send-е, и обработка запроса завершается с исключением.
*
* Поэтому, используется очередь. Это можно улучшить в дальнейшем.
*/
2015-04-23 00:20:41 +00:00
typedef std::queue<InputData> AvailableInputs;
AvailableInputs available_inputs;
2015-04-23 00:20:41 +00:00
/// Для операций с available_inputs.
std::mutex available_inputs_mutex;
/// Сколько источников иссякло.
std::atomic<size_t> active_threads { 0 };
/// Завершить работу потоков (раньше, чем иссякнут источники).
std::atomic<bool> finish { false };
/// Подождали завершения всех потоков.
std::atomic<bool> joined_threads { false };
Logger * log = &Logger::get("ParallelInputsProcessor");
};
}