From 4a7393cfc6ffa403b0183fd66bb40886e7b7a680 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sun, 31 Mar 2019 20:03:57 +0100 Subject: [PATCH 1/6] include the asof column in the block stored. overall not working yet but at least doesnt crash anymore --- dbms/src/Interpreters/Join.cpp | 39 ++++++++++++++++--- .../00927_asof_join_correct_bt.reference | 5 +++ .../00927_asof_join_correct_bt.sql | 15 +++++++ 3 files changed, 53 insertions(+), 6 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference create mode 100644 dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 2d959be98f2..cb1be05015a 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -534,8 +534,15 @@ bool Join::insertFromBlock(const Block & block) NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section /// Remove the key columns from stored_block, as they are not needed. + /// However, do not erase the ASOF column if this is an asof join for (const auto & name : key_names_right) { + if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back()) + { + LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name); + break; // this is the last column so break is OK + } + if (!erased.count(name)) stored_block->erase(stored_block->getPositionByName(name)); erased.insert(name); @@ -606,8 +613,13 @@ public: void appendFromBlock(const Block & block, size_t row_num) { - for (size_t j = 0; j < right_indexes.size(); ++j) + std::cout << "appendFromBlock block=" << block.dumpStructure() << " row_num=" << row_num << std::endl; + + + for (size_t j = 0; j < right_indexes.size(); ++j) { + std::cout << "right_index=" << right_indexes[j] << std::endl; columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num); + } } @@ -616,19 +628,25 @@ public: for (size_t j = 0; j < right_indexes.size(); ++j) columns[j]->insertDefault(); } + void addExtraColumn(const ColumnWithTypeAndName & src_column) + { + addColumn(src_column, columns.size()); + } + private: - TypeAndNames type_name; - MutableColumns columns; - std::vector right_indexes; - void addColumn(const ColumnWithTypeAndName & src_column, size_t idx) { + std::cout << "adding column from src=" << src_column.dumpStructure() << " idx=" << idx << std::endl; columns.push_back(src_column.column->cloneEmpty()); columns.back()->reserve(src_column.column->size()); type_name.emplace_back(src_column.type, src_column.name); right_indexes.push_back(idx); } + + TypeAndNames type_name; + MutableColumns columns; + std::vector right_indexes; }; template @@ -825,17 +843,26 @@ void Join::joinBlockImpl( num_columns_to_skip = keys_size; /// Add new columns to the block. - AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip); + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) + { + // Add the last key column which is the ASOF key + added.addExtraColumn(sample_block_with_keys.safeGetByPosition(sample_block_with_keys.columns()-1)); + } + std::unique_ptr offsets_to_replicate; IColumn::Filter row_filter = switchJoinRightColumns( type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate); + LOG_DEBUG(log, "joinBlockImpl - switchJoinRightColumns"); + for (size_t i = 0; i < added.size(); ++i) block.insert(added.moveColumn(i)); + LOG_DEBUG(log, "joinBlockImpl - after insert: " << block.dumpStructure()); + /// Filter & insert missing rows auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join); diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference new file mode 100644 index 00000000000..2e22280c8da --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference @@ -0,0 +1,5 @@ +1 101 1 0 0 0 +1 102 2 2 102 1 +1 103 3 2 102 1 +1 104 4 4 104 1 +1 105 5 4 104 1 diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql new file mode 100644 index 00000000000..27860bb5d05 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql @@ -0,0 +1,15 @@ +USE test; + +DROP TABLE IF EXISTS A; +DROP TABLE IF EXISTS B; + +CREATE TABLE A(k UInt32, t UInt32, a UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO A(k,t,a) VALUES (1,101,1),(1,102,2),(1,103,3),(1,104,4),(1,105,5); + +CREATE TABLE B(k UInt32, t UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4); + +SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); + +DROP TABLE A; +DROP TABLE B; From 7ea03f6fa78d122a1083a0a8f0700b098c52d2af Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sun, 31 Mar 2019 22:14:43 +0100 Subject: [PATCH 2/6] appears to work and get the correct timestamps --- dbms/src/Interpreters/Join.cpp | 79 +++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index cb1be05015a..a0c667da175 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -420,6 +420,8 @@ namespace if (emplace_result.isInserted()) time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); + + std::cout << "inserting rhs block=" << stored_block->dumpStructure() << std::endl; time_series_map->insert(asof_column, stored_block, i, pool); } }; @@ -485,7 +487,6 @@ namespace bool Join::insertFromBlock(const Block & block) { std::unique_lock lock(rwlock); - LOG_DEBUG(log, "joinBlock: " << block.dumpStructure()); if (empty()) throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR); @@ -549,6 +550,8 @@ bool Join::insertFromBlock(const Block & block) } } + LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure()); + size_t size = stored_block->columns(); /// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them. @@ -586,7 +589,7 @@ public: AddedColumns(const Block & sample_block_with_columns_to_add, const Block & block_with_columns_to_add, - const Block & block, size_t num_columns_to_skip) + const Block & block) { size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); @@ -599,11 +602,32 @@ public: const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i); /// Don't insert column if it's in left block or not explicitly required. - if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) - addColumn(src_column, num_columns_to_skip + i); + if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) { + addColumn(src_column); + } } } + void addColumn(const ColumnWithTypeAndName & src_column) + { + std::cout << "adding column from src=" << src_column.dumpStructure() << std::endl; + columns.push_back(src_column.column->cloneEmpty()); + columns.back()->reserve(src_column.column->size()); + type_name.emplace_back(src_column.type, src_column.name); + } + + void fillRightIndices(const Block& rhs_block) + { + for(auto& tn : type_name) { + right_indexes.push_back(rhs_block.getPositionByName(tn.second)); + } + + for(unsigned i = 0; i < right_indexes.size(); i++) { + std::cout << "ri i=" << i << " ri=" << right_indexes[i] << std::endl; + } + loaded = true; + } + size_t size() const { return columns.size(); } ColumnWithTypeAndName moveColumn(size_t i) @@ -613,39 +637,29 @@ public: void appendFromBlock(const Block & block, size_t row_num) { - std::cout << "appendFromBlock block=" << block.dumpStructure() << " row_num=" << row_num << std::endl; + if(!loaded) + return; + std::cout << "appendFromBlock block=" << block.dumpStructure() << " row_num=" << row_num << " right_indexes=["; - for (size_t j = 0; j < right_indexes.size(); ++j) { - std::cout << "right_index=" << right_indexes[j] << std::endl; + for (size_t j = 0; j < columns.size(); ++j) { + std::cout << right_indexes[j] << " "; columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num); } - } + std::cout << "]" << std::endl; + } void appendDefaultRow() { - for (size_t j = 0; j < right_indexes.size(); ++j) + for (size_t j = 0; j < columns.size(); ++j) columns[j]->insertDefault(); } - void addExtraColumn(const ColumnWithTypeAndName & src_column) - { - addColumn(src_column, columns.size()); - } - private: - void addColumn(const ColumnWithTypeAndName & src_column, size_t idx) - { - std::cout << "adding column from src=" << src_column.dumpStructure() << " idx=" << idx << std::endl; - columns.push_back(src_column.column->cloneEmpty()); - columns.back()->reserve(src_column.column->size()); - type_name.emplace_back(src_column.type, src_column.name); - right_indexes.push_back(idx); - } - TypeAndNames type_name; MutableColumns columns; + bool loaded = false; std::vector right_indexes; }; @@ -838,17 +852,22 @@ void Join::joinBlockImpl( * For FULL/RIGHT JOIN, the saved blocks contain keys; * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. */ - size_t num_columns_to_skip = 0; - if constexpr (right_or_full) - num_columns_to_skip = keys_size; +// size_t num_columns_to_skip = 0; +// if constexpr (right_or_full) +// num_columns_to_skip = keys_size; /// Add new columns to the block. - AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip); + LOG_DEBUG(log, "joinBlockImpl - sample_block_with_columns_to_add" << sample_block_with_columns_to_add.dumpStructure()); + + AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block); if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) - { - // Add the last key column which is the ASOF key - added.addExtraColumn(sample_block_with_keys.safeGetByPosition(sample_block_with_keys.columns()-1)); + added.addColumn(sample_block_with_keys.safeGetByPosition(sample_block_with_keys.columns()-1)); + + if(!blocks.empty()) { + added.fillRightIndices(*blocks.begin()); + } else { + LOG_DEBUG(log, "unable to fill right index of added columns"); } std::unique_ptr offsets_to_replicate; From 02320de49c2357f99d8682743024e2604756aa1a Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sun, 31 Mar 2019 22:22:58 +0100 Subject: [PATCH 3/6] fix up the timestamps to match the real timestamps --- .../00927_asof_join_noninclusive.reference | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference index b4022cef7da..5d19ee97374 100644 --- a/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference +++ b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference @@ -1,29 +1,29 @@ 1 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0 1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1 -1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1 +1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1 1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1 -1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1 +1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1 2 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0 2 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0 2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2 -2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2 -2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2 +2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2 +2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2 3 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:03 3 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:04 4 0 0000-00-00 00:00:00 0 3 1970-01-01 00:00:05 5 0 0000-00-00 00:00:00 0 1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1 -1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1 +1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1 1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1 -1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1 +1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1 2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2 -2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2 -2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2 +2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2 +2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2 1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1 -1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1 +1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1 1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1 -1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1 +1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1 2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2 -2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2 -2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2 +2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2 +2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2 From 27776ca929c3b529dcb505aff0d00a1a712a4771 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sun, 31 Mar 2019 22:56:37 +0100 Subject: [PATCH 4/6] fix up wrong assumption that the sample_block_with_keys has same ordering as key_names_right --- dbms/src/Interpreters/Join.cpp | 8 +++++++- .../00927_asof_join_correct_bt.reference | 10 ++++++++++ .../0_stateless/00927_asof_join_correct_bt.sql | 14 ++++++++++++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index a0c667da175..110cce8e7da 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -330,6 +330,7 @@ void Join::setSampleBlock(const Block & block) sample_block_with_columns_to_add = materializeBlock(block); + LOG_DEBUG(log, "setSampleBlock sample_block_with_columns_to_add " << sample_block_with_columns_to_add.dumpStructure()); /// Move from `sample_block_with_columns_to_add` key columns to `sample_block_with_keys`, keeping the order. size_t pos = 0; @@ -361,6 +362,9 @@ void Join::setSampleBlock(const Block & block) if (use_nulls && isLeftOrFull(kind)) for (size_t i = 0; i < num_columns_to_add; ++i) convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i)); + + LOG_DEBUG(log, "setSampleBlock sample_block_with_keys " << sample_block_with_keys.dumpStructure()); + LOG_DEBUG(log, "setSampleBlock sample_block_with_columns_to_add " << sample_block_with_columns_to_add.dumpStructure()); } namespace @@ -618,6 +622,7 @@ public: void fillRightIndices(const Block& rhs_block) { + std::cout << "rhs_block=" << rhs_block.dumpStructure() << std::endl; for(auto& tn : type_name) { right_indexes.push_back(rhs_block.getPositionByName(tn.second)); } @@ -861,8 +866,9 @@ void Join::joinBlockImpl( AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block); + // the last column in the key names is the asof column if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) - added.addColumn(sample_block_with_keys.safeGetByPosition(sample_block_with_keys.columns()-1)); + added.addColumn(sample_block_with_keys.getByName(key_names_right.back())); if(!blocks.empty()) { added.fillRightIndices(*blocks.begin()); diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference index 2e22280c8da..bb199d0159a 100644 --- a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference +++ b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.reference @@ -3,3 +3,13 @@ 1 103 3 2 102 1 1 104 4 4 104 1 1 105 5 4 104 1 +1 101 1 0 0 0 +1 102 2 2 102 1 +1 103 3 2 102 1 +1 104 4 4 104 1 +1 105 5 4 104 1 +1 101 1 0 0 0 +1 102 2 2 102 1 +1 103 3 2 102 1 +1 104 4 4 104 1 +1 105 5 4 104 1 diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql index 27860bb5d05..a813f2fa410 100644 --- a/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql +++ b/dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql @@ -8,8 +8,18 @@ INSERT INTO A(k,t,a) VALUES (1,101,1),(1,102,2),(1,103,3),(1,104,4),(1,105,5); CREATE TABLE B(k UInt32, t UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t); INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4); - SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); +DROP TABLE B; + + +CREATE TABLE B(t UInt32, k UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4); +SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); +DROP TABLE B; + +CREATE TABLE B(k UInt32, b UInt64, t UInt32) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4); +SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); +DROP TABLE B; DROP TABLE A; -DROP TABLE B; From a64b8afc7e7511430e603ef653dd80b3a471517e Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Mon, 1 Apr 2019 00:09:00 +0100 Subject: [PATCH 5/6] cleanup --- dbms/src/Interpreters/Join.cpp | 162 +++++++++++++-------------------- dbms/src/Interpreters/Join.h | 4 + 2 files changed, 69 insertions(+), 97 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 110cce8e7da..8a17fd8e22c 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -330,7 +330,9 @@ void Join::setSampleBlock(const Block & block) sample_block_with_columns_to_add = materializeBlock(block); - LOG_DEBUG(log, "setSampleBlock sample_block_with_columns_to_add " << sample_block_with_columns_to_add.dumpStructure()); + + blocklist_sample = Block(block.getColumnsWithTypeAndName()); + prepareBlockListStructure(blocklist_sample); /// Move from `sample_block_with_columns_to_add` key columns to `sample_block_with_keys`, keeping the order. size_t pos = 0; @@ -362,9 +364,6 @@ void Join::setSampleBlock(const Block & block) if (use_nulls && isLeftOrFull(kind)) for (size_t i = 0; i < num_columns_to_add; ++i) convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i)); - - LOG_DEBUG(log, "setSampleBlock sample_block_with_keys " << sample_block_with_keys.dumpStructure()); - LOG_DEBUG(log, "setSampleBlock sample_block_with_columns_to_add " << sample_block_with_columns_to_add.dumpStructure()); } namespace @@ -424,8 +423,6 @@ namespace if (emplace_result.isInserted()) time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); - - std::cout << "inserting rhs block=" << stored_block->dumpStructure() << std::endl; time_series_map->insert(asof_column, stored_block, i, pool); } }; @@ -488,6 +485,44 @@ namespace } } +void Join::prepareBlockListStructure(Block& stored_block) +{ + if (isRightOrFull(kind)) + { + /** Move the key columns to the beginning of the block. + * This is where NonJoinedBlockInputStream will expect. + */ + size_t key_num = 0; + for (const auto & name : key_names_right) + { + size_t pos = stored_block.getPositionByName(name); + ColumnWithTypeAndName col = stored_block.safeGetByPosition(pos); + stored_block.erase(pos); + stored_block.insert(key_num, std::move(col)); + ++key_num; + } + } + else + { + NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section + + /// Remove the key columns from stored_block, as they are not needed. + /// However, do not erase the ASOF column if this is an asof join + for (const auto & name : key_names_right) + { + if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back()) + { + LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name); + break; // this is the last column so break is OK + } + + if (!erased.count(name)) + stored_block.erase(stored_block.getPositionByName(name)); + erased.insert(name); + } + } +} + bool Join::insertFromBlock(const Block & block) { std::unique_lock lock(rwlock); @@ -519,40 +554,7 @@ bool Join::insertFromBlock(const Block & block) blocks.push_back(block); Block * stored_block = &blocks.back(); - if (isRightOrFull(kind)) - { - /** Move the key columns to the beginning of the block. - * This is where NonJoinedBlockInputStream will expect. - */ - size_t key_num = 0; - for (const auto & name : key_names_right) - { - size_t pos = stored_block->getPositionByName(name); - ColumnWithTypeAndName col = stored_block->safeGetByPosition(pos); - stored_block->erase(pos); - stored_block->insert(key_num, std::move(col)); - ++key_num; - } - } - else - { - NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section - - /// Remove the key columns from stored_block, as they are not needed. - /// However, do not erase the ASOF column if this is an asof join - for (const auto & name : key_names_right) - { - if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back()) - { - LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name); - break; // this is the last column so break is OK - } - - if (!erased.count(name)) - stored_block->erase(stored_block->getPositionByName(name)); - erased.insert(name); - } - } + prepareBlockListStructure(*stored_block); LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure()); @@ -593,7 +595,9 @@ public: AddedColumns(const Block & sample_block_with_columns_to_add, const Block & block_with_columns_to_add, - const Block & block) + const Block & block, + const Block & blocklist_sample, + const ColumnsWithTypeAndName& extras) { size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); @@ -606,31 +610,15 @@ public: const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i); /// Don't insert column if it's in left block or not explicitly required. - if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) { + if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) addColumn(src_column); - } - } - } - - void addColumn(const ColumnWithTypeAndName & src_column) - { - std::cout << "adding column from src=" << src_column.dumpStructure() << std::endl; - columns.push_back(src_column.column->cloneEmpty()); - columns.back()->reserve(src_column.column->size()); - type_name.emplace_back(src_column.type, src_column.name); - } - - void fillRightIndices(const Block& rhs_block) - { - std::cout << "rhs_block=" << rhs_block.dumpStructure() << std::endl; - for(auto& tn : type_name) { - right_indexes.push_back(rhs_block.getPositionByName(tn.second)); } - for(unsigned i = 0; i < right_indexes.size(); i++) { - std::cout << "ri i=" << i << " ri=" << right_indexes[i] << std::endl; - } - loaded = true; + for (auto& extra : extras) + addColumn(extra); + + for (auto& tn : type_name) + right_indexes.push_back(blocklist_sample.getPositionByName(tn.second)); } size_t size() const { return columns.size(); } @@ -642,30 +630,28 @@ public: void appendFromBlock(const Block & block, size_t row_num) { - if(!loaded) - return; - - std::cout << "appendFromBlock block=" << block.dumpStructure() << " row_num=" << row_num << " right_indexes=["; - - for (size_t j = 0; j < columns.size(); ++j) { - std::cout << right_indexes[j] << " "; + for (size_t j = 0; j < right_indexes.size(); ++j) columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num); - } - - std::cout << "]" << std::endl; } + void appendDefaultRow() { - for (size_t j = 0; j < columns.size(); ++j) + for (size_t j = 0; j < right_indexes.size(); ++j) columns[j]->insertDefault(); } private: TypeAndNames type_name; MutableColumns columns; - bool loaded = false; std::vector right_indexes; + + void addColumn(const ColumnWithTypeAndName & src_column) + { + columns.push_back(src_column.column->cloneEmpty()); + columns.back()->reserve(src_column.column->size()); + type_name.emplace_back(src_column.type, src_column.name); + } }; template @@ -856,40 +842,22 @@ void Join::joinBlockImpl( /** For LEFT/INNER JOIN, the saved blocks do not contain keys. * For FULL/RIGHT JOIN, the saved blocks contain keys; * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. + * For ASOF, the last column is used as the ASOF column */ -// size_t num_columns_to_skip = 0; -// if constexpr (right_or_full) -// num_columns_to_skip = keys_size; - - /// Add new columns to the block. - LOG_DEBUG(log, "joinBlockImpl - sample_block_with_columns_to_add" << sample_block_with_columns_to_add.dumpStructure()); - - AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block); - - // the last column in the key names is the asof column + ColumnsWithTypeAndName extras; if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) - added.addColumn(sample_block_with_keys.getByName(key_names_right.back())); - - if(!blocks.empty()) { - added.fillRightIndices(*blocks.begin()); - } else { - LOG_DEBUG(log, "unable to fill right index of added columns"); - } + extras.push_back(sample_block_with_keys.getByName(key_names_right.back())); + AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, blocklist_sample, extras); std::unique_ptr offsets_to_replicate; IColumn::Filter row_filter = switchJoinRightColumns( type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate); - LOG_DEBUG(log, "joinBlockImpl - switchJoinRightColumns"); - for (size_t i = 0; i < added.size(); ++i) block.insert(added.moveColumn(i)); - LOG_DEBUG(log, "joinBlockImpl - after insert: " << block.dumpStructure()); - /// Filter & insert missing rows - auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join); if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof) diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index f6ddaf87af0..77a2abacb5a 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -377,6 +377,10 @@ private: /// Block with key columns in the same order they appear in the right-side table. Block sample_block_with_keys; + /// Block as it would appear in the BlockList + void prepareBlockListStructure(Block& stored_block); + Block blocklist_sample; + Poco::Logger * log; /// Limits for maximum map size. From d4ec3bbf70b6e4229bbf0574e54e48870dceb4a6 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Mon, 1 Apr 2019 11:35:37 +0100 Subject: [PATCH 6/6] fix style --- dbms/src/Interpreters/Join.cpp | 8 ++++---- dbms/src/Interpreters/Join.h | 6 +++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 8a17fd8e22c..f19781ca380 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -485,7 +485,7 @@ namespace } } -void Join::prepareBlockListStructure(Block& stored_block) +void Join::prepareBlockListStructure(Block & stored_block) { if (isRightOrFull(kind)) { @@ -597,7 +597,7 @@ public: const Block & block_with_columns_to_add, const Block & block, const Block & blocklist_sample, - const ColumnsWithTypeAndName& extras) + const ColumnsWithTypeAndName & extras) { size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); @@ -614,10 +614,10 @@ public: addColumn(src_column); } - for (auto& extra : extras) + for (auto & extra : extras) addColumn(extra); - for (auto& tn : type_name) + for (auto & tn : type_name) right_indexes.push_back(blocklist_sample.getPositionByName(tn.second)); } diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 77a2abacb5a..01bd1335cbd 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -378,7 +378,6 @@ private: Block sample_block_with_keys; /// Block as it would appear in the BlockList - void prepareBlockListStructure(Block& stored_block); Block blocklist_sample; Poco::Logger * log; @@ -397,6 +396,11 @@ private: void init(Type type_); + /** Take an inserted block and discard everything that does not need to be stored + * Example, remove the keys as they come from the LHS block, but do keep the ASOF timestamps + */ + void prepareBlockListStructure(Block & stored_block); + /// Throw an exception if blocks have different types of key columns. void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right) const;