From 511e608080eb86639b24f8efd01dade7e8c12ba0 Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Sat, 16 Jun 2018 02:13:47 +0300 Subject: [PATCH] use arena --- .../AggregateFunctionHistogram.h | 90 +++++++++++-------- 1 file changed, 54 insertions(+), 36 deletions(-) diff --git a/dbms/src/AggregateFunctions/AggregateFunctionHistogram.h b/dbms/src/AggregateFunctions/AggregateFunctionHistogram.h index deb3533b89b..05d5a08ef5a 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionHistogram.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionHistogram.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -61,20 +62,20 @@ private: private: UInt32 max_bins; + UInt32 size; Mean lower_bound; Mean upper_bound; - static constexpr UInt32 preallocated_bins = 16; static constexpr Float32 epsilon = 1e-8; // Weighted values representation of histogram. // We allow up to max_bins * 2 values stored in intermediate states - PODArray> points; + WeightedValue* points; private: void sort() { - RadixSort::execute(&points[0], points.size()); + RadixSort::execute(points, size); } /** @@ -83,14 +84,14 @@ private: void compress() { sort(); - while (points.size() > max_bins) + while (size > max_bins) { size_t min_index = 0; auto quality = [&](size_t i) { return points[i + 1].mean - points[i].mean; }; - for (size_t i = 0; i + 1 != points.size(); ++i) + for (size_t i = 0; i + 1 != size; ++i) { if (quality(min_index) > quality(i)) { @@ -99,38 +100,46 @@ private: } points[min_index] = points[min_index] + points[min_index + 1]; - for (size_t i = min_index + 1; i + 1 < points.size(); ++i) + for (size_t i = min_index + 1; i + 1 < size; ++i) { points[i] = points[i + 1]; } - points.pop_back(); + size--; } } /*** - * Delete too close points from histogram + * Delete too close points from histogram. + * Assume that points are sorted. */ void unique() { - auto l = points.begin(); - for (auto r = std::next(l); r != points.end(); r++) + size_t l = 0; + for (auto r = l + 1; r != size; r++) { - if (abs(l->mean - r->mean) < epsilon) + if (abs(points[l].mean - points[r].mean) < epsilon) { - *l = *l + *r; + points[l] = points[l] + points[r]; } else { l++; - *l = *r; + points[l] = points[r]; } } - points.resize(l - points.begin() + 1); + size = l + 1; + } + + void init(Arena* arena) + { + points = reinterpret_cast(arena->alloc(max_bins * 2 * sizeof(WeightedValue))); } public: AggregateFunctionHistogramData(UInt32 max_bins) : max_bins(max_bins) + , size(0) + , points(nullptr) { } @@ -138,36 +147,39 @@ public: compress(); unique(); - for (size_t i = 0; i < points.size(); i++) + for (size_t i = 0; i < size; i++) { to_lower.insert((i == 0) ? lower_bound : (points[i].mean + points[i - 1].mean) / 2); - to_upper.insert((i + 1 == points.size()) ? upper_bound : (points[i].mean + points[i + 1].mean) / 2); + to_upper.insert((i + 1 == size) ? upper_bound : (points[i].mean + points[i + 1].mean) / 2); // linear density approximation Weight lower_weight = (i == 0) ? points[i].weight : ((points[i - 1].weight) + points[i].weight * 3) / 4; - Weight upper_weight = (i + 1 == points.size()) ? points[i].weight : (points[i + 1].weight + points[i].weight * 3) / 4; + Weight upper_weight = (i + 1 == size) ? points[i].weight : (points[i + 1].weight + points[i].weight * 3) / 4; to_weights.insert((lower_weight + upper_weight) / 2); } } - void add(Mean value, Weight weight) + void add(Mean value, Weight weight, Arena* arena) { - points.push_back({value, weight}); + if (!points) + init(arena); + points[size++] = {value, weight}; lower_bound = std::min(lower_bound, value); upper_bound = std::max(upper_bound, value); - if (points.size() >= max_bins * 2) + if (size > max_bins * 2) { compress(); } } - void merge(const AggregateFunctionHistogramData& other) + void merge(const AggregateFunctionHistogramData& other, Arena* arena) { lower_bound = std::min(lower_bound, other.lower_bound); upper_bound = std::max(lower_bound, other.upper_bound); - for (auto pt: other.points) { - add(pt.mean, pt.weight); + for (size_t i = 0; i < other.size; i++) + { + add(other.points[i].mean, other.points[i].weight, arena); } } @@ -176,23 +188,24 @@ public: buf.write(reinterpret_cast(&lower_bound), sizeof(lower_bound)); buf.write(reinterpret_cast(&upper_bound), sizeof(upper_bound)); - writeVarUInt(points.size(), buf); - buf.write(reinterpret_cast(&points[0]), points.size() * sizeof(points[0])); + writeVarUInt(size, buf); + buf.write(reinterpret_cast(points), size * sizeof(WeightedValue)); } - void read(ReadBuffer & buf) + void read(ReadBuffer & buf, Arena* arena) { buf.read(reinterpret_cast(&lower_bound), sizeof(lower_bound)); buf.read(reinterpret_cast(&upper_bound), sizeof(upper_bound)); - size_t size = 0; readVarUInt(size, buf); if (size > max_bins) throw Exception("Too many bins", ErrorCodes::TOO_LARGE_ARRAY_SIZE); - points.resize(size); - buf.read(reinterpret_cast(&points[0]), size * sizeof(points[0])); + if (!points) + init(arena); + + buf.read(reinterpret_cast(points), size * sizeof(points[0])); } }; @@ -263,15 +276,20 @@ public: return std::make_shared(tuple); } - void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override + bool allocatesMemoryInArena() const override { - auto val = static_cast &>(*columns[0]).getData()[row_num]; - this->data(place).add(static_cast(val), 1); + return true; } - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { - this->data(place).merge(this->data(rhs)); + auto val = static_cast &>(*columns[0]).getData()[row_num]; + this->data(place).add(static_cast(val), 1, arena); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).merge(this->data(rhs), arena); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override @@ -279,9 +297,9 @@ public: this->data(place).write(buf); } - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override { - this->data(place).read(buf); + this->data(place).read(buf, arena); } void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override