Add new aggregate function sumMap(key, value).

This commit is contained in:
Alex Bocharov 2017-09-15 12:14:19 +01:00
parent a43b9ec398
commit 591cd5c009
11 changed files with 338 additions and 0 deletions

View File

@ -0,0 +1,27 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionSumMap.h>
#include <AggregateFunctions/Helpers.h>
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionSumMap>();
}
}
void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory)
{
factory.registerFunction("sumMap", createAggregateFunctionSumMap);
}
}

View File

@ -0,0 +1,189 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <AggregateFunctions/IBinaryAggregateFunction.h>
#include <Functions/FunctionHelpers.h>
#include <map>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
struct AggregateFunctionSumMapData
{
std::map<Field, Field> merged_maps;
};
/** Aggregate function, that takes two arguments: keys and values, and as a result, builds an array of 2 arrays -
* ordered keys and values summed up by corresponding keys.
*
* This function is the most useful when using SummingMergeTree to sum Nested columns, which name ends in "Map".
*
* Example: sumMap(k, v) of:
* k v
* [1,2,3] [10,10,10]
* [3,4,5] [10,10,10]
* [4,5,6] [10,10,10]
* [6,7,8] [10,10,10]
* [7,5,3] [5,15,25]
* [8,9,10] [20,20,20]
* will return:
* [[1,2,3,4,5,6,7,8,9,10],[10,10,45,20,35,20,15,30,20,20]]
*/
class AggregateFunctionSumMap final : public IBinaryAggregateFunction<struct AggregateFunctionSumMapData, AggregateFunctionSumMap>
{
private:
DataTypePtr type;
public:
String getName() const override { return "sumMap"; }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeArray>(type);
}
void setArgumentsImpl(const DataTypes & arguments)
{
if (2 != arguments.size())
throw Exception("Aggregate function " + getName() + "require exactly two arguments of array type.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto * array_type = checkAndGetDataType<DataTypeArray>(arguments[0].get());
if (!array_type)
throw Exception("First argument for function " + getName() + " must be an array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
array_type = checkAndGetDataType<DataTypeArray>(arguments[1].get());
if (!array_type)
throw Exception("Second argument for function " + getName() + " must be an array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
type = arguments.front();
}
void setParameters(const Array & params) override
{
if (!params.empty())
throw Exception("This instantiation of " + getName() + "aggregate function doesn't accept any parameters.",
ErrorCodes::LOGICAL_ERROR);
}
void addImpl(AggregateDataPtr place, const IColumn & column_keys, const IColumn & column_values, size_t row_num, Arena *) const
{
Field field_keys;
column_keys.get(row_num, field_keys);
const auto & keys = field_keys.get<Array &>();
Field field_values;
column_values.get(row_num, field_values);
const auto & values = field_values.get<Array &>();
auto & merged_maps = this->data(place).merged_maps;
if (keys.size() != values.size())
throw Exception("Sizes of keys and values arrays do not match", ErrorCodes::LOGICAL_ERROR);
size_t size = keys.size();
for (size_t i = 0; i < size; ++i)
{
if (merged_maps.find(keys[i]) != merged_maps.end())
merged_maps[keys[i]] += values[i];
else
merged_maps[keys[i]] = values[i];
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & merged_maps = this->data(place).merged_maps;
const auto & rhs_maps = this->data(rhs).merged_maps;
for (const auto &rhs_map : rhs_maps)
{
if (merged_maps.find(rhs_map.first) != merged_maps.end())
merged_maps[rhs_map.first] += rhs_map.second;
else
merged_maps[rhs_map.first] = rhs_map.second;
}
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
/// Serialize merged_maps as two vectors. Using boost::archive could be better but it's unavailable.
const auto & merged_maps = this->data(place).merged_maps;
size_t size = merged_maps.size();
Array keys, values;
keys.reserve(size);
values.reserve(size);
for (const auto &v : merged_maps)
{
keys.push_back(v.first);
values.push_back(v.second);
}
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(&keys[0]), size * sizeof(keys[0]));
buf.write(reinterpret_cast<const char *>(&values[0]), size * sizeof(values[0]));
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
auto & merged_maps = this->data(place).merged_maps;
size_t size = 0;
readVarUInt(size, buf);
Array keys, values;
keys.resize(size);
values.resize(size);
buf.read(reinterpret_cast<char *>(&keys[0]), size * sizeof(keys[0]));
buf.read(reinterpret_cast<char *>(&values[0]), size * sizeof(values[0]));
for (size_t i = 0; i < size; ++i)
{
merged_maps[keys[i]] = values[i];
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto & to_array = static_cast<ColumnArray &>(to);
auto & to_data = to_array.getData();
auto & to_offsets = to_array.getOffsets();
const auto & merged_maps = this->data(place).merged_maps;
size_t size = merged_maps.size();
Array keys, values;
keys.reserve(size);
values.reserve(size);
for (const auto &v : merged_maps)
{
keys.push_back(v.first);
values.push_back(v.second);
}
to_data.insert(keys);
to_data.insert(values);
to_offsets.push_back((to_offsets.empty() ? 0 : to_offsets.back()) + 2);
}
};
}

View File

@ -20,6 +20,7 @@ void registerAggregateFunctionsSequenceMatch(AggregateFunctionFactory & factory)
void registerAggregateFunctionsMinMaxAny(AggregateFunctionFactory & factory);
void registerAggregateFunctionsStatistics(AggregateFunctionFactory & factory);
void registerAggregateFunctionSum(AggregateFunctionFactory & factory);
void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory);
void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory);
void registerAggregateFunctionUniqUpTo(AggregateFunctionFactory & factory);
void registerAggregateFunctionTopK(AggregateFunctionFactory & factory);
@ -46,6 +47,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsMinMaxAny(factory);
registerAggregateFunctionsStatistics(factory);
registerAggregateFunctionSum(factory);
registerAggregateFunctionSumMap(factory);
registerAggregateFunctionsUniq(factory);
registerAggregateFunctionUniqUpTo(factory);
registerAggregateFunctionTopK(factory);

View File

@ -152,6 +152,27 @@ public:
return *this;
}
Field & operator+= (const Field & rhs)
{
if (which != rhs.which)
throw Exception("Adding different types is not allowed.", ErrorCodes::BAD_TYPE_OF_FIELD);
else
{
switch (which)
{
case Types::UInt64: assignConcrete<UInt64>(get<UInt64>() + rhs.get<UInt64>()); break;
case Types::Int64: assignConcrete<Int64>(get<Int64>() + rhs.get<Int64>()); break;
case Types::Float64: assignConcrete<Float64>(get<Float64>() + rhs.get<Float64>()); break;
case Types::String: assignConcrete<String>(get<String>() + rhs.get<String>()); break;
default:
throw Exception("Bad type of Field to add", ErrorCodes::BAD_TYPE_OF_FIELD);
}
}
return *this;
}
Field & operator= (Field && rhs)
{
if (this != &rhs)

View File

@ -23,6 +23,7 @@
#include <AggregateFunctions/AggregateFunctionState.h>
#include <AggregateFunctions/AggregateFunctionStatistics.h>
#include <AggregateFunctions/AggregateFunctionSum.h>
#include <AggregateFunctions/AggregateFunctionSumMap.h>
#include <AggregateFunctions/AggregateFunctionTopK.h>
#include <AggregateFunctions/AggregateFunctionUniq.h>
#include <AggregateFunctions/AggregateFunctionUniqUpTo.h>

View File

@ -0,0 +1,10 @@
2000-01-01 2000-01-01 00:00:00 [1,2,3] [10,10,10]
2000-01-01 2000-01-01 00:00:00 [3,4,5] [10,10,10]
2000-01-01 2000-01-01 00:01:00 [4,5,6] [10,10,10]
2000-01-01 2000-01-01 00:01:00 [6,7,8] [10,10,10]
[[1,2,3,4,5,6,7,8],[10,10,20,20,20,20,10,10]]
[[1,2,3,4,5,6,7,8],[10,10,20,20,20,20,10,10]]
2000-01-01 00:00:00 [[1,2,3,4,5],[10,10,20,10,10]]
2000-01-01 00:01:00 [[4,5,6,7,8],[10,10,20,10,10]]
2000-01-01 00:00:00 [1,2,3,4,5] [10,10,20,10,10]
2000-01-01 00:01:00 [4,5,6,7,8] [10,10,20,10,10]

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS test.sum_map;
CREATE TABLE test.sum_map(date Date, timeslot DateTime, statusMap Nested(status UInt16, requests UInt64)) ENGINE = Log;
INSERT INTO test.sum_map VALUES ('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
SELECT * FROM test.sum_map;
SELECT sumMap(statusMap.status, statusMap.requests) FROM test.sum_map;
SELECT sumMapMerge(s) FROM (SELECT sumMapState(statusMap.status, statusMap.requests) AS s FROM test.sum_map);
SELECT timeslot, sumMap(statusMap.status, statusMap.requests) FROM test.sum_map GROUP BY timeslot;
SELECT timeslot, sumMap(statusMap.status, statusMap.requests)[1], sumMap(statusMap.status, statusMap.requests)[2] FROM test.sum_map GROUP BY timeslot;
DROP TABLE test.sum_map;

View File

@ -44,6 +44,42 @@ sum(x)
Calculates the sum.
Only works for numbers.
sumMap(key, value)
------
Performs summation of array 'value' by corresponding keys of array 'key'.
Number of elements in 'key' and 'value' arrays should be the same for each row, on which summation is being performed.
Returns array of two arrays - sorted keys and values, summed up by corresponding keys.
Example:
.. code-block:: sql
CREATE TABLE sum_map(
date Date,
timeslot DateTime,
statusMap Nested(
status UInt16,
requests UInt64
)
) ENGINE = Log;
INSERT INTO sum_map VALUES
('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
SELECT
timeslot,
sumMap(statusMap.status, statusMap.requests)
FROM sum_map
GROUP BY timeslot
.. code-block:: text
┌────────────timeslot─┬─sumMap(statusMap.status, statusMap.requests)─┐
│ 2000-01-01 00:00:00 │ [[1,2,3,4,5],[10,10,20,10,10]] │
│ 2000-01-01 00:01:00 │ [[4,5,6,7,8],[10,10,20,10,10]] │
└─────────────────────┴──────────────────────────────────────────────┘
avg(x)
------
Calculates the average.

View File

@ -37,6 +37,8 @@ Examples:
[(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)]
[(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)]
For aggregating Map use function sumMap(key, value).
For nested data structures, you don't need to specify the columns as a list of columns for totaling.
This table engine is not particularly useful. Remember that when saving just pre-aggregated data, you lose some of the system's advantages.

View File

@ -44,6 +44,42 @@ sum(x)
Вычисляет сумму.
Работает только для чисел.
sumMap(key, value)
------
Производит суммирование массива 'value' по соотвествующим ключам заданным в массиве 'key'.
Количество элементов в 'key' и 'value' должно быть одинаковым для каждой строки, для которой происходит суммирование.
Возвращает массив из двух массивов - ключи в отсортированном порядке и значения, просуммированные по соотвествующим ключам.
Пример:
.. code-block:: sql
CREATE TABLE sum_map(
date Date,
timeslot DateTime,
statusMap Nested(
status UInt16,
requests UInt64
)
) ENGINE = Log;
INSERT INTO sum_map VALUES
('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
SELECT
timeslot,
sumMap(statusMap.status, statusMap.requests)
FROM sum_map
GROUP BY timeslot
.. code-block:: text
┌────────────timeslot─┬─sumMap(statusMap.status, statusMap.requests)─┐
│ 2000-01-01 00:00:00 │ [[1,2,3,4,5],[10,10,20,10,10]] │
│ 2000-01-01 00:01:00 │ [[4,5,6,7,8],[10,10,20,10,10]] │
└─────────────────────┴──────────────────────────────────────────────┘
avg(x)
------
Вычисляет среднее.

View File

@ -36,6 +36,8 @@ SummingMergeTree
[(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)]
[(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)]
Для агрегации Map используйте функцию sumMap(key, value).
Для вложенных структур данных не нужно указывать её столбцы в качестве списка столбцов для суммирования.
Этот движок таблиц разработан по просьбе БК, и является мало полезным. Помните, что при хранении лишь предагрегированных данных, вы теряете часть преимуществ системы.