This commit is contained in:
pingyu 2021-03-29 00:46:50 +08:00
parent 48b745206c
commit 1ac42e003c
2 changed files with 48 additions and 25 deletions

View File

@ -19,16 +19,17 @@ class ThetaSketchData : private boost::noncopyable
private:
mutable datasketches::update_theta_sketch sk_update;
mutable datasketches::theta_union sk_union;
bool is_merged;
Poco::Logger * log;
void internal_merge() const
{
if (!sk_update.is_empty())
{
sk_union.update(sk_update);
sk_update = datasketches::update_theta_sketch::builder().build();
}
}
// void internal_merge() const
// {
// if (!sk_update.is_empty())
// {
// sk_union.update(sk_update);
// sk_update = datasketches::update_theta_sketch::builder().build();
// }
// }
public:
using value_type = Key;
@ -36,63 +37,84 @@ public:
ThetaSketchData()
: sk_update(datasketches::update_theta_sketch::builder().build()),
sk_union(datasketches::theta_union::builder().build()),
is_merged(false),
log(&Poco::Logger::get("ThetaSketchData"))
{
}
~ThetaSketchData() = default;
/// Insert original value without hash, as `datasketches::update_theta_sketch.update` will do the hash internal.
void insert_original(StringRef value)
void insert_original(const StringRef & value)
{
sk_update.update(value.data, value.size);
LOG_WARNING(log, "insert_origin() {}, {}", value.toString(), sk_update.to_string());
LOG_WARNING(log, "insert_origin() {}", value.toString());
}
/// Note that `datasketches::update_theta_sketch.update` will do the hash again.
void insert(Key value)
{
sk_update.update(value);
LOG_WARNING(log, "insert() {}, {}", value, sk_update.to_string());
LOG_WARNING(log, "insert() {}", value);
}
UInt64 size() const
{
LOG_WARNING(log, "size() update:{}, union:{}", sk_update.get_estimate(), sk_union.get_result().get_estimate());
internal_merge();
return static_cast<UInt64>(sk_union.get_result().get_estimate());
if (!is_merged)
return static_cast<UInt64>(sk_update.get_estimate());
else
return static_cast<UInt64>(sk_union.get_result().get_estimate());
}
void merge(const ThetaSketchData & rhs)
{
rhs.internal_merge();
sk_union.update(rhs.sk_union.get_result());
if (!is_merged && !sk_update.is_empty())
{
sk_union.update(sk_update);
}
is_merged = true;
LOG_WARNING(log, "merge() result:{}, rhs:{}", sk_union.get_result().to_string(), rhs.sk_union.get_result().to_string());
if (!rhs.is_merged && !rhs.sk_update.is_empty())
sk_union.update(rhs.sk_update);
else if (rhs.is_merged)
sk_union.update(rhs.sk_union.get_result());
LOG_WARNING(log, "merge() result:{}", sk_union.get_result().to_string());
}
/// You can only call for an empty object.
void read(DB::ReadBuffer & in)
{
LOG_WARNING(log, "read() {}", sk_union.get_result().to_string());
datasketches::compact_theta_sketch::vector_bytes bytes;
readVectorBinary(bytes, in);
auto sk = datasketches::compact_theta_sketch::deserialize(bytes.data(), bytes.size());
sk_union = datasketches::theta_union::builder().build();
sk_union.update(sk);
is_merged = true;
LOG_WARNING(log, "read() {}", sk_union.get_result().to_string());
LOG_WARNING(log, "read()[after] {}", sk_union.get_result().to_string());
}
void readAndMerge(DB::ReadBuffer &)
{
assert(0);
}
// void readAndMerge(DB::ReadBuffer &)
// {
// LOG_WARNING(log, "readAndMerge() {}", sk_union.get_result().to_string());
// }
void write(DB::WriteBuffer & out) const
{
internal_merge();
auto bytes = sk_union.get_result().serialize();
writeVectorBinary(bytes, out);
if (!is_merged)
{
auto bytes = sk_update.compact().serialize();
writeVectorBinary(bytes, out);
}
else
{
auto bytes = sk_union.get_result().serialize();
writeVectorBinary(bytes, out);
}
LOG_WARNING(log, "write() {}", sk_union.get_result().to_string());
}

View File

@ -12,7 +12,8 @@ namespace DB
static bool isUniq(const ASTFunction & func)
{
return func.name == "uniq" || func.name == "uniqExact" || func.name == "uniqHLL12"
|| func.name == "uniqCombined" || func.name == "uniqCombined64";
|| func.name == "uniqCombined" || func.name == "uniqCombined64"
|| func.name == "uniqThetaSketchState";
}
/// Remove injective functions of one argument: replace with a child