Refactor NotJoined pt2: rename classes, get rig of inheritance

This commit is contained in:
vdimir 2021-08-09 17:30:37 +03:00
parent afa748c717
commit 9d8178d04c
No known key found for this signature in database
GPG Key ID: F57B3E10A21DBB31
6 changed files with 65 additions and 62 deletions

View File

@ -629,7 +629,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
ConstNullMapPtr null_map{};
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
/// If RIGHT or FULL save blocks with nulls for NonJoinedBlockInputStream
/// If RIGHT or FULL save blocks with nulls for NotJoinedInputStream
UInt8 save_nullmap = 0;
if (isRightOrFull(kind) && null_map)
{
@ -1468,21 +1468,15 @@ struct AdderNonJoined
/// Stream from not joined earlier rows of the right table.
class NonJoinedBlockInputStream final : public NotJoined
class NotJoinedHash final : public NotJoinedInputStream::RightColumnsFiller
{
public:
NonJoinedBlockInputStream(
const HashJoin & parent_,
const Block & result_sample_block_,
size_t left_columns_count,
UInt64 max_block_size_)
: NotJoined(parent_.savedBlockSample(), result_sample_block_,
left_columns_count, parent_.table_join->leftToRightKeyRemap())
, parent(parent_)
, max_block_size(max_block_size_)
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
: parent(parent_), max_block_size(max_block_size_)
{}
protected:
Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); }
size_t fillColumns(MutableColumns & columns_right) override
{
size_t rows_added = 0;
@ -1594,7 +1588,8 @@ BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result
}
size_t left_columns_count = result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns();
return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, left_columns_count, max_block_size);
auto non_joined = std::make_unique<NotJoinedHash>(*this, max_block_size);
return std::make_shared<NotJoinedInputStream>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
void HashJoin::reuseJoinedData(const HashJoin & join)

View File

@ -337,7 +337,7 @@ public:
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
private:
friend class NonJoinedBlockInputStream;
friend class NotJoinedHash;
friend class JoinSource;
std::shared_ptr<TableJoin> table_join;

View File

@ -1028,24 +1028,15 @@ void MergeJoin::initRightTableWriter()
}
/// Stream from not joined earlier rows of the right table.
class NonMergeJoinedBlockInputStream final : public NotJoined
class NotJoinedMerge final : public NotJoinedInputStream::RightColumnsFiller
{
public:
NonMergeJoinedBlockInputStream(const MergeJoin & parent_,
const Block & result_sample_block,
size_t left_columns_count,
UInt64 max_block_size_)
: NotJoined(parent_.modifyRightBlock(parent_.right_sample_block),
result_sample_block,
left_columns_count,
parent_.table_join->leftToRightKeyRemap())
, parent(parent_)
, max_block_size(max_block_size_)
NotJoinedMerge(const MergeJoin & parent_, UInt64 max_block_size_)
: parent(parent_), max_block_size(max_block_size_)
{}
String getName() const override { return "NonMergeJoined"; }
Block getEmptyBlock() override { return parent.modifyRightBlock(parent.right_sample_block).cloneEmpty(); }
protected:
size_t fillColumns(MutableColumns & columns_right) override
{
const RowBitmaps & bitmaps = *parent.used_rows_bitmap;
@ -1100,9 +1091,12 @@ private:
BlockInputStreamPtr MergeJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
{
size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns();
if (table_join->strictness() == ASTTableJoin::Strictness::All && (is_right || is_full))
return std::make_shared<NonMergeJoinedBlockInputStream>(*this, result_sample_block, left_columns_count, max_block_size);
{
size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns();
auto non_joined = std::make_unique<NotJoinedMerge>(*this, max_block_size);
return std::make_shared<NotJoinedInputStream>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
return {};
}

View File

@ -38,7 +38,7 @@ public:
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
private:
friend class NonMergeJoinedBlockInputStream;
friend class NotJoinedMerge;
struct NotProcessed : public ExtraBlock
{

View File

@ -492,11 +492,12 @@ void splitAdditionalColumns(const Names & key_names, const Block & sample_block,
}
NotJoined::NotJoined(const Block & saved_block_sample_,
NotJoinedInputStream::NotJoinedInputStream(std::unique_ptr<RightColumnsFiller> filler_,
const Block & result_sample_block_,
size_t left_columns_count,
const LeftToRightKeyRemap & left_to_right_key_remap)
: saved_block_sample(saved_block_sample_)
: filler(std::move(filler_))
, saved_block_sample(filler->getEmptyBlock())
, result_sample_block(materializeBlock(result_sample_block_))
{
for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos)
@ -536,7 +537,7 @@ NotJoined::NotJoined(const Block & saved_block_sample_,
ErrorCodes::LOGICAL_ERROR);
}
void NotJoined::setRightIndex(size_t right_pos, size_t result_position)
void NotJoinedInputStream::setRightIndex(size_t right_pos, size_t result_position)
{
if (!column_indices_right.contains(right_pos))
{
@ -547,7 +548,7 @@ void NotJoined::setRightIndex(size_t right_pos, size_t result_position)
same_result_keys[result_position] = column_indices_right[right_pos];
}
void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos)
void NotJoinedInputStream::extractColumnChanges(size_t right_pos, size_t result_pos)
{
auto src_props = getLowcardAndNullability(saved_block_sample.getByPosition(right_pos).column);
auto dst_props = getLowcardAndNullability(result_sample_block.getByPosition(result_pos).column);
@ -559,7 +560,7 @@ void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos)
right_lowcard_changes.push_back({result_pos, dst_props.is_lowcard});
}
void NotJoined::correctLowcardAndNullability(Block & block)
void NotJoinedInputStream::correctLowcardAndNullability(Block & block)
{
for (auto & [pos, added] : right_nullability_changes)
{
@ -587,7 +588,7 @@ void NotJoined::correctLowcardAndNullability(Block & block)
}
}
void NotJoined::addLeftColumns(Block & block, size_t rows_added) const
void NotJoinedInputStream::addLeftColumns(Block & block, size_t rows_added) const
{
for (size_t pos : column_indices_left)
{
@ -599,7 +600,7 @@ void NotJoined::addLeftColumns(Block & block, size_t rows_added) const
}
}
void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) const
void NotJoinedInputStream::addRightColumns(Block & block, MutableColumns & columns_right) const
{
for (const auto & pr : column_indices_right)
{
@ -609,7 +610,7 @@ void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) c
}
}
void NotJoined::copySameKeys(Block & block) const
void NotJoinedInputStream::copySameKeys(Block & block) const
{
for (const auto & pr : same_result_keys)
{
@ -619,4 +620,24 @@ void NotJoined::copySameKeys(Block & block) const
}
}
Block NotJoinedInputStream::readImpl()
{
Block right_block = filler->getEmptyBlock();
MutableColumns columns_right = right_block.cloneEmptyColumns();
size_t rows_added = filler->fillColumns(columns_right);
if (rows_added == 0)
return {};
addLeftColumns(right_block, rows_added);
addRightColumns(right_block, columns_right);
copySameKeys(right_block);
correctLowcardAndNullability(right_block);
#ifndef NDEBUG
assertBlocksHaveEqualStructure(right_block, result_sample_block, getName());
#endif
return right_block;
}
}

View File

@ -65,12 +65,24 @@ void changeLowCardinalityInplace(ColumnWithTypeAndName & column);
}
/// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table.
class NotJoined : public IBlockInputStream
class NotJoinedInputStream : public IBlockInputStream
{
public:
using LeftToRightKeyRemap = std::unordered_map<String, String>;
NotJoined(const Block & saved_block_sample_,
/// Returns non joined columns from right part of join
class RightColumnsFiller
{
public:
/// Create empty block for right part
virtual Block getEmptyBlock() = 0;
/// Fill columns from right part of join with not joined rows
virtual size_t fillColumns(MutableColumns & columns_right) = 0;
virtual ~RightColumnsFiller() = default;
};
NotJoinedInputStream(std::unique_ptr<RightColumnsFiller> filler_,
const Block & result_sample_block_,
size_t left_columns_count,
const LeftToRightKeyRemap & left_to_right_key_remap);
@ -79,28 +91,7 @@ public:
Block getHeader() const override { return result_sample_block; }
protected:
Block readImpl() override final
{
Block result = saved_block_sample.cloneEmpty();
MutableColumns columns_right = result.mutateColumns();
size_t rows_added = fillColumns(columns_right);
if (rows_added == 0)
return {};
Block res = result_sample_block.cloneEmpty();
addLeftColumns(res, rows_added);
addRightColumns(res, columns_right);
copySameKeys(res);
correctLowcardAndNullability(res);
#ifndef NDEBUG
assertBlocksHaveEqualStructure(res, result_sample_block, getName());
#endif
return res;
}
virtual size_t fillColumns(MutableColumns & columns_right) = 0;
Block readImpl() override final;
private:
void extractColumnChanges(size_t right_pos, size_t result_pos);
@ -109,6 +100,8 @@ private:
void addRightColumns(Block & block, MutableColumns & columns_right) const;
void copySameKeys(Block & block) const;
std::unique_ptr<RightColumnsFiller> filler;
/// Right block saved in Join
Block saved_block_sample;