mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
add FunctionsUniqTheta
This commit is contained in:
parent
9e46abc560
commit
2d5446a86b
@ -9,6 +9,8 @@
|
||||
#include <base/StringRef.h>
|
||||
#include <theta_sketch.hpp>
|
||||
#include <theta_union.hpp>
|
||||
#include <theta_intersection.hpp>
|
||||
#include <theta_a_not_b.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -80,6 +82,58 @@ public:
|
||||
u->update(rhs.sk_union->get_result());
|
||||
}
|
||||
|
||||
void intersect(const ThetaSketchData & rhs)
|
||||
{
|
||||
datasketches::theta_union * u = getSkUnion();
|
||||
|
||||
if (sk_update)
|
||||
{
|
||||
u->update(*sk_update);
|
||||
sk_update.reset(nullptr);
|
||||
}
|
||||
|
||||
datasketches::theta_intersection theta_intersection;
|
||||
|
||||
theta_intersection.update(u->get_result());
|
||||
|
||||
if (rhs.sk_update)
|
||||
theta_intersection.update(*rhs.sk_update);
|
||||
else if (rhs.sk_union)
|
||||
theta_intersection.update(rhs.sk_union->get_result());
|
||||
|
||||
sk_union.reset(nullptr);
|
||||
u = getSkUnion();
|
||||
u->update(theta_intersection.get_result());
|
||||
}
|
||||
|
||||
void a_not_b(const ThetaSketchData & rhs)
|
||||
{
|
||||
datasketches::theta_union * u = getSkUnion();
|
||||
|
||||
if (sk_update)
|
||||
{
|
||||
u->update(*sk_update);
|
||||
sk_update.reset(nullptr);
|
||||
}
|
||||
|
||||
datasketches::theta_a_not_b a_not_b;
|
||||
|
||||
if (rhs.sk_update)
|
||||
{
|
||||
datasketches::compact_theta_sketch result = a_not_b.compute(u->get_result(), *rhs.sk_update);
|
||||
sk_union.reset(nullptr);
|
||||
u = getSkUnion();
|
||||
u->update(result);
|
||||
}
|
||||
else if (rhs.sk_union)
|
||||
{
|
||||
datasketches::compact_theta_sketch result = a_not_b.compute(u->get_result(), rhs.sk_union->get_result());
|
||||
sk_union.reset(nullptr);
|
||||
u = getSkUnion();
|
||||
u->update(result);
|
||||
}
|
||||
}
|
||||
|
||||
/// You can only call for an empty object.
|
||||
void read(DB::ReadBuffer & in)
|
||||
{
|
||||
|
@ -92,6 +92,11 @@ list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_url>)
|
||||
add_subdirectory(array)
|
||||
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_array>)
|
||||
|
||||
if (TARGET ch_contrib::datasketches)
|
||||
add_subdirectory(UniqTheta)
|
||||
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_uniqtheta>)
|
||||
endif()
|
||||
|
||||
add_subdirectory(JSONPath)
|
||||
list (APPEND PRIVATE_LIBS clickhouse_functions_jsonpath)
|
||||
|
||||
|
9
src/Functions/UniqTheta/CMakeLists.txt
Normal file
9
src/Functions/UniqTheta/CMakeLists.txt
Normal file
@ -0,0 +1,9 @@
|
||||
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
|
||||
|
||||
add_library(clickhouse_functions_uniqtheta FunctionsUniqTheta.cpp)
|
||||
|
||||
target_link_libraries(clickhouse_functions_uniqtheta PRIVATE dbms)
|
||||
|
||||
if (TARGET ch_contrib::datasketches)
|
||||
target_link_libraries (clickhouse_functions_uniqtheta PRIVATE ch_contrib::datasketches)
|
||||
endif ()
|
19
src/Functions/UniqTheta/FunctionsUniqTheta.cpp
Normal file
19
src/Functions/UniqTheta/FunctionsUniqTheta.cpp
Normal file
@ -0,0 +1,19 @@
|
||||
#include <Functions/FunctionFactory.h>
|
||||
|
||||
#include "FunctionsUniqTheta.h"
|
||||
|
||||
#if USE_DATASKETCHES
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
REGISTER_FUNCTION(UniqTheta)
|
||||
{
|
||||
factory.registerFunction<FunctionUniqThetaIntersect>();
|
||||
factory.registerFunction<FunctionUniqThetaUnion>();
|
||||
factory.registerFunction<FunctionUniqThetaNot>();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
164
src/Functions/UniqTheta/FunctionsUniqTheta.h
Normal file
164
src/Functions/UniqTheta/FunctionsUniqTheta.h
Normal file
@ -0,0 +1,164 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/config.h>
|
||||
|
||||
#if USE_DATASKETCHES
|
||||
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
#include <AggregateFunctions/AggregateFunctionUniq.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
struct UniqThetaIntersectImpl
|
||||
{
|
||||
static void apply(AggregateFunctionUniqThetaData & sketch_data_1, const AggregateFunctionUniqThetaData & sketch_data_2)
|
||||
{
|
||||
sketch_data_1.set.intersect(sketch_data_2.set);
|
||||
}
|
||||
};
|
||||
|
||||
struct UniqThetaUnionImpl
|
||||
{
|
||||
static void apply(AggregateFunctionUniqThetaData & sketch_data_1, const AggregateFunctionUniqThetaData & sketch_data_2)
|
||||
{
|
||||
sketch_data_1.set.merge(sketch_data_2.set);
|
||||
}
|
||||
};
|
||||
|
||||
struct UniqThetaNotImpl
|
||||
{
|
||||
static void apply(AggregateFunctionUniqThetaData & sketch_data_1, const AggregateFunctionUniqThetaData & sketch_data_2)
|
||||
{
|
||||
sketch_data_1.set.a_not_b(sketch_data_2.set);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Impl, typename Name>
|
||||
class FunctionUniqTheta : public IFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = Name::name;
|
||||
|
||||
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionUniqTheta>(); }
|
||||
|
||||
String getName() const override { return name; }
|
||||
|
||||
bool isVariadic() const override { return false; }
|
||||
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
|
||||
|
||||
size_t getNumberOfArguments() const override { return 2; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
||||
{
|
||||
const auto * sketch_type0 = typeid_cast<const DataTypeAggregateFunction *>(arguments[0].get());
|
||||
if (!(sketch_type0 && sketch_type0->getFunctionName() == "uniqTheta"))
|
||||
throw Exception(
|
||||
"First argument for function " + getName() + " must be a uniqTheta but it has type " + arguments[0]->getName(),
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
const auto * sketch_type1 = typeid_cast<const DataTypeAggregateFunction *>(arguments[1].get());
|
||||
if (!(sketch_type1 && sketch_type1->getFunctionName() == "uniqTheta"))
|
||||
throw Exception(
|
||||
"Second argument for function " + getName() + " must be a uniqTheta but it has type " + arguments[1]->getName(),
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
//todo:check
|
||||
if (sketch_type0->getArgumentsDataTypes()[0]->getTypeId() != sketch_type1->getArgumentsDataTypes()[0]->getTypeId())
|
||||
throw Exception(
|
||||
"The nested type in uniqThetas must be the same, but one is " + sketch_type0->getArgumentsDataTypes()[0]->getName()
|
||||
+ ", and the other is " + sketch_type1->getArgumentsDataTypes()[0]->getName(),
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return arguments[0];
|
||||
}
|
||||
|
||||
bool useDefaultImplementationForConstants() const override { return true; }
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
|
||||
{
|
||||
const ColumnAggregateFunction * column_ptrs[2];
|
||||
bool is_column_const[2];
|
||||
for (size_t i = 0; i < 2; ++i)
|
||||
{
|
||||
if (const auto * argument_column_const = typeid_cast<const ColumnConst *>(arguments[i].column.get()))
|
||||
{
|
||||
column_ptrs[i] = typeid_cast<const ColumnAggregateFunction *>(argument_column_const->getDataColumnPtr().get());
|
||||
is_column_const[i] = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
column_ptrs[i] = typeid_cast<const ColumnAggregateFunction *>(arguments[i].column.get());
|
||||
is_column_const[i] = false;
|
||||
}
|
||||
}
|
||||
|
||||
auto col_to = ColumnAggregateFunction::create(column_ptrs[0]->getAggregateFunction());
|
||||
|
||||
col_to->reserve(input_rows_count);
|
||||
|
||||
const PaddedPODArray<AggregateDataPtr> & container0 = column_ptrs[0]->getData();
|
||||
const PaddedPODArray<AggregateDataPtr> & container1 = column_ptrs[1]->getData();
|
||||
|
||||
for (size_t i = 0; i < input_rows_count; ++i)
|
||||
{
|
||||
const AggregateDataPtr data_ptr_0 = is_column_const[0] ? container0[0] : container0[i];
|
||||
const AggregateDataPtr data_ptr_1 = is_column_const[1] ? container1[0] : container1[i];
|
||||
|
||||
col_to->insertFrom(data_ptr_0);
|
||||
AggregateFunctionUniqThetaData & sketch_data_1 = *reinterpret_cast<AggregateFunctionUniqThetaData *>(col_to->getData()[i]);
|
||||
const AggregateFunctionUniqThetaData & sketch_data_2
|
||||
= *reinterpret_cast<const AggregateFunctionUniqThetaData *>(data_ptr_1);
|
||||
Impl::apply(sketch_data_1, sketch_data_2);
|
||||
}
|
||||
return col_to;
|
||||
}
|
||||
};
|
||||
|
||||
struct NameUniqThetaIntersect
|
||||
{
|
||||
static constexpr auto name = "uniqThetaIntersect";
|
||||
};
|
||||
|
||||
struct NameUniqThetaUnion
|
||||
{
|
||||
static constexpr auto name = "uniqThetaUnion";
|
||||
};
|
||||
|
||||
struct NameUniqThetaNot
|
||||
{
|
||||
static constexpr auto name = "uniqThetaNot";
|
||||
};
|
||||
|
||||
using FunctionUniqThetaIntersect = FunctionUniqTheta<UniqThetaIntersectImpl, NameUniqThetaIntersect>;
|
||||
using FunctionUniqThetaUnion = FunctionUniqTheta<UniqThetaUnionImpl, NameUniqThetaUnion>;
|
||||
using FunctionUniqThetaNot = FunctionUniqTheta<UniqThetaNotImpl, NameUniqThetaNot>;
|
||||
|
||||
}
|
||||
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user