ClickHouse/dbms/include/DB/DataStreams/UnionBlockInputStream.h

318 lines
9.0 KiB
C
Raw Normal View History

2012-01-10 22:11:51 +00:00
#pragma once
#include <list>
#include <queue>
#include <atomic>
2014-07-16 02:06:58 +00:00
#include <thread>
2012-01-10 22:11:51 +00:00
#include <Yandex/logger_useful.h>
2012-01-10 22:11:51 +00:00
2013-05-03 02:25:50 +00:00
#include <DB/Common/ConcurrentBoundedQueue.h>
2012-01-10 22:11:51 +00:00
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Объединяет несколько источников в один.
* Блоки из разных источников перемежаются друг с другом произвольным образом.
* Можно указать количество потоков (max_threads),
* в которых будет выполняться получение данных из разных источников.
*
* Устроено так:
* - есть набор источников, из которых можно вынимать блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
* - полученные блоки складываются в ограниченную очередь готовых блоков;
* - основной поток вынимает готовые блоки из очереди готовых блоков.
2012-01-10 22:11:51 +00:00
*/
class UnionBlockInputStream : public IProfilingBlockInputStream
{
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads)
2012-01-10 22:11:51 +00:00
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
2012-06-24 23:17:06 +00:00
2012-01-10 22:11:51 +00:00
for (size_t i = 0; i < inputs_.size(); ++i)
2014-07-16 02:06:58 +00:00
input_queue.emplace(inputs_[i], i);
2012-01-10 22:11:51 +00:00
}
2012-10-20 02:10:47 +00:00
String getName() const { return "UnionBlockInputStream"; }
String getID() const
{
std::stringstream res;
res << "Union(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Порядок не имеет значения.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
2013-09-13 20:33:09 +00:00
2012-10-20 02:10:47 +00:00
~UnionBlockInputStream()
{
2013-09-14 05:14:22 +00:00
try
{
if (!all_read)
cancel();
finalize();
2013-09-14 05:14:22 +00:00
}
catch (...)
{
LOG_ERROR(log, "Exception while destroying UnionBlockInputStream.");
}
2012-10-20 02:10:47 +00:00
}
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
* пропуская отвалившиеся по эксепшену.
*/
void cancel()
{
2012-11-10 05:13:46 +00:00
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
finish = true;
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&**it))
{
try
{
child->cancel();
}
catch (...)
{
2013-09-14 07:43:45 +00:00
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
}
}
}
protected:
void finalize()
{
2014-07-16 02:06:58 +00:00
if (threads.empty())
return;
LOG_TRACE(log, "Waiting for threads to finish");
/// Вынем всё, что есть в очереди готовых данных.
2013-12-08 05:18:22 +00:00
output_queue.clear();
2014-07-16 02:06:58 +00:00
/// В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
for (auto & thread : threads)
thread.join();
2014-07-16 02:06:58 +00:00
threads.clear();
LOG_TRACE(log, "Waited for threads to finish");
}
2012-01-10 22:11:51 +00:00
Block readImpl()
{
OutputData res;
if (all_read)
return res.block;
2012-10-20 02:10:47 +00:00
/// Запускаем потоки, если это ещё не было сделано.
2014-07-16 02:06:58 +00:00
if (threads.empty())
2012-01-10 22:11:51 +00:00
{
2014-07-16 02:06:58 +00:00
threads.reserve(max_threads);
for (size_t i = 0; i < max_threads; ++i)
threads.emplace_back([=] { thread(current_memory_tracker); });
}
2012-06-24 23:17:06 +00:00
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
output_queue.pop(res);
2012-01-10 22:11:51 +00:00
if (res.exception)
res.exception->rethrow();
if (!res.block)
all_read = true;
return res.block;
2012-01-10 22:11:51 +00:00
}
void readSuffix()
2013-09-13 20:33:09 +00:00
{
if (!all_read && !is_cancelled)
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
2013-11-29 15:04:07 +00:00
/// Может быть, в очереди есть ещё эксепшен.
OutputData res;
2014-07-16 02:06:58 +00:00
while (output_queue.tryPop(res))
if (res.exception)
res.exception->rethrow();
finalize();
for (size_t i = 0; i < children.size(); ++i)
children[i]->readSuffix();
2013-09-13 20:33:09 +00:00
}
private:
2012-01-10 22:11:51 +00:00
/// Данные отдельного источника
struct InputData
2012-01-10 22:11:51 +00:00
{
BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки).
2012-01-10 22:11:51 +00:00
2014-07-16 02:06:58 +00:00
InputData() {}
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
2012-01-10 22:11:51 +00:00
};
2014-07-16 02:06:58 +00:00
void thread(MemoryTracker * memory_tracker)
2012-01-10 22:11:51 +00:00
{
2014-07-16 02:06:58 +00:00
current_memory_tracker = memory_tracker;
ExceptionPtr exception;
try
{
2014-07-16 02:06:58 +00:00
loop();
}
catch (...)
{
exception = cloneCurrentException();
}
2014-07-16 02:06:58 +00:00
if (exception)
2012-01-10 22:11:51 +00:00
{
2014-07-16 02:06:58 +00:00
/// Отдаём эксепшен в основной поток.
output_queue.push(exception);
try
{
2014-07-16 02:06:58 +00:00
cancel();
}
catch (...)
{
2014-07-16 02:06:58 +00:00
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
}
}
2014-07-16 02:06:58 +00:00
}
2014-07-16 02:06:58 +00:00
void loop()
{
while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{
2014-07-16 02:06:58 +00:00
InputData input;
/// Выбираем следующий источник.
{
2014-07-16 02:06:58 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
2014-07-16 02:06:58 +00:00
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (input_queue.empty())
break;
2014-07-16 02:06:58 +00:00
input = input_queue.front();
2014-07-16 02:06:58 +00:00
/// Убираем источник из очереди доступных источников.
input_queue.pop();
}
2014-07-16 02:06:58 +00:00
/// Основная работа.
Block block = input.in->read();
2014-07-16 02:06:58 +00:00
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
2014-07-16 02:06:58 +00:00
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
if (block)
{
2014-07-16 02:06:58 +00:00
input_queue.push(input);
2014-07-16 02:06:58 +00:00
if (finish)
break;
2013-12-08 05:18:22 +00:00
2014-07-16 02:06:58 +00:00
output_queue.push(block);
}
else
{
++exhausted_inputs;
2013-12-08 05:18:22 +00:00
2014-07-16 02:06:58 +00:00
/// Если все источники иссякли.
if (exhausted_inputs == children.size())
{
2014-07-16 02:06:58 +00:00
finish = true;
break;
}
}
}
2012-01-10 22:11:51 +00:00
}
2014-07-16 02:06:58 +00:00
if (finish)
{
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет; только один раз.
if (false == pushed_end_of_output_queue.exchange(true))
output_queue.push(OutputData());
}
}
unsigned max_threads;
/// Потоки.
2014-07-16 02:06:58 +00:00
typedef std::vector<std::thread> ThreadsData;
ThreadsData threads;
/// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент.
typedef std::queue<InputData> InputQueue;
InputQueue input_queue;
/// Блок или эксепшен.
struct OutputData
{
Block block;
ExceptionPtr exception;
OutputData() {}
OutputData(Block & block_) : block(block_) {}
OutputData(ExceptionPtr & exception_) : exception(exception_) {}
};
/// Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue;
/// Для операций с input_queue.
Poco::FastMutex mutex;
/// Сколько источников иссякло.
size_t exhausted_inputs = 0;
/// Завершить работу потоков (раньше, чем иссякнут источники).
std::atomic<bool> finish { false };
/// Положили ли в output_queue пустой блок.
std::atomic<bool> pushed_end_of_output_queue { false };
bool all_read { false };
Logger * log = &Logger::get("UnionBlockInputStream");
2012-01-10 22:11:51 +00:00
};
}