Implement sequenceNextNode

This commit is contained in:
achimbab 2021-01-20 19:17:34 +09:00
parent b77e3a485b
commit fce1ca255d
8 changed files with 583 additions and 3 deletions

View File

@ -156,10 +156,17 @@ public:
{
ColumnNullable & to_concrete = assert_cast<ColumnNullable &>(to);
if (getFlag(place))
{
if (unlikely(nested_function->doesInsertResultNeedNullableColumn()))
{
nested_function->insertResultInto(nestedPlace(place), to_concrete, arena);
}
else
{
nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
to_concrete.getNullMapData().push_back(0);
}
}
else
{
to_concrete.insertDefault();

View File

@ -0,0 +1,102 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionSequenceNextNode.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <ext/range.h>
#include "registerAggregateFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
namespace
{
template <typename TYPE>
inline AggregateFunctionPtr createAggregateFunctionSequenceNextNodeImpl(const DataTypePtr data_type, const DataTypes & argument_types, bool descending_order)
{
if (descending_order)
return std::make_shared<SequenceNextNodeImpl<TYPE, NodeString, true>>(data_type, argument_types);
else
return std::make_shared<SequenceNextNodeImpl<TYPE, NodeString, false>>(data_type, argument_types);
}
AggregateFunctionPtr createAggregateFunctionSequenceNextNode(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
bool descending_order = false;
if (parameters.size() == 1)
{
auto type = parameters[0].getType();
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception("The first parameter for aggregate function " + name + " should be 0 or 1", ErrorCodes::BAD_ARGUMENTS);
descending_order = parameters[0].get<UInt64>();
}
else
throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 1",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (argument_types.size() < 3)
throw Exception("Aggregate function " + name + " requires at least three arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
else if (argument_types.size() > 2 + 64)
throw Exception("Aggregate function " + name + " requires at most 66(timestamp, value_column, 64 events) arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (const auto i : ext::range(2, argument_types.size()))
{
const auto * cond_arg = argument_types[i].get();
if (!isUInt8(cond_arg))
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) + " of aggregate function "
+ name + ", must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
if (WhichDataType(argument_types[1].get()).idx != TypeIndex::String)
throw Exception{"Illegal type " + argument_types.front().get()->getName()
+ " of second argument of aggregate function " + name + ", must be String",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
DataTypePtr data_type;
if (typeid_cast<const DataTypeNullable *>(argument_types[1].get()))
data_type = argument_types[1];
else
data_type = std::make_shared<DataTypeNullable>(argument_types[1]);
WhichDataType timestamp_type(argument_types[0].get());
if (timestamp_type.idx == TypeIndex::UInt8)
return createAggregateFunctionSequenceNextNodeImpl<UInt8>(data_type, argument_types, descending_order);
if (timestamp_type.idx == TypeIndex::UInt16)
return createAggregateFunctionSequenceNextNodeImpl<UInt16>(data_type, argument_types, descending_order);
if (timestamp_type.idx == TypeIndex::UInt32)
return createAggregateFunctionSequenceNextNodeImpl<UInt32>(data_type, argument_types, descending_order);
if (timestamp_type.idx == TypeIndex::UInt64)
return createAggregateFunctionSequenceNextNodeImpl<UInt64>(data_type, argument_types, descending_order);
if (timestamp_type.isDate())
return createAggregateFunctionSequenceNextNodeImpl<DataTypeDate::FieldType>(data_type, argument_types, descending_order);
if (timestamp_type.isDateTime())
return createAggregateFunctionSequenceNextNodeImpl<DataTypeDateTime::FieldType>(data_type, argument_types, descending_order);
throw Exception{"Illegal type " + argument_types.front().get()->getName()
+ " of first argument of aggregate function " + name + ", must be Unsigned Number, Date, DateTime",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
}
void registerAggregateFunctionSequenceNextNode(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = false };
factory.registerFunction("sequenceNextNode", { createAggregateFunctionSequenceNextNode, properties });
}
}

View File

@ -0,0 +1,298 @@
#pragma once
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnNullable.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <type_traits>
namespace DB
{
template <typename Node>
struct NodeBase
{
UInt64 size; // size of payload
DataTypeDateTime::FieldType event_time;
UInt64 events_bitmap;
/// Returns pointer to actual payload
char * data() { return reinterpret_cast<char *>(this) + sizeof(Node); }
const char * data() const { return reinterpret_cast<const char *>(this) + sizeof(Node); }
/// Clones existing node (does not modify next field)
Node * clone(Arena * arena) const
{
return reinterpret_cast<Node *>(
const_cast<char *>(arena->alignedInsert(reinterpret_cast<const char *>(this), sizeof(Node) + size, alignof(Node))));
}
/// Write node to buffer
void write(WriteBuffer & buf) const
{
writeVarUInt(size, buf);
buf.write(data(), size);
}
/// Reads and allocates node from ReadBuffer's data (doesn't set next)
static Node * read(ReadBuffer & buf, Arena * arena)
{
UInt64 size;
readVarUInt(size, buf);
Node * node = reinterpret_cast<Node *>(arena->alignedAlloc(sizeof(Node) + size, alignof(Node)));
node->size = size;
buf.read(node->data(), size);
return node;
}
};
struct NodeString : public NodeBase<NodeString>
{
using Node = NodeString;
/// Create node from string
static Node * allocate(const IColumn & column, size_t row_num, Arena * arena)
{
StringRef string = assert_cast<const ColumnString &>(column).getDataAt(row_num);
Node * node = reinterpret_cast<Node *>(arena->alignedAlloc(sizeof(Node) + string.size, alignof(Node)));
node->size = string.size;
memcpy(node->data(), string.data, string.size);
return node;
}
void insertInto(IColumn & column)
{
assert_cast<ColumnString &>(column).insertData(data(), size);
}
};
template <typename T, typename Node, bool Descending>
struct SequenceNextNodeGeneralData
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(Node *), 4096>;
using Array = PODArray<Node *, 32, Allocator>;
Array value;
bool sorted = false;
struct Comparator final
{
bool operator()(const Node * lhs, const Node * rhs) const
{
if (Descending)
return lhs->event_time == rhs->event_time ?
lhs->events_bitmap < rhs->events_bitmap: lhs->event_time > rhs->event_time;
else
return lhs->event_time == rhs->event_time ?
lhs->events_bitmap < rhs->events_bitmap : lhs->event_time < rhs->event_time;
}
};
void sort()
{
if (!sorted)
{
std::stable_sort(std::begin(value), std::end(value), Comparator{});
sorted = true;
}
}
};
/// Implementation of groupArray for String or any ComplexObject via Array
template <typename T, typename Node, bool Descending>
class SequenceNextNodeImpl final
: public IAggregateFunctionDataHelper<SequenceNextNodeGeneralData<T, Node, Descending>, SequenceNextNodeImpl<T, Node, Descending>>
{
using Data = SequenceNextNodeGeneralData<T, Node, Descending>;
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data *>(place); }
DataTypePtr & data_type;
UInt8 events_size;
UInt64 max_elems;
public:
SequenceNextNodeImpl(const DataTypePtr & data_type_, const DataTypes & arguments, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<SequenceNextNodeGeneralData<T, Node, Descending>, SequenceNextNodeImpl<T, Node, Descending>>(
{data_type_}, {})
, data_type(this->argument_types[0])
, events_size(arguments.size() - 2)
, max_elems(max_elems_)
{
}
String getName() const override { return "sequenceNextNode"; }
DataTypePtr getReturnType() const override { return data_type; }
void insert(Data & a, const Node * v, Arena * arena) const
{
++a.total_values;
a.value.push_back(v->clone(arena), arena);
}
void create(AggregateDataPtr place) const override
{
[[maybe_unused]] auto a = new (place) Data;
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
Node * node = Node::allocate(*columns[1], row_num, arena);
const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
UInt64 events_bitmap = 0;
for (UInt8 i = 0; i < events_size; ++i)
if (assert_cast<const ColumnVector<UInt8> *>(columns[2 + i])->getData()[row_num])
events_bitmap += (1 << i);
node->event_time = timestamp;
node->events_bitmap = events_bitmap;
data(place).value.push_back(node, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
if (data(rhs).value.empty()) /// rhs state is empty
return;
UInt64 new_elems;
if (data(place).value.size() >= max_elems)
return;
new_elems = std::min(data(rhs).value.size(), static_cast<size_t>(max_elems) - data(place).value.size());
auto & a = data(place).value;
const auto size = a.size();
auto & b = data(rhs).value;
for (UInt64 i = 0; i < new_elems; ++i)
a.push_back(b[i]->clone(arena), arena);
using Comparator = typename SequenceNextNodeGeneralData<T, Node, Descending>::Comparator;
/// either sort whole container or do so partially merging ranges afterwards
if (!data(place).sorted && !data(rhs).sorted)
std::stable_sort(std::begin(a), std::end(a), Comparator{});
else
{
const auto begin = std::begin(a);
const auto middle = std::next(begin, size);
const auto end = std::end(a);
if (!data(place).sorted)
std::stable_sort(begin, middle, Comparator{});
if (!data(rhs).sorted)
std::stable_sort(middle, end, Comparator{});
std::inplace_merge(begin, middle, end, Comparator{});
}
data(place).sorted = true;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeVarUInt(data(place).value.size(), buf);
auto & value = data(place).value;
for (auto & node : value)
node->write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
UInt64 elems;
readVarUInt(elems, buf);
if (unlikely(elems == 0))
return;
auto & value = data(place).value;
value.resize(elems, arena);
for (UInt64 i = 0; i < elems; ++i)
value[i] = Node::read(buf, arena);
}
inline UInt64 getSkipCount(const Data & data, const UInt64 i, const UInt64 j) const
{
UInt64 k = 0;
for (; k < events_size - j; ++k)
if (data.value[i - j]->events_bitmap & (1 << (events_size - 1 - j - k)))
return k;
return k;
}
UInt64 getNextNodeIndex(Data & data) const
{
if (data.value.size() <= events_size)
return 0;
data.sort();
UInt64 i = events_size - 1;
while (i < data.value.size())
{
UInt64 j = 0;
for (; j < events_size; ++j)
if (!(data.value[i - j]->events_bitmap & (1 << (events_size - 1 - j))))
break;
if (j == events_size)
return i + 1;
i += getSkipCount(data, i, j);
}
return 0;
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & value = data(place).value;
UInt64 event_idx = getNextNodeIndex(this->data(place));
if (event_idx != 0 && event_idx < value.size())
{
ColumnNullable & to_concrete = assert_cast<ColumnNullable &>(to);
value[event_idx]->insertInto(to_concrete.getNestedColumn());
to_concrete.getNullMapData().push_back(0);
}
else
to.insertDefault();
}
bool doesInsertResultNeedNullableColumn() const override { return true; }
bool allocatesMemoryInArena() const override { return true; }
};
}

View File

@ -112,6 +112,7 @@ public:
/// in `runningAccumulate`, or when calculating an aggregate function as a
/// window function.
virtual void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const = 0;
virtual bool doesInsertResultNeedNullableColumn() const { return false; }
/// Used for machine learning methods. Predict result from trained model.
/// Will insert result into `to` column for rows in range [offset, offset + limit).

View File

@ -44,6 +44,7 @@ void registerAggregateFunctionRankCorrelation(AggregateFunctionFactory &);
void registerAggregateFunctionMannWhitney(AggregateFunctionFactory &);
void registerAggregateFunctionWelchTTest(AggregateFunctionFactory &);
void registerAggregateFunctionStudentTTest(AggregateFunctionFactory &);
void registerAggregateFunctionSequenceNextNode(AggregateFunctionFactory &);
class AggregateFunctionCombinatorFactory;
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
@ -101,6 +102,7 @@ void registerAggregateFunctions()
registerAggregateFunctionMannWhitney(factory);
registerAggregateFunctionWelchTTest(factory);
registerAggregateFunctionStudentTTest(factory);
registerAggregateFunctionSequenceNextNode(factory);
}
{

View File

@ -34,7 +34,6 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
throw Exception{"ColumnNullable cannot have constant null map", ErrorCodes::ILLEGAL_COLUMN};
}
void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const
{
const auto & arr = getNullMapData();

View File

@ -0,0 +1,120 @@
(0, A) 1 B
(0, A) 2 B
(0, A) 3 B
(0, A) 4 A
(0, A) 5 B
(0, A) 6 B
(0, B) 1 C
(0, B) 2 D
(0, B) 3 \N
(0, B) 4 C
(0, B) 5 A
(0, B) 6 A
(0, C) 1 D
(0, C) 2 \N
(0, C) 3 \N
(0, C) 4 \N
(0, C) 5 \N
(0, C) 6 \N
(0, D) 1 \N
(0, D) 2 C
(0, D) 3 \N
(0, D) 4 \N
(0, D) 5 \N
(0, D) 6 \N
(0, E) 1 \N
(0, E) 2 \N
(0, E) 3 \N
(0, E) 4 \N
(0, E) 5 \N
(0, E) 6 \N
(1, A) 1 \N
(1, A) 2 \N
(1, A) 3 \N
(1, A) 4 A
(1, A) 5 B
(1, A) 6 B
(1, B) 1 A
(1, B) 2 A
(1, B) 3 A
(1, B) 4 A
(1, B) 5 A
(1, B) 6 A
(1, C) 1 B
(1, C) 2 D
(1, C) 3 \N
(1, C) 4 B
(1, C) 5 A
(1, C) 6 B
(1, D) 1 C
(1, D) 2 B
(1, D) 3 \N
(1, D) 4 \N
(1, D) 5 \N
(1, D) 6 \N
(1, E) 1 \N
(1, E) 2 \N
(1, E) 3 \N
(1, E) 4 \N
(1, E) 5 \N
(1, E) 6 \N
(0, A->B) 1 C
(0, A->B) 2 D
(0, A->B) 3 \N
(0, A->B) 4 C
(0, A->B) 5 A
(0, A->B) 6 A
(0, A->C) 1 \N
(0, A->C) 2 \N
(0, A->C) 3 \N
(0, A->C) 4 \N
(0, A->C) 5 \N
(0, A->C) 6 \N
(0, B->A) 1 \N
(0, B->A) 2 \N
(0, B->A) 3 \N
(0, B->A) 4 \N
(0, B->A) 5 C
(0, B->A) 6 B
(1, A->B) 1 \N
(1, A->B) 2 \N
(1, A->B) 3 \N
(1, A->B) 4 \N
(1, A->B) 5 A
(1, A->B) 6 A
(1, A->C) 1 \N
(1, A->C) 2 \N
(1, A->C) 3 \N
(1, A->C) 4 \N
(1, A->C) 5 \N
(1, A->C) 6 \N
(1, B->A) 1 \N
(1, B->A) 2 \N
(1, B->A) 3 \N
(1, B->A) 4 A
(1, B->A) 5 \N
(1, B->A) 6 B
(0, A->A->B) 1 \N
(0, A->A->B) 2 \N
(0, A->A->B) 3 \N
(0, A->A->B) 4 C
(0, A->A->B) 5 \N
(0, A->A->B) 6 \N
(0, B->A->A) 1 \N
(0, B->A->A) 2 \N
(0, B->A->A) 3 \N
(0, B->A->A) 4 \N
(0, B->A->A) 5 \N
(0, B->A->A) 6 \N
(1, A->A->B) 1 \N
(1, A->A->B) 2 \N
(1, A->A->B) 3 \N
(1, A->A->B) 4 \N
(1, A->A->B) 5 \N
(1, A->A->B) 6 \N
(1, B->A->A) 1 \N
(1, B->A->A) 2 \N
(1, B->A->A) 3 \N
(1, B->A->A) 4 A
(1, B->A->A) 5 \N
(1, B->A->A) 6 \N

View File

@ -0,0 +1,51 @@
DROP TABLE IF EXISTS test_sequenceNextNode;
CREATE TABLE iF NOT EXISTS test_sequenceNextNode (dt DateTime, id int, action String) ENGINE = MergeTree() PARTITION BY dt ORDER BY id;
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',1,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',1,'B');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',1,'C');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',1,'D');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',2,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',2,'B');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',2,'D');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',2,'C');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',3,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',3,'B');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',4,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',4,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',4,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',4,'B');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:05',4,'C');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',5,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',5,'B');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',5,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',5,'C');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',6,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',6,'B');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',6,'A');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',6,'B');
INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:05',6,'C');
SELECT '(0, A)', id, sequenceNextNode(0)(dt, action, action = 'A') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, B)', id, sequenceNextNode(0)(dt, action, action = 'B') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, C)', id, sequenceNextNode(0)(dt, action, action = 'C') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, D)', id, sequenceNextNode(0)(dt, action, action = 'D') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, E)', id, sequenceNextNode(0)(dt, action, action = 'E') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, A)', id, sequenceNextNode(1)(dt, action, action = 'A') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, B)', id, sequenceNextNode(1)(dt, action, action = 'B') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, C)', id, sequenceNextNode(1)(dt, action, action = 'C') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, D)', id, sequenceNextNode(1)(dt, action, action = 'D') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, E)', id, sequenceNextNode(1)(dt, action, action = 'E') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, A->B)', id, sequenceNextNode(0)(dt, action, action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, A->C)', id, sequenceNextNode(0)(dt, action, action = 'A', action = 'C') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, B->A)', id, sequenceNextNode(0)(dt, action, action = 'B', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, A->B)', id, sequenceNextNode(1)(dt, action, action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, A->C)', id, sequenceNextNode(1)(dt, action, action = 'A', action = 'C') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, B->A)', id, sequenceNextNode(1)(dt, action, action = 'B', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, A->A->B)', id, sequenceNextNode(0)(dt, action, action = 'A', action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(0, B->A->A)', id, sequenceNextNode(0)(dt, action, action = 'B', action = 'A', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, A->A->B)', id, sequenceNextNode(1)(dt, action, action = 'A', action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id;
SELECT '(1, B->A->A)', id, sequenceNextNode(1)(dt, action, action = 'B', action = 'A', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS test_sequenceNextNode;