ClickHouse/dbms/include/DB/DataStreams/UnionBlockInputStream.h

322 lines
9.9 KiB
C
Raw Normal View History

2012-01-10 22:11:51 +00:00
#pragma once
2015-09-29 19:19:54 +00:00
#include <common/logger_useful.h>
2012-01-10 22:11:51 +00:00
2013-05-03 02:25:50 +00:00
#include <DB/Common/ConcurrentBoundedQueue.h>
2012-01-10 22:11:51 +00:00
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/ParallelInputsProcessor.h>
2012-01-10 22:11:51 +00:00
namespace DB
{
using Poco::SharedPtr;
2016-01-12 02:39:12 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
2015-10-12 14:53:16 +00:00
namespace
{
template <StreamUnionMode mode>
struct OutputData;
/// Блок или эксепшен.
template <>
struct OutputData<StreamUnionMode::Basic>
{
Block block;
std::exception_ptr exception;
OutputData() {}
OutputData(Block & block_) : block(block_) {}
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
};
/// Блок + дополнительнцю информацию или эксепшен.
template <>
struct OutputData<StreamUnionMode::ExtraInfo>
{
Block block;
BlockExtraInfo extra_info;
std::exception_ptr exception;
OutputData() {}
OutputData(Block & block_, BlockExtraInfo & extra_info_) : block(block_), extra_info(extra_info_) {}
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
};
}
2012-01-10 22:11:51 +00:00
/** Объединяет несколько источников в один.
* Блоки из разных источников перемежаются друг с другом произвольным образом.
* Можно указать количество потоков (max_threads),
* в которых будет выполняться получение данных из разных источников.
*
* Устроено так:
* - с помощью ParallelInputsProcessor в нескольких потоках вынимает из источников блоки;
* - полученные блоки складываются в ограниченную очередь готовых блоков;
2015-10-12 14:53:16 +00:00
* - основной поток вынимает готовые блоки из очереди готовых блоков;
* - если указан режим StreamUnionMode::ExtraInfo, в дополнение к блокам UnionBlockInputStream
* вынимает информацию о блоках; в таком случае все источники должны поддержать такой режим.
2012-01-10 22:11:51 +00:00
*/
2015-10-12 14:53:16 +00:00
template <StreamUnionMode mode = StreamUnionMode::Basic>
2012-01-10 22:11:51 +00:00
class UnionBlockInputStream : public IProfilingBlockInputStream
{
2016-03-04 16:33:31 +00:00
public:
using ExceptionCallback = std::function<void()>;
2015-10-12 14:53:16 +00:00
private:
using Self = UnionBlockInputStream<mode>;
2012-01-10 22:11:51 +00:00
public:
2016-03-04 16:33:31 +00:00
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()) :
2015-04-22 21:26:12 +00:00
output_queue(std::min(inputs.size(), max_threads)),
handler(*this),
2016-03-04 16:33:31 +00:00
processor(inputs, additional_input_at_end, max_threads, handler),
exception_callback(exception_callback_)
2012-01-10 22:11:51 +00:00
{
children = inputs;
if (additional_input_at_end)
children.push_back(additional_input_at_end);
2012-01-10 22:11:51 +00:00
}
String getName() const override { return "Union"; }
2012-10-20 02:10:47 +00:00
String getID() const override
{
std::stringstream res;
res << "Union(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Порядок не имеет значения.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
2013-09-13 20:33:09 +00:00
~UnionBlockInputStream() override
2012-10-20 02:10:47 +00:00
{
2013-09-14 05:14:22 +00:00
try
{
if (!all_read)
cancel();
finalize();
2013-09-14 05:14:22 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
2013-09-14 05:14:22 +00:00
}
2012-10-20 02:10:47 +00:00
}
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
* пропуская отвалившиеся по эксепшену.
*/
void cancel() override
{
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
2012-11-10 05:13:46 +00:00
return;
//std::cerr << "cancelling\n";
processor.cancel();
}
2015-10-12 14:53:16 +00:00
BlockExtraInfo getBlockExtraInfo() const override
{
return doGetBlockExtraInfo();
}
protected:
void finalize()
{
if (!started)
return;
LOG_TRACE(log, "Waiting for threads to finish");
2015-10-05 05:40:27 +00:00
std::exception_ptr exception;
if (!all_read)
{
/** Прочитаем всё до конца, чтобы ParallelInputsProcessor не заблокировался при попытке вставить в очередь.
* Может быть, в очереди есть ещё эксепшен.
*/
2015-10-12 14:53:16 +00:00
OutputData<mode> res;
while (true)
{
//std::cerr << "popping\n";
output_queue.pop(res);
if (res.exception)
{
if (!exception)
exception = res.exception;
2015-10-05 05:40:27 +00:00
else if (Exception * e = exception_cast<Exception *>(exception))
e->addMessage("\n" + getExceptionMessage(res.exception, false));
}
else if (!res.block)
break;
}
all_read = true;
}
processor.wait();
LOG_TRACE(log, "Waited for threads to finish");
if (exception)
2015-10-05 05:40:27 +00:00
std::rethrow_exception(exception);
}
/** Возможны следующие варианты:
* 1. Функция readImpl вызывается до тех пор, пока она не вернёт пустой блок.
* Затем вызывается функция readSuffix и затем деструктор.
* 2. Вызывается функция readImpl. В какой-то момент, возможно из другого потока вызывается функция cancel.
* Затем вызывается функция readSuffix и затем деструктор.
* 3. В любой момент, объект может быть и так уничтожен (вызываться деструктор).
*/
Block readImpl() override
2012-01-10 22:11:51 +00:00
{
if (all_read)
2015-10-12 14:53:16 +00:00
return received_payload.block;
2012-10-20 02:10:47 +00:00
/// Запускаем потоки, если это ещё не было сделано.
if (!started)
2012-01-10 22:11:51 +00:00
{
started = true;
processor.process();
}
2012-06-24 23:17:06 +00:00
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
//std::cerr << "popping\n";
2015-10-12 14:53:16 +00:00
output_queue.pop(received_payload);
2012-01-10 22:11:51 +00:00
2015-10-12 14:53:16 +00:00
if (received_payload.exception)
2016-03-04 16:33:31 +00:00
{
if (exception_callback)
exception_callback();
2015-10-12 14:53:16 +00:00
std::rethrow_exception(received_payload.exception);
2016-03-04 16:33:31 +00:00
}
2015-10-12 14:53:16 +00:00
if (!received_payload.block)
all_read = true;
2015-10-12 14:53:16 +00:00
return received_payload.block;
2012-01-10 22:11:51 +00:00
}
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
void readSuffix() override
2013-09-13 20:33:09 +00:00
{
//std::cerr << "readSuffix\n";
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
2013-11-29 15:04:07 +00:00
finalize();
for (size_t i = 0; i < children.size(); ++i)
children[i]->readSuffix();
2013-09-13 20:33:09 +00:00
}
private:
2015-10-12 14:53:16 +00:00
template<StreamUnionMode mode2 = mode>
BlockExtraInfo doGetBlockExtraInfo(typename std::enable_if<mode2 == StreamUnionMode::ExtraInfo>::type * = nullptr) const
2012-01-10 22:11:51 +00:00
{
2015-10-12 14:53:16 +00:00
return received_payload.extra_info;
}
2012-01-10 22:11:51 +00:00
2015-10-12 14:53:16 +00:00
template<StreamUnionMode mode2 = mode>
BlockExtraInfo doGetBlockExtraInfo(typename std::enable_if<mode2 == StreamUnionMode::Basic>::type * = nullptr) const
{
throw Exception("Method getBlockExtraInfo is not supported for mode StreamUnionMode::Basic",
ErrorCodes::NOT_IMPLEMENTED);
}
2012-01-10 22:11:51 +00:00
2015-10-12 14:53:16 +00:00
private:
using Payload = OutputData<mode>;
using OutputQueue = ConcurrentBoundedQueue<Payload>;
private:
/** Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
* Когда данные закончатся - в очередь вставляется пустой блок.
* В очередь всегда (даже после исключения или отмены запроса) рано или поздно вставляется пустой блок.
* Очередь всегда (даже после исключения или отмены запроса, даже в деструкторе) нужно дочитывать до пустого блока,
* иначе ParallelInputsProcessor может заблокироваться при вставке в очередь.
*/
OutputQueue output_queue;
struct Handler
{
2015-10-12 14:53:16 +00:00
Handler(Self & parent_) : parent(parent_) {}
2015-10-12 14:53:16 +00:00
template <StreamUnionMode mode2 = mode>
void onBlock(Block & block, size_t thread_num,
typename std::enable_if<mode2 == StreamUnionMode::Basic>::type * = nullptr)
2012-01-10 22:11:51 +00:00
{
//std::cerr << "pushing block\n";
2015-10-12 14:53:16 +00:00
parent.output_queue.push(Payload(block));
}
template <StreamUnionMode mode2 = mode>
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num,
typename std::enable_if<mode2 == StreamUnionMode::ExtraInfo>::type * = nullptr)
{
//std::cerr << "pushing block with extra info\n";
parent.output_queue.push(Payload(block, extra_info));
}
void onFinish()
{
//std::cerr << "pushing end\n";
2015-10-12 14:53:16 +00:00
parent.output_queue.push(Payload());
2012-01-10 22:11:51 +00:00
}
2015-12-01 21:20:14 +00:00
void onFinishThread(size_t thread_num)
{
}
2015-10-05 05:40:27 +00:00
void onException(std::exception_ptr & exception, size_t thread_num)
2014-07-16 02:06:58 +00:00
{
//std::cerr << "pushing exception\n";
/// Порядок строк имеет значение. Если его поменять, то возможна ситуация,
/// когда перед эксепшеном, в очередь окажется вставлен пустой блок (конец данных),
/// и эксепшен потеряется.
parent.output_queue.push(exception);
parent.cancel(); /// Не кидает исключений.
}
2015-10-12 14:53:16 +00:00
Self & parent;
};
Handler handler;
2015-10-12 14:53:16 +00:00
ParallelInputsProcessor<Handler, mode> processor;
2016-03-04 16:33:31 +00:00
ExceptionCallback exception_callback;
2015-10-12 14:53:16 +00:00
Payload received_payload;
bool started = false;
bool all_read = false;
Logger * log = &Logger::get("UnionBlockInputStream");
2012-01-10 22:11:51 +00:00
};
}