Merge pull request #9938 from achimbab/pr_windowFunnel

Fix bugs in windowFunnel()
This commit is contained in:
alexey-milovidov 2020-03-31 22:44:07 +03:00 committed by GitHub
commit 8dffb62b47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 73 additions and 9 deletions

View File

@ -51,6 +51,10 @@ public:
if (!has_nullable_types)
throw Exception("Aggregate function combinator 'Null' requires at least one argument to be Nullable", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (nested_function)
if (auto adapter = nested_function->getOwnNullAdapter(nested_function, arguments, params))
return adapter;
/// Special case for 'count' function. It could be called with Nullable arguments
/// - that means - count number of calls, when all arguments are not NULL.
if (nested_function && nested_function->getName() == "count")
@ -71,9 +75,9 @@ public:
else
{
if (return_type_is_nullable)
return std::make_shared<AggregateFunctionNullVariadic<true>>(nested_function, arguments, params);
return std::make_shared<AggregateFunctionNullVariadic<true, true>>(nested_function, arguments, params);
else
return std::make_shared<AggregateFunctionNullVariadic<false>>(nested_function, arguments, params);
return std::make_shared<AggregateFunctionNullVariadic<false, true>>(nested_function, arguments, params);
}
}
};

View File

@ -204,12 +204,12 @@ public:
};
template <bool result_is_nullable>
class AggregateFunctionNullVariadic final : public AggregateFunctionNullBase<result_is_nullable, AggregateFunctionNullVariadic<result_is_nullable>>
template <bool result_is_nullable, bool null_is_skipped>
class AggregateFunctionNullVariadic final : public AggregateFunctionNullBase<result_is_nullable, AggregateFunctionNullVariadic<result_is_nullable, null_is_skipped>>
{
public:
AggregateFunctionNullVariadic(AggregateFunctionPtr nested_function_, const DataTypes & arguments, const Array & params)
: AggregateFunctionNullBase<result_is_nullable, AggregateFunctionNullVariadic<result_is_nullable>>(std::move(nested_function_), arguments, params),
: AggregateFunctionNullBase<result_is_nullable, AggregateFunctionNullVariadic<result_is_nullable, null_is_skipped>>(std::move(nested_function_), arguments, params),
number_of_arguments(arguments.size())
{
if (number_of_arguments == 1)
@ -233,7 +233,7 @@ public:
if (is_nullable[i])
{
const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]);
if (nullable_col.isNullAt(row_num))
if (null_is_skipped && nullable_col.isNullAt(row_num))
{
/// If at least one column has a null value in the current row,
/// we don't process this row.

View File

@ -11,7 +11,7 @@
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionNull.h>
namespace DB
{
@ -186,6 +186,14 @@ private:
{
return event_idx + 1;
}
else if (strict_order && first_event && events_timestamp[event_idx - 1] == -1)
{
for (size_t event = 0; event < events_timestamp.size(); ++event)
{
if (events_timestamp[event] == -1)
return event;
}
}
else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window)
{
events_timestamp[event_idx] = events_timestamp[event_idx - 1];
@ -232,6 +240,11 @@ public:
return std::make_shared<DataTypeUInt8>();
}
AggregateFunctionPtr getOwnNullAdapter(const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array & params) const override
{
return std::make_shared<AggregateFunctionNullVariadic<false, false>>(nested_function, arguments, params);
}
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
{
bool has_event = false;

View File

@ -31,6 +31,8 @@ using DataTypes = std::vector<DataTypePtr>;
using AggregateDataPtr = char *;
using ConstAggregateDataPtr = const char *;
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
/** Aggregate functions interface.
* Instances of classes with this interface do not contain the data itself for aggregation,
@ -149,6 +151,17 @@ public:
virtual void addBatchArray(
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena) const = 0;
/** By default all NULLs are skipped during aggregation.
* If it returns nullptr, the default one will be used.
* If an aggregate function wants to use something instead of the default one, it overrides this function and returns its own null adapter.
* nested_function is a smart pointer to this aggregate function itself.
* arguments and params are for nested_function.
*/
virtual AggregateFunctionPtr getOwnNullAdapter(const AggregateFunctionPtr & /*nested_function*/, const DataTypes & /*arguments*/, const Array & /*params*/) const
{
return nullptr;
}
const DataTypes & getArgumentTypes() const { return argument_types; }
const Array & getParameters() const { return parameters; }
@ -244,6 +257,4 @@ public:
};
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
}

View File

@ -36,3 +36,24 @@
[4, 2]
[5, 2]
[6, 1]
[7, 1]
[1, 2]
[2, 2]
[3, 0]
[4, 0]
[1, 2]
[2, 1]
[3, 0]
[4, 0]
[1, 0]
[2, 0]
[3, 1]
[4, 0]
[1, 0]
[2, 0]
[3, 1]
[4, 2]
[1, 0]
[2, 0]
[3, 1]
[4, 1]

View File

@ -63,4 +63,19 @@ insert into funnel_test_strict_order values (1, 6, 'c') (2, 6, 'c') (3, 6, 'b')
select user, windowFunnel(86400)(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
select user, windowFunnel(86400, 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
select user, windowFunnel(86400, 'strict', 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
insert into funnel_test_strict_order values (1, 7, 'a') (2, 7, 'c') (3, 7, 'b');
select user, windowFunnel(10, 'strict_order')(dt, event = 'a', event = 'b', event = 'c') as s from funnel_test_strict_order where user = 7 group by user format JSONCompactEachRow;
drop table funnel_test_strict_order;
drop table if exists funnel_test_non_null;
create table funnel_test_non_null (`dt` DateTime, `u` int, `a` Nullable(String), `b` Nullable(String)) engine = MergeTree() partition by dt order by u;
insert into funnel_test_non_null values (1, 1, 'a1', 'b1') (2, 1, 'a2', 'b2');
insert into funnel_test_non_null values (1, 2, 'a1', null) (2, 2, 'a2', null);
insert into funnel_test_non_null values (1, 3, null, null);
insert into funnel_test_non_null values (1, 4, null, 'b1') (2, 4, 'a2', null) (3, 4, null, 'b3');
select u, windowFunnel(86400)(dt, a = 'a1', a = 'a2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
select u, windowFunnel(86400)(dt, a = 'a1', b = 'b2') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
select u, windowFunnel(86400)(dt, a is null and b is null) as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
select u, windowFunnel(86400)(dt, a is null, b = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
select u, windowFunnel(86400, 'strict_order')(dt, a is null, b = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
drop table funnel_test_non_null;