2012-01-10 22:11:51 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2015-09-29 19:19:54 +00:00
|
|
|
|
#include <common/logger_useful.h>
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
2013-05-03 02:25:50 +00:00
|
|
|
|
#include <DB/Common/ConcurrentBoundedQueue.h>
|
2012-01-10 22:11:51 +00:00
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2014-11-30 18:22:57 +00:00
|
|
|
|
#include <DB/DataStreams/ParallelInputsProcessor.h>
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
using Poco::SharedPtr;
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
namespace
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
template <StreamUnionMode mode>
|
|
|
|
|
struct OutputData;
|
|
|
|
|
|
|
|
|
|
/// Блок или эксепшен.
|
|
|
|
|
template <>
|
|
|
|
|
struct OutputData<StreamUnionMode::Basic>
|
|
|
|
|
{
|
|
|
|
|
Block block;
|
|
|
|
|
std::exception_ptr exception;
|
|
|
|
|
|
|
|
|
|
OutputData() {}
|
|
|
|
|
OutputData(Block & block_) : block(block_) {}
|
|
|
|
|
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// Блок + дополнительнцю информацию или эксепшен.
|
|
|
|
|
template <>
|
|
|
|
|
struct OutputData<StreamUnionMode::ExtraInfo>
|
|
|
|
|
{
|
|
|
|
|
Block block;
|
|
|
|
|
BlockExtraInfo extra_info;
|
|
|
|
|
std::exception_ptr exception;
|
|
|
|
|
|
|
|
|
|
OutputData() {}
|
|
|
|
|
OutputData(Block & block_, BlockExtraInfo & extra_info_) : block(block_), extra_info(extra_info_) {}
|
|
|
|
|
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
|
|
|
|
/** Объединяет несколько источников в один.
|
|
|
|
|
* Блоки из разных источников перемежаются друг с другом произвольным образом.
|
|
|
|
|
* Можно указать количество потоков (max_threads),
|
|
|
|
|
* в которых будет выполняться получение данных из разных источников.
|
2012-09-24 02:05:40 +00:00
|
|
|
|
*
|
|
|
|
|
* Устроено так:
|
2014-11-30 18:22:57 +00:00
|
|
|
|
* - с помощью ParallelInputsProcessor в нескольких потоках вынимает из источников блоки;
|
2012-09-24 02:05:40 +00:00
|
|
|
|
* - полученные блоки складываются в ограниченную очередь готовых блоков;
|
2015-10-12 14:53:16 +00:00
|
|
|
|
* - основной поток вынимает готовые блоки из очереди готовых блоков;
|
|
|
|
|
* - если указан режим StreamUnionMode::ExtraInfo, в дополнение к блокам UnionBlockInputStream
|
|
|
|
|
* вынимает информацию о блоках; в таком случае все источники должны поддержать такой режим.
|
2012-01-10 22:11:51 +00:00
|
|
|
|
*/
|
2014-11-30 18:22:57 +00:00
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
template <StreamUnionMode mode = StreamUnionMode::Basic>
|
2012-01-10 22:11:51 +00:00
|
|
|
|
class UnionBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
2015-10-12 14:53:16 +00:00
|
|
|
|
private:
|
|
|
|
|
using Self = UnionBlockInputStream<mode>;
|
|
|
|
|
|
2012-01-10 22:11:51 +00:00
|
|
|
|
public:
|
2015-07-21 21:29:02 +00:00
|
|
|
|
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads) :
|
2015-04-22 21:26:12 +00:00
|
|
|
|
output_queue(std::min(inputs.size(), max_threads)),
|
|
|
|
|
handler(*this),
|
2015-07-21 21:29:02 +00:00
|
|
|
|
processor(inputs, additional_input_at_end, max_threads, handler)
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2014-11-30 18:22:57 +00:00
|
|
|
|
children = inputs;
|
2015-07-21 21:29:02 +00:00
|
|
|
|
if (additional_input_at_end)
|
|
|
|
|
children.push_back(additional_input_at_end);
|
2012-01-10 22:11:51 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-06-08 20:22:02 +00:00
|
|
|
|
String getName() const override { return "Union"; }
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
String getID() const override
|
2013-05-03 10:20:53 +00:00
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
|
|
|
|
res << "Union(";
|
|
|
|
|
|
|
|
|
|
Strings children_ids(children.size());
|
|
|
|
|
for (size_t i = 0; i < children.size(); ++i)
|
|
|
|
|
children_ids[i] = children[i]->getID();
|
|
|
|
|
|
|
|
|
|
/// Порядок не имеет значения.
|
|
|
|
|
std::sort(children_ids.begin(), children_ids.end());
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < children_ids.size(); ++i)
|
|
|
|
|
res << (i == 0 ? "" : ", ") << children_ids[i];
|
|
|
|
|
|
|
|
|
|
res << ")";
|
|
|
|
|
return res.str();
|
|
|
|
|
}
|
|
|
|
|
|
2013-09-13 20:33:09 +00:00
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
~UnionBlockInputStream() override
|
2012-10-20 02:10:47 +00:00
|
|
|
|
{
|
2013-09-14 05:14:22 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
2013-11-25 10:46:25 +00:00
|
|
|
|
if (!all_read)
|
|
|
|
|
cancel();
|
2013-11-29 18:44:02 +00:00
|
|
|
|
|
|
|
|
|
finalize();
|
2013-09-14 05:14:22 +00:00
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2014-12-14 08:27:22 +00:00
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
2013-09-14 05:14:22 +00:00
|
|
|
|
}
|
2012-10-20 02:10:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-10-30 19:17:41 +00:00
|
|
|
|
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
|
|
|
|
|
* пропуская отвалившиеся по эксепшену.
|
|
|
|
|
*/
|
2014-11-08 23:52:18 +00:00
|
|
|
|
void cancel() override
|
2012-10-30 19:17:41 +00:00
|
|
|
|
{
|
2015-03-20 16:20:47 +00:00
|
|
|
|
bool old_val = false;
|
|
|
|
|
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
|
2012-11-10 05:13:46 +00:00
|
|
|
|
return;
|
2012-11-10 04:42:26 +00:00
|
|
|
|
|
2014-12-14 08:27:22 +00:00
|
|
|
|
//std::cerr << "cancelling\n";
|
2014-11-30 18:22:57 +00:00
|
|
|
|
processor.cancel();
|
2013-11-29 18:44:02 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
BlockExtraInfo getBlockExtraInfo() const override
|
|
|
|
|
{
|
|
|
|
|
return doGetBlockExtraInfo();
|
|
|
|
|
}
|
2013-11-25 10:46:25 +00:00
|
|
|
|
|
2013-11-29 18:44:02 +00:00
|
|
|
|
protected:
|
|
|
|
|
void finalize()
|
|
|
|
|
{
|
2014-11-30 18:22:57 +00:00
|
|
|
|
if (!started)
|
2013-12-27 13:22:32 +00:00
|
|
|
|
return;
|
|
|
|
|
|
2013-11-25 10:46:25 +00:00
|
|
|
|
LOG_TRACE(log, "Waiting for threads to finish");
|
|
|
|
|
|
2015-10-05 05:40:27 +00:00
|
|
|
|
std::exception_ptr exception;
|
2014-12-14 08:27:22 +00:00
|
|
|
|
if (!all_read)
|
|
|
|
|
{
|
|
|
|
|
/** Прочитаем всё до конца, чтобы ParallelInputsProcessor не заблокировался при попытке вставить в очередь.
|
|
|
|
|
* Может быть, в очереди есть ещё эксепшен.
|
|
|
|
|
*/
|
2015-10-12 14:53:16 +00:00
|
|
|
|
OutputData<mode> res;
|
2014-12-14 08:27:22 +00:00
|
|
|
|
while (true)
|
|
|
|
|
{
|
|
|
|
|
//std::cerr << "popping\n";
|
|
|
|
|
output_queue.pop(res);
|
|
|
|
|
|
|
|
|
|
if (res.exception)
|
|
|
|
|
{
|
|
|
|
|
if (!exception)
|
|
|
|
|
exception = res.exception;
|
2015-10-05 05:40:27 +00:00
|
|
|
|
else if (Exception * e = exception_cast<Exception *>(exception))
|
|
|
|
|
e->addMessage("\n" + getExceptionMessage(res.exception, false));
|
2014-12-14 08:27:22 +00:00
|
|
|
|
}
|
|
|
|
|
else if (!res.block)
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
all_read = true;
|
|
|
|
|
}
|
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
processor.wait();
|
2013-11-29 18:44:02 +00:00
|
|
|
|
|
2013-11-25 10:46:25 +00:00
|
|
|
|
LOG_TRACE(log, "Waited for threads to finish");
|
2014-12-14 08:27:22 +00:00
|
|
|
|
|
|
|
|
|
if (exception)
|
2015-10-05 05:40:27 +00:00
|
|
|
|
std::rethrow_exception(exception);
|
2012-10-30 19:17:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-12-14 08:27:22 +00:00
|
|
|
|
/** Возможны следующие варианты:
|
|
|
|
|
* 1. Функция readImpl вызывается до тех пор, пока она не вернёт пустой блок.
|
|
|
|
|
* Затем вызывается функция readSuffix и затем деструктор.
|
|
|
|
|
* 2. Вызывается функция readImpl. В какой-то момент, возможно из другого потока вызывается функция cancel.
|
|
|
|
|
* Затем вызывается функция readSuffix и затем деструктор.
|
|
|
|
|
* 3. В любой момент, объект может быть и так уничтожен (вызываться деструктор).
|
|
|
|
|
*/
|
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
Block readImpl() override
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2013-11-29 08:08:43 +00:00
|
|
|
|
if (all_read)
|
2015-10-12 14:53:16 +00:00
|
|
|
|
return received_payload.block;
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
/// Запускаем потоки, если это ещё не было сделано.
|
2014-11-30 18:22:57 +00:00
|
|
|
|
if (!started)
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2014-11-30 18:22:57 +00:00
|
|
|
|
started = true;
|
|
|
|
|
processor.process();
|
2012-09-24 02:05:40 +00:00
|
|
|
|
}
|
2012-06-24 23:17:06 +00:00
|
|
|
|
|
2012-10-12 16:53:45 +00:00
|
|
|
|
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
|
2014-12-14 08:27:22 +00:00
|
|
|
|
//std::cerr << "popping\n";
|
2015-10-12 14:53:16 +00:00
|
|
|
|
output_queue.pop(received_payload);
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
if (received_payload.exception)
|
|
|
|
|
std::rethrow_exception(received_payload.exception);
|
2012-10-12 16:53:45 +00:00
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
if (!received_payload.block)
|
2012-11-10 04:42:26 +00:00
|
|
|
|
all_read = true;
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
return received_payload.block;
|
2012-01-10 22:11:51 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-12-14 08:27:22 +00:00
|
|
|
|
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
|
2014-11-08 23:52:18 +00:00
|
|
|
|
void readSuffix() override
|
2013-09-13 20:33:09 +00:00
|
|
|
|
{
|
2014-12-14 08:27:22 +00:00
|
|
|
|
//std::cerr << "readSuffix\n";
|
2015-03-20 16:20:47 +00:00
|
|
|
|
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
|
2013-12-27 13:22:32 +00:00
|
|
|
|
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
|
2013-11-29 15:04:07 +00:00
|
|
|
|
|
2013-12-27 13:22:32 +00:00
|
|
|
|
finalize();
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < children.size(); ++i)
|
|
|
|
|
children[i]->readSuffix();
|
2013-09-13 20:33:09 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
private:
|
2015-10-12 14:53:16 +00:00
|
|
|
|
template<StreamUnionMode mode2 = mode>
|
|
|
|
|
BlockExtraInfo doGetBlockExtraInfo(typename std::enable_if<mode2 == StreamUnionMode::ExtraInfo>::type * = nullptr) const
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2015-10-12 14:53:16 +00:00
|
|
|
|
return received_payload.extra_info;
|
|
|
|
|
}
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
template<StreamUnionMode mode2 = mode>
|
|
|
|
|
BlockExtraInfo doGetBlockExtraInfo(typename std::enable_if<mode2 == StreamUnionMode::Basic>::type * = nullptr) const
|
|
|
|
|
{
|
|
|
|
|
throw Exception("Method getBlockExtraInfo is not supported for mode StreamUnionMode::Basic",
|
|
|
|
|
ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
|
}
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
private:
|
|
|
|
|
using Payload = OutputData<mode>;
|
|
|
|
|
using OutputQueue = ConcurrentBoundedQueue<Payload>;
|
|
|
|
|
|
|
|
|
|
private:
|
2014-12-14 08:27:22 +00:00
|
|
|
|
/** Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
|
|
|
|
|
* Когда данные закончатся - в очередь вставляется пустой блок.
|
|
|
|
|
* В очередь всегда (даже после исключения или отмены запроса) рано или поздно вставляется пустой блок.
|
|
|
|
|
* Очередь всегда (даже после исключения или отмены запроса, даже в деструкторе) нужно дочитывать до пустого блока,
|
|
|
|
|
* иначе ParallelInputsProcessor может заблокироваться при вставке в очередь.
|
|
|
|
|
*/
|
2014-11-30 18:22:57 +00:00
|
|
|
|
OutputQueue output_queue;
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
struct Handler
|
|
|
|
|
{
|
2015-10-12 14:53:16 +00:00
|
|
|
|
Handler(Self & parent_) : parent(parent_) {}
|
2012-09-04 19:57:17 +00:00
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
template <StreamUnionMode mode2 = mode>
|
|
|
|
|
void onBlock(Block & block, size_t thread_num,
|
|
|
|
|
typename std::enable_if<mode2 == StreamUnionMode::Basic>::type * = nullptr)
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2014-12-14 08:27:22 +00:00
|
|
|
|
//std::cerr << "pushing block\n";
|
2015-10-12 14:53:16 +00:00
|
|
|
|
parent.output_queue.push(Payload(block));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template <StreamUnionMode mode2 = mode>
|
|
|
|
|
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num,
|
|
|
|
|
typename std::enable_if<mode2 == StreamUnionMode::ExtraInfo>::type * = nullptr)
|
|
|
|
|
{
|
|
|
|
|
//std::cerr << "pushing block with extra info\n";
|
|
|
|
|
parent.output_queue.push(Payload(block, extra_info));
|
2012-09-24 02:05:40 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
void onFinish()
|
2012-09-24 02:05:40 +00:00
|
|
|
|
{
|
2014-12-14 08:27:22 +00:00
|
|
|
|
//std::cerr << "pushing end\n";
|
2015-10-12 14:53:16 +00:00
|
|
|
|
parent.output_queue.push(Payload());
|
2012-01-10 22:11:51 +00:00
|
|
|
|
}
|
2012-09-03 19:52:57 +00:00
|
|
|
|
|
2015-12-01 21:20:14 +00:00
|
|
|
|
void onFinishThread(size_t thread_num)
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-05 05:40:27 +00:00
|
|
|
|
void onException(std::exception_ptr & exception, size_t thread_num)
|
2014-07-16 02:06:58 +00:00
|
|
|
|
{
|
2014-12-14 08:27:22 +00:00
|
|
|
|
//std::cerr << "pushing exception\n";
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
/// Порядок строк имеет значение. Если его поменять, то возможна ситуация,
|
|
|
|
|
/// когда перед эксепшеном, в очередь окажется вставлен пустой блок (конец данных),
|
2014-12-14 08:27:22 +00:00
|
|
|
|
/// и эксепшен потеряется.
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
parent.output_queue.push(exception);
|
2014-12-14 08:27:22 +00:00
|
|
|
|
parent.cancel(); /// Не кидает исключений.
|
2014-11-30 18:22:57 +00:00
|
|
|
|
}
|
2012-10-12 16:53:45 +00:00
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
Self & parent;
|
2012-10-12 16:53:45 +00:00
|
|
|
|
};
|
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
Handler handler;
|
2015-10-12 14:53:16 +00:00
|
|
|
|
ParallelInputsProcessor<Handler, mode> processor;
|
|
|
|
|
|
|
|
|
|
Payload received_payload;
|
2014-07-16 01:36:18 +00:00
|
|
|
|
|
2014-11-30 18:22:57 +00:00
|
|
|
|
bool started = false;
|
|
|
|
|
bool all_read = false;
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
2014-07-16 01:36:18 +00:00
|
|
|
|
Logger * log = &Logger::get("UnionBlockInputStream");
|
2012-01-10 22:11:51 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|