mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 19:12:03 +00:00
Fix windowFunnel counting same event several times
This commit is contained in:
parent
5217cdcb92
commit
4444a6d45b
@ -12,7 +12,6 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/assert_cast.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct Settings;
|
||||
@ -28,33 +27,52 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
|
||||
constexpr size_t max_events = 32;
|
||||
constexpr size_t MAX_EVENTS = 32;
|
||||
|
||||
template <typename T>
|
||||
struct AggregateFunctionWindowFunnelData
|
||||
{
|
||||
using TimestampEvent = std::pair<T, UInt8>;
|
||||
using TimestampEvents = PODArrayWithStackMemory<TimestampEvent, 64>;
|
||||
struct TimestampEvent
|
||||
{
|
||||
T timestamp;
|
||||
UInt8 event_type;
|
||||
UInt64 unique_id;
|
||||
|
||||
bool sorted = true;
|
||||
TimestampEvent() = default;
|
||||
TimestampEvent(T timestamp_, UInt8 event_type_, UInt64 unique_id_)
|
||||
: timestamp(timestamp_), event_type(event_type_), unique_id(unique_id_) {}
|
||||
|
||||
// Comparison operator for sorting events
|
||||
bool operator<(const TimestampEvent & other) const
|
||||
{
|
||||
return std::tie(timestamp, event_type, unique_id) < std::tie(other.timestamp, other.event_type, other.unique_id);
|
||||
}
|
||||
};
|
||||
|
||||
using TimestampEvents = PODArrayWithStackMemory<TimestampEvent, 64>;
|
||||
TimestampEvents events_list;
|
||||
|
||||
/// Next unique identifier for events
|
||||
/// Used to distinguish events with the same timestamp that matches several conditions.
|
||||
UInt64 next_unique_id = 1;
|
||||
bool sorted = true;
|
||||
|
||||
size_t size() const
|
||||
{
|
||||
return events_list.size();
|
||||
}
|
||||
|
||||
void add(T timestamp, UInt8 event)
|
||||
void advanceId()
|
||||
{
|
||||
/// Since most events should have already been sorted by timestamp.
|
||||
if (sorted && events_list.size() > 0)
|
||||
{
|
||||
if (events_list.back().first == timestamp)
|
||||
sorted = events_list.back().second <= event;
|
||||
else
|
||||
sorted = events_list.back().first <= timestamp;
|
||||
}
|
||||
events_list.emplace_back(timestamp, event);
|
||||
++next_unique_id;
|
||||
}
|
||||
void add(T timestamp, UInt8 event_type)
|
||||
{
|
||||
TimestampEvent new_event(timestamp, event_type, next_unique_id);
|
||||
/// Check if the new event maintains the sorted order
|
||||
if (sorted && !events_list.empty())
|
||||
sorted = events_list.back() < new_event;
|
||||
events_list.push_back(new_event);
|
||||
}
|
||||
|
||||
void merge(const AggregateFunctionWindowFunnelData & other)
|
||||
@ -62,18 +80,28 @@ struct AggregateFunctionWindowFunnelData
|
||||
if (other.events_list.empty())
|
||||
return;
|
||||
|
||||
const auto size = events_list.size();
|
||||
const auto current_size = events_list.size();
|
||||
UInt64 new_next_unique_id = next_unique_id;
|
||||
|
||||
events_list.insert(std::begin(other.events_list), std::end(other.events_list));
|
||||
for (auto other_event : other.events_list)
|
||||
{
|
||||
/// Assign unique IDs to the new events to prevent conflicts
|
||||
other_event.unique_id += next_unique_id;
|
||||
new_next_unique_id = std::max(new_next_unique_id, other_event.unique_id + 1);
|
||||
events_list.push_back(other_event);
|
||||
}
|
||||
next_unique_id = new_next_unique_id;
|
||||
|
||||
/// either sort whole container or do so partially merging ranges afterwards
|
||||
/// Sort the combined events list
|
||||
if (!sorted && !other.sorted)
|
||||
std::stable_sort(std::begin(events_list), std::end(events_list));
|
||||
{
|
||||
std::stable_sort(events_list.begin(), events_list.end());
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto begin = std::begin(events_list);
|
||||
const auto middle = std::next(begin, size);
|
||||
const auto end = std::end(events_list);
|
||||
auto begin = events_list.begin();
|
||||
auto middle = begin + current_size;
|
||||
auto end = events_list.end();
|
||||
|
||||
if (!sorted)
|
||||
std::stable_sort(begin, middle);
|
||||
@ -91,44 +119,54 @@ struct AggregateFunctionWindowFunnelData
|
||||
{
|
||||
if (!sorted)
|
||||
{
|
||||
std::stable_sort(std::begin(events_list), std::end(events_list));
|
||||
std::stable_sort(events_list.begin(), events_list.end());
|
||||
sorted = true;
|
||||
}
|
||||
}
|
||||
|
||||
void serialize(WriteBuffer & buf) const
|
||||
/// Param match_each_once indicates whether to write the unique_id.
|
||||
void serialize(WriteBuffer & buf, bool match_each_once) const
|
||||
{
|
||||
writeBinary(sorted, buf);
|
||||
writeBinary(events_list.size(), buf);
|
||||
|
||||
for (const auto & events : events_list)
|
||||
for (const auto & event : events_list)
|
||||
{
|
||||
writeBinary(events.first, buf);
|
||||
writeBinary(events.second, buf);
|
||||
writeBinary(event.timestamp, buf);
|
||||
writeBinary(event.event_type, buf);
|
||||
if (match_each_once)
|
||||
writeBinary(event.unique_id, buf);
|
||||
}
|
||||
}
|
||||
|
||||
void deserialize(ReadBuffer & buf)
|
||||
void deserialize(ReadBuffer & buf, bool match_each_once)
|
||||
{
|
||||
readBinary(sorted, buf);
|
||||
|
||||
size_t size;
|
||||
readBinary(size, buf);
|
||||
size_t events_size;
|
||||
readBinary(events_size, buf);
|
||||
|
||||
if (size > 100'000'000) /// The constant is arbitrary
|
||||
if (events_size > 100'000'000) /// Arbitrary limit to prevent excessive memory allocation
|
||||
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large size of the state of windowFunnel");
|
||||
|
||||
events_list.clear();
|
||||
events_list.reserve(size);
|
||||
events_list.reserve(events_size);
|
||||
|
||||
T timestamp;
|
||||
UInt8 event;
|
||||
UInt8 event_type;
|
||||
UInt64 unique_id;
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
for (size_t i = 0; i < events_size; ++i)
|
||||
{
|
||||
readBinary(timestamp, buf);
|
||||
readBinary(event, buf);
|
||||
events_list.emplace_back(timestamp, event);
|
||||
readBinary(event_type, buf);
|
||||
if (match_each_once)
|
||||
readBinary(unique_id, buf);
|
||||
else
|
||||
unique_id = next_unique_id;
|
||||
|
||||
events_list.emplace_back(timestamp, event_type, unique_id);
|
||||
next_unique_id = std::max(next_unique_id, unique_id + 1);
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -169,26 +207,32 @@ private:
|
||||
|
||||
data.sort();
|
||||
|
||||
/// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window
|
||||
std::vector<std::optional<std::pair<UInt64, UInt64>>> events_timestamp(events_size);
|
||||
/// Stores the timestamp of the first and last i-th level event happen within time window
|
||||
struct EventMatchTimeWindow
|
||||
{
|
||||
UInt64 first_timestamp;
|
||||
UInt64 last_timestamp;
|
||||
std::array<UInt64, MAX_EVENTS> event_path;
|
||||
|
||||
EventMatchTimeWindow() = default;
|
||||
EventMatchTimeWindow(UInt64 first_ts, UInt64 last_ts)
|
||||
: first_timestamp(first_ts), last_timestamp(last_ts) {}
|
||||
};
|
||||
|
||||
/// We track all possible event sequences up to the current event.
|
||||
/// It's required because one event can meet several conditions.
|
||||
/// For example: for events 'start', 'a', 'b', 'a', 'end'.
|
||||
/// The second occurrence of 'a' should be counted only once in one sequence.
|
||||
/// However, we do not know in advance if the next event will be 'b' or 'end', so we try to keep both paths.
|
||||
std::vector<std::list<EventMatchTimeWindow>> event_sequences(events_size);
|
||||
|
||||
bool has_first_event = false;
|
||||
size_t deduplicate_up_to = 0;
|
||||
size_t i = 0;
|
||||
for (; i < data.events_list.size(); ++i)
|
||||
for (size_t i = 0; i < data.events_list.size(); ++i)
|
||||
{
|
||||
const auto & current_event = data.events_list[i];
|
||||
while (deduplicate_up_to && i + 1 < data.events_list.size())
|
||||
{
|
||||
const auto & next_event = data.events_list[i + 1];
|
||||
if (next_event.second <= deduplicate_up_to && next_event.first == current_event.first && next_event.second == current_event.second + 1)
|
||||
++i;
|
||||
else
|
||||
break;
|
||||
}
|
||||
deduplicate_up_to = 0;
|
||||
|
||||
auto timestamp = data.events_list[i].first;
|
||||
Int64 event_idx = data.events_list[i].second - 1;
|
||||
auto timestamp = current_event.timestamp;
|
||||
Int64 event_idx = current_event.event_type - 1;
|
||||
UInt64 unique_id = current_event.unique_id;
|
||||
|
||||
if (strict_order && event_idx == -1)
|
||||
{
|
||||
@ -199,44 +243,66 @@ private:
|
||||
}
|
||||
else if (event_idx == 0)
|
||||
{
|
||||
events_timestamp[0] = std::make_pair(timestamp, timestamp);
|
||||
auto & event_seq = event_sequences[0].emplace_back(timestamp, timestamp);
|
||||
event_seq.event_path[0] = unique_id;
|
||||
has_first_event = true;
|
||||
}
|
||||
else if (strict_deduplication && events_timestamp[event_idx].has_value())
|
||||
else if (strict_deduplication && !event_sequences[event_idx].empty())
|
||||
{
|
||||
return data.events_list[i - 1].second;
|
||||
return data.events_list[i - 1].event_type;
|
||||
}
|
||||
else if (strict_order && has_first_event && !events_timestamp[event_idx - 1].has_value())
|
||||
else if (strict_order && has_first_event && event_sequences[event_idx - 1].empty())
|
||||
{
|
||||
for (size_t event = 0; event < events_timestamp.size(); ++event)
|
||||
for (size_t event = 0; event < event_sequences.size(); ++event)
|
||||
{
|
||||
if (!events_timestamp[event].has_value())
|
||||
if (event_sequences[event].empty())
|
||||
return event;
|
||||
}
|
||||
}
|
||||
else if (events_timestamp[event_idx - 1].has_value())
|
||||
else if (!event_sequences[event_idx - 1].empty())
|
||||
{
|
||||
auto prev_timestamp = events_timestamp[event_idx - 1]->first;
|
||||
bool time_matched = timestamp <= prev_timestamp + window;
|
||||
if (time_matched && strict_increase)
|
||||
auto & prev_level = event_sequences[event_idx - 1];
|
||||
for (auto it = prev_level.begin(); it != prev_level.end();)
|
||||
{
|
||||
time_matched = events_timestamp[event_idx - 1]->second < timestamp;
|
||||
if (events_timestamp[event_idx - 1]->second == timestamp)
|
||||
deduplicate_up_to = event_idx + 1;
|
||||
}
|
||||
auto first_ts = it->first_timestamp;
|
||||
bool time_matched = timestamp <= first_ts + window;
|
||||
if (!time_matched && prev_level.size() > 1)
|
||||
{
|
||||
// Remove old events that are out of the window, but keep at least one
|
||||
it = prev_level.erase(it);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (time_matched)
|
||||
{
|
||||
events_timestamp[event_idx] = std::make_pair(prev_timestamp, timestamp);
|
||||
if (event_idx + 1 == events_size)
|
||||
return events_size;
|
||||
auto prev_path = it->event_path;
|
||||
chassert(event_idx > 0);
|
||||
|
||||
/// Ensure the unique_id hasn't been used in the path already
|
||||
for (size_t j = 0; j < static_cast<size_t>(event_idx); ++j)
|
||||
{
|
||||
if (!time_matched)
|
||||
break;
|
||||
time_matched = prev_path[j] != unique_id;
|
||||
}
|
||||
|
||||
if (time_matched && strict_increase)
|
||||
time_matched = it->last_timestamp < timestamp;
|
||||
|
||||
if (time_matched)
|
||||
{
|
||||
prev_path[event_idx] = unique_id;
|
||||
auto & new_seq = event_sequences[event_idx].emplace_back(first_ts, timestamp);
|
||||
new_seq.event_path = std::move(prev_path);
|
||||
if (event_idx + 1 == events_size)
|
||||
return events_size;
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t event = events_timestamp.size(); event > 0; --event)
|
||||
for (size_t event = event_sequences.size(); event > 0; --event)
|
||||
{
|
||||
if (events_timestamp[event - 1].has_value())
|
||||
if (!event_sequences[event - 1].empty())
|
||||
return event;
|
||||
}
|
||||
return 0;
|
||||
@ -267,9 +333,9 @@ public:
|
||||
else if (option == "strict_increase")
|
||||
strict_increase = true;
|
||||
else if (option == "strict")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "strict is replaced with strict_deduplication in Aggregate function {}", getName());
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter 'strict' is replaced with 'strict_deduplication' in Aggregate function {}", getName());
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} doesn't support a parameter: {}", getName(), option);
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} doesn't support parameter: {}", getName(), option);
|
||||
}
|
||||
}
|
||||
|
||||
@ -279,11 +345,10 @@ public:
|
||||
{
|
||||
bool has_event = false;
|
||||
const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
|
||||
/// reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
|
||||
for (size_t i = events_size; i > 0; --i)
|
||||
for (size_t i = 1; i <= events_size; ++i)
|
||||
{
|
||||
UInt8 event = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
|
||||
if (event)
|
||||
UInt8 event_occurred = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
|
||||
if (event_occurred)
|
||||
{
|
||||
this->data(place).add(timestamp, i);
|
||||
has_event = true;
|
||||
@ -292,6 +357,9 @@ public:
|
||||
|
||||
if (strict_order && !has_event)
|
||||
this->data(place).add(timestamp, 0);
|
||||
|
||||
// Advance to the next unique event
|
||||
this->data(place).advanceId();
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
|
||||
@ -299,14 +367,24 @@ public:
|
||||
this->data(place).merge(this->data(rhs));
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
|
||||
/// Versioning for serialization
|
||||
/// Version 1 supports deduplication of the same event several times
|
||||
static constexpr auto MIN_REVISION_FOR_V1 = 54470;
|
||||
bool isVersioned() const override { return true; }
|
||||
size_t getDefaultVersion() const override { return 1; }
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
this->data(place).serialize(buf);
|
||||
return revision >= MIN_REVISION_FOR_V1 ? 1 : 0;
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
|
||||
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
|
||||
{
|
||||
this->data(place).deserialize(buf);
|
||||
this->data(place).serialize(buf, version.value_or(0));
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version, Arena *) const override
|
||||
{
|
||||
this->data(place).deserialize(buf, version.value_or(0));
|
||||
}
|
||||
|
||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||
@ -329,7 +407,7 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes &
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Aggregate function {} requires one timestamp argument and at least one event condition.", name);
|
||||
|
||||
if (arguments.size() > max_events + 1)
|
||||
if (arguments.size() > MAX_EVENTS + 1)
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Too many event arguments for aggregate function {}", name);
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
@ -352,7 +430,7 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes &
|
||||
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Illegal type {} of first argument of aggregate function {}, must "
|
||||
"be Unsigned Number, Date, DateTime", arguments.front().get()->getName(), name);
|
||||
"be Unsigned Number, Date, or DateTime", arguments.front().get()->getName(), name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,77 +1,177 @@
|
||||
-- { echoOn }
|
||||
drop table if exists funnel_test;
|
||||
create table funnel_test (timestamp UInt32, event UInt32) engine=Memory;
|
||||
insert into funnel_test values (0,1000),(1,1001),(2,1002),(3,1003),(4,1004),(5,1005),(6,1006),(7,1007),(8,1008);
|
||||
select 1 = windowFunnel(10000)(timestamp, event = 1000) from funnel_test;
|
||||
1
|
||||
select 2 = windowFunnel(10000)(timestamp, event = 1000, event = 1001) from funnel_test;
|
||||
1
|
||||
select 3 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002) from funnel_test;
|
||||
1
|
||||
select 4 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1008) from funnel_test;
|
||||
1
|
||||
select 1 = windowFunnel(1)(timestamp, event = 1000) from funnel_test;
|
||||
1
|
||||
select 3 = windowFunnel(2)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
|
||||
1
|
||||
select 4 = windowFunnel(3)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
|
||||
1
|
||||
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
|
||||
1
|
||||
drop table if exists funnel_test2;
|
||||
create table funnel_test2 (uid UInt32 default 1,timestamp DateTime, event UInt32) engine=Memory;
|
||||
insert into funnel_test2(timestamp, event) values ('2018-01-01 01:01:01',1001),('2018-01-01 01:01:02',1002),('2018-01-01 01:01:03',1003),('2018-01-01 01:01:04',1004),('2018-01-01 01:01:05',1005),('2018-01-01 01:01:06',1006),('2018-01-01 01:01:07',1007),('2018-01-01 01:01:08',1008);
|
||||
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test2;
|
||||
1
|
||||
select 2 = windowFunnel(10000)(timestamp, event = 1001, event = 1008) from funnel_test2;
|
||||
1
|
||||
select 1 = windowFunnel(10000)(timestamp, event = 1008, event = 1001) from funnel_test2;
|
||||
1
|
||||
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test2;
|
||||
1
|
||||
select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test2;
|
||||
1
|
||||
drop table if exists funnel_test_u64;
|
||||
create table funnel_test_u64 (uid UInt32 default 1,timestamp UInt64, event UInt32) engine=Memory;
|
||||
insert into funnel_test_u64(timestamp, event) values ( 1e14 + 1 ,1001),(1e14 + 2,1002),(1e14 + 3,1003),(1e14 + 4,1004),(1e14 + 5,1005),(1e14 + 6,1006),(1e14 + 7,1007),(1e14 + 8,1008);
|
||||
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64;
|
||||
1
|
||||
select 2 = windowFunnel(10000)(timestamp, event = 1001, event = 1008) from funnel_test_u64;
|
||||
1
|
||||
select 1 = windowFunnel(10000)(timestamp, event = 1008, event = 1001) from funnel_test_u64;
|
||||
1
|
||||
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64;
|
||||
1
|
||||
select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test_u64;
|
||||
1
|
||||
drop table if exists funnel_test_strict;
|
||||
create table funnel_test_strict (timestamp UInt32, event UInt32) engine=Memory;
|
||||
insert into funnel_test_strict values (00,1000),(10,1001),(20,1002),(30,1003),(40,1004),(50,1005),(51,1005),(60,1006),(70,1007),(80,1008);
|
||||
select 6 = windowFunnel(10000, 'strict_deduplication')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
|
||||
1
|
||||
select 7 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
|
||||
1
|
||||
drop table funnel_test;
|
||||
drop table funnel_test2;
|
||||
drop table funnel_test_u64;
|
||||
drop table funnel_test_strict;
|
||||
drop table if exists funnel_test_strict_order;
|
||||
create table funnel_test_strict_order (dt DateTime, user int, event String) engine = MergeTree() partition by dt order by user;
|
||||
insert into funnel_test_strict_order values (1, 1, 'a') (2, 1, 'b') (3, 1, 'c');
|
||||
insert into funnel_test_strict_order values (1, 2, 'a') (2, 2, 'd') (3, 2, 'b') (4, 2, 'c');
|
||||
insert into funnel_test_strict_order values (1, 3, 'a') (2, 3, 'a') (3, 3, 'b') (4, 3, 'b') (5, 3, 'c') (6, 3, 'c');
|
||||
insert into funnel_test_strict_order values (1, 4, 'a') (2, 4, 'a') (3, 4, 'a') (4, 4, 'a') (5, 4, 'b') (6, 4, 'b') (7, 4, 'c') (8, 4, 'c');
|
||||
insert into funnel_test_strict_order values (1, 5, 'a') (2, 5, 'a') (3, 5, 'b') (4, 5, 'b') (5, 5, 'd') (6, 5, 'c') (7, 5, 'c');
|
||||
insert into funnel_test_strict_order values (1, 6, 'c') (2, 6, 'c') (3, 6, 'b') (4, 6, 'b') (5, 6, 'a') (6, 6, 'a');
|
||||
select user, windowFunnel(86400)(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
|
||||
[1, 3]
|
||||
[2, 3]
|
||||
[3, 3]
|
||||
[4, 3]
|
||||
[5, 3]
|
||||
[6, 1]
|
||||
select user, windowFunnel(86400, 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
|
||||
[1, 3]
|
||||
[2, 1]
|
||||
[3, 3]
|
||||
[4, 3]
|
||||
[5, 2]
|
||||
[6, 1]
|
||||
select user, windowFunnel(86400, 'strict_deduplication', 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
|
||||
[1, 3]
|
||||
[2, 1]
|
||||
[3, 2]
|
||||
[4, 2]
|
||||
[5, 2]
|
||||
[6, 1]
|
||||
insert into funnel_test_strict_order values (1, 7, 'a') (2, 7, 'c') (3, 7, 'b');
|
||||
select user, windowFunnel(10, 'strict_order')(dt, event = 'a', event = 'b', event = 'c') as s from funnel_test_strict_order where user = 7 group by user format JSONCompactEachRow;
|
||||
[7, 1]
|
||||
drop table funnel_test_strict_order;
|
||||
--https://github.com/ClickHouse/ClickHouse/issues/27469
|
||||
drop table if exists strict_BiteTheDDDD;
|
||||
create table strict_BiteTheDDDD (ts UInt64, event String) engine = Log();
|
||||
insert into strict_BiteTheDDDD values (1,'a') (2,'b') (3,'c') (4,'b') (5,'d');
|
||||
select 3 = windowFunnel(86400, 'strict_deduplication')(ts, event='a', event='b', event='c', event='d') from strict_BiteTheDDDD format JSONCompactEachRow;
|
||||
[1]
|
||||
drop table strict_BiteTheDDDD;
|
||||
drop table if exists funnel_test_non_null;
|
||||
create table funnel_test_non_null (`dt` DateTime, `u` int, `a` Nullable(String), `b` Nullable(String)) engine = MergeTree() partition by dt order by u;
|
||||
insert into funnel_test_non_null values (1, 1, 'a1', 'b1') (2, 1, 'a2', 'b2');
|
||||
insert into funnel_test_non_null values (1, 2, 'a1', null) (2, 2, 'a2', null);
|
||||
insert into funnel_test_non_null values (1, 3, null, null);
|
||||
insert into funnel_test_non_null values (1, 4, null, 'b1') (2, 4, 'a2', null) (3, 4, null, 'b3');
|
||||
select u, windowFunnel(86400)(dt, COALESCE(a, '') = 'a1', COALESCE(a, '') = 'a2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
|
||||
[1, 2]
|
||||
[2, 2]
|
||||
[3, 0]
|
||||
[4, 0]
|
||||
select u, windowFunnel(86400)(dt, COALESCE(a, '') = 'a1', COALESCE(b, '') = 'b2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
|
||||
[1, 2]
|
||||
[2, 1]
|
||||
[3, 0]
|
||||
[4, 0]
|
||||
select u, windowFunnel(86400)(dt, a is null and b is null) as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
|
||||
[1, 0]
|
||||
[2, 0]
|
||||
[3, 1]
|
||||
[4, 0]
|
||||
select u, windowFunnel(86400)(dt, a is null, COALESCE(b, '') = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
|
||||
[1, 0]
|
||||
[2, 0]
|
||||
[3, 1]
|
||||
[4, 2]
|
||||
select u, windowFunnel(86400, 'strict_order')(dt, a is null, COALESCE(b, '') = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
|
||||
[1, 0]
|
||||
[2, 0]
|
||||
[3, 1]
|
||||
[4, 1]
|
||||
drop table funnel_test_non_null;
|
||||
create table funnel_test_strict_increase (timestamp UInt32, event UInt32) engine=Memory;
|
||||
insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,1003),(2,1004);
|
||||
select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
|
||||
1
|
||||
select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
|
||||
1
|
||||
select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
|
||||
1
|
||||
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
|
||||
1
|
||||
DROP TABLE IF EXISTS funnel_test2;
|
||||
create table funnel_test2 (event_ts UInt32, result String, uid UInt32) engine=Memory;
|
||||
insert into funnel_test2 SELECT data.1, data.2, data.3 FROM (
|
||||
SELECT arrayJoin([
|
||||
(100, 'failure', 234),
|
||||
(200, 'success', 345),
|
||||
(210, 'failure', 345),
|
||||
(230, 'success', 345),
|
||||
(250, 'failure', 234),
|
||||
(180, 'failure', 123),
|
||||
(220, 'failure', 123),
|
||||
(250, 'success', 123)
|
||||
]) data);
|
||||
SELECT '-';
|
||||
-
|
||||
SELECT uid, windowFunnel(200, 'strict_increase')( toUInt32(event_ts), result='failure', result='failure', result='success' )
|
||||
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
|
||||
123 3
|
||||
234 2
|
||||
345 1
|
||||
SELECT '-';
|
||||
-
|
||||
SELECT uid, windowFunnel(200)( toUInt32(event_ts), result='failure', result='failure', result='success' )
|
||||
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
|
||||
123 3
|
||||
234 2
|
||||
345 3
|
||||
345 1
|
||||
SELECT '-';
|
||||
-
|
||||
123 1
|
||||
234 1
|
||||
345 3
|
||||
SELECT uid, windowFunnel(200, 'strict_deduplication')( toUInt32(event_ts), result='failure', result='failure', result='success' )
|
||||
FROM funnel_test2 WHERE event_ts >= 0 AND event_ts <= 300 GROUP BY uid ORDER BY uid;
|
||||
123 3
|
||||
234 2
|
||||
345 1
|
||||
SELECT '-';
|
||||
-
|
||||
DROP TABLE IF EXISTS funnel_test2;
|
||||
drop table funnel_test_strict_increase;
|
||||
|
@ -1,3 +1,4 @@
|
||||
-- { echoOn }
|
||||
drop table if exists funnel_test;
|
||||
|
||||
create table funnel_test (timestamp UInt32, event UInt32) engine=Memory;
|
||||
@ -92,7 +93,7 @@ insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,100
|
||||
|
||||
select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
|
||||
select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
|
||||
select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
|
||||
select 1 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
|
||||
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user