Merge pull request #39919 from pzhdfy/UniqSketch

UniqThetaSketch support set operation such as union/intersect/not
This commit is contained in:
Kruglov Pavel 2022-09-05 13:42:14 +02:00 committed by GitHub
commit 582216a3ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 515 additions and 1 deletions

@ -1 +1 @@
Subproject commit 7d73d7610db31d4e1ecde0fb3a7ee90ef371207f
Subproject commit 7abd49bb2e72bf9a5029993d31dcb1872da88292

View File

@ -0,0 +1,94 @@
---
slug: /en/sql-reference/functions/uniqtheta-functions
---
# uniqTheta Functions
uniqTheta functions work for two uniqThetaSketch objects to do set operation calculations such as / ∩ / × (union/intersect/not), it is to return a new uniqThetaSketch object contain the result.
A uniqThetaSketch object is to be constructed by aggregation function uniqTheta with -State.
UniqThetaSketch is a data structure storage of approximate values set.
For more information on RoaringBitmap, see: [Theta Sketch Framework](https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html).
## uniqThetaUnion
Two uniqThetaSketch objects to do union calculation(set operation ), the result is a new uniqThetaSketch.
``` sql
uniqThetaUnion(uniqThetaSketch,uniqThetaSketch)
```
**Arguments**
- `uniqThetaSketch` uniqThetaSketch object.
**Example**
``` sql
select finalizeAggregation(uniqThetaUnion(a, b)) as a_union_b, finalizeAggregation(a) as a_cardinality, finalizeAggregation(b) as b_cardinality
from
(select arrayReduce('uniqThetaState',[1,2]) as a, arrayReduce('uniqThetaState',[2,3,4]) as b );
```
``` text
┌─a_union_b─┬─a_cardinality─┬─b_cardinality─┐
│ 4 │ 2 │ 3 │
└───────────┴───────────────┴───────────────┘
```
## uniqThetaIntersect
Two uniqThetaSketch objects to do intersect calculation(set operation ∩), the result is a new uniqThetaSketch.
``` sql
uniqThetaIntersect(uniqThetaSketch,uniqThetaSketch)
```
**Arguments**
- `uniqThetaSketch` uniqThetaSketch object.
**Example**
``` sql
select finalizeAggregation(uniqThetaIntersect(a, b)) as a_intersect_b, finalizeAggregation(a) as a_cardinality, finalizeAggregation(b) as b_cardinality
from
(select arrayReduce('uniqThetaState',[1,2]) as a, arrayReduce('uniqThetaState',[2,3,4]) as b );
```
``` text
┌─a_intersect_b─┬─a_cardinality─┬─b_cardinality─┐
│ 1 │ 2 │ 3 │
└───────────────┴───────────────┴───────────────┘
```
## uniqThetaNot
Two uniqThetaSketch objects to do a_not_b calculation(set operation ×), the result is a new uniqThetaSketch.
``` sql
uniqThetaNot(uniqThetaSketch,uniqThetaSketch)
```
**Arguments**
- `uniqThetaSketch` uniqThetaSketch object.
**Example**
``` sql
select finalizeAggregation(uniqThetaNot(a, b)) as a_not_b, finalizeAggregation(a) as a_cardinality, finalizeAggregation(b) as b_cardinality
from
(select arrayReduce('uniqThetaState',[2,3,4]) as a, arrayReduce('uniqThetaState',[1,2]) as b );
```
``` text
┌─a_not_b─┬─a_cardinality─┬─b_cardinality─┐
│ 2 │ 3 │ 2 │
└─────────┴───────────────┴───────────────┘
```
**See Also**
- [uniqThetaSketch](../../sql-reference/aggregate-functions/reference/uniqthetasketch.md#agg_function-uniqthetasketch)

View File

@ -9,6 +9,8 @@
#include <base/StringRef.h>
#include <theta_sketch.hpp>
#include <theta_union.hpp>
#include <theta_intersection.hpp>
#include <theta_a_not_b.hpp>
namespace DB
@ -80,6 +82,58 @@ public:
u->update(rhs.sk_union->get_result());
}
void intersect(const ThetaSketchData & rhs)
{
datasketches::theta_union * u = getSkUnion();
if (sk_update)
{
u->update(*sk_update);
sk_update.reset(nullptr);
}
datasketches::theta_intersection theta_intersection;
theta_intersection.update(u->get_result());
if (rhs.sk_update)
theta_intersection.update(*rhs.sk_update);
else if (rhs.sk_union)
theta_intersection.update(rhs.sk_union->get_result());
sk_union.reset(nullptr);
u = getSkUnion();
u->update(theta_intersection.get_result());
}
void aNotB(const ThetaSketchData & rhs)
{
datasketches::theta_union * u = getSkUnion();
if (sk_update)
{
u->update(*sk_update);
sk_update.reset(nullptr);
}
datasketches::theta_a_not_b a_not_b;
if (rhs.sk_update)
{
datasketches::compact_theta_sketch result = a_not_b.compute(u->get_result(), *rhs.sk_update);
sk_union.reset(nullptr);
u = getSkUnion();
u->update(result);
}
else if (rhs.sk_union)
{
datasketches::compact_theta_sketch result = a_not_b.compute(u->get_result(), rhs.sk_union->get_result());
sk_union.reset(nullptr);
u = getSkUnion();
u->update(result);
}
}
/// You can only call for an empty object.
void read(DB::ReadBuffer & in)
{

View File

@ -92,6 +92,11 @@ list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_url>)
add_subdirectory(array)
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_array>)
if (TARGET ch_contrib::datasketches)
add_subdirectory(UniqTheta)
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_uniqtheta>)
endif()
add_subdirectory(JSONPath)
list (APPEND PRIVATE_LIBS clickhouse_functions_jsonpath)

View File

@ -0,0 +1,9 @@
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_library(clickhouse_functions_uniqtheta FunctionsUniqTheta.cpp)
target_link_libraries(clickhouse_functions_uniqtheta PRIVATE dbms)
if (TARGET ch_contrib::datasketches)
target_link_libraries (clickhouse_functions_uniqtheta PRIVATE ch_contrib::datasketches)
endif ()

View File

@ -0,0 +1,68 @@
#include <Functions/FunctionFactory.h>
#include "FunctionsUniqTheta.h"
#if USE_DATASKETCHES
namespace DB
{
REGISTER_FUNCTION(UniqTheta)
{
factory.registerFunction<FunctionUniqThetaIntersect>(
{
R"(
Two uniqThetaSketch objects to do intersect calculation(set operation ), the result is a new uniqThetaSketch.
A uniqThetaSketch object is to be constructed by aggregation function uniqTheta with -State.
UniqThetaSketch is a data structure storage of approximate values set.
For more information on RoaringBitmap, see: [Theta Sketch Framework](https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html).
Typical usage:
[example:typical]
)",
Documentation::Examples{
{"typical", "select finalizeAggregation(uniqThetaIntersect(arrayReduce('uniqThetaState',[1,2]), arrayReduce('uniqThetaState',[2,3,4])));"}},
Documentation::Categories{"uniqTheta"}
});
factory.registerFunction<FunctionUniqThetaUnion>(
{
R"(
Two uniqThetaSketch objects to do union calculation(set operation ), the result is a new uniqThetaSketch.
A uniqThetaSketch object is to be constructed by aggregation function uniqTheta with -State.
UniqThetaSketch is a data structure storage of approximate values set.
For more information on RoaringBitmap, see: [Theta Sketch Framework](https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html).
Typical usage:
[example:typical]
)",
Documentation::Examples{
{"typical", "select finalizeAggregation(uniqThetaUnion(arrayReduce('uniqThetaState',[1,2]), arrayReduce('uniqThetaState',[2,3,4])));"}},
Documentation::Categories{"uniqTheta"}
});
factory.registerFunction<FunctionUniqThetaNot>(
{
R"(
Two uniqThetaSketch objects to do a_not_b calculation(set operation ×), the result is a new uniqThetaSketch.
A uniqThetaSketch object is to be constructed by aggregation function uniqTheta with -State.
UniqThetaSketch is a data structure storage of approximate values set.
For more information on RoaringBitmap, see: [Theta Sketch Framework](https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html).
Typical usage:
[example:typical]
)",
Documentation::Examples{
{"typical", "select finalizeAggregation(uniqThetaNot(arrayReduce('uniqThetaState',[1,2]), arrayReduce('uniqThetaState',[2,3,4])));"}},
Documentation::Categories{"uniqTheta"}
});
}
}
#endif

View File

@ -0,0 +1,176 @@
#pragma once
#include <Common/config.h>
#if USE_DATASKETCHES
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/castColumn.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <AggregateFunctions/AggregateFunctionUniq.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
struct UniqThetaIntersectImpl
{
static void apply(AggregateFunctionUniqThetaData & sketch_data_1, const AggregateFunctionUniqThetaData & sketch_data_2)
{
sketch_data_1.set.intersect(sketch_data_2.set);
}
};
struct UniqThetaUnionImpl
{
static void apply(AggregateFunctionUniqThetaData & sketch_data_1, const AggregateFunctionUniqThetaData & sketch_data_2)
{
sketch_data_1.set.merge(sketch_data_2.set);
}
};
struct UniqThetaNotImpl
{
static void apply(AggregateFunctionUniqThetaData & sketch_data_1, const AggregateFunctionUniqThetaData & sketch_data_2)
{
sketch_data_1.set.aNotB(sketch_data_2.set);
}
};
template <typename Impl, typename Name>
class FunctionUniqTheta : public IFunction
{
public:
static constexpr auto name = Name::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionUniqTheta>(); }
String getName() const override { return name; }
bool isVariadic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const auto * sketch_type0 = typeid_cast<const DataTypeAggregateFunction *>(arguments[0].get());
if (!(sketch_type0 && sketch_type0->getFunctionName() == "uniqTheta"))
throw Exception(
"First argument for function " + getName() + " must be a uniqTheta but it has type " + arguments[0]->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto * sketch_type1 = typeid_cast<const DataTypeAggregateFunction *>(arguments[1].get());
if (!(sketch_type1 && sketch_type1->getFunctionName() == "uniqTheta"))
throw Exception(
"Second argument for function " + getName() + " must be a uniqTheta but it has type " + arguments[1]->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const DataTypes & arg_data_types0 = sketch_type0->getArgumentsDataTypes();
const DataTypes & arg_data_types1 = sketch_type1->getArgumentsDataTypes();
if (arg_data_types0.size() != arg_data_types1.size())
throw Exception(
"The nested type in uniqThetas must be the same length, but one is " + std::to_string(arg_data_types0.size())
+ ", and the other is " + std::to_string(arg_data_types1.size()),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
size_t types_size = arg_data_types0.size();
for (size_t i = 0; i < types_size; ++i)
{
if (!arg_data_types0[i]->equals(*arg_data_types1[i]))
throw Exception(
"The " + std::to_string(i) + "th nested type in uniqThetas must be the same, but one is " + arg_data_types0[i]->getName()
+ ", and the other is " + arg_data_types1[i]->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return arguments[0];
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const ColumnAggregateFunction * column_ptrs[2];
bool is_column_const[2];
for (size_t i = 0; i < 2; ++i)
{
if (const auto * argument_column_const = typeid_cast<const ColumnConst *>(arguments[i].column.get()))
{
column_ptrs[i] = typeid_cast<const ColumnAggregateFunction *>(argument_column_const->getDataColumnPtr().get());
is_column_const[i] = true;
}
else
{
column_ptrs[i] = typeid_cast<const ColumnAggregateFunction *>(arguments[i].column.get());
is_column_const[i] = false;
}
}
auto col_to = ColumnAggregateFunction::create(column_ptrs[0]->getAggregateFunction());
col_to->reserve(input_rows_count);
const PaddedPODArray<AggregateDataPtr> & container0 = column_ptrs[0]->getData();
const PaddedPODArray<AggregateDataPtr> & container1 = column_ptrs[1]->getData();
for (size_t i = 0; i < input_rows_count; ++i)
{
const AggregateDataPtr data_ptr_0 = is_column_const[0] ? container0[0] : container0[i];
const AggregateDataPtr data_ptr_1 = is_column_const[1] ? container1[0] : container1[i];
col_to->insertFrom(data_ptr_0);
AggregateFunctionUniqThetaData & sketch_data_1 = *reinterpret_cast<AggregateFunctionUniqThetaData *>(col_to->getData()[i]);
const AggregateFunctionUniqThetaData & sketch_data_2
= *reinterpret_cast<const AggregateFunctionUniqThetaData *>(data_ptr_1);
Impl::apply(sketch_data_1, sketch_data_2);
}
return col_to;
}
};
struct NameUniqThetaIntersect
{
static constexpr auto name = "uniqThetaIntersect";
};
struct NameUniqThetaUnion
{
static constexpr auto name = "uniqThetaUnion";
};
struct NameUniqThetaNot
{
static constexpr auto name = "uniqThetaNot";
};
using FunctionUniqThetaIntersect = FunctionUniqTheta<UniqThetaIntersectImpl, NameUniqThetaIntersect>;
using FunctionUniqThetaUnion = FunctionUniqTheta<UniqThetaUnionImpl, NameUniqThetaUnion>;
using FunctionUniqThetaNot = FunctionUniqTheta<UniqThetaNotImpl, NameUniqThetaNot>;
}
#endif

View File

@ -0,0 +1,18 @@
uniqTheta union test
0 0 0
4 2 3
4 3 2
uniqTheta intersect test
0 0 0
1 2 3
1 3 2
uniqTheta union test
0 0 0
1 2 3
2 3 2
uniqTheta retention test
4 9 4
uniqTheta retention with AggregatingMergeTree test
0.5 2 4
uniqTheta retention with MergeTree test
0.5 2 4

View File

@ -0,0 +1,90 @@
-- Tags: no-fasttest
SELECT 'uniqTheta union test';
select finalizeAggregation(uniqThetaUnion(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[]) as a, arrayReduce('uniqThetaState',[]) as b );
select finalizeAggregation(uniqThetaUnion(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[1,2]) as a, arrayReduce('uniqThetaState',[2,3,4]) as b );
select finalizeAggregation(uniqThetaUnion(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[2,3,4]) as a, arrayReduce('uniqThetaState',[1,2]) as b );
SELECT 'uniqTheta intersect test';
select finalizeAggregation(uniqThetaIntersect(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[]) as a, arrayReduce('uniqThetaState',[]) as b );
select finalizeAggregation(uniqThetaIntersect(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[1,2]) as a, arrayReduce('uniqThetaState',[2,3,4]) as b );
select finalizeAggregation(uniqThetaIntersect(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[2,3,4]) as a, arrayReduce('uniqThetaState',[1,2]) as b );
SELECT 'uniqTheta union test';
select finalizeAggregation(uniqThetaNot(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[]) as a, arrayReduce('uniqThetaState',[]) as b );
select finalizeAggregation(uniqThetaNot(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[1,2]) as a, arrayReduce('uniqThetaState',[2,3,4]) as b );
select finalizeAggregation(uniqThetaNot(a, b)), finalizeAggregation(a), finalizeAggregation(b) from (select arrayReduce('uniqThetaState',[2,3,4]) as a, arrayReduce('uniqThetaState',[1,2]) as b );
SELECT 'uniqTheta retention test';
select finalizeAggregation(uniqThetaIntersect(a,b)), finalizeAggregation(a),finalizeAggregation(b) from
(
select (uniqThetaStateIf(number, number>0)) as a, (uniqThetaStateIf(number, number>5)) as b
from
(select number FROM system.numbers LIMIT 10)
);
SELECT 'uniqTheta retention with AggregatingMergeTree test';
DROP TABLE IF EXISTS test1;
CREATE TABLE test1
(
`year` String ,
`uv` AggregateFunction(uniqTheta, Int64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (year);
INSERT INTO TABLE test1(year, uv) select '2021',uniqThetaState(toInt64(1));
INSERT INTO TABLE test1(year, uv) select '2021',uniqThetaState(toInt64(2));
INSERT INTO TABLE test1(year, uv) select '2021',uniqThetaState(toInt64(3));
INSERT INTO TABLE test1(year, uv) select '2021',uniqThetaState(toInt64(4));
INSERT INTO TABLE test1(year, uv) select '2022',uniqThetaState(toInt64(1));
INSERT INTO TABLE test1(year, uv) select '2022',uniqThetaState(toInt64(3));
select finalizeAggregation(uniqThetaIntersect(uv2021,uv2022))/finalizeAggregation(uv2021),finalizeAggregation(uniqThetaIntersect(uv2021,uv2022)),finalizeAggregation(uv2021)
from
(
select uniqThetaMergeStateIf(uv,year='2021') as uv2021, uniqThetaMergeStateIf(uv,year='2022') as uv2022
from test1
);
DROP TABLE IF EXISTS test1;
SELECT 'uniqTheta retention with MergeTree test';
DROP TABLE IF EXISTS test2;
CREATE TABLE test2
(
`year` String ,
`uv` Int64
)
ENGINE = MergeTree()
ORDER BY (year);
INSERT INTO TABLE test2(year, uv) select '2021',1;
INSERT INTO TABLE test2(year, uv) select '2021',2;
INSERT INTO TABLE test2(year, uv) select '2021',3;
INSERT INTO TABLE test2(year, uv) select '2021',4;
INSERT INTO TABLE test2(year, uv) select '2022',1;
INSERT INTO TABLE test2(year, uv) select '2022',3;
select finalizeAggregation(uniqThetaIntersect(uv2021,uv2022))/finalizeAggregation(uv2021),finalizeAggregation(uniqThetaIntersect(uv2021,uv2022)),finalizeAggregation(uv2021)
from
(
select uniqThetaStateIf(uv,year='2021') as uv2021, uniqThetaStateIf(uv,year='2022') as uv2022
from test2
);
DROP TABLE IF EXISTS test2;