dbms: more fixes on aggregate function columns [METR-10894]

This commit is contained in:
Sergey Fedorov 2014-05-30 20:21:30 +04:00
parent 782346b01b
commit edd482acf3
6 changed files with 33 additions and 15 deletions

View File

@ -98,11 +98,11 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
static_cast<ColumnAggregateFunction &>(to).insertData(place, 0);
static_cast<ColumnAggregateFunction &>(to).getData().push_back(const_cast<AggregateDataPtr>(place));
}
/// Для аггрегатных функции типа state никогда не нужно вызывать insertResultInto
bool canBeFinal() const { return false; }
/// Аггрегатная функция или состояние аггрегатной функции.
bool isState() const { return true; }
};

View File

@ -97,8 +97,8 @@ public:
/// Вставить результат в столбец.
virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;
/// Можно ли вызывать метод insertResultInto, или всегда нужно запоминать состояние.
virtual bool canBeFinal() const { return true; }
/// Аггрегатная функция или состояние аггрегатной функции.
virtual bool isState() const { return false; }
};

View File

@ -111,7 +111,9 @@ public:
void insert(const Field & x)
{
data.push_back(AggregateDataPtr());
if (arenas.empty())
arenas.push_back(new Arena());
data.push_back(arenas.back()->alloc(func->sizeOfData()*10));
func->create(data.back());
ReadBufferFromString read_buffer(x.safeGet<const String &>());
func->deserializeMerge(data.back(), read_buffer);
@ -120,10 +122,6 @@ public:
void insertData(const char * pos, size_t length)
{
data.push_back(*reinterpret_cast<const AggregateDataPtr *>(pos));
// For debugging:
// AggregateDataPtr tmp = AggregateDataPtr();
// func->create(tmp);
// func->merge(tmp, data.back());
}
ColumnPtr cut(size_t start, size_t length) const
@ -145,7 +143,22 @@ public:
ColumnPtr filter(const Filter & filter) const
{
throw Exception("Method filter is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED);
size_t size = data.size();
if (size != filter.size())
throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
ColumnAggregateFunction * res_ = new ColumnAggregateFunction(func, arenas);
ColumnPtr res = res_;
if (size == 0)
return res;
res_->data.reserve(size);
for (size_t i = 0; i < size; ++i)
if (filter[i])
res_->data.push_back(this->data[i]);
return res;
}
ColumnPtr permute(const Permutation & perm, size_t limit) const

View File

@ -89,10 +89,10 @@ void AggregatingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainP
current_key = std::move(next_key);
next_key.resize(description.size());
++merged_rows;
/// Запишем данные для очередной группы.
setRow(current_row, current);
insertCurrentRow(merged_columns);
++merged_rows;
}
else
{

View File

@ -35,6 +35,10 @@ void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuf
const ColumnAggregateFunction & real_column = dynamic_cast<const ColumnAggregateFunction &>(column);
const ColumnAggregateFunction::Container_t & vec = real_column.getData();
for (auto & val: vec)
std::cerr << static_cast<void *>(val) << " ";
std::cerr << std::endl;
ColumnAggregateFunction::Container_t::const_iterator it = vec.begin() + offset;
ColumnAggregateFunction::Container_t::const_iterator end = limit ? it + limit : vec.end();

View File

@ -349,7 +349,7 @@ void Aggregator::destroyImpl(
* после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций,
* то data будет равен nullptr-у.
*/
if (nullptr != data)
if (nullptr != data && !aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
}
}
@ -559,7 +559,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
column.column = column.type->createColumn();
column.column->reserve(rows);
if (!aggregate_functions[i]->canBeFinal())
if (aggregate_functions[i]->isState())
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*column.column);
@ -811,7 +811,8 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
AggregatedDataWithoutKey & res_data = result.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}
if (result.type == AggregatedDataVariants::KEY_64)