diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 27294a57675..a807a9fa4ee 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -33,7 +33,6 @@ namespace DB namespace ErrorCodes { - extern const int BAD_TYPE_OF_FIELD; extern const int NOT_IMPLEMENTED; extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int INCOMPATIBLE_TYPE_OF_JOIN; @@ -158,25 +157,21 @@ HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_s } else if (strictness == ASTTableJoin::Strictness::Asof) { - if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner) - throw Exception("ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED); + /// @note ASOF JOIN is not INNER. It's better avoid use of 'INNER ASOF' combination in messages. + /// In fact INNER means 'LEFT SEMI ASOF' while LEFT means 'LEFT OUTER ASOF'. + if (!isLeft(kind) && !isInner(kind)) + throw Exception("Wrong ASOF JOIN type. Only ASOF and LEFT ASOF joins are supported", ErrorCodes::NOT_IMPLEMENTED); + + if (key_columns.size() <= 1) + throw Exception("ASOF join needs at least one equi-join column", ErrorCodes::SYNTAX_ERROR); + + if (right_table_keys.getByName(key_names_right.back()).type->isNullable()) + throw Exception("ASOF join over right table Nullable column is not implemented", ErrorCodes::NOT_IMPLEMENTED); - const IColumn * asof_column = key_columns.back(); size_t asof_size; - - asof_type = AsofRowRefs::getTypeSize(asof_column, asof_size); - if (!asof_type) - { - std::string msg = "ASOF join not supported for type: "; - msg += asof_column->getFamilyName(); - throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD); - } - + asof_type = AsofRowRefs::getTypeSize(*key_columns.back(), asof_size); key_columns.pop_back(); - if (key_columns.empty()) - throw Exception("ASOF join cannot be done without a joining column", ErrorCodes::SYNTAX_ERROR); - /// this is going to set up the appropriate hash table for the direct lookup part of the join /// However, this does not depend on the size of the asof join key (as that goes into the BST) /// Therefore, add it back in such that it can be extracted appropriately from the full stored @@ -248,11 +243,6 @@ HashJoin::Type HashJoin::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & return Type::hashed; } -static const IColumn * extractAsofColumn(const ColumnRawPtrs & key_columns) -{ - return key_columns.back(); -} - template static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes) { @@ -428,14 +418,15 @@ namespace } static ALWAYS_INLINE void insertAsof(HashJoin & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, - const IColumn * asof_column) + const IColumn & asof_column) { auto emplace_result = key_getter.emplaceKey(map, i, pool); typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); + TypeIndex asof_type = *join.getAsofType(); if (emplace_result.isInserted()) - time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); - time_series_map->insert(join.getAsofType(), asof_column, stored_block, i); + time_series_map = new (time_series_map) typename Map::mapped_type(asof_type); + time_series_map->insert(asof_type, asof_column, stored_block, i); } }; @@ -451,7 +442,7 @@ namespace const IColumn * asof_column [[maybe_unused]] = nullptr; if constexpr (is_asof_join) - asof_column = extractAsofColumn(key_columns); + asof_column = key_columns.back(); auto key_getter = createKeyGetter(key_columns, key_sizes); @@ -461,7 +452,7 @@ namespace continue; if constexpr (is_asof_join) - Inserter::insertAsof(join, map, key_getter, stored_block, i, pool, asof_column); + Inserter::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column); else if constexpr (mapped_one) Inserter::insertOne(join, map, key_getter, stored_block, i, pool); else @@ -614,21 +605,22 @@ class AddedColumns public: using TypeAndNames = std::vector>; - AddedColumns(const Block & sample_block_with_columns_to_add, - const Block & block_with_columns_to_add, + AddedColumns(const Block & block_with_columns_to_add, const Block & block, const Block & saved_block_sample, - const ColumnsWithTypeAndName & extras, - const HashJoin & join_, + const HashJoin & join, const ColumnRawPtrs & key_columns_, - const Sizes & key_sizes_) - : join(join_) - , key_columns(key_columns_) + const Sizes & key_sizes_, + bool is_asof_join) + : key_columns(key_columns_) , key_sizes(key_sizes_) , rows_to_add(block.rows()) - , need_filter(false) + , asof_type(join.getAsofType()) + , asof_inequality(join.getAsofInequality()) { - size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); + size_t num_columns_to_add = block_with_columns_to_add.columns(); + if (is_asof_join) + ++num_columns_to_add; columns.reserve(num_columns_to_add); type_name.reserve(num_columns_to_add); @@ -641,8 +633,12 @@ public: addColumn(src_column); } - for (const auto & extra : extras) - addColumn(extra); + if (is_asof_join) + { + const ColumnWithTypeAndName & right_asof_column = join.rightAsofKeyColumn(); + addColumn(right_asof_column); + left_asof_key = key_columns.back(); + } for (auto & tn : type_name) right_indexes.push_back(saved_block_sample.getPositionByName(tn.second)); @@ -680,18 +676,25 @@ public: } } - const HashJoin & join; + TypeIndex asofType() const { return *asof_type; } + ASOF::Inequality asofInequality() const { return asof_inequality; } + const IColumn & leftAsofKey() const { return *left_asof_key; } + const ColumnRawPtrs & key_columns; const Sizes & key_sizes; size_t rows_to_add; std::unique_ptr offsets_to_replicate; - bool need_filter; + bool need_filter = false; private: TypeAndNames type_name; MutableColumns columns; std::vector right_indexes; size_t lazy_defaults_count = 0; + /// for ASOF + std::optional asof_type; + ASOF::Inequality asof_inequality; + const IColumn * left_asof_key = nullptr; void addColumn(const ColumnWithTypeAndName & src_column) { @@ -760,10 +763,6 @@ NO_INLINE IColumn::Filter joinRightColumns(const Map & map, AddedColumns & added if constexpr (need_replication) added_columns.offsets_to_replicate = std::make_unique(rows); - const IColumn * asof_column [[maybe_unused]] = nullptr; - if constexpr (is_asof_join) - asof_column = extractAsofColumn(added_columns.key_columns); - auto key_getter = createKeyGetter(added_columns.key_columns, added_columns.key_sizes); IColumn::Offset current_offset = 0; @@ -790,8 +789,11 @@ NO_INLINE IColumn::Filter joinRightColumns(const Map & map, AddedColumns & added if constexpr (is_asof_join) { - const HashJoin & join = added_columns.join; - if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofInequality(), asof_column, i)) + TypeIndex asof_type = added_columns.asofType(); + ASOF::Inequality asof_inequality = added_columns.asofInequality(); + const IColumn & left_asof_key = added_columns.leftAsofKey(); + + if (const RowRef * found = mapped.findAsof(asof_type, asof_inequality, left_asof_key, i)) { setUsed(filter, i); mapped.setUsed(); @@ -932,11 +934,11 @@ void HashJoin::joinBlockImpl( /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them. Columns materialized_keys = JoinCommon::materializeColumns(block, key_names_left); - ColumnRawPtrs key_columns = JoinCommon::getRawPointers(materialized_keys); + ColumnRawPtrs left_key_columns = JoinCommon::getRawPointers(materialized_keys); /// Keys with NULL value in any column won't join to anything. ConstNullMapPtr null_map{}; - ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); + ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(left_key_columns, null_map); size_t existing_columns = block.columns(); @@ -957,12 +959,8 @@ void HashJoin::joinBlockImpl( * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. * For ASOF, the last column is used as the ASOF column */ - ColumnsWithTypeAndName extras; - if constexpr (is_asof_join) - extras.push_back(right_table_keys.getByName(key_names_right.back())); - AddedColumns added_columns(sample_block_with_columns_to_add, block_with_columns_to_add, block, savedBlockSample(), - extras, *this, key_columns, key_sizes); + AddedColumns added_columns(block_with_columns_to_add, block, savedBlockSample(), *this, left_key_columns, key_sizes, is_asof_join); bool has_required_right_keys = (required_right_keys.columns() != 0); added_columns.need_filter = need_filter || has_required_right_keys; diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 67d83d27a6d..fb879e2c507 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -191,10 +191,16 @@ public: ASTTableJoin::Kind getKind() const { return kind; } ASTTableJoin::Strictness getStrictness() const { return strictness; } - TypeIndex getAsofType() const { return *asof_type; } + const std::optional & getAsofType() const { return asof_type; } ASOF::Inequality getAsofInequality() const { return asof_inequality; } bool anyTakeLastRow() const { return any_take_last_row; } + const ColumnWithTypeAndName & rightAsofKeyColumn() const + { + /// It should be nullable if nullable_right_side is true + return savedBlockSample().getByName(key_names_right.back()); + } + /// Different types of keys for maps. #define APPLY_FOR_JOIN_VARIANTS(M) \ M(key8) \ diff --git a/src/Interpreters/RowRefs.cpp b/src/Interpreters/RowRefs.cpp index 879a0bcf88e..a206456f1b6 100644 --- a/src/Interpreters/RowRefs.cpp +++ b/src/Interpreters/RowRefs.cpp @@ -12,6 +12,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int BAD_TYPE_OF_FIELD; +} + namespace { @@ -56,7 +61,7 @@ AsofRowRefs::AsofRowRefs(TypeIndex type) callWithType(type, call); } -void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num) +void AsofRowRefs::insert(TypeIndex type, const IColumn & asof_column, const Block * block, size_t row_num) { auto call = [&](const auto & t) { @@ -64,9 +69,9 @@ void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Bloc using LookupPtr = typename Entry::LookupPtr; using ColumnType = std::conditional_t, ColumnDecimal, ColumnVector>; - auto * column = typeid_cast(asof_column); + const auto & column = typeid_cast(asof_column); - T key = column->getElement(row_num); + T key = column.getElement(row_num); auto entry = Entry(key, RowRef(block, row_num)); std::get(lookups)->insert(entry); }; @@ -74,7 +79,7 @@ void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Bloc callWithType(type, call); } -const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const +const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn & asof_column, size_t row_num) const { const RowRef * out = nullptr; @@ -88,8 +93,8 @@ const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality using LookupPtr = typename EntryType::LookupPtr; using ColumnType = std::conditional_t, ColumnDecimal, ColumnVector>; - auto * column = typeid_cast(asof_column); - T key = column->getElement(row_num); + const auto & column = typeid_cast(asof_column); + T key = column.getElement(row_num); auto & typed_lookup = std::get(lookups); if (is_strict) @@ -102,9 +107,9 @@ const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality return out; } -std::optional AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size) +std::optional AsofRowRefs::getTypeSize(const IColumn & asof_column, size_t & size) { - TypeIndex idx = asof_column->getDataType(); + TypeIndex idx = asof_column.getDataType(); switch (idx) { @@ -152,8 +157,7 @@ std::optional AsofRowRefs::getTypeSize(const IColumn * asof_column, s break; } - size = 0; - return {}; + throw Exception("ASOF join not supported for type: " + std::string(asof_column.getFamilyName()), ErrorCodes::BAD_TYPE_OF_FIELD); } } diff --git a/src/Interpreters/RowRefs.h b/src/Interpreters/RowRefs.h index e8231b1c233..fc035bf626e 100644 --- a/src/Interpreters/RowRefs.h +++ b/src/Interpreters/RowRefs.h @@ -233,13 +233,13 @@ public: AsofRowRefs() {} AsofRowRefs(TypeIndex t); - static std::optional getTypeSize(const IColumn * asof_column, size_t & type_size); + static std::optional getTypeSize(const IColumn & asof_column, size_t & type_size); // This will be synchronized by the rwlock mutex in Join.h - void insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num); + void insert(TypeIndex type, const IColumn & asof_column, const Block * block, size_t row_num); // This will internally synchronize - const RowRef * findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const; + const RowRef * findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn & asof_column, size_t row_num) const; private: // Lookups can be stored in a HashTable because it is memmovable diff --git a/tests/queries/0_stateless/01428_nullable_asof_join.reference b/tests/queries/0_stateless/01428_nullable_asof_join.reference new file mode 100644 index 00000000000..f04655fefaa --- /dev/null +++ b/tests/queries/0_stateless/01428_nullable_asof_join.reference @@ -0,0 +1,20 @@ +left asof using +0 \N 0 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8) +1 \N 1 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8) +1 1 2 2 UInt8 Nullable(UInt8) UInt8 Nullable(UInt8) +0 \N 0 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8) +1 \N 1 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8) +1 1 2 2 UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8) +left asof on +0 \N 0 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8) +1 \N 1 \N UInt8 Nullable(UInt8) UInt8 Nullable(UInt8) +1 1 2 2 UInt8 Nullable(UInt8) UInt8 Nullable(UInt8) +0 \N 0 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8) +1 \N 1 \N UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8) +1 1 2 2 UInt8 Nullable(UInt8) Nullable(UInt8) Nullable(UInt8) +asof using +1 1 2 2 UInt8 UInt8 UInt8 UInt8 +1 1 2 2 UInt8 UInt8 Nullable(UInt8) UInt8 +asof on +1 1 2 2 UInt8 UInt8 UInt8 UInt8 +1 1 2 2 UInt8 UInt8 Nullable(UInt8) UInt8 diff --git a/tests/queries/0_stateless/01428_nullable_asof_join.sql b/tests/queries/0_stateless/01428_nullable_asof_join.sql new file mode 100644 index 00000000000..c812e6ecfab --- /dev/null +++ b/tests/queries/0_stateless/01428_nullable_asof_join.sql @@ -0,0 +1,105 @@ +SET join_use_nulls = 1; + +select 'left asof using'; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b +USING(pk, dt) +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b +USING(pk, dt) +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b +USING(pk, dt) +ORDER BY a.dt;-- { serverError 48 } + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b +USING(pk, dt) +ORDER BY a.dt;-- { serverError 48 } + +select 'left asof on'; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, 2 as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt;-- { serverError 48 } + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt;-- { serverError 48 } + +select 'asof using'; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, 2 as dt) b +USING(pk, dt) +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, 2 as dt) b +USING(pk, dt) +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b +USING(pk, dt) +ORDER BY a.dt;-- { serverError 48 } + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b +USING(pk, dt) +ORDER BY a.dt;-- { serverError 48 } + +select 'asof on'; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, 2 as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, 2 as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt; + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt;-- { serverError 48 } + +SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt)) +FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a +ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b +ON a.pk = b.pk AND a.dt >= b.dt +ORDER BY a.dt;-- { serverError 48 }