Try fix aggregating.

This commit is contained in:
Nikolai Kochetov 2020-04-04 18:18:35 +03:00
parent c53b902d6c
commit 9fac9a7d38
2 changed files with 16 additions and 12 deletions

View File

@ -184,26 +184,24 @@ void AggregatingSortedTransform::merge()
if (key_differs) if (key_differs)
{ {
/// Write the simple aggregation result for the previous group.
if (merged_data.mergedRows() > 0)
insertSimpleAggregationResult();
merged_data.insertRow();
/// if there are enough rows accumulated and the last one is calculated completely /// if there are enough rows accumulated and the last one is calculated completely
if (merged_data.hasEnoughRows()) if (merged_data.hasEnoughRows())
{
/// Write the simple aggregation result for the previous group.
insertSimpleAggregationResult();
return; return;
}
/// We will write the data for the group. We copy the values of ordinary columns. /// We will write the data for the group. We copy the values of ordinary columns.
merged_data.insertRow(current->all_columns, current->pos, merged_data.initializeRow(current->all_columns, current->pos,
columns_definition.column_numbers_not_to_aggregate); columns_definition.column_numbers_not_to_aggregate);
/// Add the empty aggregation state to the aggregate columns. The state will be updated in the `addRow` function. /// Add the empty aggregation state to the aggregate columns. The state will be updated in the `addRow` function.
for (auto & column_to_aggregate : columns_definition.columns_to_aggregate) for (auto & column_to_aggregate : columns_definition.columns_to_aggregate)
column_to_aggregate.column->insertDefault(); column_to_aggregate.column->insertDefault();
/// Write the simple aggregation result for the previous group.
if (merged_data.mergedRows() > 0)
insertSimpleAggregationResult();
/// Reset simple aggregation states for next row /// Reset simple aggregation states for next row
for (auto & desc : columns_definition.columns_to_simple_aggregate) for (auto & desc : columns_definition.columns_to_simple_aggregate)
desc.createState(); desc.createState();
@ -229,7 +227,10 @@ void AggregatingSortedTransform::merge()
/// Write the simple aggregation result for the previous group. /// Write the simple aggregation result for the previous group.
if (merged_data.mergedRows() > 0) if (merged_data.mergedRows() > 0)
{
insertSimpleAggregationResult(); insertSimpleAggregationResult();
merged_data.insertRow();
}
last_chunk_sort_columns.clear(); last_chunk_sort_columns.clear();
is_finished = true; is_finished = true;

View File

@ -59,11 +59,14 @@ private:
public: public:
using MergedData::MergedData; using MergedData::MergedData;
void insertRow(const ColumnRawPtrs & raw_columns, size_t row, const ColumnNumbers & column_numbers) void initializeRow(const ColumnRawPtrs & raw_columns, size_t row, const ColumnNumbers & column_numbers)
{ {
for (auto column_number :column_numbers) for (auto column_number : column_numbers)
columns[column_number]->insertFrom(*raw_columns[column_number], row); columns[column_number]->insertFrom(*raw_columns[column_number], row);
}
void insertRow()
{
++total_merged_rows; ++total_merged_rows;
++merged_rows; ++merged_rows;
/// TODO: sum_blocks_granularity += block_size; /// TODO: sum_blocks_granularity += block_size;