Merge pull request #29595 from CurtizJ/generalize-update-permutation

Generalize code in `IColumn::updatePermutation`
This commit is contained in:
Nikita Mikhaylov 2021-10-04 20:36:57 +03:00 committed by GitHub
commit 613b814e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 222 additions and 474 deletions

View File

@ -1,5 +1,8 @@
#pragma once
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#if !defined(ARCADIA_BUILD)
# include <miniselect/floyd_rivest_select.h> // Y_IGNORE
#else
@ -34,4 +37,7 @@ void partial_sort(RandomIt first, RandomIt middle, RandomIt last, Compare compar
#else
::std::partial_sort(first, middle, last, compare);
#endif
#pragma GCC diagnostic pop
}

View File

@ -361,7 +361,9 @@ endif()
target_include_directories(clickhouse_common_io PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/Core/include") # uses some includes from core
dbms_target_include_directories(PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/Core/include")
target_include_directories(clickhouse_common_io BEFORE PUBLIC ${PDQSORT_INCLUDE_DIR})
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${PDQSORT_INCLUDE_DIR})
target_include_directories(clickhouse_common_io BEFORE PUBLIC ${MINISELECT_INCLUDE_DIR})
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${MINISELECT_INCLUDE_DIR})
if (ZSTD_LIBRARY)

View File

@ -386,11 +386,8 @@ bool ColumnArray::hasEqualValues() const
return hasEqualValuesImpl<ColumnArray>();
}
namespace
{
template <bool positive>
struct Cmp
struct ColumnArray::Cmp
{
const ColumnArray & parent;
int nan_direction_hint;
@ -406,13 +403,14 @@ struct Cmp
res = parent.compareAtWithCollation(lhs, rhs, parent, nan_direction_hint, *collator);
else
res = parent.compareAt(lhs, rhs, parent, nan_direction_hint);
return positive ? res : -res;
if constexpr (positive)
return res;
else
return -res;
}
};
}
void ColumnArray::reserve(size_t n)
{
getOffsets().reserve(n);
@ -854,82 +852,6 @@ void ColumnArray::getPermutationImpl(size_t limit, Permutation & res, Comparator
std::sort(res.begin(), res.end(), less);
}
template <typename Comparator>
void ColumnArray::updatePermutationImpl(size_t limit, Permutation & res, EqualRanges & equal_range, Comparator cmp) const
{
if (equal_range.empty())
return;
if (limit >= size() || limit >= equal_range.back().second)
limit = 0;
size_t number_of_ranges = equal_range.size();
if (limit)
--number_of_ranges;
auto less = [&cmp](size_t lhs, size_t rhs){ return cmp(lhs, rhs) < 0; };
EqualRanges new_ranges;
for (size_t i = 0; i < number_of_ranges; ++i)
{
const auto & [first, last] = equal_range[i];
std::sort(res.begin() + first, res.begin() + last, less);
auto new_first = first;
for (auto j = first + 1; j < last; ++j)
{
if (cmp(res[new_first], res[j]) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
if (last - new_first > 1)
new_ranges.emplace_back(new_first, last);
}
if (limit)
{
const auto & [first, last] = equal_range.back();
if (limit < first || limit > last)
return;
/// Since then we are working inside the interval.
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, less);
auto new_first = first;
for (auto j = first + 1; j < limit; ++j)
{
if (cmp(res[new_first], res[j]) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
auto new_last = limit;
for (auto j = limit; j < last; ++j)
{
if (cmp(res[new_first], res[j]) == 0)
{
std::swap(res[new_last], res[j]);
++new_last;
}
}
if (new_last - new_first > 1)
{
new_ranges.emplace_back(new_first, new_last);
}
}
equal_range = std::move(new_ranges);
}
void ColumnArray::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
{
if (reverse)

View File

@ -28,6 +28,8 @@ private:
ColumnArray(const ColumnArray &) = default;
template <bool positive> struct Cmp;
public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
@ -185,9 +187,6 @@ private:
template <typename Comparator>
void getPermutationImpl(size_t limit, Permutation & res, Comparator cmp) const;
template <typename Comparator>
void updatePermutationImpl(size_t limit, Permutation & res, EqualRanges & equal_range, Comparator cmp) const;
};

View File

@ -150,82 +150,20 @@ void ColumnDecimal<T>::getPermutation(bool reverse, size_t limit, int , IColumn:
template <is_decimal T>
void ColumnDecimal<T>::updatePermutation(bool reverse, size_t limit, int, IColumn::Permutation & res, EqualRanges & equal_ranges) const
{
if (equal_ranges.empty())
return;
if (limit >= data.size() || limit >= equal_ranges.back().second)
limit = 0;
size_t number_of_ranges = equal_ranges.size();
if (limit)
--number_of_ranges;
EqualRanges new_ranges;
SCOPE_EXIT({equal_ranges = std::move(new_ranges);});
for (size_t i = 0; i < number_of_ranges; ++i)
{
const auto& [first, last] = equal_ranges[i];
if (reverse)
std::sort(res.begin() + first, res.begin() + last,
[this](size_t a, size_t b) { return data[a] > data[b]; });
else
std::sort(res.begin() + first, res.begin() + last,
[this](size_t a, size_t b) { return data[a] < data[b]; });
auto new_first = first;
for (auto j = first + 1; j < last; ++j)
{
if (data[res[new_first]] != data[res[j]])
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
if (last - new_first > 1)
new_ranges.emplace_back(new_first, last);
}
if (limit)
{
const auto & [first, last] = equal_ranges.back();
if (limit < first || limit > last)
return;
/// Since then we are working inside the interval.
auto equals = [this](size_t lhs, size_t rhs) { return data[lhs] == data[rhs]; };
auto sort = [](auto begin, auto end, auto pred) { std::sort(begin, end, pred); };
auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); };
if (reverse)
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last,
[this](size_t a, size_t b) { return data[a] > data[b]; });
this->updatePermutationImpl(
limit, res, equal_ranges,
[this](size_t lhs, size_t rhs) { return data[lhs] > data[rhs]; },
equals, sort, partial_sort);
else
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last,
[this](size_t a, size_t b) { return data[a] < data[b]; });
auto new_first = first;
for (auto j = first + 1; j < limit; ++j)
{
if (data[res[new_first]] != data[res[j]])
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
auto new_last = limit;
for (auto j = limit; j < last; ++j)
{
if (data[res[new_first]] == data[res[j]])
{
std::swap(res[new_last], res[j]);
++new_last;
}
}
if (new_last - new_first > 1)
new_ranges.emplace_back(new_first, new_last);
}
this->updatePermutationImpl(
limit, res, equal_ranges,
[this](size_t lhs, size_t rhs) { return data[lhs] < data[rhs]; },
equals, sort, partial_sort);
}
template <is_decimal T>

View File

@ -15,6 +15,7 @@
namespace DB
{
/// PaddedPODArray extended by Decimal scale
template <typename T>
class DecimalPaddedPODArray : public PaddedPODArray<T>

View File

@ -137,17 +137,35 @@ void ColumnFixedString::updateHashFast(SipHash & hash) const
}
template <bool positive>
struct ColumnFixedString::less
struct ColumnFixedString::Cmp
{
const ColumnFixedString & parent;
explicit less(const ColumnFixedString & parent_) : parent(parent_) {}
bool operator()(size_t lhs, size_t rhs) const
explicit Cmp(const ColumnFixedString & parent_) : parent(parent_) {}
int operator()(size_t lhs, size_t rhs) const
{
int res = memcmpSmallAllowOverflow15(parent.chars.data() + lhs * parent.n, parent.chars.data() + rhs * parent.n, parent.n);
return positive ? (res < 0) : (res > 0);
if constexpr (positive)
return res;
else
return -res;
}
};
struct ColumnFixedString::less
{
Cmp<true> cmp;
explicit less(const ColumnFixedString & parent_) : cmp(parent_) {}
int operator()(size_t lhs, size_t rhs) const { return cmp(lhs, rhs) < 0; }
};
struct ColumnFixedString::greater
{
Cmp<true> cmp;
explicit greater(const ColumnFixedString & parent_) : cmp(parent_) {}
int operator()(size_t lhs, size_t rhs) const { return cmp(lhs, rhs) > 0; }
};
void ColumnFixedString::getPermutation(bool reverse, size_t limit, int /*nan_direction_hint*/, Permutation & res) const
{
size_t s = size();
@ -161,93 +179,25 @@ void ColumnFixedString::getPermutation(bool reverse, size_t limit, int /*nan_dir
if (limit)
{
if (reverse)
partial_sort(res.begin(), res.begin() + limit, res.end(), less<false>(*this));
::partial_sort(res.begin(), res.begin() + limit, res.end(), greater(*this));
else
partial_sort(res.begin(), res.begin() + limit, res.end(), less<true>(*this));
::partial_sort(res.begin(), res.begin() + limit, res.end(), less(*this));
}
else
{
if (reverse)
std::sort(res.begin(), res.end(), less<false>(*this));
std::sort(res.begin(), res.end(), greater(*this));
else
std::sort(res.begin(), res.end(), less<true>(*this));
std::sort(res.begin(), res.end(), less(*this));
}
}
void ColumnFixedString::updatePermutation(bool reverse, size_t limit, int, Permutation & res, EqualRanges & equal_ranges) const
{
if (equal_ranges.empty())
return;
if (limit >= size() || limit >= equal_ranges.back().second)
limit = 0;
size_t number_of_ranges = equal_ranges.size();
if (limit)
--number_of_ranges;
EqualRanges new_ranges;
SCOPE_EXIT({equal_ranges = std::move(new_ranges);});
for (size_t i = 0; i < number_of_ranges; ++i)
{
const auto& [first, last] = equal_ranges[i];
if (reverse)
std::sort(res.begin() + first, res.begin() + last, less<false>(*this));
updatePermutationImpl(limit, res, equal_ranges, Cmp<false>(*this));
else
std::sort(res.begin() + first, res.begin() + last, less<true>(*this));
auto new_first = first;
for (auto j = first + 1; j < last; ++j)
{
if (memcmpSmallAllowOverflow15(chars.data() + res[j] * n, chars.data() + res[new_first] * n, n) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
if (last - new_first > 1)
new_ranges.emplace_back(new_first, last);
}
if (limit)
{
const auto & [first, last] = equal_ranges.back();
if (limit < first || limit > last)
return;
/// Since then we are working inside the interval.
if (reverse)
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, less<false>(*this));
else
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, less<true>(*this));
auto new_first = first;
for (auto j = first + 1; j < limit; ++j)
{
if (memcmpSmallAllowOverflow15(chars.data() + res[j] * n, chars.data() + res[new_first] * n, n) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
auto new_last = limit;
for (auto j = limit; j < last; ++j)
{
if (memcmpSmallAllowOverflow15(chars.data() + res[j] * n, chars.data() + res[new_first] * n, n) == 0)
{
std::swap(res[new_last], res[j]);
++new_last;
}
}
if (new_last - new_first > 1)
new_ranges.emplace_back(new_first, new_last);
}
updatePermutationImpl(limit, res, equal_ranges, Cmp<true>(*this));
}
void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_t length)
@ -464,13 +414,12 @@ void ColumnFixedString::getExtremes(Field & min, Field & max) const
size_t min_idx = 0;
size_t max_idx = 0;
less<true> less_op(*this);
auto cmp_less = less(*this);
for (size_t i = 1; i < col_size; ++i)
{
if (less_op(i, min_idx))
if (cmp_less(i, min_idx))
min_idx = i;
else if (less_op(max_idx, i))
else if (cmp_less(max_idx, i))
max_idx = i;
}

View File

@ -32,8 +32,9 @@ private:
/// The size of the rows.
const size_t n;
template <bool positive>
template <bool positive> struct Cmp;
struct less;
struct greater;
/** Create an empty column of strings of fixed-length `n` */
ColumnFixedString(size_t n_) : n(n_) {}

View File

@ -394,81 +394,6 @@ void ColumnLowCardinality::getPermutationImpl(bool reverse, size_t limit, int na
}
}
template <typename Cmp>
void ColumnLowCardinality::updatePermutationImpl(size_t limit, Permutation & res, EqualRanges & equal_ranges, Cmp comparator) const
{
if (equal_ranges.empty())
return;
if (limit >= size() || limit >= equal_ranges.back().second)
limit = 0;
size_t number_of_ranges = equal_ranges.size();
if (limit)
--number_of_ranges;
EqualRanges new_ranges;
SCOPE_EXIT({equal_ranges = std::move(new_ranges);});
auto less = [&comparator](size_t lhs, size_t rhs){ return comparator(lhs, rhs) < 0; };
for (size_t i = 0; i < number_of_ranges; ++i)
{
const auto& [first, last] = equal_ranges[i];
std::sort(res.begin() + first, res.begin() + last, less);
auto new_first = first;
for (auto j = first + 1; j < last; ++j)
{
if (comparator(res[new_first], res[j]) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
if (last - new_first > 1)
new_ranges.emplace_back(new_first, last);
}
if (limit)
{
const auto & [first, last] = equal_ranges.back();
if (limit < first || limit > last)
return;
/// Since then we are working inside the interval.
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, less);
auto new_first = first;
for (auto j = first + 1; j < limit; ++j)
{
if (comparator(res[new_first],res[j]) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
auto new_last = limit;
for (auto j = limit; j < last; ++j)
{
if (comparator(res[new_first], res[j]) == 0)
{
std::swap(res[new_last], res[j]);
++new_last;
}
}
if (new_last - new_first > 1)
new_ranges.emplace_back(new_first, new_last);
}
}
void ColumnLowCardinality::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
{
getPermutationImpl(reverse, limit, nan_direction_hint, res);

View File

@ -333,9 +333,6 @@ private:
int compareAtImpl(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint, const Collator * collator=nullptr) const;
void getPermutationImpl(bool reverse, size_t limit, int nan_direction_hint, Permutation & res, const Collator * collator = nullptr) const;
template <typename Cmp>
void updatePermutationImpl(size_t limit, Permutation & res, EqualRanges & equal_ranges, Cmp comparator) const;
};

View File

@ -357,7 +357,10 @@ struct ColumnString::Cmp
parent.chars.data() + parent.offsetAt(lhs), parent.sizeAt(lhs) - 1,
parent.chars.data() + parent.offsetAt(rhs), parent.sizeAt(rhs) - 1);
return positive ? res : -res;
if constexpr (positive)
return res;
else
return -res;
}
};
@ -380,78 +383,6 @@ void ColumnString::getPermutationImpl(size_t limit, Permutation & res, Comparato
std::sort(res.begin(), res.end(), less);
}
template <typename Comparator>
void ColumnString::updatePermutationImpl(size_t limit, Permutation & res, EqualRanges & equal_ranges, Comparator cmp) const
{
if (equal_ranges.empty())
return;
if (limit >= size() || limit > equal_ranges.back().second)
limit = 0;
EqualRanges new_ranges;
SCOPE_EXIT({equal_ranges = std::move(new_ranges);});
size_t number_of_ranges = equal_ranges.size();
if (limit)
--number_of_ranges;
auto less = [&cmp](size_t lhs, size_t rhs){ return cmp(lhs, rhs) < 0; };
for (size_t i = 0; i < number_of_ranges; ++i)
{
const auto & [first, last] = equal_ranges[i];
std::sort(res.begin() + first, res.begin() + last, less);
size_t new_first = first;
for (size_t j = first + 1; j < last; ++j)
{
if (cmp(res[j], res[new_first]) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
if (last - new_first > 1)
new_ranges.emplace_back(new_first, last);
}
if (limit)
{
const auto & [first, last] = equal_ranges.back();
if (limit < first || limit > last)
return;
/// Since then we are working inside the interval.
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, less);
size_t new_first = first;
for (size_t j = first + 1; j < limit; ++j)
{
if (cmp(res[j], res[new_first]) != 0)
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
size_t new_last = limit;
for (size_t j = limit; j < last; ++j)
{
if (cmp(res[j], res[new_first]) == 0)
{
std::swap(res[j], res[new_last]);
++new_last;
}
}
if (new_last - new_first > 1)
new_ranges.emplace_back(new_first, new_last);
}
}
void ColumnString::getPermutation(bool reverse, size_t limit, int /*nan_direction_hint*/, Permutation & res) const
{
if (reverse)

View File

@ -54,9 +54,6 @@ private:
template <typename Comparator>
void getPermutationImpl(size_t limit, Permutation & res, Comparator cmp) const;
template <typename Comparator>
void updatePermutationImpl(size_t limit, Permutation & res, EqualRanges & equal_ranges, Comparator cmp) const;
public:
const char * getFamilyName() const override { return "String"; }
TypeIndex getDataType() const override { return TypeIndex::String; }

View File

@ -109,6 +109,15 @@ struct ColumnVector<T>::greater
bool operator()(size_t lhs, size_t rhs) const { return CompareHelper<T>::greater(parent.data[lhs], parent.data[rhs], nan_direction_hint); }
};
template <typename T>
struct ColumnVector<T>::equals
{
const Self & parent;
int nan_direction_hint;
equals(const Self & parent_, int nan_direction_hint_) : parent(parent_), nan_direction_hint(nan_direction_hint_) {}
bool operator()(size_t lhs, size_t rhs) const { return CompareHelper<T>::equals(parent.data[lhs], parent.data[rhs], nan_direction_hint); }
};
namespace
{
@ -204,80 +213,21 @@ void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_directi
template <typename T>
void ColumnVector<T>::updatePermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_range) const
{
if (equal_range.empty())
return;
if (limit >= data.size() || limit >= equal_range.back().second)
limit = 0;
EqualRanges new_ranges;
SCOPE_EXIT({equal_range = std::move(new_ranges);});
for (size_t i = 0; i < equal_range.size() - bool(limit); ++i)
{
const auto & [first, last] = equal_range[i];
if (reverse)
pdqsort(res.begin() + first, res.begin() + last, greater(*this, nan_direction_hint));
else
pdqsort(res.begin() + first, res.begin() + last, less(*this, nan_direction_hint));
size_t new_first = first;
for (size_t j = first + 1; j < last; ++j)
{
if (less(*this, nan_direction_hint)(res[j], res[new_first]) || greater(*this, nan_direction_hint)(res[j], res[new_first]))
{
if (j - new_first > 1)
{
new_ranges.emplace_back(new_first, j);
}
new_first = j;
}
}
if (last - new_first > 1)
{
new_ranges.emplace_back(new_first, last);
}
}
if (limit)
{
const auto & [first, last] = equal_range.back();
if (limit < first || limit > last)
return;
/// Since then, we are working inside the interval.
auto sort = [](auto begin, auto end, auto pred) { pdqsort(begin, end, pred); };
auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); };
if (reverse)
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, greater(*this, nan_direction_hint));
this->updatePermutationImpl(
limit, res, equal_range,
greater(*this, nan_direction_hint),
equals(*this, nan_direction_hint),
sort, partial_sort);
else
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, less(*this, nan_direction_hint));
size_t new_first = first;
for (size_t j = first + 1; j < limit; ++j)
{
if (less(*this, nan_direction_hint)(res[j], res[new_first]) || greater(*this, nan_direction_hint)(res[j], res[new_first]))
{
if (j - new_first > 1)
{
new_ranges.emplace_back(new_first, j);
}
new_first = j;
}
}
size_t new_last = limit;
for (size_t j = limit; j < last; ++j)
{
if (!less(*this, nan_direction_hint)(res[j], res[new_first]) && !greater(*this, nan_direction_hint)(res[j], res[new_first]))
{
std::swap(res[j], res[new_last]);
++new_last;
}
}
if (new_last - new_first > 1)
{
new_ranges.emplace_back(new_first, new_last);
}
}
this->updatePermutationImpl(
limit, res, equal_range,
less(*this, nan_direction_hint),
equals(*this, nan_direction_hint),
sort, partial_sort);
}
template <typename T>

View File

@ -30,6 +30,7 @@ struct CompareHelper
{
static constexpr bool less(T a, U b, int /*nan_direction_hint*/) { return a < b; }
static constexpr bool greater(T a, U b, int /*nan_direction_hint*/) { return a > b; }
static constexpr bool equals(T a, U b, int /*nan_direction_hint*/) { return a == b; }
/** Compares two numbers. Returns a number less than zero, equal to zero, or greater than zero if a < b, a == b, a > b, respectively.
* If one of the values is NaN, then
@ -76,6 +77,11 @@ struct FloatCompareHelper
return a > b;
}
static constexpr bool equals(T a, T b, int nan_direction_hint)
{
return compare(a, b, nan_direction_hint) == 0;
}
static constexpr int compare(T a, T b, int nan_direction_hint)
{
const bool isnan_a = std::isnan(a);
@ -112,6 +118,7 @@ private:
struct less;
struct greater;
struct equals;
public:
using ValueType = T;

View File

@ -488,6 +488,28 @@ protected:
template <typename Derived>
bool hasEqualValuesImpl() const;
/// Uses std::sort and partial_sort as default algorithms.
/// Implements 'less' and 'equals' via comparator.
/// If 'less' and 'equals' can be implemented more optimal
/// (e.g. with less number of comparisons), you can use
/// directly the second overload of this method.
template <typename Comparator>
void updatePermutationImpl(
size_t limit,
Permutation & res,
EqualRanges & equal_ranges,
Comparator cmp) const;
template <typename Less, typename Equals, typename Sort, typename PartialSort>
void updatePermutationImpl(
size_t limit,
Permutation & res,
EqualRanges & equal_ranges,
Less less,
Equals equals,
Sort full_sort,
PartialSort partial_sort) const;
};
using ColumnPtr = IColumn::Ptr;

View File

@ -8,6 +8,8 @@
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
#include <base/sort.h>
#include <algorithm>
namespace DB
{
@ -139,4 +141,103 @@ bool IColumn::hasEqualValuesImpl() const
return true;
}
template <typename Comparator>
void IColumn::updatePermutationImpl(
size_t limit,
Permutation & res,
EqualRanges & equal_ranges,
Comparator cmp) const
{
updatePermutationImpl(
limit, res, equal_ranges,
[&cmp](size_t lhs, size_t rhs) { return cmp(lhs, rhs) < 0; },
[&cmp](size_t lhs, size_t rhs) { return cmp(lhs, rhs) == 0; },
[](auto begin, auto end, auto pred) { std::sort(begin, end, pred); },
[](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); });
}
template <typename Less, typename Equals, typename Sort, typename PartialSort>
void IColumn::updatePermutationImpl(
size_t limit,
Permutation & res,
EqualRanges & equal_ranges,
Less less,
Equals equals,
Sort full_sort,
PartialSort partial_sort) const
{
if (equal_ranges.empty())
return;
if (limit >= size() || limit > equal_ranges.back().second)
limit = 0;
EqualRanges new_ranges;
size_t number_of_ranges = equal_ranges.size();
if (limit)
--number_of_ranges;
for (size_t i = 0; i < number_of_ranges; ++i)
{
const auto & [first, last] = equal_ranges[i];
full_sort(res.begin() + first, res.begin() + last, less);
size_t new_first = first;
for (size_t j = first + 1; j < last; ++j)
{
if (!equals(res[j], res[new_first]))
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
if (last - new_first > 1)
new_ranges.emplace_back(new_first, last);
}
if (limit)
{
const auto & [first, last] = equal_ranges.back();
if (limit < first || limit > last)
{
equal_ranges = std::move(new_ranges);
return;
}
/// Since then we are working inside the interval.
partial_sort(res.begin() + first, res.begin() + limit, res.begin() + last, less);
size_t new_first = first;
for (size_t j = first + 1; j < limit; ++j)
{
if (!equals(res[j], res[new_first]))
{
if (j - new_first > 1)
new_ranges.emplace_back(new_first, j);
new_first = j;
}
}
size_t new_last = limit;
for (size_t j = limit; j < last; ++j)
{
if (equals(res[j], res[new_first]))
{
std::swap(res[j], res[new_last]);
++new_last;
}
}
if (new_last - new_first > 1)
new_ranges.emplace_back(new_first, new_last);
}
equal_ranges = std::move(new_ranges);
}
}