improve performance

This commit is contained in:
taiyang-li 2023-02-15 21:19:50 +08:00
parent a478e57905
commit ddba07b9c3

View File

@ -1,7 +1,8 @@
#pragma once
#include <cmath>
#include <base/sort.h>
#include <Common/RadixSort.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteHelpers.h>
@ -53,6 +54,8 @@ public:
, compressed(compressed_)
, sampled(sampled_)
{
sampled.reserve(compress_threshold);
backup_sampled.reserve(compress_threshold);
}
bool isCompressed() const { return compressed; }
@ -113,9 +116,8 @@ public:
void compress()
{
withHeadBufferInserted();
auto compressed_samples = doCompress(sampled, 2 * relative_error * count);
std::swap(sampled, compressed_samples);
doCompress(2 * relative_error * count);
compressed = true;
}
@ -165,8 +167,8 @@ public:
// `max(g_ab + delta_ab) <= floor(2 * eps_ab * (n_a + n_b))` since
// `max(g_ab + delta_ab) <= floor(2 * eps_a * n_a) + floor(2 * eps_b * n_b)`
// Finally, one can see how the `insert(x)` operation can be expressed as `merge([(x, 1, 0])`
std::vector<Stats> merged_sampled;
merged_sampled.reserve(sampled.size() + other.sampled.size());
backup_sampled.clear();
backup_sampled.reserve(sampled.size() + other.sampled.size());
double merged_relative_error = std::max(relative_error, other.relative_error);
size_t merged_count = count + other.count;
Int64 additional_self_delta = static_cast<Int64>(std::floor(2 * other.relative_error * other.count));
@ -198,27 +200,28 @@ public:
// Insert it
next_sample.delta += additional_delta;
merged_sampled.push_back(next_sample);
backup_sampled.emplace_back(std::move(next_sample));
}
// Copy the remaining samples from the other list
// (by construction, at most one `while` loop will run)
while (self_idx < sampled.size())
{
merged_sampled.push_back(sampled[self_idx]);
backup_sampled.emplace_back(sampled[self_idx]);
++self_idx;
}
while (other_idx < other.sampled.size())
{
merged_sampled.emplace_back(other.sampled[other_idx]);
backup_sampled.emplace_back(other.sampled[other_idx]);
++other_idx;
}
auto compressed_sampled = doCompress(merged_sampled, 2 * merged_relative_error * merged_count);
compress_threshold = other.compress_threshold;
std::swap(sampled, backup_sampled);
relative_error = merged_relative_error;
std::swap(compressed_sampled, sampled);
count = merged_count;
compress_threshold = other.compress_threshold;
doCompress(2 * merged_relative_error * merged_count);
compressed = true;
}
}
@ -287,13 +290,18 @@ private:
if (head_sampled.empty())
return;
size_t current_count = count;
std::sort(head_sampled.begin(), head_sampled.end());
std::vector<Stats> new_samples;
new_samples.reserve(sampled.size() + head_sampled.size());
bool use_radix_sort = head_sampled.size() >= 256 && (is_arithmetic_v<T> && !is_big_int_v<T>);
if (use_radix_sort)
RadixSort<RadixSortNumTraits<T>>::executeLSD(head_sampled.data(), head_sampled.size());
else
::sort(head_sampled.begin(), head_sampled.end());
backup_sampled.clear();
backup_sampled.reserve(sampled.size() + head_sampled.size());
size_t sample_idx = 0;
size_t ops_idx = 0;
size_t current_count = count;
for (; ops_idx < head_sampled.size(); ++ops_idx)
{
T current_sample = head_sampled[ops_idx];
@ -301,47 +309,47 @@ private:
// Add all the samples before the next observation.
while (sample_idx < sampled.size() && sampled[sample_idx].value <= current_sample)
{
new_samples.push_back(sampled[sample_idx]);
backup_sampled.emplace_back(sampled[sample_idx]);
++sample_idx;
}
// If it is the first one to insert, of if it is the last one
++current_count;
Int64 delta;
if (new_samples.empty() || (sample_idx == sampled.size() && ops_idx == (head_sampled.size() - 1)))
if (backup_sampled.empty() || (sample_idx == sampled.size() && ops_idx == (head_sampled.size() - 1)))
delta = 0;
else
delta = static_cast<Int64>(std::floor(2 * relative_error * current_count));
new_samples.emplace_back(current_sample, 1, delta);
backup_sampled.emplace_back(current_sample, 1, delta);
}
// Add all the remaining existing samples
for (; sample_idx < sampled.size(); ++sample_idx)
new_samples.push_back(sampled[sample_idx]);
backup_sampled.emplace_back(sampled[sample_idx]);
std::swap(sampled, new_samples);
std::swap(sampled, backup_sampled);
head_sampled.clear();
count = current_count;
}
static std::vector<Stats> doCompress(const std::vector<Stats> & current_samples, double merge_threshold)
void doCompress(double merge_threshold)
{
if (current_samples.empty())
return {};
if (sampled.empty())
return;
std::vector<Stats> res;
backup_sampled.clear();
// Start for the last element, which is always part of the set.
// The head contains the current new head, that may be merged with the current element.
auto head = current_samples.back();
ssize_t i = current_samples.size() - 2;
Stats head = sampled.back();
ssize_t i = sampled.size() - 2;
// Do not compress the last element
while (i >= 1)
{
// The current sample:
const auto & sample1 = current_samples[i];
const auto & sample1 = sampled[i];
// Do we need to compress?
if (sample1.g + head.g + head.delta < merge_threshold)
{
@ -351,22 +359,23 @@ private:
else
{
// Prepend the current head, and keep the current sample as target for merging.
res.insert(res.begin(), head);
backup_sampled.push_back(head);
head = sample1;
}
--i;
}
res.insert(res.begin(), head);
backup_sampled.push_back(head);
// If necessary, add the minimum element:
auto curr_head = current_samples.front();
auto curr_head = sampled.front();
// don't add the minimum element if `currentSamples` has only one element (both `currHead` and
// `head` point to the same element)
if (curr_head.value <= head.value && current_samples.size() > 1)
res.insert(res.begin(), current_samples.front());
if (curr_head.value <= head.value && sampled.size() > 1)
backup_sampled.emplace_back(sampled.front());
return res;
std::reverse(backup_sampled.begin(), backup_sampled.end());
std::swap(sampled, backup_sampled);
}
double relative_error;
@ -375,6 +384,8 @@ private:
bool compressed;
std::vector<Stats> sampled;
std::vector<Stats> backup_sampled;
std::vector<T> head_sampled;
static constexpr size_t default_compress_threshold = 10000;