ClickHouse/dbms/src/DataStreams/AggregatingBlockInputStream.cpp

52 lines
1.3 KiB
C++
Raw Normal View History

2011-09-19 03:34:23 +00:00
#include <DB/DataStreams/AggregatingBlockInputStream.h>
namespace DB
{
2011-09-25 03:37:09 +00:00
AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression)
2011-09-24 20:32:41 +00:00
: input(input_), has_been_read(false)
{
children.push_back(input);
2011-09-25 03:37:09 +00:00
Names key_names;
AggregateDescriptions aggregates;
expression->getAggregateInfo(key_names, aggregates);
aggregator = new Aggregator(key_names, aggregates);
}
2011-09-24 20:32:41 +00:00
2011-09-19 03:34:23 +00:00
Block AggregatingBlockInputStream::readImpl()
{
if (has_been_read)
return Block();
has_been_read = true;
2011-09-25 03:37:09 +00:00
AggregatedData data = aggregator->execute(input);
Block res = aggregator->getSampleBlock();
2011-09-19 03:34:23 +00:00
for (AggregatedData::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t i = 0;
for (Row::const_iterator jt = it->first.begin(); jt != it->first.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
for (AggregateFunctions::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
2011-09-26 01:50:32 +00:00
/// Изменяем размер столбцов-констант в блоке.
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
if (res.getByPosition(i).column->isConst())
res.getByPosition(i).column->cut(0, data.size());
2011-09-19 03:34:23 +00:00
return res;
}
}