ClickHouse/dbms/src/DataStreams/AggregatingBlockInputStream.cpp

97 lines
2.8 KiB
C++
Raw Normal View History

2011-09-19 03:34:23 +00:00
#include <DB/DataStreams/AggregatingBlockInputStream.h>
namespace DB
{
2011-09-25 03:37:09 +00:00
AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression)
2011-09-24 20:32:41 +00:00
: input(input_), has_been_read(false)
{
children.push_back(input);
2011-09-25 03:37:09 +00:00
Names key_names;
AggregateDescriptions aggregates;
expression->getAggregateInfo(key_names, aggregates);
aggregator = new Aggregator(key_names, aggregates);
}
2011-09-24 20:32:41 +00:00
2011-09-19 03:34:23 +00:00
Block AggregatingBlockInputStream::readImpl()
{
if (has_been_read)
return Block();
has_been_read = true;
2011-09-26 07:25:22 +00:00
AggregatedDataVariants data_variants;
aggregator->execute(input, data_variants);
2011-09-25 03:37:09 +00:00
Block res = aggregator->getSampleBlock();
2011-09-26 07:25:22 +00:00
size_t rows = 0;
2011-09-19 03:34:23 +00:00
2011-09-26 07:25:22 +00:00
/// В какой структуре данных, агрегированы данные?
if (!data_variants.without_key.empty())
2011-09-19 03:34:23 +00:00
{
2011-09-26 07:25:22 +00:00
AggregatedDataWithoutKey & data = data_variants.without_key;
rows = 1;
2011-09-19 03:34:23 +00:00
size_t i = 0;
2011-09-26 07:25:22 +00:00
for (AggregateFunctions::const_iterator jt = data.begin(); jt != data.end(); ++jt, ++i)
2011-09-19 03:34:23 +00:00
res.getByPosition(i).column->insert(*jt);
2011-09-26 07:25:22 +00:00
}
else if (!data_variants.key64.empty())
{
AggregatedDataWithUInt64Key & data = data_variants.key64;
rows = data.size();
IColumn & first_column = *res.getByPosition(0).column;
for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it)
{
first_column.insert(it->first);
2011-09-19 03:34:23 +00:00
2011-09-26 07:25:22 +00:00
size_t i = 1;
for (AggregateFunctions::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else if (!data_variants.hashed.empty())
{
AggregatedDataHashed & data = data_variants.hashed;
rows = data.size();
for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t i = 0;
for (Row::const_iterator jt = it->second.first.begin(); jt != it->second.first.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
for (AggregateFunctions::const_iterator jt = it->second.second.begin(); jt != it->second.second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else
{
AggregatedData & data = data_variants.generic;
rows = data.size();
for (AggregatedData::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t i = 0;
for (Row::const_iterator jt = it->first.begin(); jt != it->first.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
for (AggregateFunctions::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
2011-09-19 03:34:23 +00:00
}
2011-09-26 01:50:32 +00:00
/// Изменяем размер столбцов-констант в блоке.
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
if (res.getByPosition(i).column->isConst())
2011-09-26 07:25:22 +00:00
res.getByPosition(i).column->cut(0, rows);
2011-09-26 01:50:32 +00:00
2011-09-19 03:34:23 +00:00
return res;
}
}