Fix windowFunnel counting same event several times

This commit is contained in:
vdimir 2024-09-18 15:29:49 +00:00
parent ecbabd6aaa
commit e9b57820fc
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
3 changed files with 270 additions and 91 deletions

View File

@ -11,7 +11,6 @@
#include <IO/WriteHelpers.h>
#include <Common/assert_cast.h>
namespace DB
struct Settings;
@ -27,33 +26,52 @@ namespace ErrorCodes
constexpr size_t max_events = 32;
constexpr size_t MAX_EVENTS = 32;
template <typename T>
struct AggregateFunctionWindowFunnelData
using TimestampEvent = std::pair<T, UInt8>;
using TimestampEvents = PODArrayWithStackMemory<TimestampEvent, 64>;
struct TimestampEvent
T timestamp;
UInt8 event_type;
UInt64 unique_id;
bool sorted = true;
TimestampEvent() = default;
TimestampEvent(T timestamp_, UInt8 event_type_, UInt64 unique_id_)
: timestamp(timestamp_), event_type(event_type_), unique_id(unique_id_) {}
// Comparison operator for sorting events
bool operator<(const TimestampEvent & other) const
return std::tie(timestamp, event_type, unique_id) < std::tie(other.timestamp, other.event_type, other.unique_id);
using TimestampEvents = PODArrayWithStackMemory<TimestampEvent, 64>;
TimestampEvents events_list;
/// Next unique identifier for events
/// Used to distinguish events with the same timestamp that matches several conditions.
UInt64 next_unique_id = 1;
bool sorted = true;
size_t size() const
return events_list.size();
void add(T timestamp, UInt8 event)
void advanceId()
/// Since most events should have already been sorted by timestamp.
if (sorted && events_list.size() > 0)
if (events_list.back().first == timestamp)
sorted = events_list.back().second <= event;
sorted = events_list.back().first <= timestamp;
events_list.emplace_back(timestamp, event);
void add(T timestamp, UInt8 event_type)
TimestampEvent new_event(timestamp, event_type, next_unique_id);
/// Check if the new event maintains the sorted order
if (sorted && !events_list.empty())
sorted = events_list.back() < new_event;
void merge(const AggregateFunctionWindowFunnelData & other)
@ -61,18 +79,28 @@ struct AggregateFunctionWindowFunnelData
if (other.events_list.empty())
const auto size = events_list.size();
const auto current_size = events_list.size();
UInt64 new_next_unique_id = next_unique_id;
events_list.insert(std::begin(other.events_list), std::end(other.events_list));
for (auto other_event : other.events_list)
/// Assign unique IDs to the new events to prevent conflicts
other_event.unique_id += next_unique_id;
new_next_unique_id = std::max(new_next_unique_id, other_event.unique_id + 1);
next_unique_id = new_next_unique_id;
/// either sort whole container or do so partially merging ranges afterwards
/// Sort the combined events list
if (!sorted && !other.sorted)
std::stable_sort(std::begin(events_list), std::end(events_list));
std::stable_sort(events_list.begin(), events_list.end());
const auto begin = std::begin(events_list);
const auto middle = std::next(begin, size);
const auto end = std::end(events_list);
auto begin = events_list.begin();
auto middle = begin + current_size;
auto end = events_list.end();
if (!sorted)
std::stable_sort(begin, middle);
@ -90,44 +118,54 @@ struct AggregateFunctionWindowFunnelData
if (!sorted)
std::stable_sort(std::begin(events_list), std::end(events_list));
std::stable_sort(events_list.begin(), events_list.end());
sorted = true;
void serialize(WriteBuffer & buf) const
/// Param match_each_once indicates whether to write the unique_id.
void serialize(WriteBuffer & buf, bool match_each_once) const
writeBinary(sorted, buf);
writeBinary(events_list.size(), buf);
for (const auto & events : events_list)
for (const auto & event : events_list)
writeBinary(events.first, buf);
writeBinary(events.second, buf);
writeBinary(event.timestamp, buf);
writeBinary(event.event_type, buf);
if (match_each_once)
writeBinary(event.unique_id, buf);
void deserialize(ReadBuffer & buf)
void deserialize(ReadBuffer & buf, bool match_each_once)
readBinary(sorted, buf);
size_t size;
readBinary(size, buf);
size_t events_size;
readBinary(events_size, buf);
if (size > 100'000'000) /// The constant is arbitrary
if (events_size > 100'000'000) /// Arbitrary limit to prevent excessive memory allocation
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large size of the state of windowFunnel");
T timestamp;
UInt8 event;
UInt8 event_type;
UInt64 unique_id;
for (size_t i = 0; i < size; ++i)
for (size_t i = 0; i < events_size; ++i)
readBinary(timestamp, buf);
readBinary(event, buf);
events_list.emplace_back(timestamp, event);
readBinary(event_type, buf);
if (match_each_once)
readBinary(unique_id, buf);
unique_id = next_unique_id;
events_list.emplace_back(timestamp, event_type, unique_id);
next_unique_id = std::max(next_unique_id, unique_id + 1);
@ -168,26 +206,32 @@ private:
/// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window
std::vector<std::optional<std::pair<UInt64, UInt64>>> events_timestamp(events_size);
/// Stores the timestamp of the first and last i-th level event happen within time window
struct EventMatchTimeWindow
UInt64 first_timestamp;
UInt64 last_timestamp;
std::array<UInt64, MAX_EVENTS> event_path;
EventMatchTimeWindow() = default;
EventMatchTimeWindow(UInt64 first_ts, UInt64 last_ts)
: first_timestamp(first_ts), last_timestamp(last_ts) {}
/// We track all possible event sequences up to the current event.
/// It's required because one event can meet several conditions.
/// For example: for events 'start', 'a', 'b', 'a', 'end'.
/// The second occurrence of 'a' should be counted only once in one sequence.
/// However, we do not know in advance if the next event will be 'b' or 'end', so we try to keep both paths.
std::vector<std::list<EventMatchTimeWindow>> event_sequences(events_size);
bool has_first_event = false;
size_t deduplicate_up_to = 0;
size_t i = 0;
for (; i < data.events_list.size(); ++i)
for (size_t i = 0; i < data.events_list.size(); ++i)
const auto & current_event = data.events_list[i];
while (deduplicate_up_to && i + 1 < data.events_list.size())
const auto & next_event = data.events_list[i + 1];
if (next_event.second <= deduplicate_up_to && next_event.first == current_event.first && next_event.second == current_event.second + 1)
deduplicate_up_to = 0;
auto timestamp = data.events_list[i].first;
Int64 event_idx = data.events_list[i].second - 1;
auto timestamp = current_event.timestamp;
Int64 event_idx = current_event.event_type - 1;
UInt64 unique_id = current_event.unique_id;
if (strict_order && event_idx == -1)
@ -198,44 +242,66 @@ private:
else if (event_idx == 0)
events_timestamp[0] = std::make_pair(timestamp, timestamp);
auto & event_seq = event_sequences[0].emplace_back(timestamp, timestamp);
event_seq.event_path[0] = unique_id;
has_first_event = true;
else if (strict_deduplication && events_timestamp[event_idx].has_value())
else if (strict_deduplication && !event_sequences[event_idx].empty())
return data.events_list[i - 1].second;
return data.events_list[i - 1].event_type;
else if (strict_order && has_first_event && !events_timestamp[event_idx - 1].has_value())
else if (strict_order && has_first_event && event_sequences[event_idx - 1].empty())
for (size_t event = 0; event < events_timestamp.size(); ++event)
for (size_t event = 0; event < event_sequences.size(); ++event)
if (!events_timestamp[event].has_value())
if (event_sequences[event].empty())
return event;
else if (events_timestamp[event_idx - 1].has_value())
else if (!event_sequences[event_idx - 1].empty())
auto prev_timestamp = events_timestamp[event_idx - 1]->first;
bool time_matched = timestamp <= prev_timestamp + window;
if (time_matched && strict_increase)
auto & prev_level = event_sequences[event_idx - 1];
for (auto it = prev_level.begin(); it != prev_level.end();)
time_matched = events_timestamp[event_idx - 1]->second < timestamp;
if (events_timestamp[event_idx - 1]->second == timestamp)
deduplicate_up_to = event_idx + 1;
auto first_ts = it->first_timestamp;
bool time_matched = timestamp <= first_ts + window;
if (!time_matched && prev_level.size() > 1)
// Remove old events that are out of the window, but keep at least one
it = prev_level.erase(it);
auto prev_path = it->event_path;
chassert(event_idx > 0);
/// Ensure the unique_id hasn't been used in the path already
for (size_t j = 0; j < static_cast<size_t>(event_idx); ++j)
if (!time_matched)
time_matched = prev_path[j] != unique_id;
if (time_matched && strict_increase)
time_matched = it->last_timestamp < timestamp;
if (time_matched)
events_timestamp[event_idx] = std::make_pair(prev_timestamp, timestamp);
prev_path[event_idx] = unique_id;
auto & new_seq = event_sequences[event_idx].emplace_back(first_ts, timestamp);
new_seq.event_path = std::move(prev_path);
if (event_idx + 1 == events_size)
return events_size;
for (size_t event = events_timestamp.size(); event > 0; --event)
for (size_t event = event_sequences.size(); event > 0; --event)
if (events_timestamp[event - 1].has_value())
if (!event_sequences[event - 1].empty())
return event;
return 0;
@ -266,9 +332,9 @@ public:
else if (option == "strict_increase")
strict_increase = true;
else if (option == "strict")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "strict is replaced with strict_deduplication in Aggregate function {}", getName());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter 'strict' is replaced with 'strict_deduplication' in Aggregate function {}", getName());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} doesn't support a parameter: {}", getName(), option);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} doesn't support parameter: {}", getName(), option);
@ -278,11 +344,10 @@ public:
bool has_event = false;
const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
/// reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
for (size_t i = events_size; i > 0; --i)
for (size_t i = 1; i <= events_size; ++i)
UInt8 event = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
if (event)
UInt8 event_occurred = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
if (event_occurred)
this->data(place).add(timestamp, i);
has_event = true;
@ -291,6 +356,9 @@ public:
if (strict_order && !has_event)
this->data(place).add(timestamp, 0);
// Advance to the next unique event
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
@ -298,14 +366,24 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
/// Versioning for serialization
/// Version 1 supports deduplication of the same event several times
static constexpr auto MIN_REVISION_FOR_V1 = 54470;
bool isVersioned() const override { return true; }
size_t getDefaultVersion() const override { return 1; }
size_t getVersionFromRevision(size_t revision) const override
return revision >= MIN_REVISION_FOR_V1 ? 1 : 0;
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
this->data(place).serialize(buf, version.value_or(0));
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version, Arena *) const override
this->data(place).deserialize(buf, version.value_or(0));
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
@ -328,7 +406,7 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes &
"Aggregate function {} requires one timestamp argument and at least one event condition.", name);
if (arguments.size() > max_events + 1)
if (arguments.size() > MAX_EVENTS + 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Too many event arguments for aggregate function {}", name);
for (size_t i = 1; i < arguments.size(); ++i)
@ -351,7 +429,7 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes &
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of first argument of aggregate function {}, must "
"be Unsigned Number, Date, DateTime", arguments.front().get()->getName(), name);
"be Unsigned Number, Date, or DateTime", arguments.front().get()->getName(), name);

View File

@ -1,77 +1,177 @@
-- { echoOn }
drop table if exists funnel_test;
create table funnel_test (timestamp UInt32, event UInt32) engine=Memory;
insert into funnel_test values (0,1000),(1,1001),(2,1002),(3,1003),(4,1004),(5,1005),(6,1006),(7,1007),(8,1008);
select 1 = windowFunnel(10000)(timestamp, event = 1000) from funnel_test;
select 2 = windowFunnel(10000)(timestamp, event = 1000, event = 1001) from funnel_test;
select 3 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002) from funnel_test;
select 4 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1008) from funnel_test;
select 1 = windowFunnel(1)(timestamp, event = 1000) from funnel_test;
select 3 = windowFunnel(2)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
select 4 = windowFunnel(3)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
drop table if exists funnel_test2;
create table funnel_test2 (uid UInt32 default 1,timestamp DateTime, event UInt32) engine=Memory;
insert into funnel_test2(timestamp, event) values ('2018-01-01 01:01:01',1001),('2018-01-01 01:01:02',1002),('2018-01-01 01:01:03',1003),('2018-01-01 01:01:04',1004),('2018-01-01 01:01:05',1005),('2018-01-01 01:01:06',1006),('2018-01-01 01:01:07',1007),('2018-01-01 01:01:08',1008);
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test2;
select 2 = windowFunnel(10000)(timestamp, event = 1001, event = 1008) from funnel_test2;
select 1 = windowFunnel(10000)(timestamp, event = 1008, event = 1001) from funnel_test2;
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test2;
select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test2;
drop table if exists funnel_test_u64;
create table funnel_test_u64 (uid UInt32 default 1,timestamp UInt64, event UInt32) engine=Memory;
insert into funnel_test_u64(timestamp, event) values ( 1e14 + 1 ,1001),(1e14 + 2,1002),(1e14 + 3,1003),(1e14 + 4,1004),(1e14 + 5,1005),(1e14 + 6,1006),(1e14 + 7,1007),(1e14 + 8,1008);
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64;
select 2 = windowFunnel(10000)(timestamp, event = 1001, event = 1008) from funnel_test_u64;
select 1 = windowFunnel(10000)(timestamp, event = 1008, event = 1001) from funnel_test_u64;
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64;
select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test_u64;
drop table if exists funnel_test_strict;
create table funnel_test_strict (timestamp UInt32, event UInt32) engine=Memory;
insert into funnel_test_strict values (00,1000),(10,1001),(20,1002),(30,1003),(40,1004),(50,1005),(51,1005),(60,1006),(70,1007),(80,1008);
select 6 = windowFunnel(10000, 'strict_deduplication')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
select 7 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
drop table funnel_test;
drop table funnel_test2;
drop table funnel_test_u64;
drop table funnel_test_strict;
drop table if exists funnel_test_strict_order;
create table funnel_test_strict_order (dt DateTime, user int, event String) engine = MergeTree() partition by dt order by user;
insert into funnel_test_strict_order values (1, 1, 'a') (2, 1, 'b') (3, 1, 'c');
insert into funnel_test_strict_order values (1, 2, 'a') (2, 2, 'd') (3, 2, 'b') (4, 2, 'c');
insert into funnel_test_strict_order values (1, 3, 'a') (2, 3, 'a') (3, 3, 'b') (4, 3, 'b') (5, 3, 'c') (6, 3, 'c');
insert into funnel_test_strict_order values (1, 4, 'a') (2, 4, 'a') (3, 4, 'a') (4, 4, 'a') (5, 4, 'b') (6, 4, 'b') (7, 4, 'c') (8, 4, 'c');
insert into funnel_test_strict_order values (1, 5, 'a') (2, 5, 'a') (3, 5, 'b') (4, 5, 'b') (5, 5, 'd') (6, 5, 'c') (7, 5, 'c');
insert into funnel_test_strict_order values (1, 6, 'c') (2, 6, 'c') (3, 6, 'b') (4, 6, 'b') (5, 6, 'a') (6, 6, 'a');
select user, windowFunnel(86400)(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
[1, 3]
[2, 3]
[3, 3]
[4, 3]
[5, 3]
[6, 1]
select user, windowFunnel(86400, 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
[1, 3]
[2, 1]
[3, 3]
[4, 3]
[5, 2]
[6, 1]
select user, windowFunnel(86400, 'strict_deduplication', 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
[1, 3]
[2, 1]
[3, 2]
[4, 2]
[5, 2]
[6, 1]
insert into funnel_test_strict_order values (1, 7, 'a') (2, 7, 'c') (3, 7, 'b');
select user, windowFunnel(10, 'strict_order')(dt, event = 'a', event = 'b', event = 'c') as s from funnel_test_strict_order where user = 7 group by user format JSONCompactEachRow;
[7, 1]
drop table funnel_test_strict_order;
drop table if exists strict_BiteTheDDDD;
create table strict_BiteTheDDDD (ts UInt64, event String) engine = Log();
insert into strict_BiteTheDDDD values (1,'a') (2,'b') (3,'c') (4,'b') (5,'d');
select 3 = windowFunnel(86400, 'strict_deduplication')(ts, event='a', event='b', event='c', event='d') from strict_BiteTheDDDD format JSONCompactEachRow;
drop table strict_BiteTheDDDD;
drop table if exists funnel_test_non_null;
create table funnel_test_non_null (`dt` DateTime, `u` int, `a` Nullable(String), `b` Nullable(String)) engine = MergeTree() partition by dt order by u;
insert into funnel_test_non_null values (1, 1, 'a1', 'b1') (2, 1, 'a2', 'b2');
insert into funnel_test_non_null values (1, 2, 'a1', null) (2, 2, 'a2', null);
insert into funnel_test_non_null values (1, 3, null, null);
insert into funnel_test_non_null values (1, 4, null, 'b1') (2, 4, 'a2', null) (3, 4, null, 'b3');
select u, windowFunnel(86400)(dt, COALESCE(a, '') = 'a1', COALESCE(a, '') = 'a2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
[1, 2]
[2, 2]
[3, 0]
[4, 0]
select u, windowFunnel(86400)(dt, COALESCE(a, '') = 'a1', COALESCE(b, '') = 'b2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
[1, 2]
[2, 1]
[3, 0]
[4, 0]
select u, windowFunnel(86400)(dt, a is null and b is null) as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
[1, 0]
[2, 0]
[3, 1]
[4, 0]
select u, windowFunnel(86400)(dt, a is null, COALESCE(b, '') = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
[1, 0]
[2, 0]
[3, 1]
[4, 2]
select u, windowFunnel(86400, 'strict_order')(dt, a is null, COALESCE(b, '') = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
[1, 0]
[2, 0]
[3, 1]
[4, 1]
drop table funnel_test_non_null;
create table funnel_test_strict_increase (timestamp UInt32, event UInt32) engine=Memory;
insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,1003),(2,1004);
select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
DROP TABLE IF EXISTS funnel_test2;
create table funnel_test2 (event_ts UInt32, result String, uid UInt32) engine=Memory;
insert into funnel_test2 SELECT data.1, data.2, data.3 FROM (
SELECT arrayJoin([
(100, 'failure', 234),
(200, 'success', 345),
(210, 'failure', 345),
(230, 'success', 345),
(250, 'failure', 234),
(180, 'failure', 123),
(220, 'failure', 123),
(250, 'success', 123)
]) data);
SELECT uid, windowFunnel(200, 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
123 3
234 2
345 1
SELECT uid, windowFunnel(200)( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
123 3
234 2
345 3
345 1
123 1
234 1
345 3
SELECT uid, windowFunnel(200, 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
123 3
234 2
345 1
DROP TABLE IF EXISTS funnel_test2;
drop table funnel_test_strict_increase;

View File

@ -1,3 +1,4 @@
-- { echoOn }
drop table if exists funnel_test;
create table funnel_test (timestamp UInt32, event UInt32) engine=Memory;
@ -92,7 +93,7 @@ insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,100
select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;