2012-02-27 06:28:20 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <statdaemons/threadpool.hpp>
|
|
|
|
|
|
|
|
|
|
#include <DB/Interpreters/Aggregator.h>
|
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
using Poco::SharedPtr;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** Агрегирует несколько источников параллельно.
|
|
|
|
|
* Запускает агрегацию отдельных источников в отдельных потоках, затем объединяет результаты.
|
2014-02-26 11:44:54 +00:00
|
|
|
|
* Если final=false, агрегатные функции не финализируются, то есть, не заменяются на своё значение, а содержат промежуточное состояние вычислений.
|
2012-02-27 06:28:20 +00:00
|
|
|
|
* Это необходимо, чтобы можно было продолжить агрегацию (например, объединяя потоки частично агрегированных данных).
|
|
|
|
|
*/
|
|
|
|
|
class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
2014-02-26 11:44:54 +00:00
|
|
|
|
ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, const ColumnNumbers & keys_,
|
|
|
|
|
AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, unsigned max_threads_ = 1,
|
2014-02-17 23:56:45 +00:00
|
|
|
|
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
|
2014-02-26 11:44:54 +00:00
|
|
|
|
: aggregator(new Aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_)),
|
|
|
|
|
has_been_read(false), final(final_), max_threads(max_threads_), pool(std::min(max_threads, inputs_.size()))
|
2012-02-27 06:28:20 +00:00
|
|
|
|
{
|
|
|
|
|
children.insert(children.end(), inputs_.begin(), inputs_.end());
|
|
|
|
|
}
|
|
|
|
|
|
2014-02-26 11:44:54 +00:00
|
|
|
|
/** Столбцы из key_names и аргументы агрегатных функций, уже должны быть вычислены.
|
2012-02-27 06:28:20 +00:00
|
|
|
|
*/
|
2014-02-26 11:44:54 +00:00
|
|
|
|
ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, const Names & key_names,
|
|
|
|
|
const AggregateDescriptions & aggregates, bool overflow_row_, bool final_, unsigned max_threads_ = 1,
|
2014-02-17 23:56:45 +00:00
|
|
|
|
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
|
2014-02-26 11:44:54 +00:00
|
|
|
|
: has_been_read(false), final(final_), max_threads(max_threads_), pool(std::min(max_threads, inputs_.size()))
|
2012-02-27 06:28:20 +00:00
|
|
|
|
{
|
|
|
|
|
children.insert(children.end(), inputs_.begin(), inputs_.end());
|
|
|
|
|
|
2014-02-26 11:44:54 +00:00
|
|
|
|
aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_);
|
2012-02-27 06:28:20 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
String getName() const { return "ParallelAggregatingBlockInputStream"; }
|
|
|
|
|
|
2013-05-03 10:20:53 +00:00
|
|
|
|
String getID() const
|
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
|
|
|
|
res << "ParallelAggregating(";
|
|
|
|
|
|
|
|
|
|
Strings children_ids(children.size());
|
|
|
|
|
for (size_t i = 0; i < children.size(); ++i)
|
|
|
|
|
children_ids[i] = children[i]->getID();
|
|
|
|
|
|
|
|
|
|
/// Порядок не имеет значения.
|
|
|
|
|
std::sort(children_ids.begin(), children_ids.end());
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < children_ids.size(); ++i)
|
|
|
|
|
res << (i == 0 ? "" : ", ") << children_ids[i];
|
|
|
|
|
|
|
|
|
|
res << ", " << aggregator->getID() << ")";
|
|
|
|
|
return res.str();
|
|
|
|
|
}
|
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
protected:
|
2012-02-27 06:28:20 +00:00
|
|
|
|
Block readImpl()
|
|
|
|
|
{
|
|
|
|
|
if (has_been_read)
|
|
|
|
|
return Block();
|
|
|
|
|
|
|
|
|
|
has_been_read = true;
|
|
|
|
|
|
2013-05-04 04:05:15 +00:00
|
|
|
|
ManyAggregatedDataVariants many_data(children.size());
|
|
|
|
|
Exceptions exceptions(children.size());
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2012-02-27 06:28:20 +00:00
|
|
|
|
for (size_t i = 0, size = many_data.size(); i < size; ++i)
|
|
|
|
|
{
|
|
|
|
|
many_data[i] = new AggregatedDataVariants;
|
2013-05-04 04:05:15 +00:00
|
|
|
|
pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this, boost::ref(children[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i])));
|
2012-02-27 06:28:20 +00:00
|
|
|
|
}
|
|
|
|
|
pool.wait();
|
2012-09-12 18:18:48 +00:00
|
|
|
|
|
2013-11-02 23:42:10 +00:00
|
|
|
|
rethrowFirstException(exceptions);
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2013-01-07 01:41:06 +00:00
|
|
|
|
if (isCancelled())
|
|
|
|
|
return Block();
|
|
|
|
|
|
2012-02-27 06:28:20 +00:00
|
|
|
|
AggregatedDataVariantsPtr res = aggregator->merge(many_data);
|
2014-02-26 11:44:54 +00:00
|
|
|
|
return aggregator->convertToBlock(*res, final);
|
2012-02-27 06:28:20 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
SharedPtr<Aggregator> aggregator;
|
|
|
|
|
bool has_been_read;
|
2013-11-03 23:35:18 +00:00
|
|
|
|
bool final;
|
2012-02-27 06:28:20 +00:00
|
|
|
|
size_t max_threads;
|
|
|
|
|
boost::threadpool::pool pool;
|
|
|
|
|
|
2014-02-26 11:44:54 +00:00
|
|
|
|
/// Вычисления, которые выполнятся в отдельном потоке
|
2012-02-27 06:28:20 +00:00
|
|
|
|
void calculate(BlockInputStreamPtr & input, AggregatedDataVariants & data, ExceptionPtr & exception)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
aggregator->execute(input, data);
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2013-11-02 23:42:10 +00:00
|
|
|
|
exception = cloneCurrentException();
|
2012-02-27 06:28:20 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|