From 9d8178d04c6321ad301ee82ead42106a2bb928f9 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 9 Aug 2021 17:30:37 +0300 Subject: [PATCH] Refactor NotJoined pt2: rename classes, get rig of inheritance --- src/Interpreters/HashJoin.cpp | 21 +++++++--------- src/Interpreters/HashJoin.h | 2 +- src/Interpreters/MergeJoin.cpp | 24 +++++++------------ src/Interpreters/MergeJoin.h | 2 +- src/Interpreters/join_common.cpp | 37 +++++++++++++++++++++------- src/Interpreters/join_common.h | 41 +++++++++++++------------------- 6 files changed, 65 insertions(+), 62 deletions(-) diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 4130431485e..4384072377d 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -629,7 +629,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) ConstNullMapPtr null_map{}; ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); - /// If RIGHT or FULL save blocks with nulls for NonJoinedBlockInputStream + /// If RIGHT or FULL save blocks with nulls for NotJoinedInputStream UInt8 save_nullmap = 0; if (isRightOrFull(kind) && null_map) { @@ -1468,21 +1468,15 @@ struct AdderNonJoined /// Stream from not joined earlier rows of the right table. -class NonJoinedBlockInputStream final : public NotJoined +class NotJoinedHash final : public NotJoinedInputStream::RightColumnsFiller { public: - NonJoinedBlockInputStream( - const HashJoin & parent_, - const Block & result_sample_block_, - size_t left_columns_count, - UInt64 max_block_size_) - : NotJoined(parent_.savedBlockSample(), result_sample_block_, - left_columns_count, parent_.table_join->leftToRightKeyRemap()) - , parent(parent_) - , max_block_size(max_block_size_) + NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_) + : parent(parent_), max_block_size(max_block_size_) {} -protected: + Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); } + size_t fillColumns(MutableColumns & columns_right) override { size_t rows_added = 0; @@ -1594,7 +1588,8 @@ BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result } size_t left_columns_count = result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns(); - return std::make_shared(*this, result_sample_block, left_columns_count, max_block_size); + auto non_joined = std::make_unique(*this, max_block_size); + return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); } void HashJoin::reuseJoinedData(const HashJoin & join) diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 65e3f5dbabe..f6e47b59d25 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -337,7 +337,7 @@ public: bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); } private: - friend class NonJoinedBlockInputStream; + friend class NotJoinedHash; friend class JoinSource; std::shared_ptr table_join; diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 84d5a80cff5..611f1742fa4 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -1028,24 +1028,15 @@ void MergeJoin::initRightTableWriter() } /// Stream from not joined earlier rows of the right table. -class NonMergeJoinedBlockInputStream final : public NotJoined +class NotJoinedMerge final : public NotJoinedInputStream::RightColumnsFiller { public: - NonMergeJoinedBlockInputStream(const MergeJoin & parent_, - const Block & result_sample_block, - size_t left_columns_count, - UInt64 max_block_size_) - : NotJoined(parent_.modifyRightBlock(parent_.right_sample_block), - result_sample_block, - left_columns_count, - parent_.table_join->leftToRightKeyRemap()) - , parent(parent_) - , max_block_size(max_block_size_) + NotJoinedMerge(const MergeJoin & parent_, UInt64 max_block_size_) + : parent(parent_), max_block_size(max_block_size_) {} - String getName() const override { return "NonMergeJoined"; } + Block getEmptyBlock() override { return parent.modifyRightBlock(parent.right_sample_block).cloneEmpty(); } -protected: size_t fillColumns(MutableColumns & columns_right) override { const RowBitmaps & bitmaps = *parent.used_rows_bitmap; @@ -1100,9 +1091,12 @@ private: BlockInputStreamPtr MergeJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const { - size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns(); if (table_join->strictness() == ASTTableJoin::Strictness::All && (is_right || is_full)) - return std::make_shared(*this, result_sample_block, left_columns_count, max_block_size); + { + size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns(); + auto non_joined = std::make_unique(*this, max_block_size); + return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); + } return {}; } diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index 4aa26ead0a0..89371d8b13b 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -38,7 +38,7 @@ public: BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override; private: - friend class NonMergeJoinedBlockInputStream; + friend class NotJoinedMerge; struct NotProcessed : public ExtraBlock { diff --git a/src/Interpreters/join_common.cpp b/src/Interpreters/join_common.cpp index 2c6a2731a0e..c640fea3a36 100644 --- a/src/Interpreters/join_common.cpp +++ b/src/Interpreters/join_common.cpp @@ -492,11 +492,12 @@ void splitAdditionalColumns(const Names & key_names, const Block & sample_block, } -NotJoined::NotJoined(const Block & saved_block_sample_, +NotJoinedInputStream::NotJoinedInputStream(std::unique_ptr filler_, const Block & result_sample_block_, size_t left_columns_count, const LeftToRightKeyRemap & left_to_right_key_remap) - : saved_block_sample(saved_block_sample_) + : filler(std::move(filler_)) + , saved_block_sample(filler->getEmptyBlock()) , result_sample_block(materializeBlock(result_sample_block_)) { for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos) @@ -536,7 +537,7 @@ NotJoined::NotJoined(const Block & saved_block_sample_, ErrorCodes::LOGICAL_ERROR); } -void NotJoined::setRightIndex(size_t right_pos, size_t result_position) +void NotJoinedInputStream::setRightIndex(size_t right_pos, size_t result_position) { if (!column_indices_right.contains(right_pos)) { @@ -547,7 +548,7 @@ void NotJoined::setRightIndex(size_t right_pos, size_t result_position) same_result_keys[result_position] = column_indices_right[right_pos]; } -void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos) +void NotJoinedInputStream::extractColumnChanges(size_t right_pos, size_t result_pos) { auto src_props = getLowcardAndNullability(saved_block_sample.getByPosition(right_pos).column); auto dst_props = getLowcardAndNullability(result_sample_block.getByPosition(result_pos).column); @@ -559,7 +560,7 @@ void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos) right_lowcard_changes.push_back({result_pos, dst_props.is_lowcard}); } -void NotJoined::correctLowcardAndNullability(Block & block) +void NotJoinedInputStream::correctLowcardAndNullability(Block & block) { for (auto & [pos, added] : right_nullability_changes) { @@ -587,7 +588,7 @@ void NotJoined::correctLowcardAndNullability(Block & block) } } -void NotJoined::addLeftColumns(Block & block, size_t rows_added) const +void NotJoinedInputStream::addLeftColumns(Block & block, size_t rows_added) const { for (size_t pos : column_indices_left) { @@ -599,7 +600,7 @@ void NotJoined::addLeftColumns(Block & block, size_t rows_added) const } } -void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) const +void NotJoinedInputStream::addRightColumns(Block & block, MutableColumns & columns_right) const { for (const auto & pr : column_indices_right) { @@ -609,7 +610,7 @@ void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) c } } -void NotJoined::copySameKeys(Block & block) const +void NotJoinedInputStream::copySameKeys(Block & block) const { for (const auto & pr : same_result_keys) { @@ -619,4 +620,24 @@ void NotJoined::copySameKeys(Block & block) const } } +Block NotJoinedInputStream::readImpl() + +{ + Block right_block = filler->getEmptyBlock(); + MutableColumns columns_right = right_block.cloneEmptyColumns(); + size_t rows_added = filler->fillColumns(columns_right); + if (rows_added == 0) + return {}; + + addLeftColumns(right_block, rows_added); + addRightColumns(right_block, columns_right); + copySameKeys(right_block); + correctLowcardAndNullability(right_block); + +#ifndef NDEBUG + assertBlocksHaveEqualStructure(right_block, result_sample_block, getName()); +#endif + return right_block; +} + } diff --git a/src/Interpreters/join_common.h b/src/Interpreters/join_common.h index f61e110627b..32fa4a4ee9e 100644 --- a/src/Interpreters/join_common.h +++ b/src/Interpreters/join_common.h @@ -65,12 +65,24 @@ void changeLowCardinalityInplace(ColumnWithTypeAndName & column); } /// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table. -class NotJoined : public IBlockInputStream +class NotJoinedInputStream : public IBlockInputStream { public: using LeftToRightKeyRemap = std::unordered_map; - NotJoined(const Block & saved_block_sample_, + /// Returns non joined columns from right part of join + class RightColumnsFiller + { + public: + /// Create empty block for right part + virtual Block getEmptyBlock() = 0; + /// Fill columns from right part of join with not joined rows + virtual size_t fillColumns(MutableColumns & columns_right) = 0; + + virtual ~RightColumnsFiller() = default; + }; + + NotJoinedInputStream(std::unique_ptr filler_, const Block & result_sample_block_, size_t left_columns_count, const LeftToRightKeyRemap & left_to_right_key_remap); @@ -79,28 +91,7 @@ public: Block getHeader() const override { return result_sample_block; } protected: - Block readImpl() override final - { - Block result = saved_block_sample.cloneEmpty(); - MutableColumns columns_right = result.mutateColumns(); - - size_t rows_added = fillColumns(columns_right); - if (rows_added == 0) - return {}; - - Block res = result_sample_block.cloneEmpty(); - addLeftColumns(res, rows_added); - addRightColumns(res, columns_right); - copySameKeys(res); - correctLowcardAndNullability(res); - -#ifndef NDEBUG - assertBlocksHaveEqualStructure(res, result_sample_block, getName()); -#endif - return res; - } - - virtual size_t fillColumns(MutableColumns & columns_right) = 0; + Block readImpl() override final; private: void extractColumnChanges(size_t right_pos, size_t result_pos); @@ -109,6 +100,8 @@ private: void addRightColumns(Block & block, MutableColumns & columns_right) const; void copySameKeys(Block & block) const; + std::unique_ptr filler; + /// Right block saved in Join Block saved_block_sample;