fix left asof join with join_use_nulls (#13291)

This commit is contained in:
Artem Zuikov 2020-08-04 02:11:39 +03:00 committed by GitHub
parent 43f048d2cb
commit d4266d9619
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 199 additions and 66 deletions

View File

@ -33,7 +33,6 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
@ -158,25 +157,21 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
}
else if (strictness == ASTTableJoin::Strictness::Asof)
{
if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner)
throw Exception("ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED);
/// @note ASOF JOIN is not INNER. It's better avoid use of 'INNER ASOF' combination in messages.
/// In fact INNER means 'LEFT SEMI ASOF' while LEFT means 'LEFT OUTER ASOF'.
if (!isLeft(kind) && !isInner(kind))
throw Exception("Wrong ASOF JOIN type. Only ASOF and LEFT ASOF joins are supported", ErrorCodes::NOT_IMPLEMENTED);
if (key_columns.size() <= 1)
throw Exception("ASOF join needs at least one equi-join column", ErrorCodes::SYNTAX_ERROR);
if (right_table_keys.getByName(key_names_right.back()).type->isNullable())
throw Exception("ASOF join over right table Nullable column is not implemented", ErrorCodes::NOT_IMPLEMENTED);
const IColumn * asof_column = key_columns.back();
size_t asof_size;
asof_type = AsofRowRefs::getTypeSize(asof_column, asof_size);
if (!asof_type)
{
std::string msg = "ASOF join not supported for type: ";
msg += asof_column->getFamilyName();
throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD);
}
asof_type = AsofRowRefs::getTypeSize(*key_columns.back(), asof_size);
key_columns.pop_back();
if (key_columns.empty())
throw Exception("ASOF join cannot be done without a joining column", ErrorCodes::SYNTAX_ERROR);
/// this is going to set up the appropriate hash table for the direct lookup part of the join
/// However, this does not depend on the size of the asof join key (as that goes into the BST)
/// Therefore, add it back in such that it can be extracted appropriately from the full stored
@ -248,11 +243,6 @@ HashJoin::Type HashJoin::chooseMethod(const ColumnRawPtrs & key_columns, Sizes &
return Type::hashed;
}
static const IColumn * extractAsofColumn(const ColumnRawPtrs & key_columns)
{
return key_columns.back();
}
template<typename KeyGetter, bool is_asof_join>
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes)
{
@ -428,14 +418,15 @@ namespace
}
static ALWAYS_INLINE void insertAsof(HashJoin & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool,
const IColumn * asof_column)
const IColumn & asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
TypeIndex asof_type = *join.getAsofType();
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
time_series_map->insert(join.getAsofType(), asof_column, stored_block, i);
time_series_map = new (time_series_map) typename Map::mapped_type(asof_type);
time_series_map->insert(asof_type, asof_column, stored_block, i);
}
};
@ -451,7 +442,7 @@ namespace
const IColumn * asof_column [[maybe_unused]] = nullptr;
if constexpr (is_asof_join)
asof_column = extractAsofColumn(key_columns);
asof_column = key_columns.back();
auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(key_columns, key_sizes);
@ -461,7 +452,7 @@ namespace
continue;
if constexpr (is_asof_join)
Inserter<Map, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, asof_column);
Inserter<Map, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column);
else if constexpr (mapped_one)
Inserter<Map, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool);
else
@ -614,21 +605,22 @@ class AddedColumns
public:
using TypeAndNames = std::vector<std::pair<decltype(ColumnWithTypeAndName::type), decltype(ColumnWithTypeAndName::name)>>;
AddedColumns(const Block & sample_block_with_columns_to_add,
const Block & block_with_columns_to_add,
AddedColumns(const Block & block_with_columns_to_add,
const Block & block,
const Block & saved_block_sample,
const ColumnsWithTypeAndName & extras,
const HashJoin & join_,
const HashJoin & join,
const ColumnRawPtrs & key_columns_,
const Sizes & key_sizes_)
: join(join_)
, key_columns(key_columns_)
const Sizes & key_sizes_,
bool is_asof_join)
: key_columns(key_columns_)
, key_sizes(key_sizes_)
, rows_to_add(block.rows())
, need_filter(false)
, asof_type(join.getAsofType())
, asof_inequality(join.getAsofInequality())
{
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
size_t num_columns_to_add = block_with_columns_to_add.columns();
if (is_asof_join)
++num_columns_to_add;
columns.reserve(num_columns_to_add);
type_name.reserve(num_columns_to_add);
@ -641,8 +633,12 @@ public:
addColumn(src_column);
}
for (const auto & extra : extras)
addColumn(extra);
if (is_asof_join)
{
const ColumnWithTypeAndName & right_asof_column = join.rightAsofKeyColumn();
addColumn(right_asof_column);
left_asof_key = key_columns.back();
}
for (auto & tn : type_name)
right_indexes.push_back(saved_block_sample.getPositionByName(tn.second));
@ -680,18 +676,25 @@ public:
}
}
const HashJoin & join;
TypeIndex asofType() const { return *asof_type; }
ASOF::Inequality asofInequality() const { return asof_inequality; }
const IColumn & leftAsofKey() const { return *left_asof_key; }
const ColumnRawPtrs & key_columns;
const Sizes & key_sizes;
size_t rows_to_add;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
bool need_filter;
bool need_filter = false;
private:
TypeAndNames type_name;
MutableColumns columns;
std::vector<size_t> right_indexes;
size_t lazy_defaults_count = 0;
/// for ASOF
std::optional<TypeIndex> asof_type;
ASOF::Inequality asof_inequality;
const IColumn * left_asof_key = nullptr;
void addColumn(const ColumnWithTypeAndName & src_column)
{
@ -760,10 +763,6 @@ NO_INLINE IColumn::Filter joinRightColumns(const Map & map, AddedColumns & added
if constexpr (need_replication)
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
const IColumn * asof_column [[maybe_unused]] = nullptr;
if constexpr (is_asof_join)
asof_column = extractAsofColumn(added_columns.key_columns);
auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(added_columns.key_columns, added_columns.key_sizes);
IColumn::Offset current_offset = 0;
@ -790,8 +789,11 @@ NO_INLINE IColumn::Filter joinRightColumns(const Map & map, AddedColumns & added
if constexpr (is_asof_join)
{
const HashJoin & join = added_columns.join;
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofInequality(), asof_column, i))
TypeIndex asof_type = added_columns.asofType();
ASOF::Inequality asof_inequality = added_columns.asofInequality();
const IColumn & left_asof_key = added_columns.leftAsofKey();
if (const RowRef * found = mapped.findAsof(asof_type, asof_inequality, left_asof_key, i))
{
setUsed<need_filter>(filter, i);
mapped.setUsed();
@ -932,11 +934,11 @@ void HashJoin::joinBlockImpl(
/// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
Columns materialized_keys = JoinCommon::materializeColumns(block, key_names_left);
ColumnRawPtrs key_columns = JoinCommon::getRawPointers(materialized_keys);
ColumnRawPtrs left_key_columns = JoinCommon::getRawPointers(materialized_keys);
/// Keys with NULL value in any column won't join to anything.
ConstNullMapPtr null_map{};
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(left_key_columns, null_map);
size_t existing_columns = block.columns();
@ -957,12 +959,8 @@ void HashJoin::joinBlockImpl(
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column
*/
ColumnsWithTypeAndName extras;
if constexpr (is_asof_join)
extras.push_back(right_table_keys.getByName(key_names_right.back()));
AddedColumns added_columns(sample_block_with_columns_to_add, block_with_columns_to_add, block, savedBlockSample(),
extras, *this, key_columns, key_sizes);
AddedColumns added_columns(block_with_columns_to_add, block, savedBlockSample(), *this, left_key_columns, key_sizes, is_asof_join);
bool has_required_right_keys = (required_right_keys.columns() != 0);
added_columns.need_filter = need_filter || has_required_right_keys;

View File

@ -191,10 +191,16 @@ public:
ASTTableJoin::Kind getKind() const { return kind; }
ASTTableJoin::Strictness getStrictness() const { return strictness; }
TypeIndex getAsofType() const { return *asof_type; }
const std::optional<TypeIndex> & getAsofType() const { return asof_type; }
ASOF::Inequality getAsofInequality() const { return asof_inequality; }
bool anyTakeLastRow() const { return any_take_last_row; }
const ColumnWithTypeAndName & rightAsofKeyColumn() const
{
/// It should be nullable if nullable_right_side is true
return savedBlockSample().getByName(key_names_right.back());
}
/// Different types of keys for maps.
#define APPLY_FOR_JOIN_VARIANTS(M) \
M(key8) \

View File

@ -12,6 +12,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
}
namespace
{
@ -56,7 +61,7 @@ AsofRowRefs::AsofRowRefs(TypeIndex type)
callWithType(type, call);
}
void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num)
void AsofRowRefs::insert(TypeIndex type, const IColumn & asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
@ -64,9 +69,9 @@ void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Bloc
using LookupPtr = typename Entry<T>::LookupPtr;
using ColumnType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
auto * column = typeid_cast<const ColumnType *>(asof_column);
const auto & column = typeid_cast<const ColumnType &>(asof_column);
T key = column->getElement(row_num);
T key = column.getElement(row_num);
auto entry = Entry<T>(key, RowRef(block, row_num));
std::get<LookupPtr>(lookups)->insert(entry);
};
@ -74,7 +79,7 @@ void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Bloc
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn & asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
@ -88,8 +93,8 @@ const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality
using LookupPtr = typename EntryType::LookupPtr;
using ColumnType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
auto * column = typeid_cast<const ColumnType *>(asof_column);
T key = column->getElement(row_num);
const auto & column = typeid_cast<const ColumnType &>(asof_column);
T key = column.getElement(row_num);
auto & typed_lookup = std::get<LookupPtr>(lookups);
if (is_strict)
@ -102,9 +107,9 @@ const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality
return out;
}
std::optional<TypeIndex> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
std::optional<TypeIndex> AsofRowRefs::getTypeSize(const IColumn & asof_column, size_t & size)
{
TypeIndex idx = asof_column->getDataType();
TypeIndex idx = asof_column.getDataType();
switch (idx)
{
@ -152,8 +157,7 @@ std::optional<TypeIndex> AsofRowRefs::getTypeSize(const IColumn * asof_column, s
break;
}
size = 0;
return {};
throw Exception("ASOF join not supported for type: " + std::string(asof_column.getFamilyName()), ErrorCodes::BAD_TYPE_OF_FIELD);
}
}

View File

@ -233,13 +233,13 @@ public:
AsofRowRefs() {}
AsofRowRefs(TypeIndex t);
static std::optional<TypeIndex> getTypeSize(const IColumn * asof_column, size_t & type_size);
static std::optional<TypeIndex> getTypeSize(const IColumn & asof_column, size_t & type_size);
// This will be synchronized by the rwlock mutex in Join.h
void insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num);
void insert(TypeIndex type, const IColumn & asof_column, const Block * block, size_t row_num);
// This will internally synchronize
const RowRef * findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
const RowRef * findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn & asof_column, size_t row_num) const;
private:
// Lookups can be stored in a HashTable because it is memmovable

View File

@ -0,0 +1,20 @@
left asof using
0 \N 0 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8)
1 \N 1 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8)
1 1 2 2 UInt8 Nullable(UInt8) UInt8 Nullable(UInt8)
0 \N 0 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8)
1 \N 1 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8)
1 1 2 2 UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8)
left asof on
0 \N 0 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8)
1 \N 1 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8)
1 1 2 2 UInt8 Nullable(UInt8) UInt8 Nullable(UInt8)
0 \N 0 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8)
1 \N 1 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8)
1 1 2 2 UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8)
asof using
1 1 2 2 UInt8 UInt8 UInt8 UInt8
1 1 2 2 UInt8 UInt8 Nullable(UInt8) UInt8
asof on
1 1 2 2 UInt8 UInt8 UInt8 UInt8
1 1 2 2 UInt8 UInt8 Nullable(UInt8) UInt8

View File

@ -0,0 +1,105 @@
SET join_use_nulls = 1;
select 'left asof using';
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b
USING(pk, dt)
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b
USING(pk, dt)
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
select 'left asof on';
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }
select 'asof using';
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, 2 as dt) b
USING(pk, dt)
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, 2 as dt) b
USING(pk, dt)
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
select 'asof on';
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, 2 as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, 2 as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }