#pragma once #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { class Block; /// Reference to the row in block. struct RowRef { using SizeT = uint32_t; /// Do not use size_t cause of memory economy const Block * block = nullptr; SizeT row_num = 0; RowRef() = default; RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {} }; /// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs) struct RowRefList : RowRef { /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized. struct Batch { static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31. SizeT size = 0; /// It's smaller than size_t but keeps align in Arena. Batch * next; RowRef row_refs[MAX_SIZE]; Batch(Batch * parent) : next(parent) {} bool full() const { return size == MAX_SIZE; } Batch * insert(RowRef && row_ref, Arena & pool) { if (full()) { auto batch = pool.alloc(); *batch = Batch(this); batch->insert(std::move(row_ref), pool); return batch; } row_refs[size++] = std::move(row_ref); return this; } }; class ForwardIterator { public: ForwardIterator(const RowRefList * begin) : root(begin) , first(true) , batch(root->next) , position(0) {} const RowRef * operator -> () const { if (first) return root; return &batch->row_refs[position]; } const RowRef * operator * () const { if (first) return root; return &batch->row_refs[position]; } void operator ++ () { if (first) { first = false; return; } if (batch) { ++position; if (position >= batch->size) { batch = batch->next; position = 0; } } } bool ok() const { return first || batch; } private: const RowRefList * root; bool first; Batch * batch; size_t position; }; RowRefList() {} RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} ForwardIterator begin() const { return ForwardIterator(this); } /// insert element after current one void insert(RowRef && row_ref, Arena & pool) { if (!next) { next = pool.alloc(); *next = Batch(nullptr); } next = next->insert(std::move(row_ref), pool); } private: Batch * next = nullptr; }; /** * This class is intended to push sortable data into. * When looking up values the container ensures that it is sorted for log(N) lookup * After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the * references that can be returned by the lookup methods */ struct SortedLookupVectorBase { SortedLookupVectorBase() = default; virtual ~SortedLookupVectorBase() { } static std::optional getTypeSize(const IColumn & asof_column, size_t & type_size); // This will be synchronized by the rwlock mutex in Join.h virtual void insert(const IColumn &, const Block *, size_t) = 0; // This needs to be synchronized internally virtual std::tuple findAsof(const IColumn &, size_t) = 0; }; template class SortedLookupVector : public SortedLookupVectorBase { public: struct Entry { /// We don't store a RowRef and instead keep it's members separately (and return a tuple) to reduce the memory usage. /// For example, for sizeof(T) == 4 => sizeof(Entry) == 16 (while before it would be 20). Then when you put it into a vector, the effect is even greater decltype(RowRef::block) block; decltype(RowRef::row_num) row_num; TKey asof_value; Entry() = delete; Entry(TKey v, const Block * b, size_t r) : block(b), row_num(r), asof_value(v) { } bool operator<(const Entry & other) const { return asof_value < other.asof_value; } }; struct greaterEntryOperator { bool operator()(Entry const & a, Entry const & b) const { return a.asof_value > b.asof_value; } }; public: using Base = std::vector; using Keys = std::vector; static constexpr bool isDescending = (inequality == ASOF::Inequality::Greater || inequality == ASOF::Inequality::GreaterOrEquals); static constexpr bool isStrict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater); void insert(const IColumn & asof_column, const Block * block, size_t row_num) override { using ColumnType = ColumnVectorOrDecimal; const auto & column = assert_cast(asof_column); TKey k = column.getElement(row_num); assert(!sorted.load(std::memory_order_acquire)); array.emplace_back(k, block, row_num); } /// Unrolled version of upper_bound and lower_bound /// Loosely based on https://academy.realm.io/posts/how-we-beat-cpp-stl-binary-search/ /// In the future it'd interesting to replace it with a B+Tree Layout as described /// at https://en.algorithmica.org/hpc/data-structures/s-tree/ size_t boundSearch(TKey value) { size_t size = array.size(); size_t low = 0; /// This is a single binary search iteration as a macro to unroll. Takes into account the inequality: /// isStrict -> Equal values are not requested /// isDescending -> The vector is sorted in reverse (for greater or greaterOrEquals) #define BOUND_ITERATION \ { \ size_t half = size / 2; \ size_t other_half = size - half; \ size_t probe = low + half; \ size_t other_low = low + other_half; \ TKey v = array[probe].asof_value; \ size = half; \ if constexpr (isDescending) \ { \ if constexpr (isStrict) \ low = value <= v ? other_low : low; \ else \ low = value < v ? other_low : low; \ } \ else \ { \ if constexpr (isStrict) \ low = value >= v ? other_low : low; \ else \ low = value > v ? other_low : low; \ } \ } while (size >= 8) { BOUND_ITERATION BOUND_ITERATION BOUND_ITERATION } while (size > 0) { BOUND_ITERATION } #undef BOUND_ITERATION return low; } std::tuple findAsof(const IColumn & asof_column, size_t row_num) override { sort(); using ColumnType = ColumnVectorOrDecimal; const auto & column = assert_cast(asof_column); TKey k = column.getElement(row_num); size_t pos = boundSearch(k); if (pos != array.size()) return std::make_tuple(array[pos].block, array[pos].row_num); return {nullptr, 0}; } private: std::atomic sorted = false; mutable std::mutex lock; Base array; // Double checked locking with SC atomics works in C++ // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/ // The first thread that calls one of the lookup methods sorts the data // After calling the first lookup method it is no longer allowed to insert any data // the array becomes immutable void sort() { if (!sorted.load(std::memory_order_acquire)) { std::lock_guard l(lock); if (!sorted.load(std::memory_order_relaxed)) { if constexpr (isDescending) ::sort(array.begin(), array.end(), greaterEntryOperator()); else ::sort(array.begin(), array.end()); sorted.store(true, std::memory_order_release); } } } }; // It only contains a std::unique_ptr which is memmovable. // Source: https://github.com/ClickHouse/ClickHouse/issues/4906 using AsofRowRefs = std::unique_ptr; AsofRowRefs createAsofRowRef(TypeIndex type, ASOF::Inequality inequality); }