diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp b/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp index d3b150e37fc..8e80320760f 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp @@ -12,7 +12,6 @@ #include #include - namespace DB { struct Settings; @@ -28,33 +27,52 @@ namespace ErrorCodes namespace { -constexpr size_t max_events = 32; +constexpr size_t MAX_EVENTS = 32; template struct AggregateFunctionWindowFunnelData { - using TimestampEvent = std::pair; - using TimestampEvents = PODArrayWithStackMemory; + struct TimestampEvent + { + T timestamp; + UInt8 event_type; + UInt64 unique_id; - bool sorted = true; + TimestampEvent() = default; + TimestampEvent(T timestamp_, UInt8 event_type_, UInt64 unique_id_) + : timestamp(timestamp_), event_type(event_type_), unique_id(unique_id_) {} + + // Comparison operator for sorting events + bool operator<(const TimestampEvent & other) const + { + return std::tie(timestamp, event_type, unique_id) < std::tie(other.timestamp, other.event_type, other.unique_id); + } + }; + + using TimestampEvents = PODArrayWithStackMemory; TimestampEvents events_list; + /// Next unique identifier for events + /// Used to distinguish events with the same timestamp that matches several conditions. + UInt64 next_unique_id = 1; + bool sorted = true; + size_t size() const { return events_list.size(); } - void add(T timestamp, UInt8 event) + void advanceId() { - /// Since most events should have already been sorted by timestamp. - if (sorted && events_list.size() > 0) - { - if (events_list.back().first == timestamp) - sorted = events_list.back().second <= event; - else - sorted = events_list.back().first <= timestamp; - } - events_list.emplace_back(timestamp, event); + ++next_unique_id; + } + void add(T timestamp, UInt8 event_type) + { + TimestampEvent new_event(timestamp, event_type, next_unique_id); + /// Check if the new event maintains the sorted order + if (sorted && !events_list.empty()) + sorted = events_list.back() < new_event; + events_list.push_back(new_event); } void merge(const AggregateFunctionWindowFunnelData & other) @@ -62,18 +80,28 @@ struct AggregateFunctionWindowFunnelData if (other.events_list.empty()) return; - const auto size = events_list.size(); + const auto current_size = events_list.size(); + UInt64 new_next_unique_id = next_unique_id; - events_list.insert(std::begin(other.events_list), std::end(other.events_list)); + for (auto other_event : other.events_list) + { + /// Assign unique IDs to the new events to prevent conflicts + other_event.unique_id += next_unique_id; + new_next_unique_id = std::max(new_next_unique_id, other_event.unique_id + 1); + events_list.push_back(other_event); + } + next_unique_id = new_next_unique_id; - /// either sort whole container or do so partially merging ranges afterwards + /// Sort the combined events list if (!sorted && !other.sorted) - std::stable_sort(std::begin(events_list), std::end(events_list)); + { + std::stable_sort(events_list.begin(), events_list.end()); + } else { - const auto begin = std::begin(events_list); - const auto middle = std::next(begin, size); - const auto end = std::end(events_list); + auto begin = events_list.begin(); + auto middle = begin + current_size; + auto end = events_list.end(); if (!sorted) std::stable_sort(begin, middle); @@ -91,44 +119,54 @@ struct AggregateFunctionWindowFunnelData { if (!sorted) { - std::stable_sort(std::begin(events_list), std::end(events_list)); + std::stable_sort(events_list.begin(), events_list.end()); sorted = true; } } - void serialize(WriteBuffer & buf) const + /// Param match_each_once indicates whether to write the unique_id. + void serialize(WriteBuffer & buf, bool match_each_once) const { writeBinary(sorted, buf); writeBinary(events_list.size(), buf); - for (const auto & events : events_list) + for (const auto & event : events_list) { - writeBinary(events.first, buf); - writeBinary(events.second, buf); + writeBinary(event.timestamp, buf); + writeBinary(event.event_type, buf); + if (match_each_once) + writeBinary(event.unique_id, buf); } } - void deserialize(ReadBuffer & buf) + void deserialize(ReadBuffer & buf, bool match_each_once) { readBinary(sorted, buf); - size_t size; - readBinary(size, buf); + size_t events_size; + readBinary(events_size, buf); - if (size > 100'000'000) /// The constant is arbitrary + if (events_size > 100'000'000) /// Arbitrary limit to prevent excessive memory allocation throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large size of the state of windowFunnel"); events_list.clear(); - events_list.reserve(size); + events_list.reserve(events_size); T timestamp; - UInt8 event; + UInt8 event_type; + UInt64 unique_id; - for (size_t i = 0; i < size; ++i) + for (size_t i = 0; i < events_size; ++i) { readBinary(timestamp, buf); - readBinary(event, buf); - events_list.emplace_back(timestamp, event); + readBinary(event_type, buf); + if (match_each_once) + readBinary(unique_id, buf); + else + unique_id = next_unique_id; + + events_list.emplace_back(timestamp, event_type, unique_id); + next_unique_id = std::max(next_unique_id, unique_id + 1); } } }; @@ -169,26 +207,32 @@ private: data.sort(); - /// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window - std::vector>> events_timestamp(events_size); + /// Stores the timestamp of the first and last i-th level event happen within time window + struct EventMatchTimeWindow + { + UInt64 first_timestamp; + UInt64 last_timestamp; + std::array event_path; + + EventMatchTimeWindow() = default; + EventMatchTimeWindow(UInt64 first_ts, UInt64 last_ts) + : first_timestamp(first_ts), last_timestamp(last_ts) {} + }; + + /// We track all possible event sequences up to the current event. + /// It's required because one event can meet several conditions. + /// For example: for events 'start', 'a', 'b', 'a', 'end'. + /// The second occurrence of 'a' should be counted only once in one sequence. + /// However, we do not know in advance if the next event will be 'b' or 'end', so we try to keep both paths. + std::vector> event_sequences(events_size); + bool has_first_event = false; - size_t deduplicate_up_to = 0; - size_t i = 0; - for (; i < data.events_list.size(); ++i) + for (size_t i = 0; i < data.events_list.size(); ++i) { const auto & current_event = data.events_list[i]; - while (deduplicate_up_to && i + 1 < data.events_list.size()) - { - const auto & next_event = data.events_list[i + 1]; - if (next_event.second <= deduplicate_up_to && next_event.first == current_event.first && next_event.second == current_event.second + 1) - ++i; - else - break; - } - deduplicate_up_to = 0; - - auto timestamp = data.events_list[i].first; - Int64 event_idx = data.events_list[i].second - 1; + auto timestamp = current_event.timestamp; + Int64 event_idx = current_event.event_type - 1; + UInt64 unique_id = current_event.unique_id; if (strict_order && event_idx == -1) { @@ -199,44 +243,66 @@ private: } else if (event_idx == 0) { - events_timestamp[0] = std::make_pair(timestamp, timestamp); + auto & event_seq = event_sequences[0].emplace_back(timestamp, timestamp); + event_seq.event_path[0] = unique_id; has_first_event = true; } - else if (strict_deduplication && events_timestamp[event_idx].has_value()) + else if (strict_deduplication && !event_sequences[event_idx].empty()) { - return data.events_list[i - 1].second; + return data.events_list[i - 1].event_type; } - else if (strict_order && has_first_event && !events_timestamp[event_idx - 1].has_value()) + else if (strict_order && has_first_event && event_sequences[event_idx - 1].empty()) { - for (size_t event = 0; event < events_timestamp.size(); ++event) + for (size_t event = 0; event < event_sequences.size(); ++event) { - if (!events_timestamp[event].has_value()) + if (event_sequences[event].empty()) return event; } } - else if (events_timestamp[event_idx - 1].has_value()) + else if (!event_sequences[event_idx - 1].empty()) { - auto prev_timestamp = events_timestamp[event_idx - 1]->first; - bool time_matched = timestamp <= prev_timestamp + window; - if (time_matched && strict_increase) + auto & prev_level = event_sequences[event_idx - 1]; + for (auto it = prev_level.begin(); it != prev_level.end();) { - time_matched = events_timestamp[event_idx - 1]->second < timestamp; - if (events_timestamp[event_idx - 1]->second == timestamp) - deduplicate_up_to = event_idx + 1; - } + auto first_ts = it->first_timestamp; + bool time_matched = timestamp <= first_ts + window; + if (!time_matched && prev_level.size() > 1) + { + // Remove old events that are out of the window, but keep at least one + it = prev_level.erase(it); + continue; + } - if (time_matched) - { - events_timestamp[event_idx] = std::make_pair(prev_timestamp, timestamp); - if (event_idx + 1 == events_size) - return events_size; + auto prev_path = it->event_path; + chassert(event_idx > 0); + + /// Ensure the unique_id hasn't been used in the path already + for (size_t j = 0; j < static_cast(event_idx); ++j) + { + if (!time_matched) + break; + time_matched = prev_path[j] != unique_id; + } + + if (time_matched && strict_increase) + time_matched = it->last_timestamp < timestamp; + + if (time_matched) + { + prev_path[event_idx] = unique_id; + auto & new_seq = event_sequences[event_idx].emplace_back(first_ts, timestamp); + new_seq.event_path = std::move(prev_path); + if (event_idx + 1 == events_size) + return events_size; + } + ++it; } } } - for (size_t event = events_timestamp.size(); event > 0; --event) + for (size_t event = event_sequences.size(); event > 0; --event) { - if (events_timestamp[event - 1].has_value()) + if (!event_sequences[event - 1].empty()) return event; } return 0; @@ -267,9 +333,9 @@ public: else if (option == "strict_increase") strict_increase = true; else if (option == "strict") - throw Exception(ErrorCodes::BAD_ARGUMENTS, "strict is replaced with strict_deduplication in Aggregate function {}", getName()); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter 'strict' is replaced with 'strict_deduplication' in Aggregate function {}", getName()); else - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} doesn't support a parameter: {}", getName(), option); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} doesn't support parameter: {}", getName(), option); } } @@ -279,11 +345,10 @@ public: { bool has_event = false; const auto timestamp = assert_cast *>(columns[0])->getData()[row_num]; - /// reverse iteration and stable sorting are needed for events that are qualified by more than one condition. - for (size_t i = events_size; i > 0; --i) + for (size_t i = 1; i <= events_size; ++i) { - UInt8 event = assert_cast *>(columns[i])->getData()[row_num]; - if (event) + UInt8 event_occurred = assert_cast *>(columns[i])->getData()[row_num]; + if (event_occurred) { this->data(place).add(timestamp, i); has_event = true; @@ -292,6 +357,9 @@ public: if (strict_order && !has_event) this->data(place).add(timestamp, 0); + + // Advance to the next unique event + this->data(place).advanceId(); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override @@ -299,14 +367,24 @@ public: this->data(place).merge(this->data(rhs)); } - void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + /// Versioning for serialization + /// Version 1 supports deduplication of the same event several times + static constexpr auto MIN_REVISION_FOR_V1 = 54470; + bool isVersioned() const override { return true; } + size_t getDefaultVersion() const override { return 1; } + size_t getVersionFromRevision(size_t revision) const override { - this->data(place).serialize(buf); + return revision >= MIN_REVISION_FOR_V1 ? 1 : 0; } - void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional /* version */, Arena *) const override + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional version) const override { - this->data(place).deserialize(buf); + this->data(place).serialize(buf, version.value_or(0)); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional version, Arena *) const override + { + this->data(place).deserialize(buf, version.value_or(0)); } void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override @@ -329,7 +407,7 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Aggregate function {} requires one timestamp argument and at least one event condition.", name); - if (arguments.size() > max_events + 1) + if (arguments.size() > MAX_EVENTS + 1) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Too many event arguments for aggregate function {}", name); for (size_t i = 1; i < arguments.size(); ++i) @@ -352,7 +430,7 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of first argument of aggregate function {}, must " - "be Unsigned Number, Date, DateTime", arguments.front().get()->getName(), name); + "be Unsigned Number, Date, or DateTime", arguments.front().get()->getName(), name); } } diff --git a/tests/queries/0_stateless/00632_aggregation_window_funnel.reference b/tests/queries/0_stateless/00632_aggregation_window_funnel.reference index 318e2a22ae5..807cc3758d2 100644 --- a/tests/queries/0_stateless/00632_aggregation_window_funnel.reference +++ b/tests/queries/0_stateless/00632_aggregation_window_funnel.reference @@ -1,77 +1,177 @@ +-- { echoOn } +drop table if exists funnel_test; +create table funnel_test (timestamp UInt32, event UInt32) engine=Memory; +insert into funnel_test values (0,1000),(1,1001),(2,1002),(3,1003),(4,1004),(5,1005),(6,1006),(7,1007),(8,1008); +select 1 = windowFunnel(10000)(timestamp, event = 1000) from funnel_test; 1 +select 2 = windowFunnel(10000)(timestamp, event = 1000, event = 1001) from funnel_test; 1 +select 3 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002) from funnel_test; 1 +select 4 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1008) from funnel_test; 1 +select 1 = windowFunnel(1)(timestamp, event = 1000) from funnel_test; 1 +select 3 = windowFunnel(2)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test; 1 +select 4 = windowFunnel(3)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test; 1 +select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test; 1 +drop table if exists funnel_test2; +create table funnel_test2 (uid UInt32 default 1,timestamp DateTime, event UInt32) engine=Memory; +insert into funnel_test2(timestamp, event) values ('2018-01-01 01:01:01',1001),('2018-01-01 01:01:02',1002),('2018-01-01 01:01:03',1003),('2018-01-01 01:01:04',1004),('2018-01-01 01:01:05',1005),('2018-01-01 01:01:06',1006),('2018-01-01 01:01:07',1007),('2018-01-01 01:01:08',1008); +select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test2; 1 +select 2 = windowFunnel(10000)(timestamp, event = 1001, event = 1008) from funnel_test2; 1 +select 1 = windowFunnel(10000)(timestamp, event = 1008, event = 1001) from funnel_test2; 1 +select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test2; 1 +select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test2; 1 +drop table if exists funnel_test_u64; +create table funnel_test_u64 (uid UInt32 default 1,timestamp UInt64, event UInt32) engine=Memory; +insert into funnel_test_u64(timestamp, event) values ( 1e14 + 1 ,1001),(1e14 + 2,1002),(1e14 + 3,1003),(1e14 + 4,1004),(1e14 + 5,1005),(1e14 + 6,1006),(1e14 + 7,1007),(1e14 + 8,1008); +select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64; 1 +select 2 = windowFunnel(10000)(timestamp, event = 1001, event = 1008) from funnel_test_u64; 1 +select 1 = windowFunnel(10000)(timestamp, event = 1008, event = 1001) from funnel_test_u64; 1 +select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64; 1 +select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test_u64; 1 +drop table if exists funnel_test_strict; +create table funnel_test_strict (timestamp UInt32, event UInt32) engine=Memory; +insert into funnel_test_strict values (00,1000),(10,1001),(20,1002),(30,1003),(40,1004),(50,1005),(51,1005),(60,1006),(70,1007),(80,1008); +select 6 = windowFunnel(10000, 'strict_deduplication')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict; 1 +select 7 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict; 1 +drop table funnel_test; +drop table funnel_test2; +drop table funnel_test_u64; +drop table funnel_test_strict; +drop table if exists funnel_test_strict_order; +create table funnel_test_strict_order (dt DateTime, user int, event String) engine = MergeTree() partition by dt order by user; +insert into funnel_test_strict_order values (1, 1, 'a') (2, 1, 'b') (3, 1, 'c'); +insert into funnel_test_strict_order values (1, 2, 'a') (2, 2, 'd') (3, 2, 'b') (4, 2, 'c'); +insert into funnel_test_strict_order values (1, 3, 'a') (2, 3, 'a') (3, 3, 'b') (4, 3, 'b') (5, 3, 'c') (6, 3, 'c'); +insert into funnel_test_strict_order values (1, 4, 'a') (2, 4, 'a') (3, 4, 'a') (4, 4, 'a') (5, 4, 'b') (6, 4, 'b') (7, 4, 'c') (8, 4, 'c'); +insert into funnel_test_strict_order values (1, 5, 'a') (2, 5, 'a') (3, 5, 'b') (4, 5, 'b') (5, 5, 'd') (6, 5, 'c') (7, 5, 'c'); +insert into funnel_test_strict_order values (1, 6, 'c') (2, 6, 'c') (3, 6, 'b') (4, 6, 'b') (5, 6, 'a') (6, 6, 'a'); +select user, windowFunnel(86400)(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow; [1, 3] [2, 3] [3, 3] [4, 3] [5, 3] [6, 1] +select user, windowFunnel(86400, 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow; [1, 3] [2, 1] [3, 3] [4, 3] [5, 2] [6, 1] +select user, windowFunnel(86400, 'strict_deduplication', 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow; [1, 3] [2, 1] [3, 2] [4, 2] [5, 2] [6, 1] +insert into funnel_test_strict_order values (1, 7, 'a') (2, 7, 'c') (3, 7, 'b'); +select user, windowFunnel(10, 'strict_order')(dt, event = 'a', event = 'b', event = 'c') as s from funnel_test_strict_order where user = 7 group by user format JSONCompactEachRow; [7, 1] +drop table funnel_test_strict_order; +--https://github.com/ClickHouse/ClickHouse/issues/27469 +drop table if exists strict_BiteTheDDDD; +create table strict_BiteTheDDDD (ts UInt64, event String) engine = Log(); +insert into strict_BiteTheDDDD values (1,'a') (2,'b') (3,'c') (4,'b') (5,'d'); +select 3 = windowFunnel(86400, 'strict_deduplication')(ts, event='a', event='b', event='c', event='d') from strict_BiteTheDDDD format JSONCompactEachRow; [1] +drop table strict_BiteTheDDDD; +drop table if exists funnel_test_non_null; +create table funnel_test_non_null (`dt` DateTime, `u` int, `a` Nullable(String), `b` Nullable(String)) engine = MergeTree() partition by dt order by u; +insert into funnel_test_non_null values (1, 1, 'a1', 'b1') (2, 1, 'a2', 'b2'); +insert into funnel_test_non_null values (1, 2, 'a1', null) (2, 2, 'a2', null); +insert into funnel_test_non_null values (1, 3, null, null); +insert into funnel_test_non_null values (1, 4, null, 'b1') (2, 4, 'a2', null) (3, 4, null, 'b3'); +select u, windowFunnel(86400)(dt, COALESCE(a, '') = 'a1', COALESCE(a, '') = 'a2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow; [1, 2] [2, 2] [3, 0] [4, 0] +select u, windowFunnel(86400)(dt, COALESCE(a, '') = 'a1', COALESCE(b, '') = 'b2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow; [1, 2] [2, 1] [3, 0] [4, 0] +select u, windowFunnel(86400)(dt, a is null and b is null) as s from funnel_test_non_null group by u order by u format JSONCompactEachRow; [1, 0] [2, 0] [3, 1] [4, 0] +select u, windowFunnel(86400)(dt, a is null, COALESCE(b, '') = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow; [1, 0] [2, 0] [3, 1] [4, 2] +select u, windowFunnel(86400, 'strict_order')(dt, a is null, COALESCE(b, '') = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow; [1, 0] [2, 0] [3, 1] [4, 1] +drop table funnel_test_non_null; +create table funnel_test_strict_increase (timestamp UInt32, event UInt32) engine=Memory; +insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,1003),(2,1004); +select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase; 1 +select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase; 1 +select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase; 1 +select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase; 1 +DROP TABLE IF EXISTS funnel_test2; +create table funnel_test2 (event_ts UInt32, result String, uid UInt32) engine=Memory; +insert into funnel_test2 SELECT data.1, data.2, data.3 FROM ( + SELECT arrayJoin([ + (100, 'failure', 234), + (200, 'success', 345), + (210, 'failure', 345), + (230, 'success', 345), + (250, 'failure', 234), + (180, 'failure', 123), + (220, 'failure', 123), + (250, 'success', 123) + ]) data); +SELECT '-'; - +SELECT uid, windowFunnel(200, 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' ) +FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid; 123 3 234 2 345 1 +SELECT '-'; - +SELECT uid, windowFunnel(200)( toUInt32(event_ts), result='failure', result='failure', result='success' ) +FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid; 123 3 234 2 -345 3 +345 1 +SELECT '-'; - -123 1 -234 1 -345 3 +SELECT uid, windowFunnel(200, 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' ) +FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid; +123 3 +234 2 +345 1 +SELECT '-'; - +DROP TABLE IF EXISTS funnel_test2; +drop table funnel_test_strict_increase; diff --git a/tests/queries/0_stateless/00632_aggregation_window_funnel.sql b/tests/queries/0_stateless/00632_aggregation_window_funnel.sql index 403a8e6d406..0e5f955a80c 100644 --- a/tests/queries/0_stateless/00632_aggregation_window_funnel.sql +++ b/tests/queries/0_stateless/00632_aggregation_window_funnel.sql @@ -1,3 +1,4 @@ +-- { echoOn } drop table if exists funnel_test; create table funnel_test (timestamp UInt32, event UInt32) engine=Memory; @@ -92,7 +93,7 @@ insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,100 select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase; select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase; -select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase; +select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase; select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;