ClickHouse/dbms/src/AggregateFunctions/AggregateFunctionHistogram.h

329 lines
9.4 KiB
C++
Raw Normal View History

2018-06-22 18:30:09 +00:00
#pragma once
#include <Common/Arena.h>
#include <Common/PODArray.h>
#include <Common/AutoArray.h>
2018-06-22 18:30:09 +00:00
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/VarInt.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <queue>
2018-07-05 11:33:59 +00:00
#include <stddef.h>
2018-07-04 22:28:15 +00:00
namespace DB
{
2018-06-22 18:30:09 +00:00
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
}
/**
* distance compression algorigthm implementation
* http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
*/
class AggregateFunctionHistogramData
{
public:
using Mean = Float64;
using Weight = Float64;
2018-06-22 18:30:09 +00:00
private:
2018-07-04 21:49:32 +00:00
struct WeightedValue
{
2018-06-22 18:30:09 +00:00
Mean mean;
Weight weight;
2018-07-05 13:14:05 +00:00
WeightedValue operator+ (const WeightedValue& other)
2018-06-22 18:30:09 +00:00
{
return {mean + other.weight * (other.mean - mean) / (other.weight + weight), other.weight + weight};
2018-06-22 18:30:09 +00:00
}
};
private:
// quantity of stored weighted-values
2018-06-22 18:30:09 +00:00
UInt32 size;
// calculated lower and upper bounds of seen points
2018-06-22 18:30:09 +00:00
Mean lower_bound;
Mean upper_bound;
static constexpr Mean epsilon = 1e-8;
2018-06-22 18:30:09 +00:00
// Weighted values representation of histogram.
2018-07-05 11:33:59 +00:00
WeightedValue points[0];
2018-06-22 18:30:09 +00:00
private:
void sort()
{
std::sort(points, points + size,
2018-07-04 21:49:32 +00:00
[](const WeightedValue & first, const WeightedValue & second)
{
return first.mean < second.mean;
});
2018-06-22 18:30:09 +00:00
}
/**
* Repeatedly fuse most close values until max_bins bins left
2018-06-22 18:30:09 +00:00
*/
2018-07-05 11:33:59 +00:00
void compress(UInt32 max_bins)
2018-06-22 18:30:09 +00:00
{
sort();
2018-07-04 21:49:32 +00:00
auto new_size = size;
if (size <= max_bins)
return;
// Maintain doubly-linked list of "active" points
// and store neighbour pairs in priority queue by distance
AutoArray<UInt32> previous(size + 1);
AutoArray<UInt32> next(size + 1);
AutoArray<bool> active(size + 1, true);
active[size] = false;
2018-07-04 21:49:32 +00:00
auto delete_node = [&](UInt32 i)
2018-06-22 18:30:09 +00:00
{
previous[next[i]] = previous[i];
next[previous[i]] = next[i];
active[i] = false;
};
2018-07-04 21:49:32 +00:00
for (size_t i = 0; i <= size; ++i)
{
previous[i] = i - 1;
next[i] = i + 1;
}
2018-07-04 21:49:32 +00:00
next[size] = 0;
previous[0] = size;
2018-06-22 18:30:09 +00:00
using QueueItem = std::pair<Mean, UInt32>;
std::vector<QueueItem> storage;
storage.reserve(2 * size - max_bins);
std::priority_queue<QueueItem, std::vector<QueueItem>, std::greater<QueueItem>> queue;
auto quality = [&](UInt32 i) { return points[next[i]].mean - points[i].mean; };
2018-07-04 21:49:32 +00:00
for (size_t i = 0; i + 1 < size; ++i)
queue.push({quality(i), i});
2018-07-04 21:49:32 +00:00
while (new_size > max_bins && !queue.empty())
{
auto min_item = queue.top();
queue.pop();
2018-07-04 21:49:32 +00:00
auto left = min_item.second;
auto right = next[left];
if (!active[left] || !active[right] || quality(left) > min_item.first)
continue;
2018-07-04 21:49:32 +00:00
points[left] = points[left] + points[right];
2018-07-04 21:49:32 +00:00
delete_node(right);
if (active[next[left]])
queue.push({quality(left), left});
if (active[previous[left]])
queue.push({quality(previous[left]), previous[left]});
2018-07-04 21:49:32 +00:00
--new_size;
2018-06-22 18:30:09 +00:00
}
2018-07-04 21:49:32 +00:00
size_t left = 0;
for (size_t right = 0; right < size; ++right)
{
if (active[right])
{
points[left] = points[right];
++left;
}
}
size = new_size;
2018-06-22 18:30:09 +00:00
}
/***
* Delete too close points from histogram.
* Assumes that points are sorted.
2018-06-22 18:30:09 +00:00
*/
void unique()
{
2018-07-04 21:49:32 +00:00
size_t left = 0;
2018-07-05 13:14:05 +00:00
for (auto right = left + 1; right < size; right++)
2018-06-22 18:30:09 +00:00
{
2018-07-05 13:14:05 +00:00
if (points[left].mean + epsilon >= points[right].mean)
2018-06-22 18:30:09 +00:00
{
2018-07-05 13:14:05 +00:00
points[left] = points[left] + points[right];
2018-06-22 18:30:09 +00:00
}
else
{
2018-07-04 21:49:32 +00:00
++left;
2018-07-05 13:14:05 +00:00
points[left] = points[right];
2018-06-22 18:30:09 +00:00
}
}
2018-07-04 21:49:32 +00:00
size = left + 1;
2018-06-22 18:30:09 +00:00
}
public:
2018-07-05 11:33:59 +00:00
AggregateFunctionHistogramData()
: size(0)
, lower_bound(std::numeric_limits<Mean>::max())
, upper_bound(std::numeric_limits<Mean>::lowest())
2018-06-22 18:30:09 +00:00
{
2018-07-05 11:33:59 +00:00
static_assert(offsetof(AggregateFunctionHistogramData, points) == sizeof(AggregateFunctionHistogramData), "points should be last member");
2018-06-22 18:30:09 +00:00
}
2018-07-05 13:14:05 +00:00
static size_t structSize(size_t max_bins)
2018-06-22 18:30:09 +00:00
{
2018-07-05 11:33:59 +00:00
return sizeof(AggregateFunctionHistogramData) + max_bins * 2 * sizeof(WeightedValue);
}
2018-07-05 11:33:59 +00:00
void insertResultInto(ColumnVector<Mean>& to_lower, ColumnVector<Mean>& to_upper, ColumnVector<Weight>& to_weights, UInt32 max_bins) {
compress(max_bins);
2018-06-22 18:30:09 +00:00
unique();
2018-07-04 21:49:32 +00:00
for (size_t i = 0; i < size; ++i)
2018-06-22 18:30:09 +00:00
{
to_lower.insert((i == 0) ? lower_bound : (points[i].mean + points[i - 1].mean) / 2);
to_upper.insert((i + 1 == size) ? upper_bound : (points[i].mean + points[i + 1].mean) / 2);
// linear density approximation
Weight lower_weight = (i == 0) ? points[i].weight : ((points[i - 1].weight) + points[i].weight * 3) / 4;
Weight upper_weight = (i + 1 == size) ? points[i].weight : (points[i + 1].weight + points[i].weight * 3) / 4;
to_weights.insert((lower_weight + upper_weight) / 2);
}
}
2018-07-05 11:33:59 +00:00
void add(Mean value, Weight weight, UInt32 max_bins)
2018-06-22 18:30:09 +00:00
{
2018-07-04 21:49:32 +00:00
points[size] = {value, weight};
++size;
2018-06-22 18:30:09 +00:00
lower_bound = std::min(lower_bound, value);
upper_bound = std::max(upper_bound, value);
if (size >= max_bins * 2)
2018-07-05 11:33:59 +00:00
compress(max_bins);
2018-06-22 18:30:09 +00:00
}
2018-07-05 11:33:59 +00:00
void merge(const AggregateFunctionHistogramData& other, UInt32 max_bins)
2018-06-22 18:30:09 +00:00
{
lower_bound = std::min(lower_bound, other.lower_bound);
upper_bound = std::max(lower_bound, other.upper_bound);
for (size_t i = 0; i < other.size; i++)
{
2018-07-05 11:33:59 +00:00
add(other.points[i].mean, other.points[i].weight, max_bins);
2018-06-22 18:30:09 +00:00
}
}
void write(WriteBuffer & buf) const
{
buf.write(reinterpret_cast<const char *>(&lower_bound), sizeof(lower_bound));
buf.write(reinterpret_cast<const char *>(&upper_bound), sizeof(upper_bound));
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(points), size * sizeof(WeightedValue));
}
2018-07-05 11:33:59 +00:00
void read(ReadBuffer & buf, UInt32 max_bins)
2018-06-22 18:30:09 +00:00
{
buf.read(reinterpret_cast<char *>(&lower_bound), sizeof(lower_bound));
buf.read(reinterpret_cast<char *>(&upper_bound), sizeof(upper_bound));
readVarUInt(size, buf);
if (size > max_bins * 2)
2018-06-22 18:30:09 +00:00
throw Exception("Too many bins", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
2018-07-05 13:14:05 +00:00
buf.read(reinterpret_cast<char *>(points), size * sizeof(WeightedValue));
2018-06-22 18:30:09 +00:00
}
};
template <typename T>
2018-07-04 21:49:32 +00:00
class AggregateFunctionHistogram final: public IAggregateFunctionDataHelper<AggregateFunctionHistogramData, AggregateFunctionHistogram<T>>
2018-06-22 18:30:09 +00:00
{
private:
using Data = AggregateFunctionHistogramData;
2018-07-04 22:28:15 +00:00
const UInt32 max_bins;
2018-06-22 18:30:09 +00:00
public:
AggregateFunctionHistogram(UInt32 max_bins)
: max_bins(max_bins)
{
}
size_t sizeOfData() const override
{
2018-07-05 11:33:59 +00:00
return Data::structSize(max_bins);
2018-06-22 18:30:09 +00:00
}
DataTypePtr getReturnType() const override
{
DataTypes types;
auto mean = std::make_shared<DataTypeNumber<Data::Mean>>();
auto weight = std::make_shared<DataTypeNumber<Data::Weight>>();
// lower bound
types.emplace_back(mean);
// upper bound
types.emplace_back(mean);
// weight
types.emplace_back(weight);
2018-07-04 22:28:15 +00:00
auto tuple = std::make_shared<DataTypeTuple>(types);
2018-06-22 18:30:09 +00:00
return std::make_shared<DataTypeArray>(tuple);
}
2018-07-05 11:33:59 +00:00
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
2018-06-22 18:30:09 +00:00
{
auto val = static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
2018-07-05 11:33:59 +00:00
this->data(place).add(static_cast<Data::Mean>(val), 1, max_bins);
2018-06-22 18:30:09 +00:00
}
2018-07-05 11:33:59 +00:00
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
2018-06-22 18:30:09 +00:00
{
2018-07-05 11:33:59 +00:00
this->data(place).merge(this->data(rhs), max_bins);
2018-06-22 18:30:09 +00:00
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}
2018-07-05 11:33:59 +00:00
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
2018-06-22 18:30:09 +00:00
{
2018-07-05 11:33:59 +00:00
this->data(place).read(buf, max_bins);
2018-06-22 18:30:09 +00:00
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto& data = this->data(const_cast<AggregateDataPtr>(place));
auto & to_array = static_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = to_array.getOffsets();
auto & to_tuple = static_cast<ColumnTuple &>(to_array.getData());
auto & to_lower = static_cast<ColumnVector<Data::Mean> &>(to_tuple.getColumn(0));
auto & to_upper = static_cast<ColumnVector<Data::Mean> &>(to_tuple.getColumn(1));
auto & to_weights = static_cast<ColumnVector<Data::Weight> &>(to_tuple.getColumn(2));
2018-07-05 11:33:59 +00:00
data.insertResultInto(to_lower, to_upper, to_weights, max_bins);
2018-06-22 18:30:09 +00:00
offsets_to.push_back(to_tuple.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
String getName() const override { return "histogram"; }
};
}