Fix windowFunnel result with strict_increase mode in case of several conditions holds for the same event

This commit is contained in:
vdimir 2024-09-05 15:21:28 +00:00
parent 4c4a051d5e
commit ecbabd6aaa
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
4 changed files with 89 additions and 27 deletions

View File

@ -261,7 +261,7 @@ windowFunnel(window, [mode, [mode, ... ]])(timestamp, cond1, cond2, ..., condN)
- `window` — Length of the sliding window, it is the time interval between the first and the last condition. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond1 <= timestamp of cond2 <= ... <= timestamp of condN <= timestamp of cond1 + window`. - `window` — Length of the sliding window, it is the time interval between the first and the last condition. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond1 <= timestamp of cond2 <= ... <= timestamp of condN <= timestamp of cond1 + window`.
- `mode` — It is an optional argument. One or more modes can be set. - `mode` — It is an optional argument. One or more modes can be set.
- `'strict_deduplication'` — If the same condition holds for the sequence of events, then such repeating event interrupts further processing. - `'strict_deduplication'` — If the same condition holds for the sequence of events, then such repeating event interrupts further processing. Note: it may work unexpectedly if several conditions hold for the same event.
- `'strict_order'` — Don't allow interventions of other events. E.g. in the case of `A->B->D->C`, it stops finding `A->B->C` at the `D` and the max event level is 2. - `'strict_order'` — Don't allow interventions of other events. E.g. in the case of `A->B->D->C`, it stops finding `A->B->C` at the `D` and the max event level is 2.
- `'strict_increase'` — Apply conditions only to events with strictly increasing timestamps. - `'strict_increase'` — Apply conditions only to events with strictly increasing timestamps.
@ -490,7 +490,7 @@ Where:
## uniqUpTo(N)(x) ## uniqUpTo(N)(x)
Calculates the number of different values of the argument up to a specified limit, `N`. If the number of different argument values is greater than `N`, this function returns `N` + 1, otherwise it calculates the exact value. Calculates the number of different values of the argument up to a specified limit, `N`. If the number of different argument values is greater than `N`, this function returns `N` + 1, otherwise it calculates the exact value.
Recommended for use with small `N`s, up to 10. The maximum value of `N` is 100. Recommended for use with small `N`s, up to 10. The maximum value of `N` is 100.
@ -522,7 +522,7 @@ This function behaves the same as [sumMap](../../sql-reference/aggregate-functio
- `keys`: [Array](../data-types/array.md) of keys. - `keys`: [Array](../data-types/array.md) of keys.
- `values`: [Array](../data-types/array.md) of values. - `values`: [Array](../data-types/array.md) of values.
**Returned Value** **Returned Value**
- Returns a tuple of two arrays: keys in sorted order, and values summed for the corresponding keys. - Returns a tuple of two arrays: keys in sorted order, and values summed for the corresponding keys.
@ -539,10 +539,10 @@ CREATE TABLE sum_map
) )
ENGINE = Log ENGINE = Log
INSERT INTO sum_map VALUES INSERT INTO sum_map VALUES
('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]); ('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
``` ```
@ -572,7 +572,7 @@ This function behaves the same as [sumMap](../../sql-reference/aggregate-functio
- `keys`: [Array](../data-types/array.md) of keys. - `keys`: [Array](../data-types/array.md) of keys.
- `values`: [Array](../data-types/array.md) of values. - `values`: [Array](../data-types/array.md) of values.
**Returned Value** **Returned Value**
- Returns a tuple of two arrays: keys in sorted order, and values summed for the corresponding keys. - Returns a tuple of two arrays: keys in sorted order, and values summed for the corresponding keys.
@ -591,10 +591,10 @@ CREATE TABLE sum_map
) )
ENGINE = Log ENGINE = Log
INSERT INTO sum_map VALUES INSERT INTO sum_map VALUES
('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:00:00', [1, 2, 3], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:00:00', [3, 4, 5], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]), ('2000-01-01', '2000-01-01 00:01:00', [4, 5, 6], [10, 10, 10]),
('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]); ('2000-01-01', '2000-01-01 00:01:00', [6, 7, 8], [10, 10, 10]);
``` ```

View File

@ -170,14 +170,28 @@ private:
/// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window /// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window
std::vector<std::optional<std::pair<UInt64, UInt64>>> events_timestamp(events_size); std::vector<std::optional<std::pair<UInt64, UInt64>>> events_timestamp(events_size);
bool first_event = false; bool has_first_event = false;
for (size_t i = 0; i < data.events_list.size(); ++i) size_t deduplicate_up_to = 0;
size_t i = 0;
for (; i < data.events_list.size(); ++i)
{ {
const T & timestamp = data.events_list[i].first; const auto & current_event = data.events_list[i];
const auto & event_idx = data.events_list[i].second - 1; while (deduplicate_up_to && i + 1 < data.events_list.size())
{
const auto & next_event = data.events_list[i + 1];
if (next_event.second <= deduplicate_up_to && next_event.first == current_event.first && next_event.second == current_event.second + 1)
++i;
else
break;
}
deduplicate_up_to = 0;
auto timestamp = data.events_list[i].first;
Int64 event_idx = data.events_list[i].second - 1;
if (strict_order && event_idx == -1) if (strict_order && event_idx == -1)
{ {
if (first_event) if (has_first_event)
break; break;
else else
continue; continue;
@ -185,13 +199,13 @@ private:
else if (event_idx == 0) else if (event_idx == 0)
{ {
events_timestamp[0] = std::make_pair(timestamp, timestamp); events_timestamp[0] = std::make_pair(timestamp, timestamp);
first_event = true; has_first_event = true;
} }
else if (strict_deduplication && events_timestamp[event_idx].has_value()) else if (strict_deduplication && events_timestamp[event_idx].has_value())
{ {
return data.events_list[i - 1].second; return data.events_list[i - 1].second;
} }
else if (strict_order && first_event && !events_timestamp[event_idx - 1].has_value()) else if (strict_order && has_first_event && !events_timestamp[event_idx - 1].has_value())
{ {
for (size_t event = 0; event < events_timestamp.size(); ++event) for (size_t event = 0; event < events_timestamp.size(); ++event)
{ {
@ -201,13 +215,18 @@ private:
} }
else if (events_timestamp[event_idx - 1].has_value()) else if (events_timestamp[event_idx - 1].has_value())
{ {
auto first_timestamp = events_timestamp[event_idx - 1]->first; auto prev_timestamp = events_timestamp[event_idx - 1]->first;
bool time_matched = timestamp <= first_timestamp + window; bool time_matched = timestamp <= prev_timestamp + window;
if (strict_increase) if (time_matched && strict_increase)
time_matched = time_matched && events_timestamp[event_idx - 1]->second < timestamp; {
time_matched = events_timestamp[event_idx - 1]->second < timestamp;
if (events_timestamp[event_idx - 1]->second == timestamp)
deduplicate_up_to = event_idx + 1;
}
if (time_matched) if (time_matched)
{ {
events_timestamp[event_idx] = std::make_pair(first_timestamp, timestamp); events_timestamp[event_idx] = std::make_pair(prev_timestamp, timestamp);
if (event_idx + 1 == events_size) if (event_idx + 1 == events_size)
return events_size; return events_size;
} }
@ -260,9 +279,9 @@ public:
bool has_event = false; bool has_event = false;
const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num]; const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
/// reverse iteration and stable sorting are needed for events that are qualified by more than one condition. /// reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
for (auto i = events_size; i > 0; --i) for (size_t i = events_size; i > 0; --i)
{ {
auto event = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num]; UInt8 event = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
if (event) if (event)
{ {
this->data(place).add(timestamp, i); this->data(place).add(timestamp, i);
@ -312,13 +331,13 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes &
if (arguments.size() > max_events + 1) if (arguments.size() > max_events + 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Too many event arguments for aggregate function {}", name); throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Too many event arguments for aggregate function {}", name);
for (const auto i : collections::range(1, arguments.size())) for (size_t i = 1; i < arguments.size(); ++i)
{ {
const auto * cond_arg = arguments[i].get(); const auto * cond_arg = arguments[i].get();
if (!isUInt8(cond_arg)) if (!isUInt8(cond_arg))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of aggregate function {}, must be UInt8", "Illegal type {} of argument {} of aggregate function '{}', must be UInt8",
cond_arg->getName(), toString(i + 1), name); cond_arg->getName(), i + 1, name);
} }
AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunctionWindowFunnel, Data>(*arguments[0], arguments, params)); AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunctionWindowFunnel, Data>(*arguments[0], arguments, params));

View File

@ -62,3 +62,16 @@
1 1
1 1
1 1
-
123 3
234 2
345 1
-
123 3
234 2
345 3
-
123 1
234 1
345 3
-

View File

@ -95,4 +95,34 @@ select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event
select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase; select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase; select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
DROP TABLE IF EXISTS funnel_test2;
create table funnel_test2 (event_ts UInt32, result String, uid UInt32) engine=Memory;
insert into funnel_test2 SELECT data.1, data.2, data.3 FROM (
SELECT arrayJoin([
(100, 'failure', 234),
(200, 'success', 345),
(210, 'failure', 345),
(230, 'success', 345),
(250, 'failure', 234),
(180, 'failure', 123),
(220, 'failure', 123),
(250, 'success', 123)
]) data);
SELECT '-';
SELECT uid, windowFunnel(200, 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
SELECT uid, windowFunnel(200)( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
SELECT uid, windowFunnel(200, 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
DROP TABLE IF EXISTS funnel_test2;
drop table funnel_test_strict_increase; drop table funnel_test_strict_increase;