SummingSortedBlockInputStream: fix summing of multiple blocks

Previously the destination columns were only computed for the first
block, so that subsequently written blocks failed to write
aggregation results to corrent columns.
This commit is contained in:
Marek Vavruša 2017-10-18 21:21:48 -07:00 committed by alexey-milovidov
parent e936c4d066
commit c0458999f9

View File

@ -1,6 +1,7 @@
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNested.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnTuple.h>
#include <Common/StringUtils.h>
@ -10,6 +11,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/Context.h>
namespace DB
@ -151,7 +153,6 @@ Block SummingSortedBlockInputStream::readImpl()
// Create aggregator to sum this column
auto desc = AggregateDescription{};
desc.column_numbers = {i};
desc.merged_column = column.column;
desc.function = factory.get("sumWithOverflow", {column.type});
desc.function->setArguments({column.type});
desc.state.resize(desc.function->sizeOfData());
@ -188,10 +189,7 @@ Block SummingSortedBlockInputStream::readImpl()
continue;
}
// Wrap aggregated columns in a tuple to match function signature
DataTypes argument_types = {};
auto tuple = std::make_shared<ColumnTuple>();
auto & tuple_columns = tuple->getColumns();
auto desc = AggregateDescription{};
auto map_desc = MapDescription{};
@ -225,7 +223,6 @@ Block SummingSortedBlockInputStream::readImpl()
// Add column to function arguments
desc.column_numbers.push_back(*column_num_it);
argument_types.push_back(key_col.type);
tuple_columns.push_back(key_col.column);
}
if (column_num_it != map.second.end())
@ -238,7 +235,6 @@ Block SummingSortedBlockInputStream::readImpl()
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.merged_column = static_cast<ColumnPtr>(tuple);
desc.function = factory.get("sumMap", argument_types);
desc.function->setArguments(argument_types);
desc.state.resize(desc.function->sizeOfData());
@ -254,6 +250,23 @@ Block SummingSortedBlockInputStream::readImpl()
}
}
// Update aggregation result columns for current block
for (auto & desc : columns_to_aggregate)
{
// Wrap aggregated columns in a tuple to match function signature
if (checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
{
auto tuple = std::make_shared<ColumnTuple>();
auto & tuple_columns = tuple->getColumns();
for (auto i : desc.column_numbers)
tuple_columns.push_back(merged_block.safeGetByPosition(i).column);
desc.merged_column = tuple;
}
else
desc.merged_column = merged_block.safeGetByPosition(desc.column_numbers[0]).column;
}
if (has_collation)
merge(merged_columns, queue_with_collation);
else