From 591cd5c00971ec77ba4c0ecc39862dfc0a96cf24 Mon Sep 17 00:00:00 2001 From: Alex Bocharov Date: Fri, 15 Sep 2017 12:14:19 +0100 Subject: [PATCH] Add new aggregate function sumMap(key, value). --- .../AggregateFunctionSumMap.cpp | 27 +++ .../AggregateFunctionSumMap.h | 189 ++++++++++++++++++ .../registerAggregateFunctions.cpp | 2 + dbms/src/Core/Field.h | 21 ++ dbms/src/Interpreters/SpecializedAggregator.h | 1 + .../0_stateless/00502_sum_map.reference | 10 + .../queries/0_stateless/00502_sum_map.sql | 12 ++ docs/en/agg_functions/index.rst | 36 ++++ docs/en/table_engines/summingmergetree.rst | 2 + docs/ru/agg_functions/index.rst | 36 ++++ docs/ru/table_engines/summingmergetree.rst | 2 + 11 files changed, 338 insertions(+) create mode 100644 dbms/src/AggregateFunctions/AggregateFunctionSumMap.cpp create mode 100644 dbms/src/AggregateFunctions/AggregateFunctionSumMap.h create mode 100644 dbms/tests/queries/0_stateless/00502_sum_map.reference create mode 100644 dbms/tests/queries/0_stateless/00502_sum_map.sql diff --git a/dbms/src/AggregateFunctions/AggregateFunctionSumMap.cpp b/dbms/src/AggregateFunctions/AggregateFunctionSumMap.cpp new file mode 100644 index 00000000000..c2906e69ca1 --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionSumMap.cpp @@ -0,0 +1,27 @@ +#include +#include +#include + +namespace DB +{ + +namespace +{ + +AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, const DataTypes & argument_types, const Array & parameters) +{ + if (argument_types.size() != 2) + throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + return std::make_shared(); +} + +} + +void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory) +{ + factory.registerFunction("sumMap", createAggregateFunctionSumMap); +} + +} diff --git a/dbms/src/AggregateFunctions/AggregateFunctionSumMap.h b/dbms/src/AggregateFunctions/AggregateFunctionSumMap.h new file mode 100644 index 00000000000..ee1303ad4f8 --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionSumMap.h @@ -0,0 +1,189 @@ +#pragma once + +#include +#include + +#include +#include + +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +struct AggregateFunctionSumMapData +{ + std::map merged_maps; +}; + +/** Aggregate function, that takes two arguments: keys and values, and as a result, builds an array of 2 arrays - + * ordered keys and values summed up by corresponding keys. + * + * This function is the most useful when using SummingMergeTree to sum Nested columns, which name ends in "Map". + * + * Example: sumMap(k, v) of: + * k v + * [1,2,3] [10,10,10] + * [3,4,5] [10,10,10] + * [4,5,6] [10,10,10] + * [6,7,8] [10,10,10] + * [7,5,3] [5,15,25] + * [8,9,10] [20,20,20] + * will return: + * [[1,2,3,4,5,6,7,8,9,10],[10,10,45,20,35,20,15,30,20,20]] + */ +class AggregateFunctionSumMap final : public IBinaryAggregateFunction +{ +private: + DataTypePtr type; + +public: + String getName() const override { return "sumMap"; } + + DataTypePtr getReturnType() const override + { + return std::make_shared(type); + } + + void setArgumentsImpl(const DataTypes & arguments) + { + if (2 != arguments.size()) + throw Exception("Aggregate function " + getName() + "require exactly two arguments of array type.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + const auto * array_type = checkAndGetDataType(arguments[0].get()); + if (!array_type) + throw Exception("First argument for function " + getName() + " must be an array.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + array_type = checkAndGetDataType(arguments[1].get()); + if (!array_type) + throw Exception("Second argument for function " + getName() + " must be an array.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + type = arguments.front(); + } + + void setParameters(const Array & params) override + { + if (!params.empty()) + throw Exception("This instantiation of " + getName() + "aggregate function doesn't accept any parameters.", + ErrorCodes::LOGICAL_ERROR); + } + + void addImpl(AggregateDataPtr place, const IColumn & column_keys, const IColumn & column_values, size_t row_num, Arena *) const + { + Field field_keys; + column_keys.get(row_num, field_keys); + const auto & keys = field_keys.get(); + + Field field_values; + column_values.get(row_num, field_values); + const auto & values = field_values.get(); + + auto & merged_maps = this->data(place).merged_maps; + + if (keys.size() != values.size()) + throw Exception("Sizes of keys and values arrays do not match", ErrorCodes::LOGICAL_ERROR); + + size_t size = keys.size(); + + for (size_t i = 0; i < size; ++i) + { + if (merged_maps.find(keys[i]) != merged_maps.end()) + merged_maps[keys[i]] += values[i]; + else + merged_maps[keys[i]] = values[i]; + } + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + auto & merged_maps = this->data(place).merged_maps; + const auto & rhs_maps = this->data(rhs).merged_maps; + + for (const auto &rhs_map : rhs_maps) + { + if (merged_maps.find(rhs_map.first) != merged_maps.end()) + merged_maps[rhs_map.first] += rhs_map.second; + else + merged_maps[rhs_map.first] = rhs_map.second; + } + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + /// Serialize merged_maps as two vectors. Using boost::archive could be better but it's unavailable. + const auto & merged_maps = this->data(place).merged_maps; + size_t size = merged_maps.size(); + + Array keys, values; + keys.reserve(size); + values.reserve(size); + for (const auto &v : merged_maps) + { + keys.push_back(v.first); + values.push_back(v.second); + } + + writeVarUInt(size, buf); + buf.write(reinterpret_cast(&keys[0]), size * sizeof(keys[0])); + buf.write(reinterpret_cast(&values[0]), size * sizeof(values[0])); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override + { + auto & merged_maps = this->data(place).merged_maps; + + size_t size = 0; + readVarUInt(size, buf); + + Array keys, values; + keys.resize(size); + values.resize(size); + buf.read(reinterpret_cast(&keys[0]), size * sizeof(keys[0])); + buf.read(reinterpret_cast(&values[0]), size * sizeof(values[0])); + + for (size_t i = 0; i < size; ++i) + { + merged_maps[keys[i]] = values[i]; + } + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { + auto & to_array = static_cast(to); + auto & to_data = to_array.getData(); + auto & to_offsets = to_array.getOffsets(); + + const auto & merged_maps = this->data(place).merged_maps; + size_t size = merged_maps.size(); + + Array keys, values; + keys.reserve(size); + values.reserve(size); + for (const auto &v : merged_maps) + { + keys.push_back(v.first); + values.push_back(v.second); + } + + to_data.insert(keys); + to_data.insert(values); + to_offsets.push_back((to_offsets.empty() ? 0 : to_offsets.back()) + 2); + } +}; + +} diff --git a/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp b/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp index e37ddeade76..3397e2bd28d 100644 --- a/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp +++ b/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp @@ -20,6 +20,7 @@ void registerAggregateFunctionsSequenceMatch(AggregateFunctionFactory & factory) void registerAggregateFunctionsMinMaxAny(AggregateFunctionFactory & factory); void registerAggregateFunctionsStatistics(AggregateFunctionFactory & factory); void registerAggregateFunctionSum(AggregateFunctionFactory & factory); +void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory); void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory); void registerAggregateFunctionUniqUpTo(AggregateFunctionFactory & factory); void registerAggregateFunctionTopK(AggregateFunctionFactory & factory); @@ -46,6 +47,7 @@ void registerAggregateFunctions() registerAggregateFunctionsMinMaxAny(factory); registerAggregateFunctionsStatistics(factory); registerAggregateFunctionSum(factory); + registerAggregateFunctionSumMap(factory); registerAggregateFunctionsUniq(factory); registerAggregateFunctionUniqUpTo(factory); registerAggregateFunctionTopK(factory); diff --git a/dbms/src/Core/Field.h b/dbms/src/Core/Field.h index affa2dcec96..10b9120fb50 100644 --- a/dbms/src/Core/Field.h +++ b/dbms/src/Core/Field.h @@ -152,6 +152,27 @@ public: return *this; } + Field & operator+= (const Field & rhs) + { + if (which != rhs.which) + throw Exception("Adding different types is not allowed.", ErrorCodes::BAD_TYPE_OF_FIELD); + else + { + switch (which) + { + case Types::UInt64: assignConcrete(get() + rhs.get()); break; + case Types::Int64: assignConcrete(get() + rhs.get()); break; + case Types::Float64: assignConcrete(get() + rhs.get()); break; + case Types::String: assignConcrete(get() + rhs.get()); break; + + default: + throw Exception("Bad type of Field to add", ErrorCodes::BAD_TYPE_OF_FIELD); + } + } + + return *this; + } + Field & operator= (Field && rhs) { if (this != &rhs) diff --git a/dbms/src/Interpreters/SpecializedAggregator.h b/dbms/src/Interpreters/SpecializedAggregator.h index 56a1d803a90..2bde58adb57 100644 --- a/dbms/src/Interpreters/SpecializedAggregator.h +++ b/dbms/src/Interpreters/SpecializedAggregator.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/tests/queries/0_stateless/00502_sum_map.reference b/dbms/tests/queries/0_stateless/00502_sum_map.reference new file mode 100644 index 00000000000..d79161d8bfa --- /dev/null +++ b/dbms/tests/queries/0_stateless/00502_sum_map.reference @@ -0,0 +1,10 @@ +2000-01-01 2000-01-01 00:00:00 [1,2,3] [10,10,10] +2000-01-01 2000-01-01 00:00:00 [3,4,5] [10,10,10] +2000-01-01 2000-01-01 00:01:00 [4,5,6] [10,10,10] +2000-01-01 2000-01-01 00:01:00 [6,7,8] [10,10,10] +[[1,2,3,4,5,6,7,8],[10,10,20,20,20,20,10,10]] +[[1,2,3,4,5,6,7,8],[10,10,20,20,20,20,10,10]] +2000-01-01 00:00:00 [[1,2,3,4,5],[10,10,20,10,10]] +2000-01-01 00:01:00 [[4,5,6,7,8],[10,10,20,10,10]] +2000-01-01 00:00:00 [1,2,3,4,5] [10,10,20,10,10] +2000-01-01 00:01:00 [4,5,6,7,8] [10,10,20,10,10] diff --git a/dbms/tests/queries/0_stateless/00502_sum_map.sql b/dbms/tests/queries/0_stateless/00502_sum_map.sql new file mode 100644 index 00000000000..6a22ce8e1b1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00502_sum_map.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS test.sum_map; +CREATE TABLE test.sum_map(date Date, timeslot DateTime, statusMap Nested(status UInt16, requests UInt64)) ENGINE = Log; + +INSERT INTO test.sum_map VALUES ('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]); + +SELECT * FROM test.sum_map; +SELECT sumMap(statusMap.status, statusMap.requests) FROM test.sum_map; +SELECT sumMapMerge(s) FROM (SELECT sumMapState(statusMap.status, statusMap.requests) AS s FROM test.sum_map); +SELECT timeslot, sumMap(statusMap.status, statusMap.requests) FROM test.sum_map GROUP BY timeslot; +SELECT timeslot, sumMap(statusMap.status, statusMap.requests)[1], sumMap(statusMap.status, statusMap.requests)[2] FROM test.sum_map GROUP BY timeslot; + +DROP TABLE test.sum_map; diff --git a/docs/en/agg_functions/index.rst b/docs/en/agg_functions/index.rst index 8db5af9ab24..3ece9a89d28 100644 --- a/docs/en/agg_functions/index.rst +++ b/docs/en/agg_functions/index.rst @@ -44,6 +44,42 @@ sum(x) Calculates the sum. Only works for numbers. +sumMap(key, value) +------ +Performs summation of array 'value' by corresponding keys of array 'key'. +Number of elements in 'key' and 'value' arrays should be the same for each row, on which summation is being performed. +Returns array of two arrays - sorted keys and values, summed up by corresponding keys. + +Example: + +.. code-block:: sql + +CREATE TABLE sum_map( + date Date, + timeslot DateTime, + statusMap Nested( + status UInt16, + requests UInt64 + ) +) ENGINE = Log; +INSERT INTO sum_map VALUES + ('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]), + ('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]), + ('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]), + ('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]); +SELECT + timeslot, + sumMap(statusMap.status, statusMap.requests) +FROM sum_map +GROUP BY timeslot + +.. code-block:: text + +┌────────────timeslot─┬─sumMap(statusMap.status, statusMap.requests)─┐ +│ 2000-01-01 00:00:00 │ [[1,2,3,4,5],[10,10,20,10,10]] │ +│ 2000-01-01 00:01:00 │ [[4,5,6,7,8],[10,10,20,10,10]] │ +└─────────────────────┴──────────────────────────────────────────────┘ + avg(x) ------ Calculates the average. diff --git a/docs/en/table_engines/summingmergetree.rst b/docs/en/table_engines/summingmergetree.rst index 16fb9406871..26ed69ae5a3 100644 --- a/docs/en/table_engines/summingmergetree.rst +++ b/docs/en/table_engines/summingmergetree.rst @@ -37,6 +37,8 @@ Examples: [(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)] [(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)] +For aggregating Map use function sumMap(key, value). + For nested data structures, you don't need to specify the columns as a list of columns for totaling. This table engine is not particularly useful. Remember that when saving just pre-aggregated data, you lose some of the system's advantages. diff --git a/docs/ru/agg_functions/index.rst b/docs/ru/agg_functions/index.rst index 50570bde1b1..5b738a67a16 100644 --- a/docs/ru/agg_functions/index.rst +++ b/docs/ru/agg_functions/index.rst @@ -44,6 +44,42 @@ sum(x) Вычисляет сумму. Работает только для чисел. +sumMap(key, value) +------ +Производит суммирование массива 'value' по соотвествующим ключам заданным в массиве 'key'. +Количество элементов в 'key' и 'value' должно быть одинаковым для каждой строки, для которой происходит суммирование. +Возвращает массив из двух массивов - ключи в отсортированном порядке и значения, просуммированные по соотвествующим ключам. + +Пример: + +.. code-block:: sql + +CREATE TABLE sum_map( + date Date, + timeslot DateTime, + statusMap Nested( + status UInt16, + requests UInt64 + ) +) ENGINE = Log; +INSERT INTO sum_map VALUES + ('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]), + ('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]), + ('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]), + ('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]); +SELECT + timeslot, + sumMap(statusMap.status, statusMap.requests) +FROM sum_map +GROUP BY timeslot + +.. code-block:: text + +┌────────────timeslot─┬─sumMap(statusMap.status, statusMap.requests)─┐ +│ 2000-01-01 00:00:00 │ [[1,2,3,4,5],[10,10,20,10,10]] │ +│ 2000-01-01 00:01:00 │ [[4,5,6,7,8],[10,10,20,10,10]] │ +└─────────────────────┴──────────────────────────────────────────────┘ + avg(x) ------ Вычисляет среднее. diff --git a/docs/ru/table_engines/summingmergetree.rst b/docs/ru/table_engines/summingmergetree.rst index edfc3dfd88e..d70125f884e 100644 --- a/docs/ru/table_engines/summingmergetree.rst +++ b/docs/ru/table_engines/summingmergetree.rst @@ -36,6 +36,8 @@ SummingMergeTree [(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)] [(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)] +Для агрегации Map используйте функцию sumMap(key, value). + Для вложенных структур данных не нужно указывать её столбцы в качестве списка столбцов для суммирования. Этот движок таблиц разработан по просьбе БК, и является мало полезным. Помните, что при хранении лишь предагрегированных данных, вы теряете часть преимуществ системы.