2011-09-19 03:34:23 +00:00
|
|
|
#include <DB/DataStreams/AggregatingBlockInputStream.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
|
2011-09-24 20:32:41 +00:00
|
|
|
/*AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression)
|
|
|
|
: input(input_), has_been_read(false)
|
|
|
|
{
|
|
|
|
children.push_back(input);
|
|
|
|
|
|
|
|
|
|
|
|
}*/
|
|
|
|
|
|
|
|
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
Block AggregatingBlockInputStream::readImpl()
|
|
|
|
{
|
|
|
|
if (has_been_read)
|
|
|
|
return Block();
|
|
|
|
|
|
|
|
has_been_read = true;
|
|
|
|
|
|
|
|
AggregatedData data = aggregator.execute(input);
|
|
|
|
Block res = aggregator.getSampleBlock();
|
|
|
|
|
|
|
|
for (AggregatedData::const_iterator it = data.begin(); it != data.end(); ++it)
|
|
|
|
{
|
|
|
|
size_t i = 0;
|
|
|
|
for (Row::const_iterator jt = it->first.begin(); jt != it->first.end(); ++jt, ++i)
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
|
|
|
|
for (AggregateFunctions::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|