DataStreams/SummingSortedBlockInputStream: use builtin aggregated functions

This replaces custom summation function implementations with an implementation
using built-in aggregation functions (sum and sumMap). The goal is to be able to
use specialized variants of aggregation functions, and to have a single
efficient implementation.
This commit is contained in:
Marek Vavruša 2017-10-08 22:56:22 -07:00 committed by alexey-milovidov
parent c9be3719bc
commit c09a43a7b2
2 changed files with 122 additions and 42 deletions

View File

@ -1,11 +1,16 @@
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNested.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnTuple.h>
#include <Common/StringUtils.h>
#include <Core/FieldVisitors.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -31,7 +36,20 @@ String SummingSortedBlockInputStream::getID() const
void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_columns)
{
for (size_t i = 0; i < num_columns; ++i)
for (auto & desc : columns_to_aggregate)
{
// Do not insert if the aggregation state hasn't been created
if (desc.created)
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
desc.function->destroy(desc.state.data());
desc.created = false;
}
else
desc.merged_column->insertDefault();
}
for (auto i : column_numbers_not_to_aggregate)
merged_columns[i]->insert(current_row[i]);
}
@ -67,6 +85,8 @@ Block SummingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (current_row.empty())
{
auto & factory = AggregateFunctionFactory::instance();
current_row.resize(num_columns);
next_key.columns.resize(description.size());
@ -88,7 +108,10 @@ Block SummingSortedBlockInputStream::readImpl()
const auto map_name = DataTypeNested::extractNestedTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling
if (map_name == column.name || !endsWith(map_name, "Map"))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
discovered_maps[map_name].emplace_back(i);
}
@ -100,17 +123,30 @@ Block SummingSortedBlockInputStream::readImpl()
column.type->getName() == "DateTime" ||
column.type->getName() == "Nullable(Date)" ||
column.type->getName() == "Nullable(DateTime)")
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Do they enter the PK?
if (isInPrimaryKey(description, column.name, i))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
if (column_names_to_sum.empty()
|| column_names_to_sum.end() !=
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
column_numbers_to_sum.push_back(i);
// Create aggregator to sum this column
auto desc = AggregateDescription{};
desc.column_numbers = {i};
desc.merged_column = column.column;
desc.function = factory.get("sum", {column.type});
desc.function->setArguments({column.type});
desc.state.resize(desc.function->sizeOfData());
columns_to_aggregate.emplace_back(std::move(desc));
}
}
}
@ -130,8 +166,11 @@ Block SummingSortedBlockInputStream::readImpl()
if (column_num_it != map.second.end())
continue;
/// collect key and value columns
MapDescription map_description;
// Wrap aggregated columns in a tuple to match function signature
DataTypes argument_types = {};
auto tuple = std::make_shared<ColumnTuple>();
auto & tuple_columns = tuple->getColumns();
auto desc = AggregateDescription{};
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
@ -150,20 +189,43 @@ Block SummingSortedBlockInputStream::readImpl()
|| nested_type.getName() == "Float64")
break;
map_description.key_col_nums.emplace_back(*column_num_it);
desc.key_col_nums.push_back(*column_num_it);
}
else
{
if (!nested_type.behavesAsNumber())
break;
map_description.val_col_nums.emplace_back(*column_num_it);
desc.val_col_nums.push_back(*column_num_it);
}
// Add column to function arguments
desc.column_numbers.push_back(*column_num_it);
argument_types.push_back(key_col.type);
tuple_columns.push_back(key_col.column);
}
if (column_num_it != map.second.end())
continue;
maps_to_sum.emplace_back(std::move(map_description));
if (map.second.size() == 2)
{
// Create parametric aggregation for all columns in the map
desc.merged_column = static_cast<ColumnPtr>(tuple);
desc.function = factory.get("sumMap", argument_types);
desc.function->setArguments(argument_types);
desc.state.resize(desc.function->sizeOfData());
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Fall back to legacy mergeMaps for composite keys
for (auto i : desc.key_col_nums)
column_numbers_not_to_aggregate.push_back(i);
for (auto i : desc.val_col_nums)
column_numbers_not_to_aggregate.push_back(i);
maps_to_sum.emplace_back(std::move(desc));
}
}
}
@ -219,12 +281,27 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
setRow(current_row, current);
current_row_is_zero = false;
/// Reset aggregation states for next row
for (auto & desc : columns_to_aggregate)
{
desc.function->create(desc.state.data());
desc.created = true;
}
}
else
{
current_row_is_zero = !addRow(current_row, current);
// Merge maps only for same rows
for (auto & desc : maps_to_sum)
{
if(mergeMap(desc, current_row, current))
current_row_is_zero = false;
}
}
if (addRow(current_row, current))
current_row_is_zero = false;
if (!current->isLast())
{
current->next();
@ -249,21 +326,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
}
template <typename TSortCursor>
bool SummingSortedBlockInputStream::mergeMaps(Row & row, TSortCursor & cursor)
{
bool non_empty_map_present = false;
/// merge nested maps
for (const auto & map : maps_to_sum)
if (mergeMap(map, row, cursor))
non_empty_map_present = true;
return non_empty_map_present;
}
template <typename TSortCursor>
bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row & row, TSortCursor & cursor)
bool SummingSortedBlockInputStream::mergeMap(const AggregateDescription & desc, Row & row, TSortCursor & cursor)
{
/// Strongly non-optimal.
@ -349,13 +412,31 @@ bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row &
template <typename TSortCursor>
bool SummingSortedBlockInputStream::addRow(Row & row, TSortCursor & cursor)
{
bool res = mergeMaps(row, cursor); /// Is there at least one non-zero number or non-empty array
for (size_t i = 0, size = column_numbers_to_sum.size(); i < size; ++i)
bool res = false;
for (auto & desc : columns_to_aggregate)
{
size_t j = column_numbers_to_sum[i];
if (applyVisitor(FieldVisitorSum((*cursor->all_columns[j])[cursor->pos]), row[j]))
res = true;
if (desc.created)
{
// Specialized case for unary functions
if (desc.column_numbers.size() == 1)
{
auto & col = cursor->all_columns[desc.column_numbers[0]];
desc.function->add(desc.state.data(), &col, cursor->pos, nullptr);
// This stream discards rows that are zero across all summed columns
if (!res)
res = col->get64(cursor->pos) != 0;
}
else
{
// Gather all source columns into a vector
std::vector<const IColumn *> columns;
columns.resize(desc.column_numbers.size());
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
columns[i] = cursor->all_columns[desc.column_numbers[i]];
desc.function->add(desc.state.data(), columns.data(), cursor->pos, nullptr);
}
}
}
return res;

View File

@ -3,7 +3,7 @@
#include <Core/Row.h>
#include <Core/ColumnNumbers.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
{
@ -47,7 +47,7 @@ private:
/// Columns with which numbers should be summed.
Names column_names_to_sum; /// If set, it is converted to column_numbers_to_sum when initialized.
ColumnNumbers column_numbers_to_sum;
ColumnNumbers column_numbers_not_to_aggregate;
/** A table can have nested tables that are treated in a special way.
* If the name of the nested table ends in `Map` and it contains at least two columns,
@ -69,15 +69,20 @@ private:
* and can be deleted at any time.
*/
/// Stores numbers of key-columns and value-columns.
struct MapDescription
struct AggregateDescription
{
AggregateFunctionPtr function;
std::vector<size_t> column_numbers;
ColumnPtr merged_column;
std::vector<char> state;
bool created = false;
/* Compatibility with the mergeMap */
std::vector<size_t> key_col_nums;
std::vector<size_t> val_col_nums;
};
/// Found nested Map-tables.
std::vector<MapDescription> maps_to_sum;
std::vector<AggregateDescription> columns_to_aggregate;
std::vector<AggregateDescription> maps_to_sum;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
@ -96,14 +101,8 @@ private:
/// Insert the summed row for the current group into the result.
void insertCurrentRow(ColumnPlainPtrs & merged_columns);
/** For nested Map, a merge by key is performed with the ejection of rows of nested arrays, in which
* all items are zero.
*/
template <typename TSortCursor>
bool mergeMaps(Row & row, TSortCursor & cursor);
template <typename TSortCursor>
bool mergeMap(const MapDescription & map, Row & row, TSortCursor & cursor);
bool mergeMap(const AggregateDescription & map, Row & row, TSortCursor & cursor);
/** Add the row under the cursor to the `row`.
* Returns false if the result is zero.