address review comments

This commit is contained in:
pingyu 2021-04-10 23:00:34 +08:00
parent eb4403c572
commit df78a1b3fd
7 changed files with 25 additions and 10 deletions

View File

@ -22,6 +22,8 @@ endif()
if (DATASKETCHES_LIBRARY AND DATASKETCHES_INCLUDE_DIR)
set(USE_DATASKETCHES 1)
else()
set(USE_DATASKETCHES 0)
endif()
endif()

View File

@ -133,8 +133,11 @@ void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory)
factory.registerFunction("uniqExact",
{createAggregateFunctionUniq<true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactData<String>>, properties});
#if USE_DATASKETCHES
factory.registerFunction("uniqThetaSketch",
{createAggregateFunctionUniq<AggregateFunctionUniqThetaSketchData, AggregateFunctionUniqThetaSketchDataForVariadic>, properties});
#endif
}
}

View File

@ -126,7 +126,7 @@ struct AggregateFunctionUniqExactData<String>
/// uniqThetaSketch
#if USE_DATASKETCHES
struct AggregateFunctionUniqThetaSketchData
{
using Set = ThetaSketchData<UInt64>;
@ -143,6 +143,7 @@ struct AggregateFunctionUniqThetaSketchDataForVariadic
static String getName() { return "uniqThetaSketch"; }
};
#endif
namespace detail
{
@ -209,10 +210,12 @@ struct OneAdder
data.set.insert(key);
}
}
#if USE_DATASKETCHES
else if constexpr (std::is_same_v<Data, AggregateFunctionUniqThetaSketchData>)
{
data.set.insert_original(column.getDataAt(row_num));
data.set.insertOriginal(column.getDataAt(row_num));
}
#endif
}
};

View File

@ -1,10 +1,14 @@
#pragma once
#include <Common/config.h>
#if USE_DATASKETCHES
#include <boost/noncopyable.hpp>
#include <memory>
#include <theta_sketch.hpp>
#include <theta_union.hpp>
#include <memory>
namespace DB
{
@ -17,14 +21,14 @@ private:
std::unique_ptr<datasketches::update_theta_sketch> sk_update;
std::unique_ptr<datasketches::theta_union> sk_union;
inline datasketches::update_theta_sketch * get_sk_update()
inline datasketches::update_theta_sketch * getSkUpdate()
{
if (!sk_update)
sk_update = std::make_unique<datasketches::update_theta_sketch>(datasketches::update_theta_sketch::builder().build());
return sk_update.get();
}
inline datasketches::theta_union * get_sk_union()
inline datasketches::theta_union * getSkUnion()
{
if (!sk_union)
sk_union = std::make_unique<datasketches::theta_union>(datasketches::theta_union::builder().build());
@ -38,15 +42,15 @@ public:
~ThetaSketchData() = default;
/// Insert original value without hash, as `datasketches::update_theta_sketch.update` will do the hash internal.
void insert_original(const StringRef & value)
void insertOriginal(const StringRef & value)
{
get_sk_update()->update(value.data, value.size);
getSkUpdate()->update(value.data, value.size);
}
/// Note that `datasketches::update_theta_sketch.update` will do the hash again.
void insert(Key value)
{
get_sk_update()->update(value);
getSkUpdate()->update(value);
}
UInt64 size() const
@ -61,7 +65,7 @@ public:
void merge(const ThetaSketchData & rhs)
{
datasketches::theta_union * u = get_sk_union();
datasketches::theta_union * u = getSkUnion();
if (sk_update)
{
@ -83,7 +87,7 @@ public:
if (!bytes.empty())
{
auto sk = datasketches::compact_theta_sketch::deserialize(bytes.data(), bytes.size());
get_sk_union()->update(sk);
getSkUnion()->update(sk);
}
}
@ -109,3 +113,5 @@ public:
}
#endif

View File

@ -15,3 +15,4 @@
#cmakedefine01 USE_GRPC
#cmakedefine01 USE_STATS
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 USE_DATASKETCHES