dbms: probably fixed error in Aggregator [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-02-13 19:24:19 +00:00
parent 12f324cb4c
commit 427be97e00

View File

@ -27,6 +27,26 @@ void Aggregator::initialize(Block & block)
initialized = true; initialized = true;
aggregate_functions.resize(aggregates_size);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i] = &*aggregates[i].function;
/// Инициализируем размеры состояний и смещения для агрегатных функций.
offsets_of_aggregate_states.resize(aggregates_size);
total_size_of_aggregate_states = 0;
for (size_t i = 0; i < aggregates_size; ++i)
{
offsets_of_aggregate_states[i] = total_size_of_aggregate_states;
total_size_of_aggregate_states += aggregates[i].function->sizeOfData();
}
/** Всё остальное - только если передан непустой block.
* (всё остальное не нужно в методе merge блоков с готовыми состояниями агрегатных функций).
*/
if (!block)
return;
/// Преобразуем имена столбцов в номера, если номера не заданы /// Преобразуем имена столбцов в номера, если номера не заданы
if (keys.empty() && !key_names.empty()) if (keys.empty() && !key_names.empty())
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it) for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
@ -37,10 +57,6 @@ void Aggregator::initialize(Block & block)
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt) for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt)); it->arguments.push_back(block.getPositionByName(*jt));
aggregate_functions.resize(aggregates_size);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i] = &*aggregates[i].function;
/// Создадим пример блока, описывающего результат /// Создадим пример блока, описывающего результат
if (!sample) if (!sample)
{ {
@ -72,16 +88,6 @@ void Aggregator::initialize(Block & block)
for (size_t i = 0; i < columns; ++i) for (size_t i = 0; i < columns; ++i)
if (block.getByPosition(i).column->isConst()) if (block.getByPosition(i).column->isConst())
sample.insert(block.getByPosition(i).cloneEmpty()); sample.insert(block.getByPosition(i).cloneEmpty());
/// Инициализируем размеры состояний и смещения для агрегатных функций.
offsets_of_aggregate_states.resize(aggregates_size);
total_size_of_aggregate_states = 0;
for (size_t i = 0; i < aggregates_size; ++i)
{
offsets_of_aggregate_states[i] = total_size_of_aggregate_states;
total_size_of_aggregate_states += aggregates[i].function->sizeOfData();
}
} }
} }
@ -750,6 +756,9 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
typedef std::vector<AggregateColumn> AggregateColumns; typedef std::vector<AggregateColumn> AggregateColumns;
AggregateColumns aggregate_columns(aggregates_size); AggregateColumns aggregate_columns(aggregates_size);
Block empty_block;
initialize(empty_block);
/// Читаем все данные /// Читаем все данные
while (Block block = stream->read()) while (Block block = stream->read())
{ {