ClickHouse/dbms/include/DB/DataStreams/UnionBlockInputStream.h

244 lines
7.4 KiB
C
Raw Normal View History

2012-01-10 22:11:51 +00:00
#pragma once
2015-09-29 19:19:54 +00:00
#include <common/logger_useful.h>
2012-01-10 22:11:51 +00:00
2013-05-03 02:25:50 +00:00
#include <DB/Common/ConcurrentBoundedQueue.h>
2012-01-10 22:11:51 +00:00
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/ParallelInputsProcessor.h>
2012-01-10 22:11:51 +00:00
namespace DB
{
using Poco::SharedPtr;
/** Объединяет несколько источников в один.
* Блоки из разных источников перемежаются друг с другом произвольным образом.
* Можно указать количество потоков (max_threads),
* в которых будет выполняться получение данных из разных источников.
*
* Устроено так:
* - с помощью ParallelInputsProcessor в нескольких потоках вынимает из источников блоки;
* - полученные блоки складываются в ограниченную очередь готовых блоков;
* - основной поток вынимает готовые блоки из очереди готовых блоков.
2012-01-10 22:11:51 +00:00
*/
2012-01-10 22:11:51 +00:00
class UnionBlockInputStream : public IProfilingBlockInputStream
{
public:
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads) :
2015-04-22 21:26:12 +00:00
output_queue(std::min(inputs.size(), max_threads)),
handler(*this),
processor(inputs, additional_input_at_end, max_threads, handler)
2012-01-10 22:11:51 +00:00
{
children = inputs;
if (additional_input_at_end)
children.push_back(additional_input_at_end);
2012-01-10 22:11:51 +00:00
}
String getName() const override { return "Union"; }
2012-10-20 02:10:47 +00:00
String getID() const override
{
std::stringstream res;
res << "Union(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Порядок не имеет значения.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
2013-09-13 20:33:09 +00:00
~UnionBlockInputStream() override
2012-10-20 02:10:47 +00:00
{
2013-09-14 05:14:22 +00:00
try
{
if (!all_read)
cancel();
finalize();
2013-09-14 05:14:22 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
2013-09-14 05:14:22 +00:00
}
2012-10-20 02:10:47 +00:00
}
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
* пропуская отвалившиеся по эксепшену.
*/
void cancel() override
{
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
2012-11-10 05:13:46 +00:00
return;
//std::cerr << "cancelling\n";
processor.cancel();
}
protected:
void finalize()
{
if (!started)
return;
LOG_TRACE(log, "Waiting for threads to finish");
ExceptionPtr exception;
if (!all_read)
{
/** Прочитаем всё до конца, чтобы ParallelInputsProcessor не заблокировался при попытке вставить в очередь.
* Может быть, в очереди есть ещё эксепшен.
*/
OutputData res;
while (true)
{
//std::cerr << "popping\n";
output_queue.pop(res);
if (res.exception)
{
if (!exception)
exception = res.exception;
else if (DB::Exception * e = dynamic_cast<DB::Exception *>(&*exception))
e->addMessage("\n" + res.exception->displayText());
}
else if (!res.block)
break;
}
all_read = true;
}
processor.wait();
LOG_TRACE(log, "Waited for threads to finish");
if (exception)
exception->rethrow();
}
/** Возможны следующие варианты:
* 1. Функция readImpl вызывается до тех пор, пока она не вернёт пустой блок.
* Затем вызывается функция readSuffix и затем деструктор.
* 2. Вызывается функция readImpl. В какой-то момент, возможно из другого потока вызывается функция cancel.
* Затем вызывается функция readSuffix и затем деструктор.
* 3. В любой момент, объект может быть и так уничтожен (вызываться деструктор).
*/
Block readImpl() override
2012-01-10 22:11:51 +00:00
{
OutputData res;
if (all_read)
return res.block;
2012-10-20 02:10:47 +00:00
/// Запускаем потоки, если это ещё не было сделано.
if (!started)
2012-01-10 22:11:51 +00:00
{
started = true;
processor.process();
}
2012-06-24 23:17:06 +00:00
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
//std::cerr << "popping\n";
output_queue.pop(res);
2012-01-10 22:11:51 +00:00
if (res.exception)
res.exception->rethrow();
if (!res.block)
all_read = true;
return res.block;
2012-01-10 22:11:51 +00:00
}
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
void readSuffix() override
2013-09-13 20:33:09 +00:00
{
//std::cerr << "readSuffix\n";
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
2013-11-29 15:04:07 +00:00
finalize();
for (size_t i = 0; i < children.size(); ++i)
children[i]->readSuffix();
2013-09-13 20:33:09 +00:00
}
private:
/// Блок или эксепшен.
struct OutputData
2012-01-10 22:11:51 +00:00
{
Block block;
ExceptionPtr exception;
2012-01-10 22:11:51 +00:00
OutputData() {}
OutputData(Block & block_) : block(block_) {}
OutputData(ExceptionPtr & exception_) : exception(exception_) {}
2012-01-10 22:11:51 +00:00
};
/** Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
* Когда данные закончатся - в очередь вставляется пустой блок.
* В очередь всегда (даже после исключения или отмены запроса) рано или поздно вставляется пустой блок.
* Очередь всегда (даже после исключения или отмены запроса, даже в деструкторе) нужно дочитывать до пустого блока,
* иначе ParallelInputsProcessor может заблокироваться при вставке в очередь.
*/
typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue;
2014-07-16 02:06:58 +00:00
struct Handler
{
Handler(UnionBlockInputStream & parent_) : parent(parent_) {}
void onBlock(Block & block, size_t thread_num)
2012-01-10 22:11:51 +00:00
{
//std::cerr << "pushing block\n";
parent.output_queue.push(block);
}
void onFinish()
{
//std::cerr << "pushing end\n";
parent.output_queue.push(OutputData());
2012-01-10 22:11:51 +00:00
}
void onException(ExceptionPtr & exception, size_t thread_num)
2014-07-16 02:06:58 +00:00
{
//std::cerr << "pushing exception\n";
/// Порядок строк имеет значение. Если его поменять, то возможна ситуация,
/// когда перед эксепшеном, в очередь окажется вставлен пустой блок (конец данных),
/// и эксепшен потеряется.
parent.output_queue.push(exception);
parent.cancel(); /// Не кидает исключений.
}
UnionBlockInputStream & parent;
};
Handler handler;
ParallelInputsProcessor<Handler> processor;
bool started = false;
bool all_read = false;
Logger * log = &Logger::get("UnionBlockInputStream");
2012-01-10 22:11:51 +00:00
};
}