Use mode for new windowFunnel algorithm

This commit is contained in:
vdimir 2024-09-19 10:33:44 +00:00
parent e9b57820fc
commit 799894d724
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
4 changed files with 32 additions and 22 deletions

View File

@ -264,6 +264,7 @@ windowFunnel(window, [mode, [mode, ... ]])(timestamp, cond1, cond2, ..., condN)
- `'strict_deduplication'` — If the same condition holds for the sequence of events, then such repeating event interrupts further processing. Note: it may work unexpectedly if several conditions hold for the same event.
- `'strict_order'` — Don't allow interventions of other events. E.g. in the case of `A->B->D->C`, it stops finding `A->B->C` at the `D` and the max event level is 2.
- `'strict_increase'` — Apply conditions only to events with strictly increasing timestamps.
- `'strict_once'` — Count each event only once in the chain even if it meets the condition several times
**Returned value**

View File

@ -193,6 +193,9 @@ private:
/// Applies conditions only to events with strictly increasing timestamps
bool strict_increase;
/// Count each event only once in the chain even if it meets the condition several times
bool strict_once;
/// Loop through the entire events_list, update the event timestamp value
/// The level path must be 1---2---3---...---check_events_size, find the max event level that satisfied the path in the sliding window.
/// If found, returns the max event level, else return 0.
@ -242,6 +245,10 @@ private:
}
else if (event_idx == 0)
{
if (!strict_once)
/// Do not keep all sequences, when mode is disabled
event_sequences[0].clear();
auto & event_seq = event_sequences[0].emplace_back(timestamp, timestamp);
event_seq.event_path[0] = unique_id;
has_first_event = true;
@ -289,6 +296,10 @@ private:
if (time_matched)
{
prev_path[event_idx] = unique_id;
if (!strict_once)
/// Do not keep all sequences, when mode is disabled
event_sequences[event_idx].clear();
auto & new_seq = event_sequences[event_idx].emplace_back(first_ts, timestamp);
new_seq.event_path = std::move(prev_path);
if (event_idx + 1 == events_size)
@ -322,6 +333,7 @@ public:
strict_deduplication = false;
strict_order = false;
strict_increase = false;
strict_once = false;
for (size_t i = 1; i < params.size(); ++i)
{
String option = params.at(i).safeGet<String>();
@ -331,6 +343,8 @@ public:
strict_order = true;
else if (option == "strict_increase")
strict_increase = true;
else if (option == "strict_once")
strict_once = true;
else if (option == "strict")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter 'strict' is replaced with 'strict_deduplication' in Aggregate function {}", getName());
else
@ -350,6 +364,8 @@ public:
if (event_occurred)
{
this->data(place).add(timestamp, i);
if (!strict_once)
this->data(place).advanceId();
has_event = true;
}
}
@ -366,24 +382,14 @@ public:
this->data(place).merge(this->data(rhs));
}
/// Versioning for serialization
/// Version 1 supports deduplication of the same event several times
static constexpr auto MIN_REVISION_FOR_V1 = 54470;
bool isVersioned() const override { return true; }
size_t getDefaultVersion() const override { return 1; }
size_t getVersionFromRevision(size_t revision) const override
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t>) const override
{
return revision >= MIN_REVISION_FOR_V1 ? 1 : 0;
this->data(place).serialize(buf, strict_once);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t>, Arena *) const override
{
this->data(place).serialize(buf, version.value_or(0));
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version, Arena *) const override
{
this->data(place).deserialize(buf, version.value_or(0));
this->data(place).deserialize(buf, strict_once);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override

View File

@ -133,7 +133,9 @@ select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 10
1
select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
1
select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
1
select 1 = windowFunnel(10000, 'strict_once')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
1
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
1
@ -152,21 +154,21 @@ insert into funnel_test2 SELECT data.1, data.2, data.3 FROM (
]) data);
SELECT '-';
-
SELECT uid, windowFunnel(200, 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
SELECT uid, windowFunnel(200, 'strict_once', 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
123 3
234 2
345 1
SELECT '-';
-
SELECT uid, windowFunnel(200)( toUInt32(event_ts), result='failure', result='failure', result='success' )
SELECT uid, windowFunnel(200, 'strict_once')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
123 3
234 2
345 1
SELECT '-';
-
SELECT uid, windowFunnel(200, 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
SELECT uid, windowFunnel(200, 'strict_once', 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
123 3
234 2

View File

@ -93,7 +93,8 @@ insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,100
select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000, 'strict_once')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
@ -113,13 +114,13 @@ insert into funnel_test2 SELECT data.1, data.2, data.3 FROM (
]) data);
SELECT '-';
SELECT uid, windowFunnel(200, 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
SELECT uid, windowFunnel(200, 'strict_once', 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
SELECT uid, windowFunnel(200)( toUInt32(event_ts), result='failure', result='failure', result='success' )
SELECT uid, windowFunnel(200, 'strict_once')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
SELECT uid, windowFunnel(200, 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
SELECT uid, windowFunnel(200, 'strict_once', 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';