This commit is contained in:
vdimir 2024-09-18 13:33:38 -07:00 committed by GitHub
commit a0302112ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 89 additions and 27 deletions

View File

@ -261,7 +261,7 @@ windowFunnel(window, [mode, [mode, ... ]])(timestamp, cond1, cond2, ..., condN)
- `window` — Length of the sliding window, it is the time interval between the first and the last condition. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond1 <= timestamp of cond2 <= ... <= timestamp of condN <= timestamp of cond1 + window`.
- `mode` — It is an optional argument. One or more modes can be set.
- `'strict_deduplication'` — If the same condition holds for the sequence of events, then such repeating event interrupts further processing.
- `'strict_deduplication'` — If the same condition holds for the sequence of events, then such repeating event interrupts further processing. Note: it may work unexpectedly if several conditions hold for the same event.
- `'strict_order'` — Don't allow interventions of other events. E.g. in the case of `A->B->D->C`, it stops finding `A->B->C` at the `D` and the max event level is 2.
- `'strict_increase'` — Apply conditions only to events with strictly increasing timestamps.

View File

@ -170,14 +170,28 @@ private:
/// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window
std::vector<std::optional<std::pair<UInt64, UInt64>>> events_timestamp(events_size);
bool first_event = false;
for (size_t i = 0; i < data.events_list.size(); ++i)
bool has_first_event = false;
size_t deduplicate_up_to = 0;
size_t i = 0;
for (; i < data.events_list.size(); ++i)
{
const T & timestamp = data.events_list[i].first;
const auto & event_idx = data.events_list[i].second - 1;
const auto & current_event = data.events_list[i];
while (deduplicate_up_to && i + 1 < data.events_list.size())
{
const auto & next_event = data.events_list[i + 1];
if (next_event.second <= deduplicate_up_to && next_event.first == current_event.first && next_event.second == current_event.second + 1)
++i;
else
break;
}
deduplicate_up_to = 0;
auto timestamp = data.events_list[i].first;
Int64 event_idx = data.events_list[i].second - 1;
if (strict_order && event_idx == -1)
{
if (first_event)
if (has_first_event)
break;
else
continue;
@ -185,13 +199,13 @@ private:
else if (event_idx == 0)
{
events_timestamp[0] = std::make_pair(timestamp, timestamp);
first_event = true;
has_first_event = true;
}
else if (strict_deduplication && events_timestamp[event_idx].has_value())
{
return data.events_list[i - 1].second;
}
else if (strict_order && first_event && !events_timestamp[event_idx - 1].has_value())
else if (strict_order && has_first_event && !events_timestamp[event_idx - 1].has_value())
{
for (size_t event = 0; event < events_timestamp.size(); ++event)
{
@ -201,13 +215,18 @@ private:
}
else if (events_timestamp[event_idx - 1].has_value())
{
auto first_timestamp = events_timestamp[event_idx - 1]->first;
bool time_matched = timestamp <= first_timestamp + window;
if (strict_increase)
time_matched = time_matched && events_timestamp[event_idx - 1]->second < timestamp;
auto prev_timestamp = events_timestamp[event_idx - 1]->first;
bool time_matched = timestamp <= prev_timestamp + window;
if (time_matched && strict_increase)
{
time_matched = events_timestamp[event_idx - 1]->second < timestamp;
if (events_timestamp[event_idx - 1]->second == timestamp)
deduplicate_up_to = event_idx + 1;
}
if (time_matched)
{
events_timestamp[event_idx] = std::make_pair(first_timestamp, timestamp);
events_timestamp[event_idx] = std::make_pair(prev_timestamp, timestamp);
if (event_idx + 1 == events_size)
return events_size;
}
@ -260,9 +279,9 @@ public:
bool has_event = false;
const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
/// reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
for (auto i = events_size; i > 0; --i)
for (size_t i = events_size; i > 0; --i)
{
auto event = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
UInt8 event = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
if (event)
{
this->data(place).add(timestamp, i);
@ -312,13 +331,13 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes &
if (arguments.size() > max_events + 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Too many event arguments for aggregate function {}", name);
for (const auto i : collections::range(1, arguments.size()))
for (size_t i = 1; i < arguments.size(); ++i)
{
const auto * cond_arg = arguments[i].get();
if (!isUInt8(cond_arg))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of aggregate function {}, must be UInt8",
cond_arg->getName(), toString(i + 1), name);
"Illegal type {} of argument {} of aggregate function '{}', must be UInt8",
cond_arg->getName(), i + 1, name);
}
AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunctionWindowFunnel, Data>(*arguments[0], arguments, params));

View File

@ -62,3 +62,16 @@
1
1
1
-
123 3
234 2
345 1
-
123 3
234 2
345 3
-
123 1
234 1
345 3
-

View File

@ -95,4 +95,34 @@ select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event
select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
DROP TABLE IF EXISTS funnel_test2;
create table funnel_test2 (event_ts UInt32, result String, uid UInt32) engine=Memory;
insert into funnel_test2 SELECT data.1, data.2, data.3 FROM (
SELECT arrayJoin([
(100, 'failure', 234),
(200, 'success', 345),
(210, 'failure', 345),
(230, 'success', 345),
(250, 'failure', 234),
(180, 'failure', 123),
(220, 'failure', 123),
(250, 'success', 123)
]) data);
SELECT '-';
SELECT uid, windowFunnel(200, 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
SELECT uid, windowFunnel(200)( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
SELECT uid, windowFunnel(200, 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
SELECT '-';
DROP TABLE IF EXISTS funnel_test2;
drop table funnel_test_strict_increase;