diff --git a/src/Common/RadixSort.h b/src/Common/RadixSort.h index 944ab860355..2f02ebb9e03 100644 --- a/src/Common/RadixSort.h +++ b/src/Common/RadixSort.h @@ -515,6 +515,11 @@ public: radixSortLSDInternal(arr, size, false, nullptr); } + static void executeLSD(Element * arr, size_t size, bool reverse) + { + radixSortLSDInternal(arr, size, reverse, nullptr); + } + /** This function will start to sort inplace (modify 'arr') * but on the last step it will write result directly to the destination * instead of finishing sorting 'arr'. diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index c56529b3214..e81db1427ef 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1237,16 +1237,16 @@ NO_INLINE IColumn::Filter joinRightColumns( { const IColumn & left_asof_key = added_columns.leftAsofKey(); - auto [block, row_num] = mapped->findAsof(left_asof_key, i); - if (block) + auto row_ref = mapped->findAsof(left_asof_key, i); + if (row_ref.block) { setUsed(filter, i); if constexpr (multiple_disjuncts) - used_flags.template setUsed(block, row_num, 0); + used_flags.template setUsed(row_ref.block, row_ref.row_num, 0); else used_flags.template setUsed(find_result); - added_columns.appendFromBlock(*block, row_num); + added_columns.appendFromBlock(*row_ref.block, row_ref.row_num); } else addNotFoundRow(added_columns, current_offset); diff --git a/src/Interpreters/RowRefs.cpp b/src/Interpreters/RowRefs.cpp index 39fc7965eb2..0385b003f3c 100644 --- a/src/Interpreters/RowRefs.cpp +++ b/src/Interpreters/RowRefs.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -44,38 +45,52 @@ class SortedLookupVector : public SortedLookupVectorBase { struct Entry { - /// We don't store a RowRef and instead keep it's members separately (and return a tuple) to reduce the memory usage. - /// For example, for sizeof(T) == 4 => sizeof(Entry) == 16 (while before it would be 20). Then when you put it into a vector, the effect is even greater - decltype(RowRef::block) block; - decltype(RowRef::row_num) row_num; - TKey asof_value; + TKey value; + uint32_t row_ref_index; Entry() = delete; - Entry(TKey v, const Block * b, size_t r) : block(b), row_num(r), asof_value(v) { } + Entry(TKey value_, uint32_t row_ref_index_) + : value(value_) + , row_ref_index(row_ref_index_) + { } - bool operator<(const Entry & other) const { return asof_value < other.asof_value; } + }; + + struct LessEntryOperator + { + ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const + { + return lhs.value > rhs.value; + } }; struct GreaterEntryOperator { - bool operator()(Entry const & a, Entry const & b) const { return a.asof_value > b.asof_value; } + ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const + { + return lhs.value > rhs.value; + } }; public: - using Base = std::vector; using Keys = std::vector; - static constexpr bool isDescending = (inequality == ASOF::Inequality::Greater || inequality == ASOF::Inequality::GreaterOrEquals); - static constexpr bool isStrict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater); + using Entries = PaddedPODArray; + using RowRefs = PaddedPODArray; + + static constexpr bool is_descending = (inequality == ASOF::Inequality::Greater || inequality == ASOF::Inequality::GreaterOrEquals); + static constexpr bool is_strict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater); void insert(const IColumn & asof_column, const Block * block, size_t row_num) override { using ColumnType = ColumnVectorOrDecimal; const auto & column = assert_cast(asof_column); - TKey k = column.getElement(row_num); + TKey key = column.getElement(row_num); assert(!sorted.load(std::memory_order_acquire)); - array.emplace_back(k, block, row_num); + + entries.emplace_back(key, row_refs.size()); + row_refs.emplace_back(RowRef(block, row_num)); } /// Unrolled version of upper_bound and lower_bound @@ -84,30 +99,30 @@ public: /// at https://en.algorithmica.org/hpc/data-structures/s-tree/ size_t boundSearch(TKey value) { - size_t size = array.size(); + size_t size = entries.size(); size_t low = 0; /// This is a single binary search iteration as a macro to unroll. Takes into account the inequality: - /// isStrict -> Equal values are not requested - /// isDescending -> The vector is sorted in reverse (for greater or greaterOrEquals) + /// is_strict -> Equal values are not requested + /// is_descending -> The vector is sorted in reverse (for greater or greaterOrEquals) #define BOUND_ITERATION \ { \ size_t half = size / 2; \ size_t other_half = size - half; \ size_t probe = low + half; \ size_t other_low = low + other_half; \ - TKey v = array[probe].asof_value; \ + TKey & v = entries[probe].value; \ size = half; \ - if constexpr (isDescending) \ + if constexpr (is_descending) \ { \ - if constexpr (isStrict) \ + if constexpr (is_strict) \ low = value <= v ? other_low : low; \ else \ low = value < v ? other_low : low; \ } \ else \ { \ - if constexpr (isStrict) \ + if constexpr (is_strict) \ low = value >= v ? other_low : low; \ else \ low = value > v ? other_low : low; \ @@ -130,7 +145,7 @@ public: return low; } - std::tuple findAsof(const IColumn & asof_column, size_t row_num) override + RowRef findAsof(const IColumn & asof_column, size_t row_num) override { sort(); @@ -139,8 +154,11 @@ public: TKey k = column.getElement(row_num); size_t pos = boundSearch(k); - if (pos != array.size()) - return std::make_tuple(array[pos].block, array[pos].row_num); + if (pos != entries.size()) + { + size_t row_ref_index = entries[pos].row_ref_index; + return row_refs[row_ref_index]; + } return {nullptr, 0}; } @@ -148,7 +166,8 @@ public: private: std::atomic sorted = false; mutable std::mutex lock; - Base array; + Entries entries; + RowRefs row_refs; // Double checked locking with SC atomics works in C++ // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/ @@ -160,12 +179,37 @@ private: if (!sorted.load(std::memory_order_acquire)) { std::lock_guard l(lock); + if (!sorted.load(std::memory_order_relaxed)) { - if constexpr (isDescending) - ::sort(array.begin(), array.end(), GreaterEntryOperator()); + if constexpr (std::is_arithmetic_v && !std::is_floating_point_v) + { + if (likely(entries.size() > 256)) + { + struct RadixSortTraits : RadixSortNumTraits + { + using Element = Entry; + using Result = Element; + + static TKey & extractKey(Element & elem) { return elem.value; } + static Element extractResult(Element & elem) { return elem; } + }; + + if constexpr (is_descending) + RadixSort::executeLSD(entries.data(), entries.size(), true); + else + RadixSort::executeLSD(entries.data(), entries.size(), false); + + sorted.store(true, std::memory_order_release); + return; + } + } + + if constexpr (is_descending) + ::sort(entries.begin(), entries.end(), GreaterEntryOperator()); else - ::sort(array.begin(), array.end()); + ::sort(entries.begin(), entries.end(), LessEntryOperator()); + sorted.store(true, std::memory_order_release); } } diff --git a/src/Interpreters/RowRefs.h b/src/Interpreters/RowRefs.h index 02462833050..fa5ce867613 100644 --- a/src/Interpreters/RowRefs.h +++ b/src/Interpreters/RowRefs.h @@ -146,7 +146,7 @@ private: struct SortedLookupVectorBase { SortedLookupVectorBase() = default; - virtual ~SortedLookupVectorBase() { } + virtual ~SortedLookupVectorBase() = default; static std::optional getTypeSize(const IColumn & asof_column, size_t & type_size); @@ -154,7 +154,7 @@ struct SortedLookupVectorBase virtual void insert(const IColumn &, const Block *, size_t) = 0; // This needs to be synchronized internally - virtual std::tuple findAsof(const IColumn &, size_t) = 0; + virtual RowRef findAsof(const IColumn &, size_t) = 0; };