2019-03-30 21:30:21 +00:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <optional>
|
2019-04-01 16:44:15 +00:00
|
|
|
#include <variant>
|
2019-04-02 18:50:35 +00:00
|
|
|
#include <list>
|
|
|
|
#include <mutex>
|
2019-04-05 17:59:48 +00:00
|
|
|
#include <algorithm>
|
2019-03-30 21:30:21 +00:00
|
|
|
|
2022-01-30 19:49:48 +00:00
|
|
|
#include <base/sort.h>
|
|
|
|
|
|
|
|
#include <Common/Arena.h>
|
|
|
|
#include <Columns/IColumn.h>
|
|
|
|
#include <Interpreters/asof.h>
|
|
|
|
|
|
|
|
|
2019-03-30 21:30:21 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
class Block;
|
|
|
|
|
|
|
|
/// Reference to the row in block.
|
2022-02-01 17:32:53 +00:00
|
|
|
struct __attribute__((__packed__)) RowRef
|
2019-03-30 21:30:21 +00:00
|
|
|
{
|
2020-04-21 19:01:34 +00:00
|
|
|
using SizeT = uint32_t; /// Do not use size_t cause of memory economy
|
|
|
|
|
2019-03-30 21:30:21 +00:00
|
|
|
const Block * block = nullptr;
|
2020-04-21 19:01:34 +00:00
|
|
|
SizeT row_num = 0;
|
2019-03-30 21:30:21 +00:00
|
|
|
|
|
|
|
RowRef() {}
|
|
|
|
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
|
|
|
|
};
|
|
|
|
|
|
|
|
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
|
|
|
|
struct RowRefList : RowRef
|
|
|
|
{
|
2019-05-14 14:40:43 +00:00
|
|
|
/// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
|
|
|
|
struct Batch
|
|
|
|
{
|
|
|
|
static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31.
|
|
|
|
|
2020-04-21 19:01:34 +00:00
|
|
|
SizeT size = 0; /// It's smaller than size_t but keeps align in Arena.
|
2019-05-14 14:40:43 +00:00
|
|
|
Batch * next;
|
|
|
|
RowRef row_refs[MAX_SIZE];
|
|
|
|
|
|
|
|
Batch(Batch * parent)
|
|
|
|
: next(parent)
|
|
|
|
{}
|
|
|
|
|
|
|
|
bool full() const { return size == MAX_SIZE; }
|
|
|
|
|
|
|
|
Batch * insert(RowRef && row_ref, Arena & pool)
|
|
|
|
{
|
|
|
|
if (full())
|
|
|
|
{
|
|
|
|
auto batch = pool.alloc<Batch>();
|
|
|
|
*batch = Batch(this);
|
|
|
|
batch->insert(std::move(row_ref), pool);
|
|
|
|
return batch;
|
|
|
|
}
|
|
|
|
|
|
|
|
row_refs[size++] = std::move(row_ref);
|
|
|
|
return this;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2019-05-14 14:39:03 +00:00
|
|
|
class ForwardIterator
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
ForwardIterator(const RowRefList * begin)
|
2019-05-14 14:40:43 +00:00
|
|
|
: root(begin)
|
|
|
|
, first(true)
|
|
|
|
, batch(root->next)
|
|
|
|
, position(0)
|
2019-05-14 14:39:03 +00:00
|
|
|
{}
|
|
|
|
|
2019-05-14 14:40:43 +00:00
|
|
|
const RowRef * operator -> () const
|
|
|
|
{
|
|
|
|
if (first)
|
|
|
|
return root;
|
|
|
|
return &batch->row_refs[position];
|
|
|
|
}
|
|
|
|
|
2021-08-17 10:27:23 +00:00
|
|
|
const RowRef * operator * () const
|
|
|
|
{
|
|
|
|
if (first)
|
|
|
|
return root;
|
|
|
|
return &batch->row_refs[position];
|
|
|
|
}
|
|
|
|
|
2019-05-14 14:40:43 +00:00
|
|
|
void operator ++ ()
|
|
|
|
{
|
|
|
|
if (first)
|
|
|
|
{
|
|
|
|
first = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (batch)
|
|
|
|
{
|
|
|
|
++position;
|
|
|
|
if (position >= batch->size)
|
|
|
|
{
|
|
|
|
batch = batch->next;
|
|
|
|
position = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-29 09:48:39 +00:00
|
|
|
bool ok() const { return first || batch; }
|
2019-05-14 14:39:03 +00:00
|
|
|
|
|
|
|
private:
|
2019-05-14 14:40:43 +00:00
|
|
|
const RowRefList * root;
|
|
|
|
bool first;
|
|
|
|
Batch * batch;
|
|
|
|
size_t position;
|
2019-05-14 14:39:03 +00:00
|
|
|
};
|
2019-03-30 21:30:21 +00:00
|
|
|
|
|
|
|
RowRefList() {}
|
|
|
|
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
|
2019-05-14 14:39:03 +00:00
|
|
|
|
|
|
|
ForwardIterator begin() const { return ForwardIterator(this); }
|
|
|
|
|
|
|
|
/// insert element after current one
|
|
|
|
void insert(RowRef && row_ref, Arena & pool)
|
|
|
|
{
|
2019-05-14 14:40:43 +00:00
|
|
|
if (!next)
|
|
|
|
{
|
|
|
|
next = pool.alloc<Batch>();
|
|
|
|
*next = Batch(nullptr);
|
|
|
|
}
|
|
|
|
next = next->insert(std::move(row_ref), pool);
|
2019-05-14 14:39:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
2019-05-14 14:40:43 +00:00
|
|
|
Batch * next = nullptr;
|
2019-03-30 21:30:21 +00:00
|
|
|
};
|
|
|
|
|
2019-04-05 17:59:48 +00:00
|
|
|
/**
|
|
|
|
* This class is intended to push sortable data into.
|
|
|
|
* When looking up values the container ensures that it is sorted for log(N) lookup
|
|
|
|
* After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the
|
|
|
|
* references that can be returned by the lookup methods
|
|
|
|
*/
|
|
|
|
|
2022-02-04 16:12:01 +00:00
|
|
|
template <typename TEntry>
|
2019-04-05 17:59:48 +00:00
|
|
|
class SortedLookupVector
|
|
|
|
{
|
|
|
|
public:
|
2019-04-25 01:16:26 +00:00
|
|
|
using Base = std::vector<TEntry>;
|
2022-02-04 16:12:01 +00:00
|
|
|
using TKey = decltype(TEntry::asof_value);
|
|
|
|
using Keys = std::vector<TKey>;
|
2019-04-05 17:59:48 +00:00
|
|
|
|
2022-02-04 16:12:01 +00:00
|
|
|
void insert(const TKey & key, const Block * block, size_t row_num)
|
2019-04-05 17:59:48 +00:00
|
|
|
{
|
|
|
|
assert(!sorted.load(std::memory_order_acquire));
|
2022-02-04 16:12:01 +00:00
|
|
|
array.emplace_back(key, block, row_num);
|
2019-04-05 17:59:48 +00:00
|
|
|
}
|
|
|
|
|
2022-02-04 16:12:01 +00:00
|
|
|
/// Find an element based on the inequality rules
|
|
|
|
/// Note that this function uses 2 arrays, one with only the keys (so it's smaller and more memory efficient)
|
|
|
|
/// and a second one with both the key and the Rowref to be returned
|
|
|
|
/// Both are sorted only once, in a concurrent safe manner
|
|
|
|
const RowRef * find(const TKey & k, ASOF::Inequality inequality)
|
2019-04-05 17:59:48 +00:00
|
|
|
{
|
2022-02-04 16:12:01 +00:00
|
|
|
sort();
|
|
|
|
auto it = keys.cend();
|
|
|
|
switch (inequality)
|
|
|
|
{
|
|
|
|
case ASOF::Inequality::LessOrEquals:
|
|
|
|
{
|
|
|
|
it = std::lower_bound(keys.cbegin(), keys.cend(), k);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case ASOF::Inequality::Less:
|
|
|
|
{
|
|
|
|
it = std::upper_bound(keys.cbegin(), keys.cend(), k);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case ASOF::Inequality::GreaterOrEquals:
|
|
|
|
{
|
|
|
|
auto first_ge = std::upper_bound(keys.cbegin(), keys.cend(), k);
|
|
|
|
if (first_ge == keys.cend() && keys.size())
|
|
|
|
first_ge--;
|
|
|
|
while (first_ge != keys.cbegin() && *first_ge > k)
|
|
|
|
first_ge--;
|
|
|
|
if (*first_ge <= k)
|
|
|
|
it = first_ge;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case ASOF::Inequality::Greater:
|
|
|
|
{
|
|
|
|
auto first_ge = std::upper_bound(keys.cbegin(), keys.cend(), k);
|
|
|
|
if (first_ge == keys.cend() && keys.size())
|
|
|
|
first_ge--;
|
|
|
|
while (first_ge != keys.cbegin() && *first_ge >= k)
|
|
|
|
first_ge--;
|
|
|
|
if (*first_ge < k)
|
|
|
|
it = first_ge;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
throw Exception("Invalid ASOF Join order", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (it != keys.cend())
|
|
|
|
return &((array.cbegin() + (it - keys.begin()))->row_ref);
|
2019-04-05 17:59:48 +00:00
|
|
|
|
2019-10-11 17:56:26 +00:00
|
|
|
return nullptr;
|
|
|
|
}
|
2019-04-05 17:59:48 +00:00
|
|
|
|
|
|
|
private:
|
|
|
|
std::atomic<bool> sorted = false;
|
|
|
|
mutable std::mutex lock;
|
|
|
|
|
2022-02-04 16:12:01 +00:00
|
|
|
Base array;
|
|
|
|
/// We keep a separate copy of just the keys to make the searches more memory efficient
|
|
|
|
Keys keys;
|
2019-04-25 01:16:26 +00:00
|
|
|
|
2019-04-05 17:59:48 +00:00
|
|
|
// Double checked locking with SC atomics works in C++
|
|
|
|
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
|
|
|
|
// The first thread that calls one of the lookup methods sorts the data
|
|
|
|
// After calling the first lookup method it is no longer allowed to insert any data
|
|
|
|
// the array becomes immutable
|
2022-02-04 16:12:01 +00:00
|
|
|
void sort()
|
2019-04-05 17:59:48 +00:00
|
|
|
{
|
2019-04-05 18:05:24 +00:00
|
|
|
if (!sorted.load(std::memory_order_acquire))
|
|
|
|
{
|
2019-04-05 17:59:48 +00:00
|
|
|
std::lock_guard<std::mutex> l(lock);
|
2019-04-05 18:05:24 +00:00
|
|
|
if (!sorted.load(std::memory_order_relaxed))
|
|
|
|
{
|
2019-04-25 01:16:26 +00:00
|
|
|
if (!array.empty())
|
2022-02-04 16:12:01 +00:00
|
|
|
{
|
|
|
|
::sort(array.begin(), array.end());
|
|
|
|
keys.reserve(array.size());
|
|
|
|
for (auto & e : array)
|
|
|
|
keys.push_back(e.asof_value);
|
|
|
|
}
|
2019-04-15 14:09:39 +00:00
|
|
|
|
2019-04-05 17:59:48 +00:00
|
|
|
sorted.store(true, std::memory_order_release);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2019-03-30 21:30:21 +00:00
|
|
|
class AsofRowRefs
|
|
|
|
{
|
|
|
|
public:
|
2019-04-02 16:22:14 +00:00
|
|
|
template <typename T>
|
2019-03-30 21:30:21 +00:00
|
|
|
struct Entry
|
|
|
|
{
|
2019-04-12 12:48:00 +00:00
|
|
|
using LookupType = SortedLookupVector<Entry<T>, T>;
|
2019-04-05 17:59:48 +00:00
|
|
|
using LookupPtr = std::unique_ptr<LookupType>;
|
2019-03-30 21:30:21 +00:00
|
|
|
T asof_value;
|
|
|
|
RowRef row_ref;
|
|
|
|
|
|
|
|
Entry(T v) : asof_value(v) {}
|
2022-02-04 16:12:01 +00:00
|
|
|
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) { }
|
|
|
|
Entry(T v, const Block * block, size_t row_num) : asof_value(v), row_ref(block, row_num) { }
|
|
|
|
|
|
|
|
bool operator<(const Entry & other) const { return asof_value < other.asof_value; }
|
2019-03-30 21:30:21 +00:00
|
|
|
};
|
|
|
|
|
2019-04-01 16:44:15 +00:00
|
|
|
using Lookups = std::variant<
|
2020-06-01 09:38:46 +00:00
|
|
|
Entry<UInt8>::LookupPtr,
|
|
|
|
Entry<UInt16>::LookupPtr,
|
2019-04-05 17:59:48 +00:00
|
|
|
Entry<UInt32>::LookupPtr,
|
|
|
|
Entry<UInt64>::LookupPtr,
|
2020-06-01 09:38:46 +00:00
|
|
|
Entry<Int8>::LookupPtr,
|
|
|
|
Entry<Int16>::LookupPtr,
|
2019-05-02 23:46:04 +00:00
|
|
|
Entry<Int32>::LookupPtr,
|
|
|
|
Entry<Int64>::LookupPtr,
|
2019-04-05 17:59:48 +00:00
|
|
|
Entry<Float32>::LookupPtr,
|
2019-11-20 09:00:10 +00:00
|
|
|
Entry<Float64>::LookupPtr,
|
|
|
|
Entry<Decimal32>::LookupPtr,
|
|
|
|
Entry<Decimal64>::LookupPtr,
|
2020-11-14 06:19:08 +00:00
|
|
|
Entry<Decimal128>::LookupPtr,
|
|
|
|
Entry<DateTime64>::LookupPtr>;
|
2019-03-30 21:30:21 +00:00
|
|
|
|
2019-04-05 17:59:48 +00:00
|
|
|
AsofRowRefs() {}
|
2020-06-01 09:38:46 +00:00
|
|
|
AsofRowRefs(TypeIndex t);
|
2019-04-05 17:59:48 +00:00
|
|
|
|
2020-08-03 23:11:39 +00:00
|
|
|
static std::optional<TypeIndex> getTypeSize(const IColumn & asof_column, size_t & type_size);
|
2019-04-01 16:44:15 +00:00
|
|
|
|
2019-04-05 17:59:48 +00:00
|
|
|
// This will be synchronized by the rwlock mutex in Join.h
|
2020-08-03 23:11:39 +00:00
|
|
|
void insert(TypeIndex type, const IColumn & asof_column, const Block * block, size_t row_num);
|
2019-04-05 17:59:48 +00:00
|
|
|
|
|
|
|
// This will internally synchronize
|
2020-08-03 23:11:39 +00:00
|
|
|
const RowRef * findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn & asof_column, size_t row_num) const;
|
2019-03-30 21:30:21 +00:00
|
|
|
|
|
|
|
private:
|
2019-04-05 17:59:48 +00:00
|
|
|
// Lookups can be stored in a HashTable because it is memmovable
|
|
|
|
// A std::variant contains a currently active type id (memmovable), together with a union of the types
|
|
|
|
// The types are all std::unique_ptr, which contains a single pointer, which is memmovable.
|
2019-09-23 16:18:19 +00:00
|
|
|
// Source: https://github.com/ClickHouse/ClickHouse/issues/4906
|
2019-04-05 17:59:48 +00:00
|
|
|
Lookups lookups;
|
2019-03-30 21:30:21 +00:00
|
|
|
};
|
|
|
|
|
2019-03-31 10:56:54 +00:00
|
|
|
}
|