mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
implement n log(n) compression algorigthm
This commit is contained in:
parent
b09323c547
commit
984fe8e4fd
@ -7,27 +7,36 @@
|
||||
|
||||
namespace DB {
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
AggregateFunctionPtr createAggregateFunctionHistogram(const std::string & name, const DataTypes & arguments, const Array & params)
|
||||
{
|
||||
if (params.size() != 1)
|
||||
{
|
||||
throw Exception("Function " + name + " requires only bins count");
|
||||
}
|
||||
throw Exception("Function " + name + " requires bins count", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
UInt32 bins_count = applyVisitor(FieldVisitorConvertToNumber<UInt32>(), params[0]);
|
||||
|
||||
if (bins_count == 0)
|
||||
throw Exception("Bin count should be positive", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
assertUnary(name, arguments);
|
||||
|
||||
UInt32 bins_count;
|
||||
|
||||
#define READ(VAL, PARAM) \
|
||||
VAL = applyVisitor(FieldVisitorConvertToNumber<decltype(VAL)>(), PARAM);
|
||||
|
||||
READ(bins_count, params[0]);
|
||||
#undef READ
|
||||
|
||||
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionHistogram>(*arguments[0], bins_count));
|
||||
|
||||
if (!res)
|
||||
throw Exception("Illegal type " + arguments[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction("histogram", createAggregateFunctionHistogram);
|
||||
|
@ -2,7 +2,8 @@
|
||||
|
||||
#include <Common/Arena.h>
|
||||
#include <Common/PODArray.h>
|
||||
#include <Common/RadixSort.h>
|
||||
#include <Common/AutoArray.h>
|
||||
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
@ -17,6 +18,8 @@
|
||||
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
#include <queue>
|
||||
|
||||
namespace DB {
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -31,8 +34,8 @@ namespace ErrorCodes
|
||||
class AggregateFunctionHistogramData
|
||||
{
|
||||
public:
|
||||
using Mean = Float32;
|
||||
using Weight = Float32;
|
||||
using Mean = Float64;
|
||||
using Weight = Float64;
|
||||
|
||||
private:
|
||||
struct WeightedValue {
|
||||
@ -41,83 +44,113 @@ private:
|
||||
|
||||
WeightedValue operator + (const WeightedValue& other)
|
||||
{
|
||||
return {(other.mean * other.weight + mean * weight) / (other.weight + weight), other.weight + weight};
|
||||
return {mean + other.weight * (other.mean - mean) / (other.weight + weight), other.weight + weight};
|
||||
}
|
||||
};
|
||||
|
||||
struct RadixSortTraits
|
||||
{
|
||||
using Element = WeightedValue;
|
||||
using Key = Mean;
|
||||
using CountType = UInt32;
|
||||
using KeyBits = UInt32;
|
||||
|
||||
static constexpr size_t PART_SIZE_BITS = 8;
|
||||
|
||||
using Transform = RadixSortFloatTransform<KeyBits>;
|
||||
using Allocator = RadixSortMallocAllocator;
|
||||
|
||||
static Key & extractKey(Element & elem) { return elem.mean; }
|
||||
};
|
||||
|
||||
private:
|
||||
UInt32 max_bins;
|
||||
|
||||
// quantity of stored weighted-values
|
||||
UInt32 size;
|
||||
|
||||
// calculated lower and upper bounds of seen points
|
||||
Mean lower_bound;
|
||||
Mean upper_bound;
|
||||
|
||||
static constexpr Float32 epsilon = 1e-8;
|
||||
static constexpr Mean epsilon = 1e-8;
|
||||
|
||||
// Weighted values representation of histogram.
|
||||
// We allow up to max_bins * 2 values stored in intermediate states
|
||||
// Is allocated in arena, so there is no explicit management.
|
||||
WeightedValue* points;
|
||||
|
||||
private:
|
||||
void sort()
|
||||
{
|
||||
RadixSort<RadixSortTraits>::execute(points, size);
|
||||
std::sort(points, points + size,
|
||||
[](const WeightedValue& first, const WeightedValue& second)
|
||||
{
|
||||
return first.mean < second.mean;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeatedly fuse until max_bins bins left
|
||||
* Repeatedly fuse most close values until max_bins bins left
|
||||
*/
|
||||
void compress()
|
||||
{
|
||||
sort();
|
||||
while (size > max_bins)
|
||||
{
|
||||
size_t min_index = 0;
|
||||
auto quality = [&](size_t i)
|
||||
{
|
||||
return points[i + 1].mean - points[i].mean;
|
||||
};
|
||||
for (size_t i = 0; i + 1 != size; ++i)
|
||||
{
|
||||
if (quality(min_index) > quality(i))
|
||||
{
|
||||
min_index = i;
|
||||
}
|
||||
}
|
||||
auto newsz = size;
|
||||
if (size <= max_bins) return;
|
||||
|
||||
points[min_index] = points[min_index] + points[min_index + 1];
|
||||
for (size_t i = min_index + 1; i + 1 < size; ++i)
|
||||
{
|
||||
points[i] = points[i + 1];
|
||||
}
|
||||
size--;
|
||||
// Maintain doubly-linked list of "active" points
|
||||
// and store neighbour pairs in priority queue by distance
|
||||
AutoArray<UInt32> previous(size + 1);
|
||||
AutoArray<UInt32> next(size + 1);
|
||||
AutoArray<bool> active(size + 1, true);
|
||||
active[size] = false;
|
||||
auto delete_node = [&](UInt32 i)
|
||||
{
|
||||
previous[next[i]] = previous[i];
|
||||
next[previous[i]] = next[i];
|
||||
active[i] = false;
|
||||
};
|
||||
for (size_t i = 0; i <= size; i++)
|
||||
{
|
||||
previous[i] = i - 1;
|
||||
next[i] = i + 1;
|
||||
}
|
||||
next[size] = 0;
|
||||
previous[0] = size;
|
||||
|
||||
using QueueItem = std::pair<Mean, UInt32>;
|
||||
|
||||
std::vector<QueueItem> storage;
|
||||
storage.reserve(2 * size - max_bins);
|
||||
std::priority_queue<QueueItem, std::vector<QueueItem>, std::greater<QueueItem>> queue;
|
||||
|
||||
auto quality = [&](UInt32 i) { return points[next[i]].mean - points[i].mean; };
|
||||
for (size_t i = 0; i + 1 < size; i++)
|
||||
queue.push({quality(i), i});
|
||||
|
||||
while (newsz > max_bins && !queue.empty())
|
||||
{
|
||||
auto min_item = queue.top();
|
||||
queue.pop();
|
||||
auto l = min_item.second;
|
||||
auto r = next[l];
|
||||
if (!active[l] || !active[r] || quality(l) > min_item.first)
|
||||
continue;
|
||||
|
||||
points[l] = points[l] + points[r];
|
||||
|
||||
delete_node(r);
|
||||
if (active[next[l]])
|
||||
queue.push({quality(l), l});
|
||||
if (active[previous[l]])
|
||||
queue.push({quality(previous[l]), previous[l]});
|
||||
|
||||
newsz--;
|
||||
}
|
||||
|
||||
size_t l = 0;
|
||||
for (size_t r = 0; r < size; r++)
|
||||
if (active[r])
|
||||
points[l++] = points[r];
|
||||
size = newsz;
|
||||
}
|
||||
|
||||
/***
|
||||
* Delete too close points from histogram.
|
||||
* Assume that points are sorted.
|
||||
* Assumes that points are sorted.
|
||||
*/
|
||||
void unique()
|
||||
{
|
||||
size_t l = 0;
|
||||
for (auto r = l + 1; r != size; r++)
|
||||
for (auto r = l + 1; r < size; r++)
|
||||
{
|
||||
if (abs(points[l].mean - points[r].mean) < epsilon)
|
||||
if (points[l].mean + epsilon >= points[r].mean)
|
||||
{
|
||||
points[l] = points[l] + points[r];
|
||||
}
|
||||
@ -139,11 +172,15 @@ public:
|
||||
AggregateFunctionHistogramData(UInt32 max_bins)
|
||||
: max_bins(max_bins)
|
||||
, size(0)
|
||||
, lower_bound(std::numeric_limits<Mean>::max())
|
||||
, upper_bound(std::numeric_limits<Mean>::lowest())
|
||||
, points(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
void insertResultInto(ColumnVector<Mean>& to_lower, ColumnVector<Mean>& to_upper, ColumnVector<Weight>& to_weights) {
|
||||
if (!points) return;
|
||||
|
||||
compress();
|
||||
unique();
|
||||
|
||||
@ -167,7 +204,7 @@ public:
|
||||
lower_bound = std::min(lower_bound, value);
|
||||
upper_bound = std::max(upper_bound, value);
|
||||
|
||||
if (size > max_bins * 2)
|
||||
if (size >= max_bins * 2)
|
||||
{
|
||||
compress();
|
||||
}
|
||||
@ -199,7 +236,7 @@ public:
|
||||
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (size > max_bins)
|
||||
if (size > max_bins * 2)
|
||||
throw Exception("Too many bins", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
if (!points)
|
||||
|
Loading…
Reference in New Issue
Block a user