2012-01-10 22:11:51 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
#include <list>
|
|
|
|
|
#include <queue>
|
|
|
|
|
|
|
|
|
|
#include <Poco/Thread.h>
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
#include <Yandex/logger_useful.h>
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
2013-05-03 02:25:50 +00:00
|
|
|
|
#include <DB/Common/ConcurrentBoundedQueue.h>
|
2012-01-10 22:11:51 +00:00
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
using Poco::SharedPtr;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** Объединяет несколько источников в один.
|
|
|
|
|
* Блоки из разных источников перемежаются друг с другом произвольным образом.
|
|
|
|
|
* Можно указать количество потоков (max_threads),
|
|
|
|
|
* в которых будет выполняться получение данных из разных источников.
|
2012-09-24 02:05:40 +00:00
|
|
|
|
*
|
|
|
|
|
* Устроено так:
|
|
|
|
|
* - есть набор источников, из которых можно вынимать блоки;
|
|
|
|
|
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
|
|
|
|
|
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
|
|
|
|
|
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
|
|
|
|
|
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
|
|
|
|
|
* - полученные блоки складываются в ограниченную очередь готовых блоков;
|
|
|
|
|
* - основной поток вынимает готовые блоки из очереди готовых блоков.
|
2012-01-10 22:11:51 +00:00
|
|
|
|
*/
|
|
|
|
|
class UnionBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
2012-09-04 19:57:17 +00:00
|
|
|
|
class Thread;
|
2012-01-10 22:11:51 +00:00
|
|
|
|
public:
|
|
|
|
|
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
|
|
|
|
|
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
|
2013-11-29 12:25:54 +00:00
|
|
|
|
output_queue(max_threads), exhausted_inputs(0), finish(false),
|
|
|
|
|
pushed_end_of_output_queue(false), all_read(false), log(&Logger::get("UnionBlockInputStream"))
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
|
|
|
|
children.insert(children.end(), inputs_.begin(), inputs_.end());
|
2012-06-24 23:17:06 +00:00
|
|
|
|
|
2012-01-10 22:11:51 +00:00
|
|
|
|
for (size_t i = 0; i < inputs_.size(); ++i)
|
2012-06-24 23:17:06 +00:00
|
|
|
|
{
|
2012-09-24 02:05:40 +00:00
|
|
|
|
input_queue.push(InputData());
|
|
|
|
|
input_queue.back().in = inputs_[i];
|
|
|
|
|
input_queue.back().i = i;
|
2012-06-24 23:17:06 +00:00
|
|
|
|
}
|
2012-01-10 22:11:51 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
String getName() const { return "UnionBlockInputStream"; }
|
|
|
|
|
|
2013-05-03 10:20:53 +00:00
|
|
|
|
String getID() const
|
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
|
|
|
|
res << "Union(";
|
|
|
|
|
|
|
|
|
|
Strings children_ids(children.size());
|
|
|
|
|
for (size_t i = 0; i < children.size(); ++i)
|
|
|
|
|
children_ids[i] = children[i]->getID();
|
|
|
|
|
|
|
|
|
|
/// Порядок не имеет значения.
|
|
|
|
|
std::sort(children_ids.begin(), children_ids.end());
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < children_ids.size(); ++i)
|
|
|
|
|
res << (i == 0 ? "" : ", ") << children_ids[i];
|
|
|
|
|
|
|
|
|
|
res << ")";
|
|
|
|
|
return res.str();
|
|
|
|
|
}
|
|
|
|
|
|
2013-09-13 20:33:09 +00:00
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
~UnionBlockInputStream()
|
|
|
|
|
{
|
2013-09-14 05:14:22 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
2013-11-25 10:46:25 +00:00
|
|
|
|
if (!all_read)
|
|
|
|
|
cancel();
|
2013-11-29 18:44:02 +00:00
|
|
|
|
|
|
|
|
|
finalize();
|
2013-09-14 05:14:22 +00:00
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
LOG_ERROR(log, "Exception while destroying UnionBlockInputStream.");
|
|
|
|
|
}
|
2012-10-20 02:10:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-10-30 19:17:41 +00:00
|
|
|
|
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
|
|
|
|
|
* пропуская отвалившиеся по эксепшену.
|
|
|
|
|
*/
|
|
|
|
|
void cancel()
|
|
|
|
|
{
|
2012-11-10 05:13:46 +00:00
|
|
|
|
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
|
|
|
|
|
return;
|
2012-11-10 04:42:26 +00:00
|
|
|
|
|
2013-11-25 10:46:25 +00:00
|
|
|
|
finish = true;
|
2012-10-30 19:17:41 +00:00
|
|
|
|
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&**it))
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
child->cancel();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2013-09-14 07:43:45 +00:00
|
|
|
|
LOG_ERROR(log, "Exception while cancelling " << child->getName());
|
2012-10-30 19:17:41 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-11-29 18:44:02 +00:00
|
|
|
|
}
|
|
|
|
|
|
2013-11-25 10:46:25 +00:00
|
|
|
|
|
2013-11-29 18:44:02 +00:00
|
|
|
|
protected:
|
|
|
|
|
void finalize()
|
|
|
|
|
{
|
2013-12-27 13:22:32 +00:00
|
|
|
|
if (threads_data.empty())
|
|
|
|
|
return;
|
|
|
|
|
|
2013-11-25 10:46:25 +00:00
|
|
|
|
LOG_TRACE(log, "Waiting for threads to finish");
|
|
|
|
|
|
|
|
|
|
/// Вынем всё, что есть в очереди готовых данных.
|
2013-12-08 05:18:22 +00:00
|
|
|
|
output_queue.clear();
|
2013-11-25 10:46:25 +00:00
|
|
|
|
|
|
|
|
|
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
|
|
|
|
|
* PS. Может быть, для переменной finish нужен барьер?
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
|
|
|
|
|
it->thread->join();
|
|
|
|
|
|
2013-11-29 18:44:02 +00:00
|
|
|
|
threads_data.clear();
|
|
|
|
|
|
2013-11-25 10:46:25 +00:00
|
|
|
|
LOG_TRACE(log, "Waited for threads to finish");
|
2012-10-30 19:17:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-01-10 22:11:51 +00:00
|
|
|
|
Block readImpl()
|
|
|
|
|
{
|
2012-10-12 16:53:45 +00:00
|
|
|
|
OutputData res;
|
2013-11-29 08:08:43 +00:00
|
|
|
|
if (all_read)
|
2012-10-12 16:53:45 +00:00
|
|
|
|
return res.block;
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
/// Запускаем потоки, если это ещё не было сделано.
|
|
|
|
|
if (threads_data.empty())
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2012-09-24 02:05:40 +00:00
|
|
|
|
threads_data.resize(max_threads);
|
|
|
|
|
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
|
2012-06-22 18:27:40 +00:00
|
|
|
|
{
|
2012-10-12 16:53:45 +00:00
|
|
|
|
it->runnable = new Thread(*this);
|
2012-09-24 02:05:40 +00:00
|
|
|
|
it->thread = new Poco::Thread;
|
|
|
|
|
it->thread->start(*it->runnable);
|
2012-01-10 22:11:51 +00:00
|
|
|
|
}
|
2012-09-24 02:05:40 +00:00
|
|
|
|
}
|
2012-06-24 23:17:06 +00:00
|
|
|
|
|
2012-10-12 16:53:45 +00:00
|
|
|
|
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
|
2012-09-24 02:05:40 +00:00
|
|
|
|
output_queue.pop(res);
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
2012-10-12 16:53:45 +00:00
|
|
|
|
if (res.exception)
|
|
|
|
|
res.exception->rethrow();
|
|
|
|
|
|
2012-11-10 04:42:26 +00:00
|
|
|
|
if (!res.block)
|
|
|
|
|
all_read = true;
|
|
|
|
|
|
2012-10-12 16:53:45 +00:00
|
|
|
|
return res.block;
|
2012-01-10 22:11:51 +00:00
|
|
|
|
}
|
|
|
|
|
|
2013-12-27 13:22:32 +00:00
|
|
|
|
void readSuffix()
|
2013-09-13 20:33:09 +00:00
|
|
|
|
{
|
2013-11-25 10:46:25 +00:00
|
|
|
|
if (!all_read && !is_cancelled)
|
2013-12-27 13:22:32 +00:00
|
|
|
|
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
|
2013-11-29 15:04:07 +00:00
|
|
|
|
|
|
|
|
|
/// Может быть, в очереди есть ещё эксепшен.
|
|
|
|
|
OutputData res;
|
|
|
|
|
while (output_queue.tryPop(res) && res.exception)
|
|
|
|
|
res.exception->rethrow();
|
2013-12-27 13:22:32 +00:00
|
|
|
|
|
|
|
|
|
finalize();
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < children.size(); ++i)
|
|
|
|
|
children[i]->readSuffix();
|
2013-09-13 20:33:09 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
private:
|
2012-01-10 22:11:51 +00:00
|
|
|
|
/// Данные отдельного источника
|
2012-09-04 19:57:17 +00:00
|
|
|
|
struct InputData
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
|
|
|
|
BlockInputStreamPtr in;
|
2012-09-24 02:05:40 +00:00
|
|
|
|
size_t i; /// Порядковый номер источника (для отладки).
|
|
|
|
|
};
|
2012-01-10 22:11:51 +00:00
|
|
|
|
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
/// Данные отдельного потока
|
|
|
|
|
struct ThreadData
|
|
|
|
|
{
|
|
|
|
|
SharedPtr<Poco::Thread> thread;
|
|
|
|
|
SharedPtr<Thread> runnable;
|
2012-01-10 22:11:51 +00:00
|
|
|
|
};
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
2012-09-04 19:57:17 +00:00
|
|
|
|
class Thread : public Poco::Runnable
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2012-09-04 19:57:17 +00:00
|
|
|
|
public:
|
2012-10-12 16:53:45 +00:00
|
|
|
|
Thread(UnionBlockInputStream & parent_) : parent(parent_) {}
|
2012-09-04 19:57:17 +00:00
|
|
|
|
|
|
|
|
|
void run()
|
2012-01-10 22:11:51 +00:00
|
|
|
|
{
|
2012-10-12 16:53:45 +00:00
|
|
|
|
ExceptionPtr exception;
|
|
|
|
|
|
2012-09-04 19:57:17 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
2012-09-24 02:05:40 +00:00
|
|
|
|
loop();
|
2012-09-04 19:57:17 +00:00
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2013-11-02 23:42:10 +00:00
|
|
|
|
exception = cloneCurrentException();
|
2012-09-04 19:57:17 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
if (exception)
|
|
|
|
|
{
|
2012-10-30 19:17:41 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
parent.cancel();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
/** Если не удалось попросить остановиться одного или несколько источников.
|
|
|
|
|
* (например, разорвано соединение при распределённой обработке запроса)
|
|
|
|
|
* - то пофиг.
|
|
|
|
|
*/
|
|
|
|
|
}
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
2012-10-12 16:53:45 +00:00
|
|
|
|
/// Отдаём эксепшен в основной поток.
|
|
|
|
|
parent.output_queue.push(exception);
|
2012-09-24 02:05:40 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void loop()
|
|
|
|
|
{
|
|
|
|
|
while (!parent.finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
|
|
|
|
|
{
|
|
|
|
|
InputData input;
|
|
|
|
|
|
|
|
|
|
/// Выбираем следующий источник.
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
|
|
|
|
|
|
|
|
|
|
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
|
|
|
|
|
if (parent.input_queue.empty())
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
input = parent.input_queue.front();
|
|
|
|
|
|
|
|
|
|
/// Убираем источник из очереди доступных источников.
|
|
|
|
|
parent.input_queue.pop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Основная работа.
|
|
|
|
|
Block block = input.in->read();
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
|
|
|
|
|
|
|
|
|
|
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
|
|
|
|
|
if (block)
|
|
|
|
|
{
|
|
|
|
|
parent.input_queue.push(input);
|
2013-12-08 05:18:22 +00:00
|
|
|
|
|
|
|
|
|
if (parent.finish)
|
|
|
|
|
break;
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
parent.output_queue.push(block);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
++parent.exhausted_inputs;
|
|
|
|
|
|
|
|
|
|
/// Если все источники иссякли.
|
|
|
|
|
if (parent.exhausted_inputs == parent.children.size())
|
|
|
|
|
{
|
|
|
|
|
parent.finish = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-11-29 08:08:43 +00:00
|
|
|
|
|
2013-11-25 10:46:25 +00:00
|
|
|
|
if (parent.finish)
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
|
|
|
|
|
|
2013-11-29 08:08:43 +00:00
|
|
|
|
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет.
|
2013-11-29 12:25:54 +00:00
|
|
|
|
if (!parent.pushed_end_of_output_queue)
|
|
|
|
|
{
|
|
|
|
|
parent.pushed_end_of_output_queue = true;
|
|
|
|
|
parent.output_queue.push(OutputData());
|
|
|
|
|
}
|
2013-11-25 10:46:25 +00:00
|
|
|
|
}
|
2012-01-10 22:11:51 +00:00
|
|
|
|
}
|
2012-09-03 19:52:57 +00:00
|
|
|
|
|
2012-09-04 19:57:17 +00:00
|
|
|
|
private:
|
|
|
|
|
UnionBlockInputStream & parent;
|
|
|
|
|
};
|
|
|
|
|
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
|
|
|
|
unsigned max_threads;
|
|
|
|
|
|
|
|
|
|
/// Потоки.
|
|
|
|
|
typedef std::list<ThreadData> ThreadsData;
|
|
|
|
|
ThreadsData threads_data;
|
|
|
|
|
|
|
|
|
|
/// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент.
|
|
|
|
|
typedef std::queue<InputData> InputQueue;
|
|
|
|
|
InputQueue input_queue;
|
|
|
|
|
|
2012-10-12 16:53:45 +00:00
|
|
|
|
/// Блок или эксепшен.
|
|
|
|
|
struct OutputData
|
|
|
|
|
{
|
|
|
|
|
Block block;
|
|
|
|
|
ExceptionPtr exception;
|
|
|
|
|
|
|
|
|
|
OutputData() {}
|
|
|
|
|
OutputData(Block & block_) : block(block_) {}
|
|
|
|
|
OutputData(ExceptionPtr & exception_) : exception(exception_) {}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
|
|
|
|
|
typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
|
2012-09-24 02:05:40 +00:00
|
|
|
|
OutputQueue output_queue;
|
|
|
|
|
|
|
|
|
|
/// Для операций с очередями.
|
2012-09-04 19:57:17 +00:00
|
|
|
|
Poco::FastMutex mutex;
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
|
|
|
|
/// Сколько источников иссякло.
|
|
|
|
|
size_t exhausted_inputs;
|
|
|
|
|
|
|
|
|
|
/// Завершить работу потоков (раньше, чем иссякнут источники).
|
|
|
|
|
volatile bool finish;
|
2013-11-29 12:25:54 +00:00
|
|
|
|
/// Положили ли в output_queue пустой блок.
|
2013-12-08 05:18:22 +00:00
|
|
|
|
volatile bool pushed_end_of_output_queue;
|
2012-11-10 04:42:26 +00:00
|
|
|
|
bool all_read;
|
2012-09-24 02:05:40 +00:00
|
|
|
|
|
|
|
|
|
Logger * log;
|
2012-01-10 22:11:51 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|