2014-11-30 18:22:57 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <list>
|
2015-04-23 00:20:41 +00:00
|
|
|
|
#include <queue>
|
2014-11-30 18:22:57 +00:00
|
|
|
|
#include <atomic>
|
|
|
|
|
#include <thread>
|
|
|
|
|
#include <mutex>
|
|
|
|
|
|
2015-09-29 19:19:54 +00:00
|
|
|
|
#include <common/logger_useful.h>
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2015-09-24 18:54:21 +00:00
|
|
|
|
#include <DB/Common/setThreadName.h>
|
2016-01-21 01:47:28 +00:00
|
|
|
|
#include <DB/Common/CurrentMetrics.h>
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
|
|
|
|
|
* Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler.
|
|
|
|
|
*
|
|
|
|
|
* Устроено так:
|
|
|
|
|
* - есть набор источников, из которых можно вынимать блоки;
|
|
|
|
|
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
|
|
|
|
|
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
|
|
|
|
|
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
|
|
|
|
|
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
/** Режим объединения.
|
|
|
|
|
*/
|
|
|
|
|
enum class StreamUnionMode
|
|
|
|
|
{
|
|
|
|
|
Basic = 0, /// вынимать блоки
|
|
|
|
|
ExtraInfo /// вынимать блоки + дополнительную информацию
|
|
|
|
|
};
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
/// Пример обработчика.
|
|
|
|
|
struct ParallelInputsHandler
|
|
|
|
|
{
|
|
|
|
|
/// Обработка блока данных.
|
|
|
|
|
void onBlock(Block & block, size_t thread_num) {}
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
/// Обработка блока данных + дополнительных информаций.
|
|
|
|
|
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num) {}
|
|
|
|
|
|
2015-12-01 21:20:14 +00:00
|
|
|
|
/// Вызывается для каждого потока, когда потоку стало больше нечего делать.
|
|
|
|
|
/// Из-за того, что иссякла часть источников, и сейчас источников осталось меньше, чем потоков.
|
|
|
|
|
/// Вызывается, если метод onException не кидает исключение; вызывается до метода onFinish.
|
|
|
|
|
void onFinishThread(size_t thread_num) {}
|
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
/// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы.
|
2014-12-14 08:27:22 +00:00
|
|
|
|
/// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение.
|
2014-11-30 18:22:57 +00:00
|
|
|
|
void onFinish() {}
|
|
|
|
|
|
|
|
|
|
/// Обработка исключения. Разумно вызывать в этом методе метод ParallelInputsProcessor::cancel, а также передавать эксепшен в основной поток.
|
2015-10-05 05:40:27 +00:00
|
|
|
|
void onException(std::exception_ptr & exception, size_t thread_num) {}
|
2014-11-30 18:22:57 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
template <typename Handler, StreamUnionMode mode = StreamUnionMode::Basic>
|
2014-11-30 18:22:57 +00:00
|
|
|
|
class ParallelInputsProcessor
|
|
|
|
|
{
|
|
|
|
|
public:
|
2015-07-21 21:29:02 +00:00
|
|
|
|
/** additional_input_at_end - если не nullptr,
|
|
|
|
|
* то из этого источника начинают доставаться блоки лишь после того, как все остальные источники обработаны.
|
|
|
|
|
* Это делается в основном потоке.
|
|
|
|
|
*
|
|
|
|
|
* Предназначено для реализации FULL и RIGHT JOIN
|
|
|
|
|
* - где нужно сначала параллельно сделать JOIN, при этом отмечая, какие ключи не найдены,
|
|
|
|
|
* и только после завершения этой работы, создать блоки из ненайденных ключей.
|
|
|
|
|
*/
|
|
|
|
|
ParallelInputsProcessor(BlockInputStreams inputs_, BlockInputStreamPtr additional_input_at_end_, size_t max_threads_, Handler & handler_)
|
|
|
|
|
: inputs(inputs_), additional_input_at_end(additional_input_at_end_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_)
|
2014-11-30 18:22:57 +00:00
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < inputs_.size(); ++i)
|
2015-04-23 00:20:41 +00:00
|
|
|
|
available_inputs.emplace(inputs_[i], i);
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
~ParallelInputsProcessor()
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
wait();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Запустить фоновые потоки, начать работу.
|
|
|
|
|
void process()
|
|
|
|
|
{
|
|
|
|
|
active_threads = max_threads;
|
|
|
|
|
threads.reserve(max_threads);
|
|
|
|
|
for (size_t i = 0; i < max_threads; ++i)
|
2014-12-15 05:23:44 +00:00
|
|
|
|
threads.emplace_back(std::bind(&ParallelInputsProcessor::thread, this, current_memory_tracker, i));
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Попросить все источники остановиться раньше, чем они иссякнут.
|
|
|
|
|
void cancel()
|
|
|
|
|
{
|
|
|
|
|
finish = true;
|
|
|
|
|
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
|
{
|
|
|
|
|
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*input))
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
child->cancel();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
/** Если не удалось попросить остановиться одного или несколько источников.
|
|
|
|
|
* (например, разорвано соединение при распределённой обработке запроса)
|
|
|
|
|
* - то пофиг.
|
|
|
|
|
*/
|
|
|
|
|
LOG_ERROR(log, "Exception while cancelling " << child->getName());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Подождать завершения работы всех потоков раньше деструктора.
|
|
|
|
|
void wait()
|
|
|
|
|
{
|
|
|
|
|
if (joined_threads)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
for (auto & thread : threads)
|
|
|
|
|
thread.join();
|
|
|
|
|
|
|
|
|
|
threads.clear();
|
|
|
|
|
joined_threads = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t getNumActiveThreads() const
|
|
|
|
|
{
|
|
|
|
|
return active_threads;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
/// Данные отдельного источника
|
|
|
|
|
struct InputData
|
|
|
|
|
{
|
|
|
|
|
BlockInputStreamPtr in;
|
|
|
|
|
size_t i; /// Порядковый номер источника (для отладки).
|
|
|
|
|
|
|
|
|
|
InputData() {}
|
|
|
|
|
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
|
|
|
|
|
};
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
template <StreamUnionMode mode2 = mode>
|
|
|
|
|
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num,
|
|
|
|
|
typename std::enable_if<mode2 == StreamUnionMode::Basic>::type * = nullptr)
|
|
|
|
|
{
|
|
|
|
|
handler.onBlock(block, thread_num);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template <StreamUnionMode mode2 = mode>
|
|
|
|
|
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num,
|
|
|
|
|
typename std::enable_if<mode2 == StreamUnionMode::ExtraInfo>::type * = nullptr)
|
|
|
|
|
{
|
|
|
|
|
BlockExtraInfo extra_info = stream->getBlockExtraInfo();
|
|
|
|
|
handler.onBlock(block, extra_info, thread_num);
|
|
|
|
|
}
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
void thread(MemoryTracker * memory_tracker, size_t thread_num)
|
|
|
|
|
{
|
|
|
|
|
current_memory_tracker = memory_tracker;
|
2015-10-05 05:40:27 +00:00
|
|
|
|
std::exception_ptr exception;
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
2015-09-24 18:54:21 +00:00
|
|
|
|
setThreadName("ParalInputsProc");
|
2016-01-21 01:47:28 +00:00
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
|
2015-09-24 18:54:21 +00:00
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
loop(thread_num);
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2015-10-05 05:40:27 +00:00
|
|
|
|
exception = std::current_exception();
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (exception)
|
|
|
|
|
{
|
|
|
|
|
handler.onException(exception, thread_num);
|
|
|
|
|
}
|
2014-12-14 08:27:22 +00:00
|
|
|
|
|
2015-12-01 21:20:14 +00:00
|
|
|
|
handler.onFinishThread(thread_num);
|
|
|
|
|
|
2014-12-14 08:27:22 +00:00
|
|
|
|
/// Последний поток при выходе сообщает, что данных больше нет.
|
2015-07-22 02:59:47 +00:00
|
|
|
|
if (0 == --active_threads)
|
2014-12-14 08:27:22 +00:00
|
|
|
|
{
|
2015-07-22 02:59:47 +00:00
|
|
|
|
/// И ещё обрабатывает дополнительный источник, если такой есть.
|
|
|
|
|
if (additional_input_at_end)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
while (Block block = additional_input_at_end->read())
|
2015-10-12 14:53:16 +00:00
|
|
|
|
publishPayload(additional_input_at_end, block, thread_num);
|
2015-07-22 02:59:47 +00:00
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2015-10-05 05:40:27 +00:00
|
|
|
|
exception = std::current_exception();
|
2015-07-22 02:59:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (exception)
|
|
|
|
|
{
|
|
|
|
|
handler.onException(exception, thread_num);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-06 15:29:16 +00:00
|
|
|
|
handler.onFinish(); /// TODO Если в onFinish или onFinishThread эксепшен, то вызывается std::terminate.
|
2014-12-14 08:27:22 +00:00
|
|
|
|
}
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void loop(size_t thread_num)
|
|
|
|
|
{
|
|
|
|
|
while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
|
|
|
|
|
{
|
|
|
|
|
InputData input;
|
|
|
|
|
|
|
|
|
|
/// Выбираем следующий источник.
|
|
|
|
|
{
|
2015-04-23 00:20:41 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(available_inputs_mutex);
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
|
2015-04-23 00:20:41 +00:00
|
|
|
|
if (available_inputs.empty())
|
2014-11-30 18:22:57 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2015-04-23 00:20:41 +00:00
|
|
|
|
input = available_inputs.front();
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
/// Убираем источник из очереди доступных источников.
|
2015-04-23 00:20:41 +00:00
|
|
|
|
available_inputs.pop();
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Основная работа.
|
|
|
|
|
Block block = input.in->read();
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
if (finish)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
|
|
|
|
|
{
|
2015-04-23 00:20:41 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(available_inputs_mutex);
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
if (block)
|
|
|
|
|
{
|
2015-04-23 00:20:41 +00:00
|
|
|
|
available_inputs.push(input);
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2015-04-23 00:20:41 +00:00
|
|
|
|
if (available_inputs.empty())
|
2014-11-30 18:22:57 +00:00
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (finish)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
if (block)
|
2015-10-12 14:53:16 +00:00
|
|
|
|
publishPayload(input.in, block, thread_num);
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
BlockInputStreams inputs;
|
2015-07-21 21:29:02 +00:00
|
|
|
|
BlockInputStreamPtr additional_input_at_end;
|
2014-11-30 18:22:57 +00:00
|
|
|
|
unsigned max_threads;
|
|
|
|
|
|
|
|
|
|
Handler & handler;
|
|
|
|
|
|
|
|
|
|
/// Потоки.
|
|
|
|
|
typedef std::vector<std::thread> ThreadsData;
|
|
|
|
|
ThreadsData threads;
|
|
|
|
|
|
2015-04-23 00:20:41 +00:00
|
|
|
|
/** Набор доступных источников, которые не заняты каким-либо потоком в данный момент.
|
|
|
|
|
* Каждый поток берёт из этого набора один источник, вынимает из источника блок (в этот момент источник делает вычисления),
|
|
|
|
|
* и (если источник не исчерпан), кладёт назад в набор доступных источников.
|
|
|
|
|
*
|
|
|
|
|
* Возникает вопрос, что лучше использовать:
|
|
|
|
|
* - очередь (только что обработанный источник будет в следующий раз обработан позже остальных)
|
|
|
|
|
* - стек (только что обработанный источник будет обработан как можно раньше).
|
|
|
|
|
*
|
|
|
|
|
* Стек лучше очереди, когда надо выполнять работу по чтению одного источника более последовательно,
|
|
|
|
|
* и теоретически, это позволяет достичь более последовательных чтений с диска.
|
|
|
|
|
*
|
|
|
|
|
* Но при использовании стека, возникает проблема при распределённой обработке запроса:
|
|
|
|
|
* данные всё-время читаются только с части серверов, а на остальных серверах
|
|
|
|
|
* возникает таймаут при send-е, и обработка запроса завершается с исключением.
|
|
|
|
|
*
|
|
|
|
|
* Поэтому, используется очередь. Это можно улучшить в дальнейшем.
|
2015-03-07 22:00:58 +00:00
|
|
|
|
*/
|
2015-04-23 00:20:41 +00:00
|
|
|
|
typedef std::queue<InputData> AvailableInputs;
|
|
|
|
|
AvailableInputs available_inputs;
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
2015-04-23 00:20:41 +00:00
|
|
|
|
/// Для операций с available_inputs.
|
|
|
|
|
std::mutex available_inputs_mutex;
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
|
|
|
|
/// Сколько источников иссякло.
|
|
|
|
|
std::atomic<size_t> active_threads { 0 };
|
|
|
|
|
/// Завершить работу потоков (раньше, чем иссякнут источники).
|
|
|
|
|
std::atomic<bool> finish { false };
|
|
|
|
|
/// Подождали завершения всех потоков.
|
|
|
|
|
std::atomic<bool> joined_threads { false };
|
|
|
|
|
|
|
|
|
|
Logger * log = &Logger::get("ParallelInputsProcessor");
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|