asof join inequalities

This commit is contained in:
chertus 2019-10-11 20:56:26 +03:00
parent 9a89ee8eab
commit 91c7ae83a3
10 changed files with 141 additions and 52 deletions

View File

@ -5,6 +5,7 @@
#include <Core/SettingsCommon.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/asof.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
@ -48,6 +49,7 @@ class AnalyzedJoin
ASTs key_asts_left;
ASTs key_asts_right;
ASTTableJoin table_join;
ASOF::Inequality asof_inequality = ASOF::Inequality::GreaterOrEquals;
/// All columns which can be read from joined table. Duplicating names are qualified.
NamesAndTypesList columns_from_joined_table;
@ -100,6 +102,9 @@ public:
void addJoinedColumn(const NameAndTypePair & joined_column);
void addJoinedColumnsAndCorrectNullability(Block & sample_block) const;
void setAsofInequality(ASOF::Inequality inequality) { asof_inequality = inequality; }
ASOF::Inequality getAsofInequality() { return asof_inequality; }
ASTPtr leftKeysList() const;
ASTPtr rightKeysList() const; /// For ON syntax only

View File

@ -32,16 +32,24 @@ void CollectJoinOnKeysMatcher::Data::addJoinKeys(const ASTPtr & left_ast, const
}
void CollectJoinOnKeysMatcher::Data::addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast,
const std::pair<size_t, size_t> & table_no)
const std::pair<size_t, size_t> & table_no, const ASOF::Inequality & inequality)
{
if (table_no.first == 1 || table_no.second == 2)
{
asof_left_key = left_ast->clone();
asof_right_key = right_ast->clone();
analyzed_join.setAsofInequality(inequality);
return;
}
else if (table_no.first == 2 || table_no.second == 1)
{
asof_left_key = right_ast->clone();
asof_right_key = left_ast->clone();
analyzed_join.setAsofInequality(ASOF::reverseInequality(inequality));
return;
}
throw Exception("ASOF JOIN for (left_table.x <= right_table.x) is not implemented", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("ASOF JOIN requires keys inequality from different tables", ErrorCodes::NOT_IMPLEMENTED);
}
void CollectJoinOnKeysMatcher::Data::asofToJoinKeys()
@ -66,10 +74,9 @@ void CollectJoinOnKeysMatcher::visit(const ASTFunction & func, const ASTPtr & as
return;
}
bool less_or_equals = (func.name == "lessOrEquals");
bool greater_or_equals = (func.name == "greaterOrEquals");
ASOF::Inequality inequality = ASOF::getInequality(func.name);
if (data.is_asof && (less_or_equals || greater_or_equals))
if (data.is_asof && (inequality != ASOF::Inequality::None))
{
if (data.asof_left_key || data.asof_right_key)
throwSyntaxException("ASOF JOIN expects exactly one inequality in ON section, unexpected " + queryToString(ast) + ".");
@ -78,11 +85,7 @@ void CollectJoinOnKeysMatcher::visit(const ASTFunction & func, const ASTPtr & as
ASTPtr right = func.arguments->children.at(1);
auto table_numbers = getTableNumbers(ast, left, right, data);
if (greater_or_equals)
data.addAsofJoinKeys(left, right, table_numbers);
else
data.addAsofJoinKeys(right, left, std::make_pair(table_numbers.second, table_numbers.first));
data.addAsofJoinKeys(left, right, table_numbers, inequality);
return;
}

View File

@ -12,6 +12,11 @@ namespace DB
class ASTIdentifier;
class AnalyzedJoin;
namespace ASOF
{
enum class Inequality;
}
class CollectJoinOnKeysMatcher
{
public:
@ -29,7 +34,8 @@ public:
bool has_some{false};
void addJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no);
void addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no);
void addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no,
const ASOF::Inequality & asof_inequality);
void asofToJoinKeys();
};

View File

@ -70,6 +70,7 @@ Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample
, nullable_right_side(table_join->forceNullableRight())
, nullable_left_side(table_join->forceNullableLeft())
, any_take_last_row(any_take_last_row_)
, asof_inequality(table_join->getAsofInequality())
, log(&Logger::get("Join"))
{
setSampleBlock(right_sample_block);
@ -635,7 +636,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
if (const RowRef * found = mapped.findAsof(join.getAsofType(), asof_column, i))
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofInequality(), asof_column, i))
{
filter[i] = 1;
mapped.setUsed();

View File

@ -166,6 +166,7 @@ public:
ASTTableJoin::Kind getKind() const { return kind; }
ASTTableJoin::Strictness getStrictness() const { return strictness; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
ASOF::Inequality getAsofInequality() const { return asof_inequality; }
bool anyTakeLastRow() const { return any_take_last_row; }
/// Different types of keys for maps.
@ -305,6 +306,7 @@ private:
Type type = Type::EMPTY;
std::optional<AsofRowRefs::Type> asof_type;
ASOF::Inequality asof_inequality;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);

View File

@ -58,26 +58,27 @@ void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * b
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, const IColumn * asof_column, size_t row_num) const
const RowRef * AsofRowRefs::findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
bool ascending = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::LessOrEquals);
bool is_strict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater);
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupPtr = typename Entry<T>::LookupPtr;
using EntryType = Entry<T>;
using LookupPtr = typename EntryType::LookupPtr;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto & typed_lookup = std::get<LookupPtr>(lookups);
// The first thread that calls upper_bound ensures that the data is sorted
auto it = typed_lookup->upper_bound(Entry<T>(key));
// cbegin() is safe to call now because the array is immutable after sorting
// hence the pointer to a entry can be returned
if (it != typed_lookup->cbegin())
out = &((--it)->row_ref);
if (is_strict)
out = typed_lookup->upperBound(EntryType(key), ascending);
else
out = typed_lookup->lowerBound(EntryType(key), ascending);
};
callWithType(type, call);

View File

@ -1,8 +1,8 @@
#pragma once
#include <Common/Arena.h>
#include <Common/RadixSort.h>
#include <Columns/IColumn.h>
#include <Interpreters/asof.h>
#include <optional>
#include <variant>
@ -144,34 +144,44 @@ public:
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
}
// Transition into second stage, ensures that the vector is sorted
typename Base::const_iterator upper_bound(const TEntry & k)
const RowRef * upperBound(const TEntry & k, bool ascending)
{
sort();
return std::upper_bound(array.cbegin(), array.cend(), k);
sort(ascending);
auto it = std::upper_bound(array.cbegin(), array.cend(), k, (ascending ? less<true> : less<false>));
if (it != array.cend())
return &(it->row_ref);
return nullptr;
}
// After ensuring that the vector is sorted by calling a lookup these are safe to call
typename Base::const_iterator cbegin() const { return array.cbegin(); }
typename Base::const_iterator cend() const { return array.cend(); }
const RowRef * lowerBound(const TEntry & k, bool ascending)
{
sort(ascending);
auto it = std::lower_bound(array.cbegin(), array.cend(), k, (ascending ? less<true> : less<false>));
if (it != array.cend())
return &(it->row_ref);
return nullptr;
}
private:
std::atomic<bool> sorted = false;
Base array;
mutable std::mutex lock;
struct RadixSortTraits : RadixSortNumTraits<TKey>
template <bool ascending>
static bool less(const TEntry & a, const TEntry & b)
{
using Element = TEntry;
static TKey & extractKey(Element & elem) { return elem.asof_value; }
};
if constexpr (ascending)
return a.asof_value < b.asof_value;
else
return a.asof_value > b.asof_value;
}
// Double checked locking with SC atomics works in C++
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
// The first thread that calls one of the lookup methods sorts the data
// After calling the first lookup method it is no longer allowed to insert any data
// the array becomes immutable
void sort()
void sort(bool ascending)
{
if (!sorted.load(std::memory_order_acquire))
{
@ -179,13 +189,7 @@ private:
if (!sorted.load(std::memory_order_relaxed))
{
if (!array.empty())
{
/// TODO: It has been tested only for UInt32 yet. It needs to check UInt64, Float32/64.
if constexpr (std::is_same_v<TKey, UInt32>)
RadixSort<RadixSortTraits>::executeLSD(&array[0], array.size());
else
std::sort(array.begin(), array.end());
}
std::sort(array.begin(), array.end(), (ascending ? less<true> : less<false>));
sorted.store(true, std::memory_order_release);
}
@ -206,11 +210,6 @@ public:
Entry(T v) : asof_value(v) {}
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
bool operator < (const Entry & o) const
{
return asof_value < o.asof_value;
}
};
using Lookups = std::variant<
@ -236,7 +235,7 @@ public:
void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num);
// This will internally synchronize
const RowRef * findAsof(Type type, const IColumn * asof_column, size_t row_num) const;
const RowRef * findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
private:
// Lookups can be stored in a HashTable because it is memmovable

View File

@ -0,0 +1,46 @@
#pragma once
#include <string>
namespace DB
{
namespace ASOF
{
enum class Inequality
{
None = 0,
Less,
Greater,
LessOrEquals,
GreaterOrEquals,
};
inline Inequality getInequality(const std::string & func_name)
{
Inequality inequality{Inequality::None};
if (func_name == "less")
inequality = Inequality::Less;
else if (func_name == "greater")
inequality = Inequality::Greater;
else if (func_name == "lessOrEquals")
inequality = Inequality::LessOrEquals;
else if (func_name == "greaterOrEquals")
inequality = Inequality::GreaterOrEquals;
return inequality;
}
inline Inequality reverseInequality(Inequality inequality)
{
if (inequality == Inequality::Less)
return Inequality::Greater;
else if (inequality == Inequality::Greater)
return Inequality::Less;
else if (inequality == Inequality::LessOrEquals)
return Inequality::GreaterOrEquals;
else if (inequality == Inequality::GreaterOrEquals)
return Inequality::LessOrEquals;
return Inequality::None;
}
}
}

View File

@ -11,3 +11,25 @@
1 2 1 2
1 3 1 2
2 3 2 3
-
1 1 1 2
1 2 1 2
1 3 1 4
2 1 2 3
2 2 2 3
2 3 2 3
-
1 1 1 2
1 2 1 2
1 3 1 4
2 1 2 3
2 2 2 3
2 3 2 3
-
1 3 1 2
-
1 1 1 2
1 2 1 4
1 3 1 4
2 1 2 3
2 2 2 3

View File

@ -9,11 +9,15 @@ INSERT INTO B (b,t) VALUES (1,2),(1,4),(2,3);
SELECT A.a, A.t, B.b, B.t FROM A ASOF LEFT JOIN B ON A.a == B.b AND A.t >= B.t ORDER BY (A.a, A.t);
SELECT count() FROM A ASOF LEFT JOIN B ON A.a == B.b AND B.t <= A.t;
SELECT A.a, A.t, B.b, B.t FROM A ASOF INNER JOIN B ON B.t <= A.t AND A.a == B.b;
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t <= B.t; -- { serverError 48 }
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND B.t >= A.t; -- { serverError 48 }
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t > B.t; -- { serverError 403 }
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t < B.t; -- { serverError 403 }
SELECT A.a, A.t, B.b, B.t FROM A ASOF INNER JOIN B ON B.t <= A.t AND A.a == B.b ORDER BY (A.a, A.t);
SELECT '-';
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t <= B.t ORDER BY (A.a, A.t);
SELECT '-';
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND B.t >= A.t ORDER BY (A.a, A.t);
SELECT '-';
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t > B.t ORDER BY (A.a, A.t);
SELECT '-';
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t < B.t ORDER BY (A.a, A.t);
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t == B.t; -- { serverError 403 }
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t != B.t; -- { serverError 403 }