mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
commit
3ceeb95145
@ -5,6 +5,7 @@
|
||||
#include <Core/SettingsCommon.h>
|
||||
#include <Parsers/ASTTablesInSelectQuery.h>
|
||||
#include <Interpreters/IJoin.h>
|
||||
#include <Interpreters/asof.h>
|
||||
#include <DataStreams/IBlockStream_fwd.h>
|
||||
#include <DataStreams/SizeLimits.h>
|
||||
|
||||
@ -49,6 +50,7 @@ class AnalyzedJoin
|
||||
ASTs key_asts_left;
|
||||
ASTs key_asts_right;
|
||||
ASTTableJoin table_join;
|
||||
ASOF::Inequality asof_inequality = ASOF::Inequality::GreaterOrEquals;
|
||||
|
||||
/// All columns which can be read from joined table. Duplicating names are qualified.
|
||||
NamesAndTypesList columns_from_joined_table;
|
||||
@ -106,6 +108,9 @@ public:
|
||||
void addJoinedColumn(const NameAndTypePair & joined_column);
|
||||
void addJoinedColumnsAndCorrectNullability(Block & sample_block) const;
|
||||
|
||||
void setAsofInequality(ASOF::Inequality inequality) { asof_inequality = inequality; }
|
||||
ASOF::Inequality getAsofInequality() { return asof_inequality; }
|
||||
|
||||
ASTPtr leftKeysList() const;
|
||||
ASTPtr rightKeysList() const; /// For ON syntax only
|
||||
|
||||
|
@ -32,16 +32,20 @@ void CollectJoinOnKeysMatcher::Data::addJoinKeys(const ASTPtr & left_ast, const
|
||||
}
|
||||
|
||||
void CollectJoinOnKeysMatcher::Data::addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast,
|
||||
const std::pair<size_t, size_t> & table_no)
|
||||
const std::pair<size_t, size_t> & table_no, const ASOF::Inequality & inequality)
|
||||
{
|
||||
if (table_no.first == 1 || table_no.second == 2)
|
||||
{
|
||||
asof_left_key = left_ast->clone();
|
||||
asof_right_key = right_ast->clone();
|
||||
return;
|
||||
analyzed_join.setAsofInequality(inequality);
|
||||
}
|
||||
else if (table_no.first == 2 || table_no.second == 1)
|
||||
{
|
||||
asof_left_key = right_ast->clone();
|
||||
asof_right_key = left_ast->clone();
|
||||
analyzed_join.setAsofInequality(ASOF::reverseInequality(inequality));
|
||||
}
|
||||
|
||||
throw Exception("ASOF JOIN for (left_table.x <= right_table.x) is not implemented", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
void CollectJoinOnKeysMatcher::Data::asofToJoinKeys()
|
||||
@ -66,10 +70,9 @@ void CollectJoinOnKeysMatcher::visit(const ASTFunction & func, const ASTPtr & as
|
||||
return;
|
||||
}
|
||||
|
||||
bool less_or_equals = (func.name == "lessOrEquals");
|
||||
bool greater_or_equals = (func.name == "greaterOrEquals");
|
||||
ASOF::Inequality inequality = ASOF::getInequality(func.name);
|
||||
|
||||
if (data.is_asof && (less_or_equals || greater_or_equals))
|
||||
if (data.is_asof && (inequality != ASOF::Inequality::None))
|
||||
{
|
||||
if (data.asof_left_key || data.asof_right_key)
|
||||
throwSyntaxException("ASOF JOIN expects exactly one inequality in ON section, unexpected " + queryToString(ast) + ".");
|
||||
@ -78,11 +81,7 @@ void CollectJoinOnKeysMatcher::visit(const ASTFunction & func, const ASTPtr & as
|
||||
ASTPtr right = func.arguments->children.at(1);
|
||||
auto table_numbers = getTableNumbers(ast, left, right, data);
|
||||
|
||||
if (greater_or_equals)
|
||||
data.addAsofJoinKeys(left, right, table_numbers);
|
||||
else
|
||||
data.addAsofJoinKeys(right, left, std::make_pair(table_numbers.second, table_numbers.first));
|
||||
|
||||
data.addAsofJoinKeys(left, right, table_numbers, inequality);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,11 @@ namespace DB
|
||||
class ASTIdentifier;
|
||||
class AnalyzedJoin;
|
||||
|
||||
namespace ASOF
|
||||
{
|
||||
enum class Inequality;
|
||||
}
|
||||
|
||||
class CollectJoinOnKeysMatcher
|
||||
{
|
||||
public:
|
||||
@ -29,7 +34,8 @@ public:
|
||||
bool has_some{false};
|
||||
|
||||
void addJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no);
|
||||
void addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no);
|
||||
void addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no,
|
||||
const ASOF::Inequality & asof_inequality);
|
||||
void asofToJoinKeys();
|
||||
};
|
||||
|
||||
|
@ -70,6 +70,7 @@ Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample
|
||||
, nullable_right_side(table_join->forceNullableRight())
|
||||
, nullable_left_side(table_join->forceNullableLeft())
|
||||
, any_take_last_row(any_take_last_row_)
|
||||
, asof_inequality(table_join->getAsofInequality())
|
||||
, log(&Logger::get("Join"))
|
||||
{
|
||||
setSampleBlock(right_sample_block);
|
||||
@ -638,7 +639,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
|
||||
|
||||
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
|
||||
{
|
||||
if (const RowRef * found = mapped.findAsof(join.getAsofType(), asof_column, i))
|
||||
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofInequality(), asof_column, i))
|
||||
{
|
||||
filter[i] = 1;
|
||||
mapped.setUsed();
|
||||
|
@ -168,6 +168,7 @@ public:
|
||||
ASTTableJoin::Kind getKind() const { return kind; }
|
||||
ASTTableJoin::Strictness getStrictness() const { return strictness; }
|
||||
AsofRowRefs::Type getAsofType() const { return *asof_type; }
|
||||
ASOF::Inequality getAsofInequality() const { return asof_inequality; }
|
||||
bool anyTakeLastRow() const { return any_take_last_row; }
|
||||
|
||||
/// Different types of keys for maps.
|
||||
@ -308,6 +309,7 @@ private:
|
||||
|
||||
Type type = Type::EMPTY;
|
||||
std::optional<AsofRowRefs::Type> asof_type;
|
||||
ASOF::Inequality asof_inequality;
|
||||
|
||||
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);
|
||||
|
||||
|
@ -58,26 +58,27 @@ void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * b
|
||||
callWithType(type, call);
|
||||
}
|
||||
|
||||
const RowRef * AsofRowRefs::findAsof(Type type, const IColumn * asof_column, size_t row_num) const
|
||||
const RowRef * AsofRowRefs::findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
|
||||
{
|
||||
const RowRef * out = nullptr;
|
||||
|
||||
bool ascending = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::LessOrEquals);
|
||||
bool is_strict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater);
|
||||
|
||||
auto call = [&](const auto & t)
|
||||
{
|
||||
using T = std::decay_t<decltype(t)>;
|
||||
using LookupPtr = typename Entry<T>::LookupPtr;
|
||||
using EntryType = Entry<T>;
|
||||
using LookupPtr = typename EntryType::LookupPtr;
|
||||
|
||||
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
|
||||
T key = column->getElement(row_num);
|
||||
auto & typed_lookup = std::get<LookupPtr>(lookups);
|
||||
|
||||
// The first thread that calls upper_bound ensures that the data is sorted
|
||||
auto it = typed_lookup->upper_bound(Entry<T>(key));
|
||||
|
||||
// cbegin() is safe to call now because the array is immutable after sorting
|
||||
// hence the pointer to a entry can be returned
|
||||
if (it != typed_lookup->cbegin())
|
||||
out = &((--it)->row_ref);
|
||||
if (is_strict)
|
||||
out = typed_lookup->upperBound(EntryType(key), ascending);
|
||||
else
|
||||
out = typed_lookup->lowerBound(EntryType(key), ascending);
|
||||
};
|
||||
|
||||
callWithType(type, call);
|
||||
|
@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/Arena.h>
|
||||
#include <Common/RadixSort.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Interpreters/asof.h>
|
||||
|
||||
#include <optional>
|
||||
#include <variant>
|
||||
@ -144,34 +144,45 @@ public:
|
||||
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
|
||||
}
|
||||
|
||||
// Transition into second stage, ensures that the vector is sorted
|
||||
typename Base::const_iterator upper_bound(const TEntry & k)
|
||||
const RowRef * upperBound(const TEntry & k, bool ascending)
|
||||
{
|
||||
sort();
|
||||
return std::upper_bound(array.cbegin(), array.cend(), k);
|
||||
sort(ascending);
|
||||
auto it = std::upper_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater));
|
||||
if (it != array.cend())
|
||||
return &(it->row_ref);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// After ensuring that the vector is sorted by calling a lookup these are safe to call
|
||||
typename Base::const_iterator cbegin() const { return array.cbegin(); }
|
||||
typename Base::const_iterator cend() const { return array.cend(); }
|
||||
const RowRef * lowerBound(const TEntry & k, bool ascending)
|
||||
{
|
||||
sort(ascending);
|
||||
auto it = std::lower_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater));
|
||||
if (it != array.cend())
|
||||
return &(it->row_ref);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<bool> sorted = false;
|
||||
Base array;
|
||||
mutable std::mutex lock;
|
||||
|
||||
struct RadixSortTraits : RadixSortNumTraits<TKey>
|
||||
static bool less(const TEntry & a, const TEntry & b)
|
||||
{
|
||||
using Element = TEntry;
|
||||
static TKey & extractKey(Element & elem) { return elem.asof_value; }
|
||||
};
|
||||
return a.asof_value < b.asof_value;
|
||||
}
|
||||
|
||||
static bool greater(const TEntry & a, const TEntry & b)
|
||||
{
|
||||
return a.asof_value > b.asof_value;
|
||||
}
|
||||
|
||||
// Double checked locking with SC atomics works in C++
|
||||
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
|
||||
// The first thread that calls one of the lookup methods sorts the data
|
||||
// After calling the first lookup method it is no longer allowed to insert any data
|
||||
// the array becomes immutable
|
||||
void sort()
|
||||
void sort(bool ascending)
|
||||
{
|
||||
if (!sorted.load(std::memory_order_acquire))
|
||||
{
|
||||
@ -179,13 +190,7 @@ private:
|
||||
if (!sorted.load(std::memory_order_relaxed))
|
||||
{
|
||||
if (!array.empty())
|
||||
{
|
||||
/// TODO: It has been tested only for UInt32 yet. It needs to check UInt64, Float32/64.
|
||||
if constexpr (std::is_same_v<TKey, UInt32>)
|
||||
RadixSort<RadixSortTraits>::executeLSD(&array[0], array.size());
|
||||
else
|
||||
std::sort(array.begin(), array.end());
|
||||
}
|
||||
std::sort(array.begin(), array.end(), (ascending ? less : greater));
|
||||
|
||||
sorted.store(true, std::memory_order_release);
|
||||
}
|
||||
@ -206,11 +211,6 @@ public:
|
||||
|
||||
Entry(T v) : asof_value(v) {}
|
||||
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
|
||||
|
||||
bool operator < (const Entry & o) const
|
||||
{
|
||||
return asof_value < o.asof_value;
|
||||
}
|
||||
};
|
||||
|
||||
using Lookups = std::variant<
|
||||
@ -236,7 +236,7 @@ public:
|
||||
void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num);
|
||||
|
||||
// This will internally synchronize
|
||||
const RowRef * findAsof(Type type, const IColumn * asof_column, size_t row_num) const;
|
||||
const RowRef * findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
|
||||
|
||||
private:
|
||||
// Lookups can be stored in a HashTable because it is memmovable
|
||||
|
46
dbms/src/Interpreters/asof.h
Normal file
46
dbms/src/Interpreters/asof.h
Normal file
@ -0,0 +1,46 @@
|
||||
#pragma once
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ASOF
|
||||
{
|
||||
|
||||
enum class Inequality
|
||||
{
|
||||
None = 0,
|
||||
Less,
|
||||
Greater,
|
||||
LessOrEquals,
|
||||
GreaterOrEquals,
|
||||
};
|
||||
|
||||
inline Inequality getInequality(const std::string & func_name)
|
||||
{
|
||||
Inequality inequality{Inequality::None};
|
||||
if (func_name == "less")
|
||||
inequality = Inequality::Less;
|
||||
else if (func_name == "greater")
|
||||
inequality = Inequality::Greater;
|
||||
else if (func_name == "lessOrEquals")
|
||||
inequality = Inequality::LessOrEquals;
|
||||
else if (func_name == "greaterOrEquals")
|
||||
inequality = Inequality::GreaterOrEquals;
|
||||
return inequality;
|
||||
}
|
||||
|
||||
inline Inequality reverseInequality(Inequality inequality)
|
||||
{
|
||||
if (inequality == Inequality::Less)
|
||||
return Inequality::Greater;
|
||||
else if (inequality == Inequality::Greater)
|
||||
return Inequality::Less;
|
||||
else if (inequality == Inequality::LessOrEquals)
|
||||
return Inequality::GreaterOrEquals;
|
||||
else if (inequality == Inequality::GreaterOrEquals)
|
||||
return Inequality::LessOrEquals;
|
||||
return Inequality::None;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -11,3 +11,25 @@
|
||||
1 2 1 2
|
||||
1 3 1 2
|
||||
2 3 2 3
|
||||
-
|
||||
1 1 1 2
|
||||
1 2 1 2
|
||||
1 3 1 4
|
||||
2 1 2 3
|
||||
2 2 2 3
|
||||
2 3 2 3
|
||||
-
|
||||
1 1 1 2
|
||||
1 2 1 2
|
||||
1 3 1 4
|
||||
2 1 2 3
|
||||
2 2 2 3
|
||||
2 3 2 3
|
||||
-
|
||||
1 3 1 2
|
||||
-
|
||||
1 1 1 2
|
||||
1 2 1 4
|
||||
1 3 1 4
|
||||
2 1 2 3
|
||||
2 2 2 3
|
||||
|
@ -9,11 +9,15 @@ INSERT INTO B (b,t) VALUES (1,2),(1,4),(2,3);
|
||||
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF LEFT JOIN B ON A.a == B.b AND A.t >= B.t ORDER BY (A.a, A.t);
|
||||
SELECT count() FROM A ASOF LEFT JOIN B ON A.a == B.b AND B.t <= A.t;
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF INNER JOIN B ON B.t <= A.t AND A.a == B.b;
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t <= B.t; -- { serverError 48 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND B.t >= A.t; -- { serverError 48 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t > B.t; -- { serverError 403 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t < B.t; -- { serverError 403 }
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF INNER JOIN B ON B.t <= A.t AND A.a == B.b ORDER BY (A.a, A.t);
|
||||
SELECT '-';
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t <= B.t ORDER BY (A.a, A.t);
|
||||
SELECT '-';
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND B.t >= A.t ORDER BY (A.a, A.t);
|
||||
SELECT '-';
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t > B.t ORDER BY (A.a, A.t);
|
||||
SELECT '-';
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t < B.t ORDER BY (A.a, A.t);
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t == B.t; -- { serverError 403 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t != B.t; -- { serverError 403 }
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user