From 2dfbbe364b357699e12888093540e1b6431a8e7a Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 17 Aug 2021 16:30:01 +0300 Subject: [PATCH] Do not use BlockInputStream for NonJoined --- src/Interpreters/HashJoin.cpp | 9 ++--- src/Interpreters/HashJoin.h | 3 +- src/Interpreters/IJoin.h | 4 +- src/Interpreters/JoinSwitcher.h | 38 +------------------ src/Interpreters/MergeJoin.cpp | 8 ++-- src/Interpreters/MergeJoin.h | 2 +- src/Interpreters/join_common.cpp | 18 ++++----- src/Interpreters/join_common.h | 11 ++---- .../Transforms/JoiningTransform.cpp | 8 ++-- src/Processors/Transforms/JoiningTransform.h | 5 +-- 10 files changed, 32 insertions(+), 74 deletions(-) diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 4384072377d..6abaddd6270 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -21,7 +21,6 @@ #include -#include #include #include @@ -629,7 +628,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) ConstNullMapPtr null_map{}; ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); - /// If RIGHT or FULL save blocks with nulls for NotJoinedInputStream + /// If RIGHT or FULL save blocks with nulls for NotJoinedBlocks UInt8 save_nullmap = 0; if (isRightOrFull(kind) && null_map) { @@ -1468,7 +1467,7 @@ struct AdderNonJoined /// Stream from not joined earlier rows of the right table. -class NotJoinedHash final : public NotJoinedInputStream::RightColumnsFiller +class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller { public: NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_) @@ -1578,7 +1577,7 @@ private: }; -BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const +std::shared_ptr HashJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const { if (table_join->strictness() == ASTTableJoin::Strictness::Asof || table_join->strictness() == ASTTableJoin::Strictness::Semi || @@ -1589,7 +1588,7 @@ BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result size_t left_columns_count = result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns(); auto non_joined = std::make_unique(*this, max_block_size); - return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); + return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); } void HashJoin::reuseJoinedData(const HashJoin & join) diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index f6e47b59d25..2e691f189c4 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -20,7 +20,6 @@ #include #include -#include #include @@ -164,7 +163,7 @@ public: * Use only after all calls to joinBlock was done. * left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside). */ - BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override; + std::shared_ptr getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override; /// Number of keys in all built JOIN maps. size_t getTotalRowCount() const final; diff --git a/src/Interpreters/IJoin.h b/src/Interpreters/IJoin.h index 8fa85de4951..2215402e1d4 100644 --- a/src/Interpreters/IJoin.h +++ b/src/Interpreters/IJoin.h @@ -5,7 +5,6 @@ #include #include -#include namespace DB { @@ -15,6 +14,7 @@ struct ExtraBlock; using ExtraBlockPtr = std::shared_ptr; class TableJoin; +class NotJoinedBlocks; class IJoin { @@ -43,7 +43,7 @@ public: /// Different query plan is used for such joins. virtual bool isFilled() const { return false; } - virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; } + virtual std::shared_ptr getNonJoinedBlocks(const Block &, UInt64) const = 0; }; using JoinPtr = std::shared_ptr; diff --git a/src/Interpreters/JoinSwitcher.h b/src/Interpreters/JoinSwitcher.h index a89ac6d5d98..e750bc5eed0 100644 --- a/src/Interpreters/JoinSwitcher.h +++ b/src/Interpreters/JoinSwitcher.h @@ -56,9 +56,9 @@ public: return join->alwaysReturnsEmptySet(); } - BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & block, UInt64 max_block_size) const override + std::shared_ptr getNonJoinedBlocks(const Block & block, UInt64 max_block_size) const override { - return join->createStreamWithNonJoinedRows(block, max_block_size); + return join->getNonJoinedBlocks(block, max_block_size); } private: @@ -74,38 +74,4 @@ private: void switchJoin(); }; - -/// Creates NonJoinedBlockInputStream on the first read. Allows to swap join algo before it. -class LazyNonJoinedBlockInputStream : public IBlockInputStream -{ -public: - LazyNonJoinedBlockInputStream(const IJoin & join_, const Block & block, UInt64 max_block_size_) - : join(join_) - , result_sample_block(block) - , max_block_size(max_block_size_) - {} - - String getName() const override { return "LazyNonMergeJoined"; } - Block getHeader() const override { return result_sample_block; } - -protected: - Block readImpl() override - { - if (!stream) - { - stream = join.createStreamWithNonJoinedRows(result_sample_block, max_block_size); - if (!stream) - return {}; - } - - return stream->read(); - } - -private: - BlockInputStreamPtr stream; - const IJoin & join; - Block result_sample_block; - UInt64 max_block_size; -}; - } diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 611f1742fa4..0150bbe1d93 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -1028,7 +1028,7 @@ void MergeJoin::initRightTableWriter() } /// Stream from not joined earlier rows of the right table. -class NotJoinedMerge final : public NotJoinedInputStream::RightColumnsFiller +class NotJoinedMerge final : public NotJoinedBlocks::RightColumnsFiller { public: NotJoinedMerge(const MergeJoin & parent_, UInt64 max_block_size_) @@ -1089,15 +1089,15 @@ private: }; -BlockInputStreamPtr MergeJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const +std::shared_ptr MergeJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const { if (table_join->strictness() == ASTTableJoin::Strictness::All && (is_right || is_full)) { size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns(); auto non_joined = std::make_unique(*this, max_block_size); - return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); + return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); } - return {}; + return nullptr; } bool MergeJoin::needConditionJoinColumn() const diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index 89371d8b13b..9e765041846 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -35,7 +35,7 @@ public: /// Has to be called only after setTotals()/mergeRightBlocks() bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); } - BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override; + std::shared_ptr getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override; private: friend class NotJoinedMerge; diff --git a/src/Interpreters/join_common.cpp b/src/Interpreters/join_common.cpp index c640fea3a36..349ba56e74a 100644 --- a/src/Interpreters/join_common.cpp +++ b/src/Interpreters/join_common.cpp @@ -492,7 +492,7 @@ void splitAdditionalColumns(const Names & key_names, const Block & sample_block, } -NotJoinedInputStream::NotJoinedInputStream(std::unique_ptr filler_, +NotJoinedBlocks::NotJoinedBlocks(std::unique_ptr filler_, const Block & result_sample_block_, size_t left_columns_count, const LeftToRightKeyRemap & left_to_right_key_remap) @@ -537,7 +537,7 @@ NotJoinedInputStream::NotJoinedInputStream(std::unique_ptr f ErrorCodes::LOGICAL_ERROR); } -void NotJoinedInputStream::setRightIndex(size_t right_pos, size_t result_position) +void NotJoinedBlocks::setRightIndex(size_t right_pos, size_t result_position) { if (!column_indices_right.contains(right_pos)) { @@ -548,7 +548,7 @@ void NotJoinedInputStream::setRightIndex(size_t right_pos, size_t result_positio same_result_keys[result_position] = column_indices_right[right_pos]; } -void NotJoinedInputStream::extractColumnChanges(size_t right_pos, size_t result_pos) +void NotJoinedBlocks::extractColumnChanges(size_t right_pos, size_t result_pos) { auto src_props = getLowcardAndNullability(saved_block_sample.getByPosition(right_pos).column); auto dst_props = getLowcardAndNullability(result_sample_block.getByPosition(result_pos).column); @@ -560,7 +560,7 @@ void NotJoinedInputStream::extractColumnChanges(size_t right_pos, size_t result_ right_lowcard_changes.push_back({result_pos, dst_props.is_lowcard}); } -void NotJoinedInputStream::correctLowcardAndNullability(Block & block) +void NotJoinedBlocks::correctLowcardAndNullability(Block & block) { for (auto & [pos, added] : right_nullability_changes) { @@ -588,7 +588,7 @@ void NotJoinedInputStream::correctLowcardAndNullability(Block & block) } } -void NotJoinedInputStream::addLeftColumns(Block & block, size_t rows_added) const +void NotJoinedBlocks::addLeftColumns(Block & block, size_t rows_added) const { for (size_t pos : column_indices_left) { @@ -600,7 +600,7 @@ void NotJoinedInputStream::addLeftColumns(Block & block, size_t rows_added) cons } } -void NotJoinedInputStream::addRightColumns(Block & block, MutableColumns & columns_right) const +void NotJoinedBlocks::addRightColumns(Block & block, MutableColumns & columns_right) const { for (const auto & pr : column_indices_right) { @@ -610,7 +610,7 @@ void NotJoinedInputStream::addRightColumns(Block & block, MutableColumns & colum } } -void NotJoinedInputStream::copySameKeys(Block & block) const +void NotJoinedBlocks::copySameKeys(Block & block) const { for (const auto & pr : same_result_keys) { @@ -620,7 +620,7 @@ void NotJoinedInputStream::copySameKeys(Block & block) const } } -Block NotJoinedInputStream::readImpl() +Block NotJoinedBlocks::read() { Block right_block = filler->getEmptyBlock(); @@ -635,7 +635,7 @@ Block NotJoinedInputStream::readImpl() correctLowcardAndNullability(right_block); #ifndef NDEBUG - assertBlocksHaveEqualStructure(right_block, result_sample_block, getName()); + assertBlocksHaveEqualStructure(right_block, result_sample_block, "NotJoinedBlocks"); #endif return right_block; } diff --git a/src/Interpreters/join_common.h b/src/Interpreters/join_common.h index 32fa4a4ee9e..ec2e1d3bd50 100644 --- a/src/Interpreters/join_common.h +++ b/src/Interpreters/join_common.h @@ -5,7 +5,6 @@ #include #include #include -#include namespace DB { @@ -65,7 +64,7 @@ void changeLowCardinalityInplace(ColumnWithTypeAndName & column); } /// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table. -class NotJoinedInputStream : public IBlockInputStream +class NotJoinedBlocks final { public: using LeftToRightKeyRemap = std::unordered_map; @@ -82,16 +81,12 @@ public: virtual ~RightColumnsFiller() = default; }; - NotJoinedInputStream(std::unique_ptr filler_, + NotJoinedBlocks(std::unique_ptr filler_, const Block & result_sample_block_, size_t left_columns_count, const LeftToRightKeyRemap & left_to_right_key_remap); - String getName() const override { return "NonJoined"; } - Block getHeader() const override { return result_sample_block; } - -protected: - Block readImpl() override final; + Block read(); private: void extractColumnChanges(size_t right_pos, size_t result_pos); diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index c1329d02fed..95a12e2291d 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -113,7 +113,7 @@ void JoiningTransform::work() } else { - if (!non_joined_stream) + if (!non_joined_blocks) { if (!finish_counter || !finish_counter->isLast()) { @@ -121,15 +121,15 @@ void JoiningTransform::work() return; } - non_joined_stream = join->createStreamWithNonJoinedRows(outputs.front().getHeader(), max_block_size); - if (!non_joined_stream) + non_joined_blocks = join->getNonJoinedBlocks(outputs.front().getHeader(), max_block_size); + if (!non_joined_blocks) { process_non_joined = false; return; } } - auto block = non_joined_stream->read(); + Block block = non_joined_blocks->read(); if (!block) { process_non_joined = false; diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index 98038946f3b..96c4032dabc 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -8,8 +8,7 @@ namespace DB class IJoin; using JoinPtr = std::shared_ptr; -class IBlockInputStream; -using BlockInputStreamPtr = std::shared_ptr; +class NotJoinedBlocks; /// Join rows to chunk form left table. /// This transform usually has two input ports and one output. @@ -76,7 +75,7 @@ private: ExtraBlockPtr not_processed; FinishCounterPtr finish_counter; - BlockInputStreamPtr non_joined_stream; + std::shared_ptr non_joined_blocks; size_t max_block_size; Block readExecute(Chunk & chunk);