Improve performance of ASOF join

This commit is contained in:
Maksim Kita 2022-03-23 12:19:38 +01:00
parent 0ab9a9c0f6
commit 3c5c267c17
4 changed files with 82 additions and 33 deletions

View File

@ -515,6 +515,11 @@ public:
radixSortLSDInternal<false>(arr, size, false, nullptr);
}
static void executeLSD(Element * arr, size_t size, bool reverse)
{
radixSortLSDInternal<false>(arr, size, reverse, nullptr);
}
/** This function will start to sort inplace (modify 'arr')
* but on the last step it will write result directly to the destination
* instead of finishing sorting 'arr'.

View File

@ -1237,16 +1237,16 @@ NO_INLINE IColumn::Filter joinRightColumns(
{
const IColumn & left_asof_key = added_columns.leftAsofKey();
auto [block, row_num] = mapped->findAsof(left_asof_key, i);
if (block)
auto row_ref = mapped->findAsof(left_asof_key, i);
if (row_ref.block)
{
setUsed<need_filter>(filter, i);
if constexpr (multiple_disjuncts)
used_flags.template setUsed<jf.need_flags, multiple_disjuncts>(block, row_num, 0);
used_flags.template setUsed<jf.need_flags, multiple_disjuncts>(row_ref.block, row_ref.row_num, 0);
else
used_flags.template setUsed<jf.need_flags, multiple_disjuncts>(find_result);
added_columns.appendFromBlock<jf.add_missing>(*block, row_num);
added_columns.appendFromBlock<jf.add_missing>(*row_ref.block, row_ref.row_num);
}
else
addNotFoundRow<jf.add_missing, jf.need_replication>(added_columns, current_offset);

View File

@ -1,5 +1,6 @@
#include <Interpreters/RowRefs.h>
#include <Common/RadixSort.h>
#include <AggregateFunctions/Helpers.h>
#include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
@ -44,38 +45,52 @@ class SortedLookupVector : public SortedLookupVectorBase
{
struct Entry
{
/// We don't store a RowRef and instead keep it's members separately (and return a tuple) to reduce the memory usage.
/// For example, for sizeof(T) == 4 => sizeof(Entry) == 16 (while before it would be 20). Then when you put it into a vector, the effect is even greater
decltype(RowRef::block) block;
decltype(RowRef::row_num) row_num;
TKey asof_value;
TKey value;
uint32_t row_ref_index;
Entry() = delete;
Entry(TKey v, const Block * b, size_t r) : block(b), row_num(r), asof_value(v) { }
Entry(TKey value_, uint32_t row_ref_index_)
: value(value_)
, row_ref_index(row_ref_index_)
{ }
bool operator<(const Entry & other) const { return asof_value < other.asof_value; }
};
struct LessEntryOperator
{
ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const
{
return lhs.value > rhs.value;
}
};
struct GreaterEntryOperator
{
bool operator()(Entry const & a, Entry const & b) const { return a.asof_value > b.asof_value; }
ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const
{
return lhs.value > rhs.value;
}
};
public:
using Base = std::vector<Entry>;
using Keys = std::vector<TKey>;
static constexpr bool isDescending = (inequality == ASOF::Inequality::Greater || inequality == ASOF::Inequality::GreaterOrEquals);
static constexpr bool isStrict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater);
using Entries = PaddedPODArray<Entry>;
using RowRefs = PaddedPODArray<RowRef>;
static constexpr bool is_descending = (inequality == ASOF::Inequality::Greater || inequality == ASOF::Inequality::GreaterOrEquals);
static constexpr bool is_strict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater);
void insert(const IColumn & asof_column, const Block * block, size_t row_num) override
{
using ColumnType = ColumnVectorOrDecimal<TKey>;
const auto & column = assert_cast<const ColumnType &>(asof_column);
TKey k = column.getElement(row_num);
TKey key = column.getElement(row_num);
assert(!sorted.load(std::memory_order_acquire));
array.emplace_back(k, block, row_num);
entries.emplace_back(key, row_refs.size());
row_refs.emplace_back(RowRef(block, row_num));
}
/// Unrolled version of upper_bound and lower_bound
@ -84,30 +99,30 @@ public:
/// at https://en.algorithmica.org/hpc/data-structures/s-tree/
size_t boundSearch(TKey value)
{
size_t size = array.size();
size_t size = entries.size();
size_t low = 0;
/// This is a single binary search iteration as a macro to unroll. Takes into account the inequality:
/// isStrict -> Equal values are not requested
/// isDescending -> The vector is sorted in reverse (for greater or greaterOrEquals)
/// is_strict -> Equal values are not requested
/// is_descending -> The vector is sorted in reverse (for greater or greaterOrEquals)
#define BOUND_ITERATION \
{ \
size_t half = size / 2; \
size_t other_half = size - half; \
size_t probe = low + half; \
size_t other_low = low + other_half; \
TKey v = array[probe].asof_value; \
TKey & v = entries[probe].value; \
size = half; \
if constexpr (isDescending) \
if constexpr (is_descending) \
{ \
if constexpr (isStrict) \
if constexpr (is_strict) \
low = value <= v ? other_low : low; \
else \
low = value < v ? other_low : low; \
} \
else \
{ \
if constexpr (isStrict) \
if constexpr (is_strict) \
low = value >= v ? other_low : low; \
else \
low = value > v ? other_low : low; \
@ -130,7 +145,7 @@ public:
return low;
}
std::tuple<decltype(RowRef::block), decltype(RowRef::row_num)> findAsof(const IColumn & asof_column, size_t row_num) override
RowRef findAsof(const IColumn & asof_column, size_t row_num) override
{
sort();
@ -139,8 +154,11 @@ public:
TKey k = column.getElement(row_num);
size_t pos = boundSearch(k);
if (pos != array.size())
return std::make_tuple(array[pos].block, array[pos].row_num);
if (pos != entries.size())
{
size_t row_ref_index = entries[pos].row_ref_index;
return row_refs[row_ref_index];
}
return {nullptr, 0};
}
@ -148,7 +166,8 @@ public:
private:
std::atomic<bool> sorted = false;
mutable std::mutex lock;
Base array;
Entries entries;
RowRefs row_refs;
// Double checked locking with SC atomics works in C++
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
@ -160,12 +179,37 @@ private:
if (!sorted.load(std::memory_order_acquire))
{
std::lock_guard<std::mutex> l(lock);
if (!sorted.load(std::memory_order_relaxed))
{
if constexpr (isDescending)
::sort(array.begin(), array.end(), GreaterEntryOperator());
if constexpr (std::is_arithmetic_v<TKey> && !std::is_floating_point_v<TKey>)
{
if (likely(entries.size() > 256))
{
struct RadixSortTraits : RadixSortNumTraits<TKey>
{
using Element = Entry;
using Result = Element;
static TKey & extractKey(Element & elem) { return elem.value; }
static Element extractResult(Element & elem) { return elem; }
};
if constexpr (is_descending)
RadixSort<RadixSortTraits>::executeLSD(entries.data(), entries.size(), true);
else
RadixSort<RadixSortTraits>::executeLSD(entries.data(), entries.size(), false);
sorted.store(true, std::memory_order_release);
return;
}
}
if constexpr (is_descending)
::sort(entries.begin(), entries.end(), GreaterEntryOperator());
else
::sort(array.begin(), array.end());
::sort(entries.begin(), entries.end(), LessEntryOperator());
sorted.store(true, std::memory_order_release);
}
}

View File

@ -146,7 +146,7 @@ private:
struct SortedLookupVectorBase
{
SortedLookupVectorBase() = default;
virtual ~SortedLookupVectorBase() { }
virtual ~SortedLookupVectorBase() = default;
static std::optional<TypeIndex> getTypeSize(const IColumn & asof_column, size_t & type_size);
@ -154,7 +154,7 @@ struct SortedLookupVectorBase
virtual void insert(const IColumn &, const Block *, size_t) = 0;
// This needs to be synchronized internally
virtual std::tuple<decltype(RowRef::block), decltype(RowRef::row_num)> findAsof(const IColumn &, size_t) = 0;
virtual RowRef findAsof(const IColumn &, size_t) = 0;
};