2011-09-26 11:49:39 +00:00
|
|
|
|
#include <DB/Columns/ColumnsNumber.h>
|
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
#include <DB/DataStreams/AggregatingBlockInputStream.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
2011-09-25 03:37:09 +00:00
|
|
|
|
AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression)
|
2011-09-24 20:32:41 +00:00
|
|
|
|
: input(input_), has_been_read(false)
|
|
|
|
|
{
|
|
|
|
|
children.push_back(input);
|
|
|
|
|
|
2011-09-25 03:37:09 +00:00
|
|
|
|
Names key_names;
|
|
|
|
|
AggregateDescriptions aggregates;
|
|
|
|
|
expression->getAggregateInfo(key_names, aggregates);
|
|
|
|
|
aggregator = new Aggregator(key_names, aggregates);
|
|
|
|
|
}
|
2011-09-24 20:32:41 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
Block AggregatingBlockInputStream::readImpl()
|
|
|
|
|
{
|
|
|
|
|
if (has_been_read)
|
|
|
|
|
return Block();
|
|
|
|
|
|
|
|
|
|
has_been_read = true;
|
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
AggregatedDataVariants data_variants;
|
|
|
|
|
aggregator->execute(input, data_variants);
|
2011-09-25 03:37:09 +00:00
|
|
|
|
Block res = aggregator->getSampleBlock();
|
2011-09-26 07:25:22 +00:00
|
|
|
|
size_t rows = 0;
|
2011-09-19 03:34:23 +00:00
|
|
|
|
|
2011-12-19 08:06:31 +00:00
|
|
|
|
/// В какой структуре данных агрегированы данные?
|
2011-09-26 07:25:22 +00:00
|
|
|
|
if (!data_variants.without_key.empty())
|
2011-09-19 03:34:23 +00:00
|
|
|
|
{
|
2011-09-26 07:25:22 +00:00
|
|
|
|
AggregatedDataWithoutKey & data = data_variants.without_key;
|
|
|
|
|
rows = 1;
|
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
size_t i = 0;
|
2011-12-19 08:06:31 +00:00
|
|
|
|
for (AggregateFunctionsPlainPtrs::const_iterator jt = data.begin(); jt != data.end(); ++jt, ++i)
|
2011-09-19 03:34:23 +00:00
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
2011-09-26 07:25:22 +00:00
|
|
|
|
}
|
|
|
|
|
else if (!data_variants.key64.empty())
|
|
|
|
|
{
|
|
|
|
|
AggregatedDataWithUInt64Key & data = data_variants.key64;
|
|
|
|
|
rows = data.size();
|
2011-12-12 10:06:17 +00:00
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
IColumn & first_column = *res.getByPosition(0).column;
|
2011-09-26 11:49:39 +00:00
|
|
|
|
bool is_signed = dynamic_cast<ColumnInt8 *>(&first_column) || dynamic_cast<ColumnInt16 *>(&first_column)
|
|
|
|
|
|| dynamic_cast<ColumnInt32 *>(&first_column) || dynamic_cast<ColumnInt64 *>(&first_column);
|
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it)
|
|
|
|
|
{
|
2011-09-26 11:49:39 +00:00
|
|
|
|
if (is_signed)
|
|
|
|
|
first_column.insert(static_cast<Int64>(it->first));
|
|
|
|
|
else
|
|
|
|
|
first_column.insert(it->first);
|
2011-09-19 03:34:23 +00:00
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
size_t i = 1;
|
2011-12-19 08:06:31 +00:00
|
|
|
|
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
|
2011-09-26 07:25:22 +00:00
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-09-26 15:22:25 +00:00
|
|
|
|
else if (!data_variants.key_string.empty())
|
|
|
|
|
{
|
|
|
|
|
AggregatedDataWithStringKey & data = data_variants.key_string;
|
|
|
|
|
rows = data.size();
|
|
|
|
|
IColumn & first_column = *res.getByPosition(0).column;
|
|
|
|
|
|
|
|
|
|
for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
first_column.insert(it->first);
|
|
|
|
|
|
|
|
|
|
size_t i = 1;
|
2011-12-19 08:06:31 +00:00
|
|
|
|
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
|
2011-09-26 15:22:25 +00:00
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-09-26 07:25:22 +00:00
|
|
|
|
else if (!data_variants.hashed.empty())
|
|
|
|
|
{
|
|
|
|
|
AggregatedDataHashed & data = data_variants.hashed;
|
|
|
|
|
rows = data.size();
|
|
|
|
|
for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
size_t i = 0;
|
|
|
|
|
for (Row::const_iterator jt = it->second.first.begin(); jt != it->second.first.end(); ++jt, ++i)
|
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
|
|
2011-12-19 08:06:31 +00:00
|
|
|
|
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.second.begin(); jt != it->second.second.end(); ++jt, ++i)
|
2011-09-26 07:25:22 +00:00
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
AggregatedData & data = data_variants.generic;
|
|
|
|
|
rows = data.size();
|
|
|
|
|
for (AggregatedData::const_iterator it = data.begin(); it != data.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
size_t i = 0;
|
|
|
|
|
for (Row::const_iterator jt = it->first.begin(); jt != it->first.end(); ++jt, ++i)
|
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
|
|
2011-12-19 08:06:31 +00:00
|
|
|
|
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
|
2011-09-26 07:25:22 +00:00
|
|
|
|
res.getByPosition(i).column->insert(*jt);
|
|
|
|
|
}
|
2011-09-19 03:34:23 +00:00
|
|
|
|
}
|
|
|
|
|
|
2011-09-26 01:50:32 +00:00
|
|
|
|
/// Изменяем размер столбцов-констант в блоке.
|
|
|
|
|
size_t columns = res.columns();
|
|
|
|
|
for (size_t i = 0; i < columns; ++i)
|
|
|
|
|
if (res.getByPosition(i).column->isConst())
|
2011-09-26 07:25:22 +00:00
|
|
|
|
res.getByPosition(i).column->cut(0, rows);
|
2011-09-26 01:50:32 +00:00
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|