diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp b/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp index 1e9f2782d95..ed732a197a1 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp @@ -6,7 +6,6 @@ #include #include -#include "registerAggregateFunctions.h" namespace DB diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h index c765024507e..e4a275555bf 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h @@ -29,6 +29,7 @@ struct ComparePair final }; static constexpr auto max_events = 32; + template struct AggregateFunctionWindowFunnelData { @@ -46,7 +47,7 @@ struct AggregateFunctionWindowFunnelData void add(T timestamp, UInt8 event) { - // Since most events should have already been sorted by timestamp. + /// Since most events should have already been sorted by timestamp. if (sorted && events_list.size() > 0) { if (events_list.back().first == timestamp) @@ -145,14 +146,20 @@ class AggregateFunctionWindowFunnel final private: UInt64 window; UInt8 events_size; - UInt8 strict; // When the 'strict' is set, it applies conditions only for the not repeating values. - UInt8 strict_order; // When the 'strict_order' is set, it doesn't allow interventions of other events. - // In the case of 'A->B->D->C', it stops finding 'A->B->C' at the 'D' and the max event level is 2. + /// When the 'strict' is set, it applies conditions only for the not repeating values. + bool strict; - // Loop through the entire events_list, update the event timestamp value - // The level path must be 1---2---3---...---check_events_size, find the max event level that satisfied the path in the sliding window. - // If found, returns the max event level, else return 0. - // The Algorithm complexity is O(n). + /// When the 'strict_order' is set, it doesn't allow interventions of other events. + /// In the case of 'A->B->D->C', it stops finding 'A->B->C' at the 'D' and the max event level is 2. + bool strict_order; + + /// Applies conditions only to events with strictly increasing timestamps + bool strict_increase; + + /// Loop through the entire events_list, update the event timestamp value + /// The level path must be 1---2---3---...---check_events_size, find the max event level that satisfied the path in the sliding window. + /// If found, returns the max event level, else return 0. + /// The Algorithm complexity is O(n). UInt8 getEventLevel(Data & data) const { if (data.size() == 0) @@ -162,16 +169,13 @@ private: data.sort(); - /// events_timestamp stores the timestamp that latest i-th level event happen within time window after previous level event. - /// timestamp defaults to -1, which unsigned timestamp value never meet - /// there may be some bugs when UInt64 type timstamp overflows Int64, but it works on most cases. - std::vector events_timestamp(events_size, -1); + /// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window + std::vector>> events_timestamp(events_size); bool first_event = false; for (const auto & pair : data.events_list) { const T & timestamp = pair.first; const auto & event_idx = pair.second - 1; - if (strict_order && event_idx == -1) { if (first_event) @@ -181,31 +185,39 @@ private: } else if (event_idx == 0) { - events_timestamp[0] = timestamp; + events_timestamp[0] = std::make_pair(timestamp, timestamp); first_event = true; } - else if (strict && events_timestamp[event_idx] >= 0) + else if (strict && events_timestamp[event_idx].has_value()) { return event_idx + 1; } - else if (strict_order && first_event && events_timestamp[event_idx - 1] == -1) + else if (strict_order && first_event && !events_timestamp[event_idx - 1].has_value()) { for (size_t event = 0; event < events_timestamp.size(); ++event) { - if (events_timestamp[event] == -1) + if (!events_timestamp[event].has_value()) return event; } } - else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window) + else if (events_timestamp[event_idx - 1].has_value()) { - events_timestamp[event_idx] = events_timestamp[event_idx - 1]; - if (event_idx + 1 == events_size) - return events_size; + auto first_timestamp = events_timestamp[event_idx - 1]->first; + bool time_matched = timestamp <= first_timestamp + window; + if (strict_increase) + time_matched = time_matched && events_timestamp[event_idx - 1]->second < timestamp; + if (time_matched) + { + events_timestamp[event_idx] = std::make_pair(first_timestamp, timestamp); + if (event_idx + 1 == events_size) + return events_size; + } } } + for (size_t event = events_timestamp.size(); event > 0; --event) { - if (events_timestamp[event - 1] >= 0) + if (events_timestamp[event - 1].has_value()) return event; } return 0; @@ -223,15 +235,18 @@ public: events_size = arguments.size() - 1; window = params.at(0).safeGet(); - strict = 0; - strict_order = 0; + strict = false; + strict_order = false; + strict_increase = false; for (size_t i = 1; i < params.size(); ++i) { String option = params.at(i).safeGet(); - if (option.compare("strict") == 0) - strict = 1; - else if (option.compare("strict_order") == 0) - strict_order = 1; + if (option == "strict") + strict = true; + else if (option == "strict_order") + strict_order = true; + else if (option == "strict_increase") + strict_increase = true; else throw Exception{"Aggregate function " + getName() + " doesn't support a parameter: " + option, ErrorCodes::BAD_ARGUMENTS}; } @@ -253,7 +268,7 @@ public: { bool has_event = false; const auto timestamp = assert_cast *>(columns[0])->getData()[row_num]; - // reverse iteration and stable sorting are needed for events that are qualified by more than one condition. + /// reverse iteration and stable sorting are needed for events that are qualified by more than one condition. for (auto i = events_size; i > 0; --i) { auto event = assert_cast *>(columns[i])->getData()[row_num]; diff --git a/tests/queries/0_stateless/00632_aggregation_window_funnel.reference b/tests/queries/0_stateless/00632_aggregation_window_funnel.reference index 492135567ea..2c68f277bfa 100644 --- a/tests/queries/0_stateless/00632_aggregation_window_funnel.reference +++ b/tests/queries/0_stateless/00632_aggregation_window_funnel.reference @@ -57,3 +57,7 @@ [2, 0] [3, 1] [4, 1] +1 +1 +1 +1 diff --git a/tests/queries/0_stateless/00632_aggregation_window_funnel.sql b/tests/queries/0_stateless/00632_aggregation_window_funnel.sql index 5a1610256ac..d9991be5583 100644 --- a/tests/queries/0_stateless/00632_aggregation_window_funnel.sql +++ b/tests/queries/0_stateless/00632_aggregation_window_funnel.sql @@ -79,3 +79,11 @@ select u, windowFunnel(86400)(dt, a is null and b is null) as s from funnel_test select u, windowFunnel(86400)(dt, a is null, b = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow; select u, windowFunnel(86400, 'strict_order')(dt, a is null, b = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow; drop table funnel_test_non_null; + +create table funnel_test_strict_increase (timestamp UInt32, event UInt32) engine=Memory; +insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,1003),(2,1004); + +select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase; +select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase; +select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase; +select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;