Do not use BlockInputStream for NonJoined

This commit is contained in:
vdimir 2021-08-17 16:30:01 +03:00
parent 9d8178d04c
commit 2dfbbe364b
No known key found for this signature in database
GPG Key ID: F57B3E10A21DBB31
10 changed files with 32 additions and 74 deletions

View File

@ -21,7 +21,6 @@
#include <Storages/StorageDictionary.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Core/ColumnNumbers.h>
@ -629,7 +628,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
ConstNullMapPtr null_map{};
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
/// If RIGHT or FULL save blocks with nulls for NotJoinedInputStream
/// If RIGHT or FULL save blocks with nulls for NotJoinedBlocks
UInt8 save_nullmap = 0;
if (isRightOrFull(kind) && null_map)
{
@ -1468,7 +1467,7 @@ struct AdderNonJoined
/// Stream from not joined earlier rows of the right table.
class NotJoinedHash final : public NotJoinedInputStream::RightColumnsFiller
class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
{
public:
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
@ -1578,7 +1577,7 @@ private:
};
BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
std::shared_ptr<NotJoinedBlocks> HashJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const
{
if (table_join->strictness() == ASTTableJoin::Strictness::Asof ||
table_join->strictness() == ASTTableJoin::Strictness::Semi ||
@ -1589,7 +1588,7 @@ BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result
size_t left_columns_count = result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns();
auto non_joined = std::make_unique<NotJoinedHash>(*this, max_block_size);
return std::make_shared<NotJoinedInputStream>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
void HashJoin::reuseJoinedData(const HashJoin & join)

View File

@ -20,7 +20,6 @@
#include <Columns/ColumnFixedString.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Core/Block.h>
@ -164,7 +163,7 @@ public:
* Use only after all calls to joinBlock was done.
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override;
/// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const final;

View File

@ -5,7 +5,6 @@
#include <Core/Names.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB
{
@ -15,6 +14,7 @@ struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
class TableJoin;
class NotJoinedBlocks;
class IJoin
{
@ -43,7 +43,7 @@ public:
/// Different query plan is used for such joins.
virtual bool isFilled() const { return false; }
virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; }
virtual std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block &, UInt64) const = 0;
};
using JoinPtr = std::shared_ptr<IJoin>;

View File

@ -56,9 +56,9 @@ public:
return join->alwaysReturnsEmptySet();
}
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & block, UInt64 max_block_size) const override
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & block, UInt64 max_block_size) const override
{
return join->createStreamWithNonJoinedRows(block, max_block_size);
return join->getNonJoinedBlocks(block, max_block_size);
}
private:
@ -74,38 +74,4 @@ private:
void switchJoin();
};
/// Creates NonJoinedBlockInputStream on the first read. Allows to swap join algo before it.
class LazyNonJoinedBlockInputStream : public IBlockInputStream
{
public:
LazyNonJoinedBlockInputStream(const IJoin & join_, const Block & block, UInt64 max_block_size_)
: join(join_)
, result_sample_block(block)
, max_block_size(max_block_size_)
{}
String getName() const override { return "LazyNonMergeJoined"; }
Block getHeader() const override { return result_sample_block; }
protected:
Block readImpl() override
{
if (!stream)
{
stream = join.createStreamWithNonJoinedRows(result_sample_block, max_block_size);
if (!stream)
return {};
}
return stream->read();
}
private:
BlockInputStreamPtr stream;
const IJoin & join;
Block result_sample_block;
UInt64 max_block_size;
};
}

View File

@ -1028,7 +1028,7 @@ void MergeJoin::initRightTableWriter()
}
/// Stream from not joined earlier rows of the right table.
class NotJoinedMerge final : public NotJoinedInputStream::RightColumnsFiller
class NotJoinedMerge final : public NotJoinedBlocks::RightColumnsFiller
{
public:
NotJoinedMerge(const MergeJoin & parent_, UInt64 max_block_size_)
@ -1089,15 +1089,15 @@ private:
};
BlockInputStreamPtr MergeJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
std::shared_ptr<NotJoinedBlocks> MergeJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const
{
if (table_join->strictness() == ASTTableJoin::Strictness::All && (is_right || is_full))
{
size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns();
auto non_joined = std::make_unique<NotJoinedMerge>(*this, max_block_size);
return std::make_shared<NotJoinedInputStream>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
return {};
return nullptr;
}
bool MergeJoin::needConditionJoinColumn() const

View File

@ -35,7 +35,7 @@ public:
/// Has to be called only after setTotals()/mergeRightBlocks()
bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); }
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override;
private:
friend class NotJoinedMerge;

View File

@ -492,7 +492,7 @@ void splitAdditionalColumns(const Names & key_names, const Block & sample_block,
}
NotJoinedInputStream::NotJoinedInputStream(std::unique_ptr<RightColumnsFiller> filler_,
NotJoinedBlocks::NotJoinedBlocks(std::unique_ptr<RightColumnsFiller> filler_,
const Block & result_sample_block_,
size_t left_columns_count,
const LeftToRightKeyRemap & left_to_right_key_remap)
@ -537,7 +537,7 @@ NotJoinedInputStream::NotJoinedInputStream(std::unique_ptr<RightColumnsFiller> f
ErrorCodes::LOGICAL_ERROR);
}
void NotJoinedInputStream::setRightIndex(size_t right_pos, size_t result_position)
void NotJoinedBlocks::setRightIndex(size_t right_pos, size_t result_position)
{
if (!column_indices_right.contains(right_pos))
{
@ -548,7 +548,7 @@ void NotJoinedInputStream::setRightIndex(size_t right_pos, size_t result_positio
same_result_keys[result_position] = column_indices_right[right_pos];
}
void NotJoinedInputStream::extractColumnChanges(size_t right_pos, size_t result_pos)
void NotJoinedBlocks::extractColumnChanges(size_t right_pos, size_t result_pos)
{
auto src_props = getLowcardAndNullability(saved_block_sample.getByPosition(right_pos).column);
auto dst_props = getLowcardAndNullability(result_sample_block.getByPosition(result_pos).column);
@ -560,7 +560,7 @@ void NotJoinedInputStream::extractColumnChanges(size_t right_pos, size_t result_
right_lowcard_changes.push_back({result_pos, dst_props.is_lowcard});
}
void NotJoinedInputStream::correctLowcardAndNullability(Block & block)
void NotJoinedBlocks::correctLowcardAndNullability(Block & block)
{
for (auto & [pos, added] : right_nullability_changes)
{
@ -588,7 +588,7 @@ void NotJoinedInputStream::correctLowcardAndNullability(Block & block)
}
}
void NotJoinedInputStream::addLeftColumns(Block & block, size_t rows_added) const
void NotJoinedBlocks::addLeftColumns(Block & block, size_t rows_added) const
{
for (size_t pos : column_indices_left)
{
@ -600,7 +600,7 @@ void NotJoinedInputStream::addLeftColumns(Block & block, size_t rows_added) cons
}
}
void NotJoinedInputStream::addRightColumns(Block & block, MutableColumns & columns_right) const
void NotJoinedBlocks::addRightColumns(Block & block, MutableColumns & columns_right) const
{
for (const auto & pr : column_indices_right)
{
@ -610,7 +610,7 @@ void NotJoinedInputStream::addRightColumns(Block & block, MutableColumns & colum
}
}
void NotJoinedInputStream::copySameKeys(Block & block) const
void NotJoinedBlocks::copySameKeys(Block & block) const
{
for (const auto & pr : same_result_keys)
{
@ -620,7 +620,7 @@ void NotJoinedInputStream::copySameKeys(Block & block) const
}
}
Block NotJoinedInputStream::readImpl()
Block NotJoinedBlocks::read()
{
Block right_block = filler->getEmptyBlock();
@ -635,7 +635,7 @@ Block NotJoinedInputStream::readImpl()
correctLowcardAndNullability(right_block);
#ifndef NDEBUG
assertBlocksHaveEqualStructure(right_block, result_sample_block, getName());
assertBlocksHaveEqualStructure(right_block, result_sample_block, "NotJoinedBlocks");
#endif
return right_block;
}

View File

@ -5,7 +5,6 @@
#include <Interpreters/IJoin.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
@ -65,7 +64,7 @@ void changeLowCardinalityInplace(ColumnWithTypeAndName & column);
}
/// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table.
class NotJoinedInputStream : public IBlockInputStream
class NotJoinedBlocks final
{
public:
using LeftToRightKeyRemap = std::unordered_map<String, String>;
@ -82,16 +81,12 @@ public:
virtual ~RightColumnsFiller() = default;
};
NotJoinedInputStream(std::unique_ptr<RightColumnsFiller> filler_,
NotJoinedBlocks(std::unique_ptr<RightColumnsFiller> filler_,
const Block & result_sample_block_,
size_t left_columns_count,
const LeftToRightKeyRemap & left_to_right_key_remap);
String getName() const override { return "NonJoined"; }
Block getHeader() const override { return result_sample_block; }
protected:
Block readImpl() override final;
Block read();
private:
void extractColumnChanges(size_t right_pos, size_t result_pos);

View File

@ -113,7 +113,7 @@ void JoiningTransform::work()
}
else
{
if (!non_joined_stream)
if (!non_joined_blocks)
{
if (!finish_counter || !finish_counter->isLast())
{
@ -121,15 +121,15 @@ void JoiningTransform::work()
return;
}
non_joined_stream = join->createStreamWithNonJoinedRows(outputs.front().getHeader(), max_block_size);
if (!non_joined_stream)
non_joined_blocks = join->getNonJoinedBlocks(outputs.front().getHeader(), max_block_size);
if (!non_joined_blocks)
{
process_non_joined = false;
return;
}
}
auto block = non_joined_stream->read();
Block block = non_joined_blocks->read();
if (!block)
{
process_non_joined = false;

View File

@ -8,8 +8,7 @@ namespace DB
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class NotJoinedBlocks;
/// Join rows to chunk form left table.
/// This transform usually has two input ports and one output.
@ -76,7 +75,7 @@ private:
ExtraBlockPtr not_processed;
FinishCounterPtr finish_counter;
BlockInputStreamPtr non_joined_stream;
std::shared_ptr<NotJoinedBlocks> non_joined_blocks;
size_t max_block_size;
Block readExecute(Chunk & chunk);