use arena

This commit is contained in:
Mikhail Surin 2018-06-16 02:13:47 +03:00
parent 2f300e9df9
commit 511e608080

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/Arena.h>
#include <Common/PODArray.h>
#include <Common/RadixSort.h>
#include <Columns/ColumnVector.h>
@ -61,20 +62,20 @@ private:
private:
UInt32 max_bins;
UInt32 size;
Mean lower_bound;
Mean upper_bound;
static constexpr UInt32 preallocated_bins = 16;
static constexpr Float32 epsilon = 1e-8;
// Weighted values representation of histogram.
// We allow up to max_bins * 2 values stored in intermediate states
PODArray<WeightedValue, preallocated_bins * sizeof(WeightedValue), Allocator<false>> points;
WeightedValue* points;
private:
void sort()
{
RadixSort<RadixSortTraits>::execute(&points[0], points.size());
RadixSort<RadixSortTraits>::execute(points, size);
}
/**
@ -83,14 +84,14 @@ private:
void compress()
{
sort();
while (points.size() > max_bins)
while (size > max_bins)
{
size_t min_index = 0;
auto quality = [&](size_t i)
{
return points[i + 1].mean - points[i].mean;
};
for (size_t i = 0; i + 1 != points.size(); ++i)
for (size_t i = 0; i + 1 != size; ++i)
{
if (quality(min_index) > quality(i))
{
@ -99,38 +100,46 @@ private:
}
points[min_index] = points[min_index] + points[min_index + 1];
for (size_t i = min_index + 1; i + 1 < points.size(); ++i)
for (size_t i = min_index + 1; i + 1 < size; ++i)
{
points[i] = points[i + 1];
}
points.pop_back();
size--;
}
}
/***
* Delete too close points from histogram
* Delete too close points from histogram.
* Assume that points are sorted.
*/
void unique()
{
auto l = points.begin();
for (auto r = std::next(l); r != points.end(); r++)
size_t l = 0;
for (auto r = l + 1; r != size; r++)
{
if (abs(l->mean - r->mean) < epsilon)
if (abs(points[l].mean - points[r].mean) < epsilon)
{
*l = *l + *r;
points[l] = points[l] + points[r];
}
else
{
l++;
*l = *r;
points[l] = points[r];
}
}
points.resize(l - points.begin() + 1);
size = l + 1;
}
void init(Arena* arena)
{
points = reinterpret_cast<WeightedValue*>(arena->alloc(max_bins * 2 * sizeof(WeightedValue)));
}
public:
AggregateFunctionHistogramData(UInt32 max_bins)
: max_bins(max_bins)
, size(0)
, points(nullptr)
{
}
@ -138,36 +147,39 @@ public:
compress();
unique();
for (size_t i = 0; i < points.size(); i++)
for (size_t i = 0; i < size; i++)
{
to_lower.insert((i == 0) ? lower_bound : (points[i].mean + points[i - 1].mean) / 2);
to_upper.insert((i + 1 == points.size()) ? upper_bound : (points[i].mean + points[i + 1].mean) / 2);
to_upper.insert((i + 1 == size) ? upper_bound : (points[i].mean + points[i + 1].mean) / 2);
// linear density approximation
Weight lower_weight = (i == 0) ? points[i].weight : ((points[i - 1].weight) + points[i].weight * 3) / 4;
Weight upper_weight = (i + 1 == points.size()) ? points[i].weight : (points[i + 1].weight + points[i].weight * 3) / 4;
Weight upper_weight = (i + 1 == size) ? points[i].weight : (points[i + 1].weight + points[i].weight * 3) / 4;
to_weights.insert((lower_weight + upper_weight) / 2);
}
}
void add(Mean value, Weight weight)
void add(Mean value, Weight weight, Arena* arena)
{
points.push_back({value, weight});
if (!points)
init(arena);
points[size++] = {value, weight};
lower_bound = std::min(lower_bound, value);
upper_bound = std::max(upper_bound, value);
if (points.size() >= max_bins * 2)
if (size > max_bins * 2)
{
compress();
}
}
void merge(const AggregateFunctionHistogramData& other)
void merge(const AggregateFunctionHistogramData& other, Arena* arena)
{
lower_bound = std::min(lower_bound, other.lower_bound);
upper_bound = std::max(lower_bound, other.upper_bound);
for (auto pt: other.points) {
add(pt.mean, pt.weight);
for (size_t i = 0; i < other.size; i++)
{
add(other.points[i].mean, other.points[i].weight, arena);
}
}
@ -176,23 +188,24 @@ public:
buf.write(reinterpret_cast<const char *>(&lower_bound), sizeof(lower_bound));
buf.write(reinterpret_cast<const char *>(&upper_bound), sizeof(upper_bound));
writeVarUInt(points.size(), buf);
buf.write(reinterpret_cast<const char *>(&points[0]), points.size() * sizeof(points[0]));
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(points), size * sizeof(WeightedValue));
}
void read(ReadBuffer & buf)
void read(ReadBuffer & buf, Arena* arena)
{
buf.read(reinterpret_cast<char *>(&lower_bound), sizeof(lower_bound));
buf.read(reinterpret_cast<char *>(&upper_bound), sizeof(upper_bound));
size_t size = 0;
readVarUInt(size, buf);
if (size > max_bins)
throw Exception("Too many bins", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
points.resize(size);
buf.read(reinterpret_cast<char *>(&points[0]), size * sizeof(points[0]));
if (!points)
init(arena);
buf.read(reinterpret_cast<char *>(points), size * sizeof(points[0]));
}
};
@ -263,15 +276,20 @@ public:
return std::make_shared<DataTypeArray>(tuple);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
bool allocatesMemoryInArena() const override
{
auto val = static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
this->data(place).add(static_cast<Data::Mean>(val), 1);
return true;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
this->data(place).merge(this->data(rhs));
auto val = static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
this->data(place).add(static_cast<Data::Mean>(val), 1, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -279,9 +297,9 @@ public:
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
this->data(place).read(buf);
this->data(place).read(buf, arena);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override