Merge pull request #35525 from kitaisreal/asof-join-improve-performance

Improve performance of ASOF JOIN
This commit is contained in:
Maksim Kita 2022-03-23 23:10:40 +01:00 committed by GitHub
commit 6d1154b505
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 33 deletions

View File

@ -515,6 +515,11 @@ public:
radixSortLSDInternal<false>(arr, size, false, nullptr); radixSortLSDInternal<false>(arr, size, false, nullptr);
} }
static void executeLSD(Element * arr, size_t size, bool reverse)
{
radixSortLSDInternal<false>(arr, size, reverse, nullptr);
}
/** This function will start to sort inplace (modify 'arr') /** This function will start to sort inplace (modify 'arr')
* but on the last step it will write result directly to the destination * but on the last step it will write result directly to the destination
* instead of finishing sorting 'arr'. * instead of finishing sorting 'arr'.

View File

@ -1237,16 +1237,16 @@ NO_INLINE IColumn::Filter joinRightColumns(
{ {
const IColumn & left_asof_key = added_columns.leftAsofKey(); const IColumn & left_asof_key = added_columns.leftAsofKey();
auto [block, row_num] = mapped->findAsof(left_asof_key, i); auto row_ref = mapped->findAsof(left_asof_key, i);
if (block) if (row_ref.block)
{ {
setUsed<need_filter>(filter, i); setUsed<need_filter>(filter, i);
if constexpr (multiple_disjuncts) if constexpr (multiple_disjuncts)
used_flags.template setUsed<jf.need_flags, multiple_disjuncts>(block, row_num, 0); used_flags.template setUsed<jf.need_flags, multiple_disjuncts>(row_ref.block, row_ref.row_num, 0);
else else
used_flags.template setUsed<jf.need_flags, multiple_disjuncts>(find_result); used_flags.template setUsed<jf.need_flags, multiple_disjuncts>(find_result);
added_columns.appendFromBlock<jf.add_missing>(*block, row_num); added_columns.appendFromBlock<jf.add_missing>(*row_ref.block, row_ref.row_num);
} }
else else
addNotFoundRow<jf.add_missing, jf.need_replication>(added_columns, current_offset); addNotFoundRow<jf.add_missing, jf.need_replication>(added_columns, current_offset);

View File

@ -1,5 +1,6 @@
#include <Interpreters/RowRefs.h> #include <Interpreters/RowRefs.h>
#include <Common/RadixSort.h>
#include <AggregateFunctions/Helpers.h> #include <AggregateFunctions/Helpers.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <DataTypes/IDataType.h> #include <DataTypes/IDataType.h>
@ -44,38 +45,52 @@ class SortedLookupVector : public SortedLookupVectorBase
{ {
struct Entry struct Entry
{ {
/// We don't store a RowRef and instead keep it's members separately (and return a tuple) to reduce the memory usage. TKey value;
/// For example, for sizeof(T) == 4 => sizeof(Entry) == 16 (while before it would be 20). Then when you put it into a vector, the effect is even greater uint32_t row_ref_index;
decltype(RowRef::block) block;
decltype(RowRef::row_num) row_num;
TKey asof_value;
Entry() = delete; Entry() = delete;
Entry(TKey v, const Block * b, size_t r) : block(b), row_num(r), asof_value(v) { } Entry(TKey value_, uint32_t row_ref_index_)
: value(value_)
, row_ref_index(row_ref_index_)
{ }
bool operator<(const Entry & other) const { return asof_value < other.asof_value; } };
struct LessEntryOperator
{
ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const
{
return lhs.value < rhs.value;
}
}; };
struct GreaterEntryOperator struct GreaterEntryOperator
{ {
bool operator()(Entry const & a, Entry const & b) const { return a.asof_value > b.asof_value; } ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const
{
return lhs.value > rhs.value;
}
}; };
public: public:
using Base = std::vector<Entry>;
using Keys = std::vector<TKey>; using Keys = std::vector<TKey>;
static constexpr bool isDescending = (inequality == ASOF::Inequality::Greater || inequality == ASOF::Inequality::GreaterOrEquals); using Entries = PaddedPODArray<Entry>;
static constexpr bool isStrict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater); using RowRefs = PaddedPODArray<RowRef>;
static constexpr bool is_descending = (inequality == ASOF::Inequality::Greater || inequality == ASOF::Inequality::GreaterOrEquals);
static constexpr bool is_strict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater);
void insert(const IColumn & asof_column, const Block * block, size_t row_num) override void insert(const IColumn & asof_column, const Block * block, size_t row_num) override
{ {
using ColumnType = ColumnVectorOrDecimal<TKey>; using ColumnType = ColumnVectorOrDecimal<TKey>;
const auto & column = assert_cast<const ColumnType &>(asof_column); const auto & column = assert_cast<const ColumnType &>(asof_column);
TKey k = column.getElement(row_num); TKey key = column.getElement(row_num);
assert(!sorted.load(std::memory_order_acquire)); assert(!sorted.load(std::memory_order_acquire));
array.emplace_back(k, block, row_num);
entries.emplace_back(key, row_refs.size());
row_refs.emplace_back(RowRef(block, row_num));
} }
/// Unrolled version of upper_bound and lower_bound /// Unrolled version of upper_bound and lower_bound
@ -84,30 +99,30 @@ public:
/// at https://en.algorithmica.org/hpc/data-structures/s-tree/ /// at https://en.algorithmica.org/hpc/data-structures/s-tree/
size_t boundSearch(TKey value) size_t boundSearch(TKey value)
{ {
size_t size = array.size(); size_t size = entries.size();
size_t low = 0; size_t low = 0;
/// This is a single binary search iteration as a macro to unroll. Takes into account the inequality: /// This is a single binary search iteration as a macro to unroll. Takes into account the inequality:
/// isStrict -> Equal values are not requested /// is_strict -> Equal values are not requested
/// isDescending -> The vector is sorted in reverse (for greater or greaterOrEquals) /// is_descending -> The vector is sorted in reverse (for greater or greaterOrEquals)
#define BOUND_ITERATION \ #define BOUND_ITERATION \
{ \ { \
size_t half = size / 2; \ size_t half = size / 2; \
size_t other_half = size - half; \ size_t other_half = size - half; \
size_t probe = low + half; \ size_t probe = low + half; \
size_t other_low = low + other_half; \ size_t other_low = low + other_half; \
TKey v = array[probe].asof_value; \ TKey & v = entries[probe].value; \
size = half; \ size = half; \
if constexpr (isDescending) \ if constexpr (is_descending) \
{ \ { \
if constexpr (isStrict) \ if constexpr (is_strict) \
low = value <= v ? other_low : low; \ low = value <= v ? other_low : low; \
else \ else \
low = value < v ? other_low : low; \ low = value < v ? other_low : low; \
} \ } \
else \ else \
{ \ { \
if constexpr (isStrict) \ if constexpr (is_strict) \
low = value >= v ? other_low : low; \ low = value >= v ? other_low : low; \
else \ else \
low = value > v ? other_low : low; \ low = value > v ? other_low : low; \
@ -130,7 +145,7 @@ public:
return low; return low;
} }
std::tuple<decltype(RowRef::block), decltype(RowRef::row_num)> findAsof(const IColumn & asof_column, size_t row_num) override RowRef findAsof(const IColumn & asof_column, size_t row_num) override
{ {
sort(); sort();
@ -139,8 +154,11 @@ public:
TKey k = column.getElement(row_num); TKey k = column.getElement(row_num);
size_t pos = boundSearch(k); size_t pos = boundSearch(k);
if (pos != array.size()) if (pos != entries.size())
return std::make_tuple(array[pos].block, array[pos].row_num); {
size_t row_ref_index = entries[pos].row_ref_index;
return row_refs[row_ref_index];
}
return {nullptr, 0}; return {nullptr, 0};
} }
@ -148,7 +166,8 @@ public:
private: private:
std::atomic<bool> sorted = false; std::atomic<bool> sorted = false;
mutable std::mutex lock; mutable std::mutex lock;
Base array; Entries entries;
RowRefs row_refs;
// Double checked locking with SC atomics works in C++ // Double checked locking with SC atomics works in C++
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/ // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
@ -160,12 +179,37 @@ private:
if (!sorted.load(std::memory_order_acquire)) if (!sorted.load(std::memory_order_acquire))
{ {
std::lock_guard<std::mutex> l(lock); std::lock_guard<std::mutex> l(lock);
if (!sorted.load(std::memory_order_relaxed)) if (!sorted.load(std::memory_order_relaxed))
{ {
if constexpr (isDescending) if constexpr (std::is_arithmetic_v<TKey> && !std::is_floating_point_v<TKey>)
::sort(array.begin(), array.end(), GreaterEntryOperator()); {
if (likely(entries.size() > 256))
{
struct RadixSortTraits : RadixSortNumTraits<TKey>
{
using Element = Entry;
using Result = Element;
static TKey & extractKey(Element & elem) { return elem.value; }
static Element extractResult(Element & elem) { return elem; }
};
if constexpr (is_descending)
RadixSort<RadixSortTraits>::executeLSD(entries.data(), entries.size(), true);
else
RadixSort<RadixSortTraits>::executeLSD(entries.data(), entries.size(), false);
sorted.store(true, std::memory_order_release);
return;
}
}
if constexpr (is_descending)
::sort(entries.begin(), entries.end(), GreaterEntryOperator());
else else
::sort(array.begin(), array.end()); ::sort(entries.begin(), entries.end(), LessEntryOperator());
sorted.store(true, std::memory_order_release); sorted.store(true, std::memory_order_release);
} }
} }

View File

@ -146,7 +146,7 @@ private:
struct SortedLookupVectorBase struct SortedLookupVectorBase
{ {
SortedLookupVectorBase() = default; SortedLookupVectorBase() = default;
virtual ~SortedLookupVectorBase() { } virtual ~SortedLookupVectorBase() = default;
static std::optional<TypeIndex> getTypeSize(const IColumn & asof_column, size_t & type_size); static std::optional<TypeIndex> getTypeSize(const IColumn & asof_column, size_t & type_size);
@ -154,7 +154,7 @@ struct SortedLookupVectorBase
virtual void insert(const IColumn &, const Block *, size_t) = 0; virtual void insert(const IColumn &, const Block *, size_t) = 0;
// This needs to be synchronized internally // This needs to be synchronized internally
virtual std::tuple<decltype(RowRef::block), decltype(RowRef::row_num)> findAsof(const IColumn &, size_t) = 0; virtual RowRef findAsof(const IColumn &, size_t) = 0;
}; };