2018-05-12 15:49:17 +00:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <iostream>
|
2018-05-13 09:36:51 +00:00
|
|
|
#include <sstream>
|
|
|
|
#include <unordered_set>
|
2018-05-12 15:49:17 +00:00
|
|
|
#include <Columns/ColumnsNumber.h>
|
2018-05-13 09:36:51 +00:00
|
|
|
#include <DataTypes/DataTypeDateTime.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
2018-05-12 15:49:17 +00:00
|
|
|
#include <Common/ArenaAllocator.h>
|
2018-05-13 09:36:51 +00:00
|
|
|
#include <ext/range.h>
|
2018-05-12 15:49:17 +00:00
|
|
|
|
|
|
|
#include <AggregateFunctions/IAggregateFunction.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
2018-11-22 21:19:58 +00:00
|
|
|
|
2018-05-12 15:49:17 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
|
|
|
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
|
2018-11-22 21:19:58 +00:00
|
|
|
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
2018-05-12 15:49:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
struct ComparePairFirst final
|
|
|
|
{
|
|
|
|
template <typename T1, typename T2>
|
|
|
|
bool operator()(const std::pair<T1, T2> & lhs, const std::pair<T1, T2> & rhs) const
|
|
|
|
{
|
|
|
|
return lhs.first < rhs.first;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
struct AggregateFunctionWindowFunnelData
|
|
|
|
{
|
|
|
|
static constexpr auto max_events = 32;
|
2018-05-13 08:18:35 +00:00
|
|
|
using TimestampEvent = std::pair<UInt32, UInt8>;
|
2018-05-12 15:49:17 +00:00
|
|
|
|
|
|
|
static constexpr size_t bytes_on_stack = 64;
|
2018-05-13 09:36:51 +00:00
|
|
|
using TimestampEvents = PODArray<TimestampEvent, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>;
|
|
|
|
|
2018-05-12 15:49:17 +00:00
|
|
|
using Comparator = ComparePairFirst;
|
|
|
|
|
|
|
|
bool sorted = true;
|
|
|
|
TimestampEvents events_list;
|
|
|
|
|
|
|
|
size_t size() const
|
|
|
|
{
|
|
|
|
return events_list.size();
|
|
|
|
}
|
|
|
|
|
2018-05-13 08:18:35 +00:00
|
|
|
void add(UInt32 timestamp, UInt8 event)
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
|
|
|
// Since most events should have already been sorted by timestamp.
|
|
|
|
if (sorted && events_list.size() > 0 && events_list.back().first > timestamp)
|
|
|
|
sorted = false;
|
|
|
|
events_list.emplace_back(timestamp, event);
|
|
|
|
}
|
|
|
|
|
|
|
|
void merge(const AggregateFunctionWindowFunnelData & other)
|
|
|
|
{
|
|
|
|
const auto size = events_list.size();
|
|
|
|
|
|
|
|
events_list.insert(std::begin(other.events_list), std::end(other.events_list));
|
|
|
|
|
|
|
|
/// either sort whole container or do so partially merging ranges afterwards
|
|
|
|
if (!sorted && !other.sorted)
|
2018-08-04 05:42:09 +00:00
|
|
|
std::stable_sort(std::begin(events_list), std::end(events_list), Comparator{});
|
2018-05-12 15:49:17 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
const auto begin = std::begin(events_list);
|
|
|
|
const auto middle = std::next(begin, size);
|
|
|
|
const auto end = std::end(events_list);
|
|
|
|
|
|
|
|
if (!sorted)
|
2018-08-04 05:42:09 +00:00
|
|
|
std::stable_sort(begin, middle, Comparator{});
|
2018-05-12 15:49:17 +00:00
|
|
|
|
|
|
|
if (!other.sorted)
|
2018-08-04 05:42:09 +00:00
|
|
|
std::stable_sort(middle, end, Comparator{});
|
2018-05-12 15:49:17 +00:00
|
|
|
|
|
|
|
std::inplace_merge(begin, middle, end, Comparator{});
|
|
|
|
}
|
|
|
|
|
|
|
|
sorted = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void sort()
|
|
|
|
{
|
|
|
|
if (!sorted)
|
2018-05-13 09:31:13 +00:00
|
|
|
{
|
2018-08-04 05:42:09 +00:00
|
|
|
std::stable_sort(std::begin(events_list), std::end(events_list), Comparator{});
|
2018-05-12 15:49:17 +00:00
|
|
|
sorted = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void serialize(WriteBuffer & buf) const
|
|
|
|
{
|
|
|
|
writeBinary(sorted, buf);
|
|
|
|
writeBinary(events_list.size(), buf);
|
|
|
|
|
|
|
|
for (const auto & events : events_list)
|
|
|
|
{
|
|
|
|
writeBinary(events.first, buf);
|
|
|
|
writeBinary(events.second, buf);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void deserialize(ReadBuffer & buf)
|
|
|
|
{
|
|
|
|
readBinary(sorted, buf);
|
|
|
|
|
|
|
|
size_t size;
|
|
|
|
readBinary(size, buf);
|
2018-05-13 09:36:51 +00:00
|
|
|
|
2018-05-13 09:31:13 +00:00
|
|
|
/// TODO Protection against huge size
|
2018-05-12 15:49:17 +00:00
|
|
|
|
|
|
|
events_list.clear();
|
2018-07-27 07:19:29 +00:00
|
|
|
events_list.reserve(size);
|
2018-05-12 15:49:17 +00:00
|
|
|
|
2018-05-13 08:18:35 +00:00
|
|
|
UInt32 timestamp;
|
2018-05-12 15:49:17 +00:00
|
|
|
UInt8 event;
|
|
|
|
|
|
|
|
for (size_t i = 0; i < size; ++i)
|
|
|
|
{
|
|
|
|
readBinary(timestamp, buf);
|
|
|
|
readBinary(event, buf);
|
|
|
|
events_list.emplace_back(timestamp, event);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
/** Calculates the max event level in a sliding window.
|
|
|
|
* The max size of events is 32, that's enough for funnel analytics
|
|
|
|
*
|
|
|
|
* Usage:
|
2018-05-13 08:18:35 +00:00
|
|
|
* - windowFunnel(window)(timestamp, cond1, cond2, cond3, ....)
|
2018-05-12 15:49:17 +00:00
|
|
|
*/
|
2018-05-13 09:36:51 +00:00
|
|
|
class AggregateFunctionWindowFunnel final
|
|
|
|
: public IAggregateFunctionDataHelper<AggregateFunctionWindowFunnelData, AggregateFunctionWindowFunnel>
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
|
|
|
private:
|
2018-05-13 08:18:35 +00:00
|
|
|
UInt32 window;
|
2018-05-13 09:36:51 +00:00
|
|
|
UInt8 events_size;
|
2018-05-12 15:49:17 +00:00
|
|
|
|
|
|
|
|
2018-05-13 08:18:35 +00:00
|
|
|
// Loop through the entire events_list, update the event timestamp value
|
|
|
|
// The level path must be 1---2---3---...---check_events_size, find the max event level that statisfied the path in the sliding window.
|
|
|
|
// If found, returns the max event level, else return 0.
|
2018-05-12 15:49:17 +00:00
|
|
|
// The Algorithm complexity is O(n).
|
2018-05-13 08:18:35 +00:00
|
|
|
UInt8 getEventLevel(const AggregateFunctionWindowFunnelData & data) const
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
2018-05-13 09:31:13 +00:00
|
|
|
if (data.size() == 0)
|
|
|
|
return 0;
|
2018-05-13 08:18:35 +00:00
|
|
|
if (events_size == 1)
|
2018-05-12 15:49:17 +00:00
|
|
|
return 1;
|
|
|
|
|
|
|
|
const_cast<AggregateFunctionWindowFunnelData &>(data).sort();
|
|
|
|
|
2018-05-13 10:05:34 +00:00
|
|
|
// events_timestamp stores the timestamp that latest i-th level event happen withing time window after previous level event.
|
2018-05-13 08:18:35 +00:00
|
|
|
// timestamp defaults to -1, which unsigned timestamp value never meet
|
|
|
|
std::vector<Int32> events_timestamp(events_size, -1);
|
2018-05-13 10:05:34 +00:00
|
|
|
for (const auto & pair : data.events_list)
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
2018-05-13 10:05:34 +00:00
|
|
|
const auto & timestamp = pair.first;
|
|
|
|
const auto & event_idx = pair.second - 1;
|
2018-05-13 09:36:51 +00:00
|
|
|
if (event_idx == 0)
|
2018-05-12 15:49:17 +00:00
|
|
|
events_timestamp[0] = timestamp;
|
2018-05-13 09:36:51 +00:00
|
|
|
else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window)
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
2018-05-13 08:18:35 +00:00
|
|
|
events_timestamp[event_idx] = events_timestamp[event_idx - 1];
|
2018-05-13 09:36:51 +00:00
|
|
|
if (event_idx + 1 == events_size)
|
|
|
|
return events_size;
|
2018-05-12 15:49:17 +00:00
|
|
|
}
|
|
|
|
}
|
2018-05-13 09:36:51 +00:00
|
|
|
for (size_t event = events_timestamp.size(); event > 0; --event)
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
2018-05-13 09:36:51 +00:00
|
|
|
if (events_timestamp[event - 1] >= 0)
|
|
|
|
return event;
|
2018-05-12 15:49:17 +00:00
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
public:
|
2018-05-13 09:36:51 +00:00
|
|
|
String getName() const override
|
|
|
|
{
|
|
|
|
return "windowFunnel";
|
|
|
|
}
|
2018-05-12 15:49:17 +00:00
|
|
|
|
|
|
|
AggregateFunctionWindowFunnel(const DataTypes & arguments, const Array & params)
|
2019-02-11 19:26:32 +00:00
|
|
|
: IAggregateFunctionDataHelper<AggregateFunctionWindowFunnelData, AggregateFunctionWindowFunnel>(arguments, params)
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
2018-05-13 08:18:35 +00:00
|
|
|
const auto time_arg = arguments.front().get();
|
2018-09-10 17:09:07 +00:00
|
|
|
if (!WhichDataType(time_arg).isDateTime() && !WhichDataType(time_arg).isUInt32())
|
2018-05-13 09:36:51 +00:00
|
|
|
throw Exception{"Illegal type " + time_arg->getName() + " of first argument of aggregate function " + getName()
|
2018-11-22 21:19:58 +00:00
|
|
|
+ ", must be DateTime or UInt32", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
2018-05-13 09:36:51 +00:00
|
|
|
|
|
|
|
for (const auto i : ext::range(1, arguments.size()))
|
|
|
|
{
|
|
|
|
auto cond_arg = arguments[i].get();
|
2018-09-10 17:09:07 +00:00
|
|
|
if (!isUInt8(cond_arg))
|
2018-05-13 09:36:51 +00:00
|
|
|
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) + " of aggregate function "
|
|
|
|
+ getName() + ", must be UInt8",
|
|
|
|
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
|
|
|
}
|
|
|
|
|
2018-05-13 08:18:35 +00:00
|
|
|
events_size = arguments.size() - 1;
|
2018-05-13 10:05:34 +00:00
|
|
|
window = params.at(0).safeGet<UInt64>();
|
2018-05-12 15:49:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
DataTypePtr getReturnType() const override
|
|
|
|
{
|
|
|
|
return std::make_shared<DataTypeUInt8>();
|
|
|
|
}
|
|
|
|
|
|
|
|
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
|
|
|
|
{
|
2018-08-04 05:42:09 +00:00
|
|
|
const auto timestamp = static_cast<const ColumnVector<UInt32> *>(columns[0])->getData()[row_num];
|
|
|
|
// reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
|
|
|
|
for (auto i = events_size; i > 0; --i)
|
2018-05-12 15:49:17 +00:00
|
|
|
{
|
2018-05-13 09:36:51 +00:00
|
|
|
auto event = static_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
|
|
|
|
if (event)
|
2018-08-04 05:42:09 +00:00
|
|
|
this->data(place).add(timestamp, i);
|
2018-05-13 08:18:35 +00:00
|
|
|
}
|
2018-05-12 15:49:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
|
|
|
|
{
|
|
|
|
this->data(place).merge(this->data(rhs));
|
|
|
|
}
|
|
|
|
|
|
|
|
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
|
|
|
{
|
|
|
|
this->data(place).serialize(buf);
|
|
|
|
}
|
|
|
|
|
|
|
|
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
|
|
|
{
|
|
|
|
this->data(place).deserialize(buf);
|
|
|
|
}
|
|
|
|
|
|
|
|
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
|
|
|
{
|
2018-05-13 08:18:35 +00:00
|
|
|
static_cast<ColumnUInt8 &>(to).getData().push_back(getEventLevel(this->data(place)));
|
2018-05-12 15:49:17 +00:00
|
|
|
}
|
|
|
|
|
2018-05-13 09:36:51 +00:00
|
|
|
const char * getHeaderFilePath() const override
|
|
|
|
{
|
|
|
|
return __FILE__;
|
|
|
|
}
|
2018-05-12 15:49:17 +00:00
|
|
|
};
|
2018-05-13 10:05:34 +00:00
|
|
|
|
2018-05-12 15:49:17 +00:00
|
|
|
}
|