diff --git a/src/AggregateFunctions/AggregateFunctionNull.h b/src/AggregateFunctions/AggregateFunctionNull.h index 5c94e68cb26..3bfcacf7d7b 100644 --- a/src/AggregateFunctions/AggregateFunctionNull.h +++ b/src/AggregateFunctions/AggregateFunctionNull.h @@ -157,8 +157,15 @@ public: ColumnNullable & to_concrete = assert_cast(to); if (getFlag(place)) { - nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena); - to_concrete.getNullMapData().push_back(0); + if (unlikely(nested_function->doesInsertResultNeedNullableColumn())) + { + nested_function->insertResultInto(nestedPlace(place), to_concrete, arena); + } + else + { + nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena); + to_concrete.getNullMapData().push_back(0); + } } else { diff --git a/src/AggregateFunctions/AggregateFunctionSequenceNextNode.cpp b/src/AggregateFunctions/AggregateFunctionSequenceNextNode.cpp new file mode 100644 index 00000000000..66f24ec8cbf --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionSequenceNextNode.cpp @@ -0,0 +1,102 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "registerAggregateFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +namespace +{ + +template +inline AggregateFunctionPtr createAggregateFunctionSequenceNextNodeImpl(const DataTypePtr data_type, const DataTypes & argument_types, bool descending_order) +{ + if (descending_order) + return std::make_shared>(data_type, argument_types); + else + return std::make_shared>(data_type, argument_types); +} + +AggregateFunctionPtr createAggregateFunctionSequenceNextNode(const std::string & name, const DataTypes & argument_types, const Array & parameters) +{ + bool descending_order = false; + + if (parameters.size() == 1) + { + auto type = parameters[0].getType(); + if (type != Field::Types::Int64 && type != Field::Types::UInt64) + throw Exception("The first parameter for aggregate function " + name + " should be 0 or 1", ErrorCodes::BAD_ARGUMENTS); + + descending_order = parameters[0].get(); + } + else + throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 1", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + if (argument_types.size() < 3) + throw Exception("Aggregate function " + name + " requires at least three arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + else if (argument_types.size() > 2 + 64) + throw Exception("Aggregate function " + name + " requires at most 66(timestamp, value_column, 64 events) arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + for (const auto i : ext::range(2, argument_types.size())) + { + const auto * cond_arg = argument_types[i].get(); + if (!isUInt8(cond_arg)) + throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) + " of aggregate function " + + name + ", must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; + } + + if (WhichDataType(argument_types[1].get()).idx != TypeIndex::String) + throw Exception{"Illegal type " + argument_types.front().get()->getName() + + " of second argument of aggregate function " + name + ", must be String", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; + + DataTypePtr data_type; + if (typeid_cast(argument_types[1].get())) + data_type = argument_types[1]; + else + data_type = std::make_shared(argument_types[1]); + + WhichDataType timestamp_type(argument_types[0].get()); + if (timestamp_type.idx == TypeIndex::UInt8) + return createAggregateFunctionSequenceNextNodeImpl(data_type, argument_types, descending_order); + if (timestamp_type.idx == TypeIndex::UInt16) + return createAggregateFunctionSequenceNextNodeImpl(data_type, argument_types, descending_order); + if (timestamp_type.idx == TypeIndex::UInt32) + return createAggregateFunctionSequenceNextNodeImpl(data_type, argument_types, descending_order); + if (timestamp_type.idx == TypeIndex::UInt64) + return createAggregateFunctionSequenceNextNodeImpl(data_type, argument_types, descending_order); + if (timestamp_type.isDate()) + return createAggregateFunctionSequenceNextNodeImpl(data_type, argument_types, descending_order); + if (timestamp_type.isDateTime()) + return createAggregateFunctionSequenceNextNodeImpl(data_type, argument_types, descending_order); + + throw Exception{"Illegal type " + argument_types.front().get()->getName() + + " of first argument of aggregate function " + name + ", must be Unsigned Number, Date, DateTime", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; +} + +} + +void registerAggregateFunctionSequenceNextNode(AggregateFunctionFactory & factory) +{ + AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = false }; + + factory.registerFunction("sequenceNextNode", { createAggregateFunctionSequenceNextNode, properties }); +} + +} diff --git a/src/AggregateFunctions/AggregateFunctionSequenceNextNode.h b/src/AggregateFunctions/AggregateFunctionSequenceNextNode.h new file mode 100644 index 00000000000..a455e16e267 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionSequenceNextNode.h @@ -0,0 +1,298 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include + +#include + + +namespace DB +{ + +template +struct NodeBase +{ + UInt64 size; // size of payload + + DataTypeDateTime::FieldType event_time; + UInt64 events_bitmap; + + /// Returns pointer to actual payload + char * data() { return reinterpret_cast(this) + sizeof(Node); } + + const char * data() const { return reinterpret_cast(this) + sizeof(Node); } + + /// Clones existing node (does not modify next field) + Node * clone(Arena * arena) const + { + return reinterpret_cast( + const_cast(arena->alignedInsert(reinterpret_cast(this), sizeof(Node) + size, alignof(Node)))); + } + + /// Write node to buffer + void write(WriteBuffer & buf) const + { + writeVarUInt(size, buf); + buf.write(data(), size); + } + + /// Reads and allocates node from ReadBuffer's data (doesn't set next) + static Node * read(ReadBuffer & buf, Arena * arena) + { + UInt64 size; + readVarUInt(size, buf); + + Node * node = reinterpret_cast(arena->alignedAlloc(sizeof(Node) + size, alignof(Node))); + node->size = size; + buf.read(node->data(), size); + return node; + } +}; + +struct NodeString : public NodeBase +{ + using Node = NodeString; + + /// Create node from string + static Node * allocate(const IColumn & column, size_t row_num, Arena * arena) + { + StringRef string = assert_cast(column).getDataAt(row_num); + + Node * node = reinterpret_cast(arena->alignedAlloc(sizeof(Node) + string.size, alignof(Node))); + node->size = string.size; + memcpy(node->data(), string.data, string.size); + + return node; + } + + void insertInto(IColumn & column) + { + assert_cast(column).insertData(data(), size); + } +}; + +template +struct SequenceNextNodeGeneralData +{ + // Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena + using Allocator = MixedAlignedArenaAllocator; + using Array = PODArray; + + Array value; + bool sorted = false; + + struct Comparator final + { + bool operator()(const Node * lhs, const Node * rhs) const + { + if (Descending) + return lhs->event_time == rhs->event_time ? + lhs->events_bitmap < rhs->events_bitmap: lhs->event_time > rhs->event_time; + else + return lhs->event_time == rhs->event_time ? + lhs->events_bitmap < rhs->events_bitmap : lhs->event_time < rhs->event_time; + } + }; + + void sort() + { + if (!sorted) + { + std::stable_sort(std::begin(value), std::end(value), Comparator{}); + sorted = true; + } + } +}; + +/// Implementation of groupArray for String or any ComplexObject via Array +template +class SequenceNextNodeImpl final + : public IAggregateFunctionDataHelper, SequenceNextNodeImpl> +{ + using Data = SequenceNextNodeGeneralData; + static Data & data(AggregateDataPtr place) { return *reinterpret_cast(place); } + static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast(place); } + + DataTypePtr & data_type; + UInt8 events_size; + UInt64 max_elems; + +public: + SequenceNextNodeImpl(const DataTypePtr & data_type_, const DataTypes & arguments, UInt64 max_elems_ = std::numeric_limits::max()) + : IAggregateFunctionDataHelper, SequenceNextNodeImpl>( + {data_type_}, {}) + , data_type(this->argument_types[0]) + , events_size(arguments.size() - 2) + , max_elems(max_elems_) + { + } + + String getName() const override { return "sequenceNextNode"; } + + DataTypePtr getReturnType() const override { return data_type; } + + void insert(Data & a, const Node * v, Arena * arena) const + { + ++a.total_values; + a.value.push_back(v->clone(arena), arena); + } + + void create(AggregateDataPtr place) const override + { + [[maybe_unused]] auto a = new (place) Data; + } + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + Node * node = Node::allocate(*columns[1], row_num, arena); + + const auto timestamp = assert_cast *>(columns[0])->getData()[row_num]; + + UInt64 events_bitmap = 0; + for (UInt8 i = 0; i < events_size; ++i) + if (assert_cast *>(columns[2 + i])->getData()[row_num]) + events_bitmap += (1 << i); + + node->event_time = timestamp; + node->events_bitmap = events_bitmap; + + data(place).value.push_back(node, arena); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + if (data(rhs).value.empty()) /// rhs state is empty + return; + + UInt64 new_elems; + if (data(place).value.size() >= max_elems) + return; + + new_elems = std::min(data(rhs).value.size(), static_cast(max_elems) - data(place).value.size()); + + auto & a = data(place).value; + const auto size = a.size(); + + auto & b = data(rhs).value; + for (UInt64 i = 0; i < new_elems; ++i) + a.push_back(b[i]->clone(arena), arena); + + using Comparator = typename SequenceNextNodeGeneralData::Comparator; + + /// either sort whole container or do so partially merging ranges afterwards + if (!data(place).sorted && !data(rhs).sorted) + std::stable_sort(std::begin(a), std::end(a), Comparator{}); + else + { + const auto begin = std::begin(a); + const auto middle = std::next(begin, size); + const auto end = std::end(a); + + if (!data(place).sorted) + std::stable_sort(begin, middle, Comparator{}); + + if (!data(rhs).sorted) + std::stable_sort(middle, end, Comparator{}); + + std::inplace_merge(begin, middle, end, Comparator{}); + } + + data(place).sorted = true; + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + writeVarUInt(data(place).value.size(), buf); + + auto & value = data(place).value; + for (auto & node : value) + node->write(buf); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override + { + UInt64 elems; + readVarUInt(elems, buf); + + if (unlikely(elems == 0)) + return; + + auto & value = data(place).value; + + value.resize(elems, arena); + for (UInt64 i = 0; i < elems; ++i) + value[i] = Node::read(buf, arena); + } + + inline UInt64 getSkipCount(const Data & data, const UInt64 i, const UInt64 j) const + { + UInt64 k = 0; + for (; k < events_size - j; ++k) + if (data.value[i - j]->events_bitmap & (1 << (events_size - 1 - j - k))) + return k; + return k; + } + + UInt64 getNextNodeIndex(Data & data) const + { + if (data.value.size() <= events_size) + return 0; + + data.sort(); + + UInt64 i = events_size - 1; + while (i < data.value.size()) + { + UInt64 j = 0; + for (; j < events_size; ++j) + if (!(data.value[i - j]->events_bitmap & (1 << (events_size - 1 - j)))) + break; + + if (j == events_size) + return i + 1; + + i += getSkipCount(data, i, j); + } + + return 0; + } + + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override + { + auto & value = data(place).value; + + UInt64 event_idx = getNextNodeIndex(this->data(place)); + if (event_idx != 0 && event_idx < value.size()) + { + ColumnNullable & to_concrete = assert_cast(to); + value[event_idx]->insertInto(to_concrete.getNestedColumn()); + to_concrete.getNullMapData().push_back(0); + } + else + to.insertDefault(); + } + + bool doesInsertResultNeedNullableColumn() const override { return true; } + + bool allocatesMemoryInArena() const override { return true; } +}; + +} diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index a9fe26688d7..d9570fa5f8b 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -112,6 +112,7 @@ public: /// in `runningAccumulate`, or when calculating an aggregate function as a /// window function. virtual void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const = 0; + virtual bool doesInsertResultNeedNullableColumn() const { return false; } /// Used for machine learning methods. Predict result from trained model. /// Will insert result into `to` column for rows in range [offset, offset + limit). diff --git a/src/AggregateFunctions/registerAggregateFunctions.cpp b/src/AggregateFunctions/registerAggregateFunctions.cpp index d8e4eb7ba98..28b758aee2c 100644 --- a/src/AggregateFunctions/registerAggregateFunctions.cpp +++ b/src/AggregateFunctions/registerAggregateFunctions.cpp @@ -44,6 +44,7 @@ void registerAggregateFunctionRankCorrelation(AggregateFunctionFactory &); void registerAggregateFunctionMannWhitney(AggregateFunctionFactory &); void registerAggregateFunctionWelchTTest(AggregateFunctionFactory &); void registerAggregateFunctionStudentTTest(AggregateFunctionFactory &); +void registerAggregateFunctionSequenceNextNode(AggregateFunctionFactory &); class AggregateFunctionCombinatorFactory; void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &); @@ -101,6 +102,7 @@ void registerAggregateFunctions() registerAggregateFunctionMannWhitney(factory); registerAggregateFunctionWelchTTest(factory); registerAggregateFunctionStudentTTest(factory); + registerAggregateFunctionSequenceNextNode(factory); } { diff --git a/src/Columns/ColumnNullable.cpp b/src/Columns/ColumnNullable.cpp index 35ce005073a..a9be62e515f 100644 --- a/src/Columns/ColumnNullable.cpp +++ b/src/Columns/ColumnNullable.cpp @@ -34,7 +34,6 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn throw Exception{"ColumnNullable cannot have constant null map", ErrorCodes::ILLEGAL_COLUMN}; } - void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const { const auto & arr = getNullMapData(); diff --git a/tests/queries/0_stateless/01656_sequence_next_node.reference b/tests/queries/0_stateless/01656_sequence_next_node.reference new file mode 100644 index 00000000000..540e5bdfb5a --- /dev/null +++ b/tests/queries/0_stateless/01656_sequence_next_node.reference @@ -0,0 +1,120 @@ +(0, A) 1 B +(0, A) 2 B +(0, A) 3 B +(0, A) 4 A +(0, A) 5 B +(0, A) 6 B +(0, B) 1 C +(0, B) 2 D +(0, B) 3 \N +(0, B) 4 C +(0, B) 5 A +(0, B) 6 A +(0, C) 1 D +(0, C) 2 \N +(0, C) 3 \N +(0, C) 4 \N +(0, C) 5 \N +(0, C) 6 \N +(0, D) 1 \N +(0, D) 2 C +(0, D) 3 \N +(0, D) 4 \N +(0, D) 5 \N +(0, D) 6 \N +(0, E) 1 \N +(0, E) 2 \N +(0, E) 3 \N +(0, E) 4 \N +(0, E) 5 \N +(0, E) 6 \N +(1, A) 1 \N +(1, A) 2 \N +(1, A) 3 \N +(1, A) 4 A +(1, A) 5 B +(1, A) 6 B +(1, B) 1 A +(1, B) 2 A +(1, B) 3 A +(1, B) 4 A +(1, B) 5 A +(1, B) 6 A +(1, C) 1 B +(1, C) 2 D +(1, C) 3 \N +(1, C) 4 B +(1, C) 5 A +(1, C) 6 B +(1, D) 1 C +(1, D) 2 B +(1, D) 3 \N +(1, D) 4 \N +(1, D) 5 \N +(1, D) 6 \N +(1, E) 1 \N +(1, E) 2 \N +(1, E) 3 \N +(1, E) 4 \N +(1, E) 5 \N +(1, E) 6 \N +(0, A->B) 1 C +(0, A->B) 2 D +(0, A->B) 3 \N +(0, A->B) 4 C +(0, A->B) 5 A +(0, A->B) 6 A +(0, A->C) 1 \N +(0, A->C) 2 \N +(0, A->C) 3 \N +(0, A->C) 4 \N +(0, A->C) 5 \N +(0, A->C) 6 \N +(0, B->A) 1 \N +(0, B->A) 2 \N +(0, B->A) 3 \N +(0, B->A) 4 \N +(0, B->A) 5 C +(0, B->A) 6 B +(1, A->B) 1 \N +(1, A->B) 2 \N +(1, A->B) 3 \N +(1, A->B) 4 \N +(1, A->B) 5 A +(1, A->B) 6 A +(1, A->C) 1 \N +(1, A->C) 2 \N +(1, A->C) 3 \N +(1, A->C) 4 \N +(1, A->C) 5 \N +(1, A->C) 6 \N +(1, B->A) 1 \N +(1, B->A) 2 \N +(1, B->A) 3 \N +(1, B->A) 4 A +(1, B->A) 5 \N +(1, B->A) 6 B +(0, A->A->B) 1 \N +(0, A->A->B) 2 \N +(0, A->A->B) 3 \N +(0, A->A->B) 4 C +(0, A->A->B) 5 \N +(0, A->A->B) 6 \N +(0, B->A->A) 1 \N +(0, B->A->A) 2 \N +(0, B->A->A) 3 \N +(0, B->A->A) 4 \N +(0, B->A->A) 5 \N +(0, B->A->A) 6 \N +(1, A->A->B) 1 \N +(1, A->A->B) 2 \N +(1, A->A->B) 3 \N +(1, A->A->B) 4 \N +(1, A->A->B) 5 \N +(1, A->A->B) 6 \N +(1, B->A->A) 1 \N +(1, B->A->A) 2 \N +(1, B->A->A) 3 \N +(1, B->A->A) 4 A +(1, B->A->A) 5 \N +(1, B->A->A) 6 \N diff --git a/tests/queries/0_stateless/01656_sequence_next_node.sql b/tests/queries/0_stateless/01656_sequence_next_node.sql new file mode 100644 index 00000000000..b11a5c7bc0e --- /dev/null +++ b/tests/queries/0_stateless/01656_sequence_next_node.sql @@ -0,0 +1,51 @@ +DROP TABLE IF EXISTS test_sequenceNextNode; + +CREATE TABLE iF NOT EXISTS test_sequenceNextNode (dt DateTime, id int, action String) ENGINE = MergeTree() PARTITION BY dt ORDER BY id; + +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',1,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',1,'B'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',1,'C'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',1,'D'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',2,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',2,'B'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',2,'D'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',2,'C'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',3,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',3,'B'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',4,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',4,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',4,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',4,'B'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:05',4,'C'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',5,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',5,'B'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',5,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',5,'C'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:01',6,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:02',6,'B'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:03',6,'A'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:04',6,'B'); +INSERT INTO test_sequenceNextNode values ('1970-01-01 09:00:05',6,'C'); + +SELECT '(0, A)', id, sequenceNextNode(0)(dt, action, action = 'A') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, B)', id, sequenceNextNode(0)(dt, action, action = 'B') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, C)', id, sequenceNextNode(0)(dt, action, action = 'C') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, D)', id, sequenceNextNode(0)(dt, action, action = 'D') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, E)', id, sequenceNextNode(0)(dt, action, action = 'E') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, A)', id, sequenceNextNode(1)(dt, action, action = 'A') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, B)', id, sequenceNextNode(1)(dt, action, action = 'B') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, C)', id, sequenceNextNode(1)(dt, action, action = 'C') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, D)', id, sequenceNextNode(1)(dt, action, action = 'D') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, E)', id, sequenceNextNode(1)(dt, action, action = 'E') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, A->B)', id, sequenceNextNode(0)(dt, action, action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, A->C)', id, sequenceNextNode(0)(dt, action, action = 'A', action = 'C') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, B->A)', id, sequenceNextNode(0)(dt, action, action = 'B', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, A->B)', id, sequenceNextNode(1)(dt, action, action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, A->C)', id, sequenceNextNode(1)(dt, action, action = 'A', action = 'C') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, B->A)', id, sequenceNextNode(1)(dt, action, action = 'B', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, A->A->B)', id, sequenceNextNode(0)(dt, action, action = 'A', action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(0, B->A->A)', id, sequenceNextNode(0)(dt, action, action = 'B', action = 'A', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, A->A->B)', id, sequenceNextNode(1)(dt, action, action = 'A', action = 'A', action = 'B') AS next_node FROM test GROUP BY id ORDER BY id; +SELECT '(1, B->A->A)', id, sequenceNextNode(1)(dt, action, action = 'B', action = 'A', action = 'A') AS next_node FROM test GROUP BY id ORDER BY id; + +DROP TABLE IF EXISTS test_sequenceNextNode;