diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 2d959be98f2..f19781ca380 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -331,6 +331,9 @@ void Join::setSampleBlock(const Block & block) sample_block_with_columns_to_add = materializeBlock(block); + blocklist_sample = Block(block.getColumnsWithTypeAndName()); + prepareBlockListStructure(blocklist_sample); + /// Move from `sample_block_with_columns_to_add` key columns to `sample_block_with_keys`, keeping the order. size_t pos = 0; while (pos < sample_block_with_columns_to_add.columns()) @@ -482,10 +485,47 @@ namespace } } +void Join::prepareBlockListStructure(Block & stored_block) +{ + if (isRightOrFull(kind)) + { + /** Move the key columns to the beginning of the block. + * This is where NonJoinedBlockInputStream will expect. + */ + size_t key_num = 0; + for (const auto & name : key_names_right) + { + size_t pos = stored_block.getPositionByName(name); + ColumnWithTypeAndName col = stored_block.safeGetByPosition(pos); + stored_block.erase(pos); + stored_block.insert(key_num, std::move(col)); + ++key_num; + } + } + else + { + NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section + + /// Remove the key columns from stored_block, as they are not needed. + /// However, do not erase the ASOF column if this is an asof join + for (const auto & name : key_names_right) + { + if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back()) + { + LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name); + break; // this is the last column so break is OK + } + + if (!erased.count(name)) + stored_block.erase(stored_block.getPositionByName(name)); + erased.insert(name); + } + } +} + bool Join::insertFromBlock(const Block & block) { std::unique_lock lock(rwlock); - LOG_DEBUG(log, "joinBlock: " << block.dumpStructure()); if (empty()) throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR); @@ -514,33 +554,9 @@ bool Join::insertFromBlock(const Block & block) blocks.push_back(block); Block * stored_block = &blocks.back(); - if (isRightOrFull(kind)) - { - /** Move the key columns to the beginning of the block. - * This is where NonJoinedBlockInputStream will expect. - */ - size_t key_num = 0; - for (const auto & name : key_names_right) - { - size_t pos = stored_block->getPositionByName(name); - ColumnWithTypeAndName col = stored_block->safeGetByPosition(pos); - stored_block->erase(pos); - stored_block->insert(key_num, std::move(col)); - ++key_num; - } - } - else - { - NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section + prepareBlockListStructure(*stored_block); - /// Remove the key columns from stored_block, as they are not needed. - for (const auto & name : key_names_right) - { - if (!erased.count(name)) - stored_block->erase(stored_block->getPositionByName(name)); - erased.insert(name); - } - } + LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure()); size_t size = stored_block->columns(); @@ -579,7 +595,9 @@ public: AddedColumns(const Block & sample_block_with_columns_to_add, const Block & block_with_columns_to_add, - const Block & block, size_t num_columns_to_skip) + const Block & block, + const Block & blocklist_sample, + const ColumnsWithTypeAndName & extras) { size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); @@ -593,8 +611,14 @@ public: /// Don't insert column if it's in left block or not explicitly required. if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) - addColumn(src_column, num_columns_to_skip + i); + addColumn(src_column); } + + for (auto & extra : extras) + addColumn(extra); + + for (auto & tn : type_name) + right_indexes.push_back(blocklist_sample.getPositionByName(tn.second)); } size_t size() const { return columns.size(); } @@ -622,12 +646,11 @@ private: MutableColumns columns; std::vector right_indexes; - void addColumn(const ColumnWithTypeAndName & src_column, size_t idx) + void addColumn(const ColumnWithTypeAndName & src_column) { columns.push_back(src_column.column->cloneEmpty()); columns.back()->reserve(src_column.column->size()); type_name.emplace_back(src_column.type, src_column.name); - right_indexes.push_back(idx); } }; @@ -819,14 +842,12 @@ void Join::joinBlockImpl( /** For LEFT/INNER JOIN, the saved blocks do not contain keys. * For FULL/RIGHT JOIN, the saved blocks contain keys; * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. + * For ASOF, the last column is used as the ASOF column */ - size_t num_columns_to_skip = 0; - if constexpr (right_or_full) - num_columns_to_skip = keys_size; - - /// Add new columns to the block. - - AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip); + ColumnsWithTypeAndName extras; + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) + extras.push_back(sample_block_with_keys.getByName(key_names_right.back())); + AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, blocklist_sample, extras); std::unique_ptr offsets_to_replicate; @@ -837,7 +858,6 @@ void Join::joinBlockImpl( block.insert(added.moveColumn(i)); /// Filter & insert missing rows - auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join); if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof) diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index f6ddaf87af0..01bd1335cbd 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -377,6 +377,9 @@ private: /// Block with key columns in the same order they appear in the right-side table. Block sample_block_with_keys; + /// Block as it would appear in the BlockList + Block blocklist_sample; + Poco::Logger * log; /// Limits for maximum map size. @@ -393,6 +396,11 @@ private: void init(Type type_); + /** Take an inserted block and discard everything that does not need to be stored + * Example, remove the keys as they come from the LHS block, but do keep the ASOF timestamps + */ + void prepareBlockListStructure(Block & stored_block); + /// Throw an exception if blocks have different types of key columns. void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right) const; diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference new file mode 100644 index 00000000000..bb199d0159a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference @@ -0,0 +1,15 @@ +1 101 1 0 0 0 +1 102 2 2 102 1 +1 103 3 2 102 1 +1 104 4 4 104 1 +1 105 5 4 104 1 +1 101 1 0 0 0 +1 102 2 2 102 1 +1 103 3 2 102 1 +1 104 4 4 104 1 +1 105 5 4 104 1 +1 101 1 0 0 0 +1 102 2 2 102 1 +1 103 3 2 102 1 +1 104 4 4 104 1 +1 105 5 4 104 1 diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql new file mode 100644 index 00000000000..a813f2fa410 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql @@ -0,0 +1,25 @@ +USE test; + +DROP TABLE IF EXISTS A; +DROP TABLE IF EXISTS B; + +CREATE TABLE A(k UInt32, t UInt32, a UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO A(k,t,a) VALUES (1,101,1),(1,102,2),(1,103,3),(1,104,4),(1,105,5); + +CREATE TABLE B(k UInt32, t UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4); +SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); +DROP TABLE B; + + +CREATE TABLE B(t UInt32, k UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4); +SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); +DROP TABLE B; + +CREATE TABLE B(k UInt32, b UInt64, t UInt32) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4); +SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); +DROP TABLE B; + +DROP TABLE A; diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference index b4022cef7da..5d19ee97374 100644 --- a/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference +++ b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference @@ -1,29 +1,29 @@ 1 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0 1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1 -1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1 +1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1 1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1 -1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1 +1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1 2 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0 2 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0 2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2 -2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2 -2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2 +2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2 +2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2 3 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:03 3 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:04 4 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:05 5 0 0000-00-00 00:00:00 0 1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1 -1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1 +1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1 1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1 -1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1 +1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1 2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2 -2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2 -2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2 +2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2 +2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2 1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1 -1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1 +1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1 1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1 -1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1 +1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1 2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2 -2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2 -2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2 +2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2 +2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2