ClickHouse/src/Interpreters/RowRefs.cpp

271 lines
8.0 KiB
C++
Raw Normal View History

2019-03-30 21:30:21 +00:00
#include <Interpreters/RowRefs.h>
2022-03-23 11:19:38 +00:00
#include <Common/RadixSort.h>
2019-03-30 21:30:21 +00:00
#include <Columns/IColumn.h>
2022-03-17 18:08:33 +00:00
#include <DataTypes/IDataType.h>
2022-07-29 16:30:50 +00:00
#include <Core/Joins.h>
2022-03-17 18:08:33 +00:00
#include <base/types.h>
2019-03-30 21:30:21 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
2022-02-18 17:37:46 +00:00
extern const int LOGICAL_ERROR;
}
2019-04-01 16:44:15 +00:00
namespace
{
/// maps enum values to types
template <typename F>
2022-03-17 18:08:33 +00:00
void callWithType(TypeIndex type, F && f)
2019-03-30 21:30:21 +00:00
{
2022-03-17 18:08:33 +00:00
WhichDataType which(type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) \
return f(TYPE());
FOR_NUMERIC_TYPES(DISPATCH)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Decimal256)
DISPATCH(DateTime64)
#undef DISPATCH
2019-04-01 16:44:15 +00:00
UNREACHABLE();
2019-04-01 16:44:15 +00:00
}
2021-09-10 21:28:43 +00:00
2022-07-29 16:30:50 +00:00
template <typename TKey, ASOFJoinInequality inequality>
2022-03-17 16:02:13 +00:00
class SortedLookupVector : public SortedLookupVectorBase
{
struct Entry
{
2022-03-23 11:19:38 +00:00
TKey value;
uint32_t row_ref_index;
2022-03-17 16:02:13 +00:00
Entry() = delete;
2022-03-23 11:19:38 +00:00
Entry(TKey value_, uint32_t row_ref_index_)
: value(value_)
, row_ref_index(row_ref_index_)
{ }
2022-03-17 16:02:13 +00:00
2022-03-23 11:19:38 +00:00
};
struct LessEntryOperator
{
ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const
{
2022-03-23 13:25:21 +00:00
return lhs.value < rhs.value;
2022-03-23 11:19:38 +00:00
}
2022-03-17 16:02:13 +00:00
};
2022-03-18 10:15:12 +00:00
struct GreaterEntryOperator
2022-03-17 16:02:13 +00:00
{
2022-03-23 11:19:38 +00:00
ALWAYS_INLINE bool operator()(const Entry & lhs, const Entry & rhs) const
{
return lhs.value > rhs.value;
}
2022-03-17 16:02:13 +00:00
};
public:
using Entries = PODArrayWithStackMemory<Entry, sizeof(Entry)>;
using RowRefs = PODArrayWithStackMemory<RowRef, sizeof(RowRef)>;
2022-03-23 11:19:38 +00:00
2022-07-29 16:30:50 +00:00
static constexpr bool is_descending = (inequality == ASOFJoinInequality::Greater || inequality == ASOFJoinInequality::GreaterOrEquals);
static constexpr bool is_strict = (inequality == ASOFJoinInequality::Less) || (inequality == ASOFJoinInequality::Greater);
2022-03-17 16:02:13 +00:00
void insert(const IColumn & asof_column, const Block * block, size_t row_num) override
{
using ColumnType = ColumnVectorOrDecimal<TKey>;
const auto & column = assert_cast<const ColumnType &>(asof_column);
2022-03-23 11:19:38 +00:00
TKey key = column.getElement(row_num);
2022-03-17 16:02:13 +00:00
assert(!sorted.load(std::memory_order_acquire));
2022-03-23 11:19:38 +00:00
entries.emplace_back(key, static_cast<UInt32>(row_refs.size()));
2022-03-23 11:19:38 +00:00
row_refs.emplace_back(RowRef(block, row_num));
2022-03-17 16:02:13 +00:00
}
/// Unrolled version of upper_bound and lower_bound
/// Loosely based on https://academy.realm.io/posts/how-we-beat-cpp-stl-binary-search/
/// In the future it'd interesting to replace it with a B+Tree Layout as described
/// at https://en.algorithmica.org/hpc/data-structures/s-tree/
size_t boundSearch(TKey value)
{
2022-03-23 11:19:38 +00:00
size_t size = entries.size();
2022-03-17 16:02:13 +00:00
size_t low = 0;
/// This is a single binary search iteration as a macro to unroll. Takes into account the inequality:
2022-03-23 11:19:38 +00:00
/// is_strict -> Equal values are not requested
/// is_descending -> The vector is sorted in reverse (for greater or greaterOrEquals)
2022-03-17 16:02:13 +00:00
#define BOUND_ITERATION \
{ \
size_t half = size / 2; \
size_t other_half = size - half; \
size_t probe = low + half; \
size_t other_low = low + other_half; \
2022-03-23 11:19:38 +00:00
TKey & v = entries[probe].value; \
2022-03-17 16:02:13 +00:00
size = half; \
2022-03-23 11:19:38 +00:00
if constexpr (is_descending) \
2022-03-17 16:02:13 +00:00
{ \
2022-03-23 11:19:38 +00:00
if constexpr (is_strict) \
2022-03-17 16:02:13 +00:00
low = value <= v ? other_low : low; \
else \
low = value < v ? other_low : low; \
} \
else \
{ \
2022-03-23 11:19:38 +00:00
if constexpr (is_strict) \
2022-03-17 16:02:13 +00:00
low = value >= v ? other_low : low; \
else \
low = value > v ? other_low : low; \
} \
}
while (size >= 8)
{
BOUND_ITERATION
BOUND_ITERATION
BOUND_ITERATION
}
while (size > 0)
{
BOUND_ITERATION
}
#undef BOUND_ITERATION
return low;
}
2022-03-23 11:19:38 +00:00
RowRef findAsof(const IColumn & asof_column, size_t row_num) override
2022-03-17 16:02:13 +00:00
{
sort();
using ColumnType = ColumnVectorOrDecimal<TKey>;
const auto & column = assert_cast<const ColumnType &>(asof_column);
TKey k = column.getElement(row_num);
size_t pos = boundSearch(k);
2022-03-23 11:19:38 +00:00
if (pos != entries.size())
{
size_t row_ref_index = entries[pos].row_ref_index;
return row_refs[row_ref_index];
}
2022-03-17 16:02:13 +00:00
return {nullptr, 0};
}
private:
std::atomic<bool> sorted = false;
mutable std::mutex lock;
2022-03-23 11:19:38 +00:00
Entries entries;
RowRefs row_refs;
2022-03-17 16:02:13 +00:00
// Double checked locking with SC atomics works in C++
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
// The first thread that calls one of the lookup methods sorts the data
// After calling the first lookup method it is no longer allowed to insert any data
// the array becomes immutable
void sort()
{
if (!sorted.load(std::memory_order_acquire))
{
std::lock_guard<std::mutex> l(lock);
2022-03-23 11:19:38 +00:00
2022-03-17 16:02:13 +00:00
if (!sorted.load(std::memory_order_relaxed))
{
2022-03-23 11:19:38 +00:00
if constexpr (std::is_arithmetic_v<TKey> && !std::is_floating_point_v<TKey>)
{
if (likely(entries.size() > 256))
{
struct RadixSortTraits : RadixSortNumTraits<TKey>
{
using Element = Entry;
using Result = Element;
static TKey & extractKey(Element & elem) { return elem.value; }
static Result extractResult(Element & elem) { return elem; }
2022-03-23 11:19:38 +00:00
};
if constexpr (is_descending)
RadixSort<RadixSortTraits>::executeLSD(entries.data(), entries.size(), true);
else
RadixSort<RadixSortTraits>::executeLSD(entries.data(), entries.size(), false);
sorted.store(true, std::memory_order_release);
return;
}
}
if constexpr (is_descending)
::sort(entries.begin(), entries.end(), GreaterEntryOperator());
2022-03-17 16:02:13 +00:00
else
2022-03-23 11:19:38 +00:00
::sort(entries.begin(), entries.end(), LessEntryOperator());
2022-03-17 16:02:13 +00:00
sorted.store(true, std::memory_order_release);
}
}
}
};
}
2019-04-01 16:44:15 +00:00
2022-07-29 16:30:50 +00:00
AsofRowRefs createAsofRowRef(TypeIndex type, ASOFJoinInequality inequality)
{
2022-03-17 16:02:13 +00:00
AsofRowRefs result;
auto call = [&](const auto & t)
{
2022-02-18 10:02:14 +00:00
using T = std::decay_t<decltype(t)>;
2022-02-18 15:16:29 +00:00
switch (inequality)
{
2022-07-29 16:30:50 +00:00
case ASOFJoinInequality::LessOrEquals:
result = std::make_unique<SortedLookupVector<T, ASOFJoinInequality::LessOrEquals>>();
2022-02-18 15:16:29 +00:00
break;
2022-07-29 16:30:50 +00:00
case ASOFJoinInequality::Less:
result = std::make_unique<SortedLookupVector<T, ASOFJoinInequality::Less>>();
2022-02-18 15:16:29 +00:00
break;
2022-07-29 16:30:50 +00:00
case ASOFJoinInequality::GreaterOrEquals:
result = std::make_unique<SortedLookupVector<T, ASOFJoinInequality::GreaterOrEquals>>();
2022-02-18 15:16:29 +00:00
break;
2022-07-29 16:30:50 +00:00
case ASOFJoinInequality::Greater:
result = std::make_unique<SortedLookupVector<T, ASOFJoinInequality::Greater>>();
2022-02-18 15:16:29 +00:00
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid ASOF Join order");
2022-02-18 15:16:29 +00:00
}
};
callWithType(type, call);
2022-03-17 16:02:13 +00:00
return result;
}
std::optional<TypeIndex> SortedLookupVectorBase::getTypeSize(const IColumn & asof_column, size_t & size)
2019-03-30 21:30:21 +00:00
{
2022-03-17 18:08:33 +00:00
WhichDataType which(asof_column.getDataType());
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) \
{ \
size = sizeof(TYPE); \
return asof_column.getDataType(); \
}
2019-04-01 16:44:15 +00:00
2022-03-17 18:08:33 +00:00
FOR_NUMERIC_TYPES(DISPATCH)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Decimal256)
DISPATCH(DateTime64)
#undef DISPATCH
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "ASOF join not supported for type: {}", std::string(asof_column.getFamilyName()));
2019-03-30 21:30:21 +00:00
}
2019-03-31 10:56:54 +00:00
}