Merge pull request #1250 from bocharov/master

Add new aggregate function sumMap(key, value).
This commit is contained in:
alexey-milovidov 2017-09-19 21:31:24 +03:00 committed by GitHub
commit 76e3f7ac0c
11 changed files with 339 additions and 21 deletions

View File

@ -0,0 +1,27 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionSumMap.h>
#include <AggregateFunctions/Helpers.h>
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionSumMap>();
}
}
void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory)
{
factory.registerFunction("sumMap", createAggregateFunctionSumMap);
}
}

View File

@ -0,0 +1,192 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Core/FieldVisitors.h>
#include <AggregateFunctions/IBinaryAggregateFunction.h>
#include <Functions/FunctionHelpers.h>
#include <map>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
struct AggregateFunctionSumMapData
{
std::map<Field, Field> merged_maps;
};
/** Aggregate function, that takes two arguments: keys and values, and as a result, builds an array of 2 arrays -
* ordered keys and values summed up by corresponding keys.
*
* This function is the most useful when using SummingMergeTree to sum Nested columns, which name ends in "Map".
*
* Example: sumMap(k, v) of:
* k v
* [1,2,3] [10,10,10]
* [3,4,5] [10,10,10]
* [4,5,6] [10,10,10]
* [6,7,8] [10,10,10]
* [7,5,3] [5,15,25]
* [8,9,10] [20,20,20]
* will return:
* ([1,2,3,4,5,6,7,8,9,10],[10,10,45,20,35,20,15,30,20,20])
*/
class AggregateFunctionSumMap final : public IBinaryAggregateFunction<struct AggregateFunctionSumMapData, AggregateFunctionSumMap>
{
private:
DataTypePtr keys_type;
DataTypePtr values_type;
public:
String getName() const override { return "sumMap"; }
DataTypePtr getReturnType() const override
{
DataTypes types;
types.emplace_back(std::make_shared<DataTypeArray>(keys_type));
types.emplace_back(std::make_shared<DataTypeArray>(values_type));
return std::make_shared<DataTypeTuple>(types);
}
void setArgumentsImpl(const DataTypes & arguments)
{
if (2 != arguments.size())
throw Exception("Aggregate function " + getName() + "require exactly two arguments of array type.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto * array_type = checkAndGetDataType<DataTypeArray>(arguments[0].get());
if (!array_type)
throw Exception("First argument for function " + getName() + " must be an array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
keys_type = array_type->getNestedType();
array_type = checkAndGetDataType<DataTypeArray>(arguments[1].get());
if (!array_type)
throw Exception("Second argument for function " + getName() + " must be an array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
values_type = array_type->getNestedType();
}
void setParameters(const Array & params) override
{
if (!params.empty())
throw Exception("This instantiation of " + getName() + "aggregate function doesn't accept any parameters.",
ErrorCodes::LOGICAL_ERROR);
}
void addImpl(AggregateDataPtr place, const IColumn & column_keys, const IColumn & column_values, size_t row_num, Arena *) const
{
Field field_keys;
column_keys.get(row_num, field_keys);
const auto & keys = field_keys.get<Array &>();
Field field_values;
column_values.get(row_num, field_values);
const auto & values = field_values.get<Array &>();
auto & merged_maps = this->data(place).merged_maps;
if (keys.size() != values.size())
throw Exception("Sizes of keys and values arrays do not match", ErrorCodes::LOGICAL_ERROR);
size_t size = keys.size();
for (size_t i = 0; i < size; ++i)
{
if (merged_maps.find(keys[i]) != merged_maps.end())
applyVisitor(FieldVisitorSum(values[i]), merged_maps[keys[i]]);
else
merged_maps[keys[i]] = values[i];
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & merged_maps = this->data(place).merged_maps;
const auto & rhs_maps = this->data(rhs).merged_maps;
for (const auto &rhs_map : rhs_maps)
{
if (merged_maps.find(rhs_map.first) != merged_maps.end())
applyVisitor(FieldVisitorSum(rhs_map.second), merged_maps[rhs_map.first]);
else
merged_maps[rhs_map.first] = rhs_map.second;
}
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
const auto & merged_maps = this->data(place).merged_maps;
size_t size = merged_maps.size();
writeVarUInt(size, buf);
for (const auto &v : merged_maps)
{
keys_type->serializeBinary(v.first, buf);
values_type->serializeBinary(v.second, buf);
}
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
auto & merged_maps = this->data(place).merged_maps;
size_t size = 0;
readVarUInt(size, buf);
for (size_t i = 0; i < size; ++i)
{
Field key, value;
keys_type->deserializeBinary(key, buf);
values_type->deserializeBinary(value, buf);
merged_maps[key] = value;
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto & to_cols = static_cast<ColumnTuple &>(to).getColumns();
auto & to_keys_arr = static_cast<ColumnArray &>(*to_cols[0]);
auto & to_values_arr = static_cast<ColumnArray &>(*to_cols[1]);
auto & to_keys_col = to_keys_arr.getData();
auto & to_keys_offsets = to_keys_arr.getOffsets();
auto & to_values_col = to_values_arr.getData();
auto & to_values_offsets = to_values_arr.getOffsets();
const auto & merged_maps = this->data(place).merged_maps;
size_t size = merged_maps.size();
to_keys_col.reserve(size);
to_values_col.reserve(size);
for (const auto &v : merged_maps)
{
to_keys_col.insert(v.first);
to_values_col.insert(v.second);
}
to_keys_offsets.push_back((to_keys_offsets.empty() ? 0 : to_keys_offsets.back()) + size);
to_values_offsets.push_back((to_values_offsets.empty() ? 0 : to_values_offsets.back()) + size);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -20,6 +20,7 @@ void registerAggregateFunctionsSequenceMatch(AggregateFunctionFactory & factory)
void registerAggregateFunctionsMinMaxAny(AggregateFunctionFactory & factory); void registerAggregateFunctionsMinMaxAny(AggregateFunctionFactory & factory);
void registerAggregateFunctionsStatistics(AggregateFunctionFactory & factory); void registerAggregateFunctionsStatistics(AggregateFunctionFactory & factory);
void registerAggregateFunctionSum(AggregateFunctionFactory & factory); void registerAggregateFunctionSum(AggregateFunctionFactory & factory);
void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory);
void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory); void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory);
void registerAggregateFunctionUniqUpTo(AggregateFunctionFactory & factory); void registerAggregateFunctionUniqUpTo(AggregateFunctionFactory & factory);
void registerAggregateFunctionTopK(AggregateFunctionFactory & factory); void registerAggregateFunctionTopK(AggregateFunctionFactory & factory);
@ -45,6 +46,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsMinMaxAny(factory); registerAggregateFunctionsMinMaxAny(factory);
registerAggregateFunctionsStatistics(factory); registerAggregateFunctionsStatistics(factory);
registerAggregateFunctionSum(factory); registerAggregateFunctionSum(factory);
registerAggregateFunctionSumMap(factory);
registerAggregateFunctionsUniq(factory); registerAggregateFunctionsUniq(factory);
registerAggregateFunctionUniqUpTo(factory); registerAggregateFunctionUniqUpTo(factory);
registerAggregateFunctionTopK(factory); registerAggregateFunctionTopK(factory);

View File

@ -14,6 +14,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_CONVERT_TYPE; extern const int CANNOT_CONVERT_TYPE;
extern const int LOGICAL_ERROR;
} }
@ -299,4 +300,23 @@ public:
bool operator() (const Tuple & l, const Tuple & r) const { return l < r; } bool operator() (const Tuple & l, const Tuple & r) const { return l < r; }
}; };
/** Implements `+=` operation.
* Returns false if the result is zero.
*/
class FieldVisitorSum : public StaticVisitor<bool>
{
private:
const Field & rhs;
public:
explicit FieldVisitorSum(const Field & rhs_) : rhs(rhs_) {}
bool operator() (UInt64 & x) const { x += get<UInt64>(rhs); return x != 0; }
bool operator() (Int64 & x) const { x += get<Int64>(rhs); return x != 0; }
bool operator() (Float64 & x) const { x += get<Float64>(rhs); return x != 0; }
bool operator() (Null & x) const { throw Exception("Cannot sum Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (String & x) const { throw Exception("Cannot sum Strings", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array & x) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); }
};
} }

View File

@ -248,27 +248,6 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
finished = true; finished = true;
} }
/** Implements `+=` operation.
* Returns false if the result is zero.
*/
class FieldVisitorSum : public StaticVisitor<bool>
{
private:
const Field & rhs;
public:
explicit FieldVisitorSum(const Field & rhs_) : rhs(rhs_) {}
bool operator() (UInt64 & x) const { x += get<UInt64>(rhs); return x != 0; }
bool operator() (Int64 & x) const { x += get<Int64>(rhs); return x != 0; }
bool operator() (Float64 & x) const { x += get<Float64>(rhs); return x != 0; }
bool operator() (Null & x) const { throw Exception("Cannot sum Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (String & x) const { throw Exception("Cannot sum Strings", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array & x) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); }
};
template <typename TSortCursor> template <typename TSortCursor>
bool SummingSortedBlockInputStream::mergeMaps(Row & row, TSortCursor & cursor) bool SummingSortedBlockInputStream::mergeMaps(Row & row, TSortCursor & cursor)
{ {

View File

@ -0,0 +1,10 @@
2000-01-01 2000-01-01 00:00:00 [1,2,3] [10,10,10]
2000-01-01 2000-01-01 00:00:00 [3,4,5] [10,10,10]
2000-01-01 2000-01-01 00:01:00 [4,5,6] [10,10,10]
2000-01-01 2000-01-01 00:01:00 [6,7,8] [10,10,10]
([1,2,3,4,5,6,7,8],[10,10,20,20,20,20,10,10])
([1,2,3,4,5,6,7,8],[10,10,20,20,20,20,10,10])
2000-01-01 00:00:00 ([1,2,3,4,5],[10,10,20,10,10])
2000-01-01 00:01:00 ([4,5,6,7,8],[10,10,20,10,10])
2000-01-01 00:00:00 [1,2,3,4,5] [10,10,20,10,10]
2000-01-01 00:01:00 [4,5,6,7,8] [10,10,20,10,10]

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS test.sum_map;
CREATE TABLE test.sum_map(date Date, timeslot DateTime, statusMap Nested(status UInt16, requests UInt64)) ENGINE = Log;
INSERT INTO test.sum_map VALUES ('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
SELECT * FROM test.sum_map;
SELECT sumMap(statusMap.status, statusMap.requests) FROM test.sum_map;
SELECT sumMapMerge(s) FROM (SELECT sumMapState(statusMap.status, statusMap.requests) AS s FROM test.sum_map);
SELECT timeslot, sumMap(statusMap.status, statusMap.requests) FROM test.sum_map GROUP BY timeslot;
SELECT timeslot, sumMap(statusMap.status, statusMap.requests).1, sumMap(statusMap.status, statusMap.requests).2 FROM test.sum_map GROUP BY timeslot;
DROP TABLE test.sum_map;

View File

@ -44,6 +44,42 @@ sum(x)
Calculates the sum. Calculates the sum.
Only works for numbers. Only works for numbers.
sumMap(key, value)
------
Performs summation of array 'value' by corresponding keys of array 'key'.
Number of elements in 'key' and 'value' arrays should be the same for each row, on which summation is being performed.
Returns a tuple of two arrays - sorted keys and values, summed up by corresponding keys.
Example:
.. code-block:: sql
CREATE TABLE sum_map(
date Date,
timeslot DateTime,
statusMap Nested(
status UInt16,
requests UInt64
)
) ENGINE = Log;
INSERT INTO sum_map VALUES
('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
SELECT
timeslot,
sumMap(statusMap.status, statusMap.requests)
FROM sum_map
GROUP BY timeslot
.. code-block:: text
┌────────────timeslot─┬─sumMap(statusMap.status, statusMap.requests)─┐
│ 2000-01-01 00:00:00 │ ([1,2,3,4,5],[10,10,20,10,10]) │
│ 2000-01-01 00:01:00 │ ([4,5,6,7,8],[10,10,20,10,10]) │
└─────────────────────┴──────────────────────────────────────────────┘
avg(x) avg(x)
------ ------
Calculates the average. Calculates the average.

View File

@ -37,6 +37,8 @@ Examples:
[(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)] [(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)]
[(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)] [(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)]
For aggregating Map use function sumMap(key, value).
For nested data structures, you don't need to specify the columns as a list of columns for totaling. For nested data structures, you don't need to specify the columns as a list of columns for totaling.
This table engine is not particularly useful. Remember that when saving just pre-aggregated data, you lose some of the system's advantages. This table engine is not particularly useful. Remember that when saving just pre-aggregated data, you lose some of the system's advantages.

View File

@ -44,6 +44,42 @@ sum(x)
Вычисляет сумму. Вычисляет сумму.
Работает только для чисел. Работает только для чисел.
sumMap(key, value)
------
Производит суммирование массива 'value' по соотвествующим ключам заданным в массиве 'key'.
Количество элементов в 'key' и 'value' должно быть одинаковым для каждой строки, для которой происходит суммирование.
Возвращает кортеж из двух массивов - ключи в отсортированном порядке и значения, просуммированные по соотвествующим ключам.
Пример:
.. code-block:: sql
CREATE TABLE sum_map(
date Date,
timeslot DateTime,
statusMap Nested(
status UInt16,
requests UInt64
)
) ENGINE = Log;
INSERT INTO sum_map VALUES
('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
SELECT
timeslot,
sumMap(statusMap.status, statusMap.requests)
FROM sum_map
GROUP BY timeslot
.. code-block:: text
┌────────────timeslot─┬─sumMap(statusMap.status, statusMap.requests)─┐
│ 2000-01-01 00:00:00 │ ([1,2,3,4,5],[10,10,20,10,10]) │
│ 2000-01-01 00:01:00 │ ([4,5,6,7,8],[10,10,20,10,10]) │
└─────────────────────┴──────────────────────────────────────────────┘
avg(x) avg(x)
------ ------
Вычисляет среднее. Вычисляет среднее.

View File

@ -36,6 +36,8 @@ SummingMergeTree
[(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)] [(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)]
[(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)] [(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)]
Для агрегации Map используйте функцию sumMap(key, value).
Для вложенных структур данных не нужно указывать её столбцы в качестве списка столбцов для суммирования. Для вложенных структур данных не нужно указывать её столбцы в качестве списка столбцов для суммирования.
Этот движок таблиц разработан по просьбе БК, и является мало полезным. Помните, что при хранении лишь предагрегированных данных, вы теряете часть преимуществ системы. Этот движок таблиц разработан по просьбе БК, и является мало полезным. Помните, что при хранении лишь предагрегированных данных, вы теряете часть преимуществ системы.