diff --git a/src/AggregateFunctions/ThetaSketchData.h b/src/AggregateFunctions/ThetaSketchData.h index 0604c0eca64..bf419020609 100644 --- a/src/AggregateFunctions/ThetaSketchData.h +++ b/src/AggregateFunctions/ThetaSketchData.h @@ -19,16 +19,17 @@ class ThetaSketchData : private boost::noncopyable private: mutable datasketches::update_theta_sketch sk_update; mutable datasketches::theta_union sk_union; + bool is_merged; Poco::Logger * log; - void internal_merge() const - { - if (!sk_update.is_empty()) - { - sk_union.update(sk_update); - sk_update = datasketches::update_theta_sketch::builder().build(); - } - } + // void internal_merge() const + // { + // if (!sk_update.is_empty()) + // { + // sk_union.update(sk_update); + // sk_update = datasketches::update_theta_sketch::builder().build(); + // } + // } public: using value_type = Key; @@ -36,63 +37,84 @@ public: ThetaSketchData() : sk_update(datasketches::update_theta_sketch::builder().build()), sk_union(datasketches::theta_union::builder().build()), + is_merged(false), log(&Poco::Logger::get("ThetaSketchData")) { } ~ThetaSketchData() = default; /// Insert original value without hash, as `datasketches::update_theta_sketch.update` will do the hash internal. - void insert_original(StringRef value) + void insert_original(const StringRef & value) { sk_update.update(value.data, value.size); - LOG_WARNING(log, "insert_origin() {}, {}", value.toString(), sk_update.to_string()); + LOG_WARNING(log, "insert_origin() {}", value.toString()); } /// Note that `datasketches::update_theta_sketch.update` will do the hash again. void insert(Key value) { sk_update.update(value); - LOG_WARNING(log, "insert() {}, {}", value, sk_update.to_string()); + LOG_WARNING(log, "insert() {}", value); } UInt64 size() const { LOG_WARNING(log, "size() update:{}, union:{}", sk_update.get_estimate(), sk_union.get_result().get_estimate()); - internal_merge(); - return static_cast(sk_union.get_result().get_estimate()); + if (!is_merged) + return static_cast(sk_update.get_estimate()); + else + return static_cast(sk_union.get_result().get_estimate()); } void merge(const ThetaSketchData & rhs) { - rhs.internal_merge(); - sk_union.update(rhs.sk_union.get_result()); + if (!is_merged && !sk_update.is_empty()) + { + sk_union.update(sk_update); + } + is_merged = true; - LOG_WARNING(log, "merge() result:{}, rhs:{}", sk_union.get_result().to_string(), rhs.sk_union.get_result().to_string()); + if (!rhs.is_merged && !rhs.sk_update.is_empty()) + sk_union.update(rhs.sk_update); + else if (rhs.is_merged) + sk_union.update(rhs.sk_union.get_result()); + + LOG_WARNING(log, "merge() result:{}", sk_union.get_result().to_string()); } /// You can only call for an empty object. void read(DB::ReadBuffer & in) { + LOG_WARNING(log, "read() {}", sk_union.get_result().to_string()); + datasketches::compact_theta_sketch::vector_bytes bytes; readVectorBinary(bytes, in); auto sk = datasketches::compact_theta_sketch::deserialize(bytes.data(), bytes.size()); sk_union = datasketches::theta_union::builder().build(); sk_union.update(sk); + is_merged = true; - LOG_WARNING(log, "read() {}", sk_union.get_result().to_string()); + LOG_WARNING(log, "read()[after] {}", sk_union.get_result().to_string()); } - void readAndMerge(DB::ReadBuffer &) - { - assert(0); - } + // void readAndMerge(DB::ReadBuffer &) + // { + // LOG_WARNING(log, "readAndMerge() {}", sk_union.get_result().to_string()); + // } void write(DB::WriteBuffer & out) const { - internal_merge(); - auto bytes = sk_union.get_result().serialize(); - writeVectorBinary(bytes, out); + if (!is_merged) + { + auto bytes = sk_update.compact().serialize(); + writeVectorBinary(bytes, out); + } + else + { + auto bytes = sk_union.get_result().serialize(); + writeVectorBinary(bytes, out); + } LOG_WARNING(log, "write() {}", sk_union.get_result().to_string()); } diff --git a/src/Interpreters/RemoveInjectiveFunctionsVisitor.cpp b/src/Interpreters/RemoveInjectiveFunctionsVisitor.cpp index ae575b8aae7..04ae065da31 100644 --- a/src/Interpreters/RemoveInjectiveFunctionsVisitor.cpp +++ b/src/Interpreters/RemoveInjectiveFunctionsVisitor.cpp @@ -12,7 +12,8 @@ namespace DB static bool isUniq(const ASTFunction & func) { return func.name == "uniq" || func.name == "uniqExact" || func.name == "uniqHLL12" - || func.name == "uniqCombined" || func.name == "uniqCombined64"; + || func.name == "uniqCombined" || func.name == "uniqCombined64" + || func.name == "uniqThetaSketchState"; } /// Remove injective functions of one argument: replace with a child