2011-09-19 03:34:23 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2013-02-04 03:31:31 +00:00
|
|
|
|
#include <iomanip>
|
|
|
|
|
|
|
|
|
|
#include <Yandex/logger_useful.h>
|
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
#include <DB/Columns/ColumnAggregateFunction.h>
|
|
|
|
|
|
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
|
|
|
|
|
2013-07-28 03:14:03 +00:00
|
|
|
|
#include <DB/Interpreters/Aggregator.h>
|
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
using Poco::SharedPtr;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** Преобразует агрегатные функции (с промежуточным состоянием) в потоке блоков в конечные значения.
|
|
|
|
|
*/
|
|
|
|
|
class FinalizingAggregatedBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
2013-07-28 03:14:03 +00:00
|
|
|
|
FinalizingAggregatedBlockInputStream(BlockInputStreamPtr input_, AggregateDescriptions & aggregates_)
|
|
|
|
|
: aggregates(aggregates_), log(&Logger::get("FinalizingAggregatedBlockInputStream"))
|
2011-09-19 03:34:23 +00:00
|
|
|
|
{
|
2013-05-04 04:05:15 +00:00
|
|
|
|
children.push_back(input_);
|
2011-09-19 03:34:23 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
String getName() const { return "FinalizingAggregatedBlockInputStream"; }
|
|
|
|
|
|
2013-05-03 10:20:53 +00:00
|
|
|
|
String getID() const
|
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
2013-05-04 05:20:07 +00:00
|
|
|
|
res << "FinalizingAggregated(" << children.back()->getID() << ")";
|
2013-05-03 10:20:53 +00:00
|
|
|
|
return res.str();
|
|
|
|
|
}
|
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
protected:
|
2011-09-19 03:34:23 +00:00
|
|
|
|
Block readImpl()
|
|
|
|
|
{
|
2013-05-04 05:20:07 +00:00
|
|
|
|
Block res = children.back()->read();
|
2011-09-19 03:34:23 +00:00
|
|
|
|
|
|
|
|
|
if (!res)
|
|
|
|
|
return res;
|
|
|
|
|
|
2013-02-04 03:31:31 +00:00
|
|
|
|
LOG_TRACE(log, "Finalizing aggregate functions");
|
|
|
|
|
Stopwatch watch;
|
|
|
|
|
|
2013-09-01 04:55:41 +00:00
|
|
|
|
finalizeBlock(res);
|
|
|
|
|
|
|
|
|
|
double elapsed_seconds = watch.elapsedSeconds();
|
|
|
|
|
if (elapsed_seconds > 0.001)
|
|
|
|
|
{
|
|
|
|
|
LOG_TRACE(log, std::fixed << std::setprecision(3)
|
|
|
|
|
<< "Finalized aggregate functions. "
|
|
|
|
|
<< res.rows() << " rows, " << res.bytes() / 1048576.0 << " MiB"
|
|
|
|
|
<< " in " << elapsed_seconds << " sec."
|
|
|
|
|
<< " (" << res.rows() / elapsed_seconds << " rows/sec., " << res.bytes() / elapsed_seconds / 1048576.0 << " MiB/sec.)");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
AggregateDescriptions aggregates;
|
|
|
|
|
Logger * log;
|
|
|
|
|
|
|
|
|
|
void finalizeBlock(Block & res)
|
|
|
|
|
{
|
2011-09-19 03:34:23 +00:00
|
|
|
|
size_t rows = res.rows();
|
|
|
|
|
size_t columns = res.columns();
|
2013-07-28 03:14:03 +00:00
|
|
|
|
size_t number_of_aggregate = 0;
|
2011-09-19 03:34:23 +00:00
|
|
|
|
for (size_t i = 0; i < columns; ++i)
|
|
|
|
|
{
|
|
|
|
|
ColumnWithNameAndType & column = res.getByPosition(i);
|
|
|
|
|
if (ColumnAggregateFunction * col = dynamic_cast<ColumnAggregateFunction *>(&*column.column))
|
|
|
|
|
{
|
|
|
|
|
ColumnAggregateFunction::Container_t & data = col->getData();
|
2013-07-28 03:14:03 +00:00
|
|
|
|
IAggregateFunction * func = aggregates[number_of_aggregate].function;
|
2013-02-08 23:41:05 +00:00
|
|
|
|
column.type = func->getReturnType();
|
2013-06-30 11:38:46 +00:00
|
|
|
|
column.column = column.type->createColumn();
|
|
|
|
|
IColumn & finalized_column = *column.column;
|
|
|
|
|
finalized_column.reserve(rows);
|
2011-09-19 03:34:23 +00:00
|
|
|
|
|
|
|
|
|
for (size_t j = 0; j < rows; ++j)
|
2013-06-30 11:38:46 +00:00
|
|
|
|
func->insertResultInto(data[j], finalized_column);
|
2013-07-28 03:14:03 +00:00
|
|
|
|
|
|
|
|
|
++number_of_aggregate;
|
2011-09-19 03:34:23 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|