mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
Merge pull request #27299 from vdimir/issue-27091-v2
Refactor NotJoined
This commit is contained in:
commit
7fe7103e53
@ -21,7 +21,6 @@
|
||||
|
||||
#include <Storages/StorageDictionary.h>
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <DataStreams/materializeBlock.h>
|
||||
|
||||
#include <Core/ColumnNumbers.h>
|
||||
@ -194,6 +193,13 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
|
||||
|
||||
required_right_keys = table_join->getRequiredRightKeys(right_table_keys, required_right_keys_sources);
|
||||
|
||||
LOG_DEBUG(log, "Right keys: [{}] (required: [{}]), left keys: [{}]",
|
||||
fmt::join(key_names_right, ", "),
|
||||
fmt::join(required_right_keys.getNames(), ", "),
|
||||
fmt::join(table_join->keyNamesLeft(), ", "));
|
||||
|
||||
LOG_DEBUG(log, "Columns to add: [{}]", sample_block_with_columns_to_add.dumpStructure());
|
||||
|
||||
std::tie(condition_mask_column_name_left, condition_mask_column_name_right) = table_join->joinConditionColumnNames();
|
||||
|
||||
JoinCommon::removeLowCardinalityInplace(right_table_keys);
|
||||
@ -629,7 +635,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
|
||||
ConstNullMapPtr null_map{};
|
||||
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
|
||||
|
||||
/// If RIGHT or FULL save blocks with nulls for NonJoinedBlockInputStream
|
||||
/// If RIGHT or FULL save blocks with nulls for NotJoinedBlocks
|
||||
UInt8 save_nullmap = 0;
|
||||
if (isRightOrFull(kind) && null_map)
|
||||
{
|
||||
@ -1468,40 +1474,17 @@ struct AdderNonJoined
|
||||
|
||||
|
||||
/// Stream from not joined earlier rows of the right table.
|
||||
class NonJoinedBlockInputStream : private NotJoined, public IBlockInputStream
|
||||
class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
|
||||
{
|
||||
public:
|
||||
NonJoinedBlockInputStream(const HashJoin & parent_, const Block & result_sample_block_, UInt64 max_block_size_)
|
||||
: NotJoined(*parent_.table_join,
|
||||
parent_.savedBlockSample(),
|
||||
parent_.right_sample_block,
|
||||
result_sample_block_)
|
||||
, parent(parent_)
|
||||
, max_block_size(max_block_size_)
|
||||
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
|
||||
: parent(parent_), max_block_size(max_block_size_)
|
||||
{}
|
||||
|
||||
String getName() const override { return "NonJoined"; }
|
||||
Block getHeader() const override { return result_sample_block; }
|
||||
Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
size_t fillColumns(MutableColumns & columns_right) override
|
||||
{
|
||||
if (parent.data->blocks.empty())
|
||||
return Block();
|
||||
return createBlock();
|
||||
}
|
||||
|
||||
private:
|
||||
const HashJoin & parent;
|
||||
UInt64 max_block_size;
|
||||
|
||||
std::any position;
|
||||
std::optional<HashJoin::BlockNullmapList::const_iterator> nulls_position;
|
||||
|
||||
Block createBlock()
|
||||
{
|
||||
MutableColumns columns_right = saved_block_sample.cloneEmptyColumns();
|
||||
|
||||
size_t rows_added = 0;
|
||||
|
||||
auto fill_callback = [&](auto, auto strictness, auto & map)
|
||||
@ -1513,22 +1496,16 @@ private:
|
||||
throw Exception("Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
fillNullsFromBlocks(columns_right, rows_added);
|
||||
if (!rows_added)
|
||||
return {};
|
||||
|
||||
Block res = result_sample_block.cloneEmpty();
|
||||
addLeftColumns(res, rows_added);
|
||||
addRightColumns(res, columns_right);
|
||||
copySameKeys(res);
|
||||
correctLowcardAndNullability(res);
|
||||
|
||||
#ifndef NDEBUG
|
||||
assertBlocksHaveEqualStructure(res, result_sample_block, getName());
|
||||
#endif
|
||||
|
||||
return res;
|
||||
return rows_added;
|
||||
}
|
||||
|
||||
private:
|
||||
const HashJoin & parent;
|
||||
UInt64 max_block_size;
|
||||
|
||||
std::any position;
|
||||
std::optional<HashJoin::BlockNullmapList::const_iterator> nulls_position;
|
||||
|
||||
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
|
||||
size_t fillColumnsFromMap(const Maps & maps, MutableColumns & columns_keys_and_right)
|
||||
{
|
||||
@ -1607,15 +1584,18 @@ private:
|
||||
};
|
||||
|
||||
|
||||
BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
|
||||
std::shared_ptr<NotJoinedBlocks> HashJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const
|
||||
{
|
||||
if (table_join->strictness() == ASTTableJoin::Strictness::Asof ||
|
||||
table_join->strictness() == ASTTableJoin::Strictness::Semi)
|
||||
table_join->strictness() == ASTTableJoin::Strictness::Semi ||
|
||||
!isRightOrFull(table_join->kind()))
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
if (isRightOrFull(table_join->kind()))
|
||||
return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, max_block_size);
|
||||
return {};
|
||||
size_t left_columns_count = result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns();
|
||||
auto non_joined = std::make_unique<NotJoinedHash>(*this, max_block_size);
|
||||
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
|
||||
}
|
||||
|
||||
void HashJoin::reuseJoinedData(const HashJoin & join)
|
||||
|
@ -20,7 +20,6 @@
|
||||
#include <Columns/ColumnFixedString.h>
|
||||
|
||||
#include <DataStreams/SizeLimits.h>
|
||||
#include <DataStreams/IBlockStream_fwd.h>
|
||||
|
||||
#include <Core/Block.h>
|
||||
|
||||
@ -164,7 +163,7 @@ public:
|
||||
* Use only after all calls to joinBlock was done.
|
||||
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
|
||||
*/
|
||||
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
|
||||
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override;
|
||||
|
||||
/// Number of keys in all built JOIN maps.
|
||||
size_t getTotalRowCount() const final;
|
||||
@ -337,7 +336,7 @@ public:
|
||||
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
|
||||
|
||||
private:
|
||||
friend class NonJoinedBlockInputStream;
|
||||
friend class NotJoinedHash;
|
||||
friend class JoinSource;
|
||||
|
||||
std::shared_ptr<TableJoin> table_join;
|
||||
|
@ -5,7 +5,6 @@
|
||||
|
||||
#include <Core/Names.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <DataStreams/IBlockStream_fwd.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -15,6 +14,7 @@ struct ExtraBlock;
|
||||
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
|
||||
|
||||
class TableJoin;
|
||||
class NotJoinedBlocks;
|
||||
|
||||
class IJoin
|
||||
{
|
||||
@ -43,7 +43,7 @@ public:
|
||||
/// Different query plan is used for such joins.
|
||||
virtual bool isFilled() const { return false; }
|
||||
|
||||
virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; }
|
||||
virtual std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block &, UInt64) const = 0;
|
||||
};
|
||||
|
||||
using JoinPtr = std::shared_ptr<IJoin>;
|
||||
|
@ -56,9 +56,9 @@ public:
|
||||
return join->alwaysReturnsEmptySet();
|
||||
}
|
||||
|
||||
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & block, UInt64 max_block_size) const override
|
||||
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & block, UInt64 max_block_size) const override
|
||||
{
|
||||
return join->createStreamWithNonJoinedRows(block, max_block_size);
|
||||
return join->getNonJoinedBlocks(block, max_block_size);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -74,38 +74,4 @@ private:
|
||||
void switchJoin();
|
||||
};
|
||||
|
||||
|
||||
/// Creates NonJoinedBlockInputStream on the first read. Allows to swap join algo before it.
|
||||
class LazyNonJoinedBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
LazyNonJoinedBlockInputStream(const IJoin & join_, const Block & block, UInt64 max_block_size_)
|
||||
: join(join_)
|
||||
, result_sample_block(block)
|
||||
, max_block_size(max_block_size_)
|
||||
{}
|
||||
|
||||
String getName() const override { return "LazyNonMergeJoined"; }
|
||||
Block getHeader() const override { return result_sample_block; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
if (!stream)
|
||||
{
|
||||
stream = join.createStreamWithNonJoinedRows(result_sample_block, max_block_size);
|
||||
if (!stream)
|
||||
return {};
|
||||
}
|
||||
|
||||
return stream->read();
|
||||
}
|
||||
|
||||
private:
|
||||
BlockInputStreamPtr stream;
|
||||
const IJoin & join;
|
||||
Block result_sample_block;
|
||||
UInt64 max_block_size;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,8 @@
|
||||
#include <limits>
|
||||
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Columns/ColumnLowCardinality.h>
|
||||
|
||||
#include <Core/SortCursor.h>
|
||||
#include <DataStreams/TemporaryFileStream.h>
|
||||
#include <DataStreams/materializeBlock.h>
|
||||
@ -723,15 +724,7 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
|
||||
if (needConditionJoinColumn())
|
||||
block.erase(deriveTempName(mask_column_name_left));
|
||||
|
||||
for (const auto & column_name : lowcard_keys)
|
||||
{
|
||||
if (!block.has(column_name))
|
||||
continue;
|
||||
if (auto & col = block.getByName(column_name); !col.type->lowCardinality())
|
||||
JoinCommon::changeLowCardinalityInplace(col);
|
||||
}
|
||||
|
||||
JoinCommon::restoreLowCardinalityInplace(block);
|
||||
JoinCommon::restoreLowCardinalityInplace(block, lowcard_keys);
|
||||
}
|
||||
|
||||
template <bool in_memory, bool is_all>
|
||||
@ -1035,55 +1028,16 @@ void MergeJoin::initRightTableWriter()
|
||||
}
|
||||
|
||||
/// Stream from not joined earlier rows of the right table.
|
||||
class NonMergeJoinedBlockInputStream : private NotJoined, public IBlockInputStream
|
||||
class NotJoinedMerge final : public NotJoinedBlocks::RightColumnsFiller
|
||||
{
|
||||
public:
|
||||
NonMergeJoinedBlockInputStream(const MergeJoin & parent_,
|
||||
const Block & result_sample_block_,
|
||||
const Names & key_names_right_,
|
||||
UInt64 max_block_size_)
|
||||
: NotJoined(*parent_.table_join,
|
||||
parent_.modifyRightBlock(parent_.right_sample_block),
|
||||
parent_.right_sample_block,
|
||||
result_sample_block_,
|
||||
{}, key_names_right_)
|
||||
, parent(parent_)
|
||||
, max_block_size(max_block_size_)
|
||||
NotJoinedMerge(const MergeJoin & parent_, UInt64 max_block_size_)
|
||||
: parent(parent_), max_block_size(max_block_size_)
|
||||
{}
|
||||
|
||||
String getName() const override { return "NonMergeJoined"; }
|
||||
Block getHeader() const override { return result_sample_block; }
|
||||
Block getEmptyBlock() override { return parent.modifyRightBlock(parent.right_sample_block).cloneEmpty(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
if (parent.getRightBlocksCount())
|
||||
return createBlock();
|
||||
return {};
|
||||
}
|
||||
|
||||
private:
|
||||
const MergeJoin & parent;
|
||||
size_t max_block_size;
|
||||
size_t block_number = 0;
|
||||
|
||||
Block createBlock()
|
||||
{
|
||||
MutableColumns columns_right = saved_block_sample.cloneEmptyColumns();
|
||||
|
||||
size_t rows_added = fillColumns(columns_right);
|
||||
if (!rows_added)
|
||||
return {};
|
||||
|
||||
Block res = result_sample_block.cloneEmpty();
|
||||
addLeftColumns(res, rows_added);
|
||||
addRightColumns(res, columns_right);
|
||||
copySameKeys(res);
|
||||
correctLowcardAndNullability(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
size_t fillColumns(MutableColumns & columns_right)
|
||||
size_t fillColumns(MutableColumns & columns_right) override
|
||||
{
|
||||
const RowBitmaps & bitmaps = *parent.used_rows_bitmap;
|
||||
size_t rows_added = 0;
|
||||
@ -1127,14 +1081,23 @@ private:
|
||||
|
||||
return rows_added;
|
||||
}
|
||||
|
||||
private:
|
||||
const MergeJoin & parent;
|
||||
size_t max_block_size;
|
||||
size_t block_number = 0;
|
||||
};
|
||||
|
||||
|
||||
BlockInputStreamPtr MergeJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
|
||||
std::shared_ptr<NotJoinedBlocks> MergeJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const
|
||||
{
|
||||
if (table_join->strictness() == ASTTableJoin::Strictness::All && (is_right || is_full))
|
||||
return std::make_shared<NonMergeJoinedBlockInputStream>(*this, result_sample_block, key_names_right, max_block_size);
|
||||
return {};
|
||||
{
|
||||
size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns();
|
||||
auto non_joined = std::make_unique<NotJoinedMerge>(*this, max_block_size);
|
||||
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool MergeJoin::needConditionJoinColumn() const
|
||||
|
@ -35,10 +35,10 @@ public:
|
||||
/// Has to be called only after setTotals()/mergeRightBlocks()
|
||||
bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); }
|
||||
|
||||
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
|
||||
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override;
|
||||
|
||||
private:
|
||||
friend class NonMergeJoinedBlockInputStream;
|
||||
friend class NotJoinedMerge;
|
||||
|
||||
struct NotProcessed : public ExtraBlock
|
||||
{
|
||||
@ -78,6 +78,7 @@ private:
|
||||
SortDescription right_merge_description;
|
||||
Block right_sample_block;
|
||||
Block right_table_keys;
|
||||
/// Columns from right side of join, both key and additional
|
||||
Block right_columns_to_add;
|
||||
SortedBlocksWriter::Blocks right_blocks;
|
||||
|
||||
|
@ -454,6 +454,24 @@ void TableJoin::addJoinCondition(const ASTPtr & ast, bool is_left)
|
||||
on_filter_condition_asts_right.push_back(ast);
|
||||
}
|
||||
|
||||
std::unordered_map<String, String> TableJoin::leftToRightKeyRemap() const
|
||||
{
|
||||
std::unordered_map<String, String> left_to_right_key_remap;
|
||||
if (hasUsing())
|
||||
{
|
||||
const auto & required_right_keys = requiredRightKeys();
|
||||
for (size_t i = 0; i < key_names_left.size(); ++i)
|
||||
{
|
||||
const String & left_key_name = key_names_left[i];
|
||||
const String & right_key_name = key_names_right[i];
|
||||
|
||||
if (!required_right_keys.contains(right_key_name))
|
||||
left_to_right_key_remap[left_key_name] = right_key_name;
|
||||
}
|
||||
}
|
||||
return left_to_right_key_remap;
|
||||
}
|
||||
|
||||
/// Returns all conditions related to one table joined with 'and' function
|
||||
static ASTPtr buildJoinConditionColumn(const ASTs & on_filter_condition_asts)
|
||||
{
|
||||
|
@ -229,6 +229,7 @@ public:
|
||||
Block getRequiredRightKeys(const Block & right_table_keys, std::vector<String> & keys_sources) const;
|
||||
|
||||
String renamedRightColumnName(const String & name) const;
|
||||
std::unordered_map<String, String> leftToRightKeyRemap() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -314,8 +314,16 @@ void removeLowCardinalityInplace(Block & block, const Names & names, bool change
|
||||
}
|
||||
}
|
||||
|
||||
void restoreLowCardinalityInplace(Block & block)
|
||||
void restoreLowCardinalityInplace(Block & block, const Names & lowcard_keys)
|
||||
{
|
||||
for (const auto & column_name : lowcard_keys)
|
||||
{
|
||||
if (!block.has(column_name))
|
||||
continue;
|
||||
if (auto & col = block.getByName(column_name); !col.type->lowCardinality())
|
||||
JoinCommon::changeLowCardinalityInplace(col);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
{
|
||||
auto & col = block.getByPosition(i);
|
||||
@ -484,49 +492,23 @@ void splitAdditionalColumns(const Names & key_names, const Block & sample_block,
|
||||
|
||||
}
|
||||
|
||||
|
||||
NotJoined::NotJoined(const TableJoin & table_join, const Block & saved_block_sample_, const Block & right_sample_block,
|
||||
const Block & result_sample_block_, const Names & key_names_left_, const Names & key_names_right_)
|
||||
: saved_block_sample(saved_block_sample_)
|
||||
NotJoinedBlocks::NotJoinedBlocks(std::unique_ptr<RightColumnsFiller> filler_,
|
||||
const Block & result_sample_block_,
|
||||
size_t left_columns_count,
|
||||
const LeftToRightKeyRemap & left_to_right_key_remap)
|
||||
: filler(std::move(filler_))
|
||||
, saved_block_sample(filler->getEmptyBlock())
|
||||
, result_sample_block(materializeBlock(result_sample_block_))
|
||||
, key_names_left(key_names_left_.empty() ? table_join.keyNamesLeft() : key_names_left_)
|
||||
, key_names_right(key_names_right_.empty() ? table_join.keyNamesRight() : key_names_right_)
|
||||
{
|
||||
std::vector<String> tmp;
|
||||
Block right_table_keys;
|
||||
Block sample_block_with_columns_to_add;
|
||||
|
||||
JoinCommon::splitAdditionalColumns(key_names_right, right_sample_block, right_table_keys,
|
||||
sample_block_with_columns_to_add);
|
||||
Block required_right_keys = table_join.getRequiredRightKeys(right_table_keys, tmp);
|
||||
|
||||
std::unordered_map<size_t, size_t> left_to_right_key_remap;
|
||||
|
||||
if (table_join.hasUsing())
|
||||
{
|
||||
for (size_t i = 0; i < key_names_left.size(); ++i)
|
||||
{
|
||||
const String & left_key_name = key_names_left[i];
|
||||
const String & right_key_name = key_names_right[i];
|
||||
|
||||
size_t left_key_pos = result_sample_block.getPositionByName(left_key_name);
|
||||
size_t right_key_pos = saved_block_sample.getPositionByName(right_key_name);
|
||||
|
||||
if (!required_right_keys.has(right_key_name))
|
||||
left_to_right_key_remap[left_key_pos] = right_key_pos;
|
||||
}
|
||||
}
|
||||
|
||||
/// result_sample_block: left_sample_block + left expressions, right not key columns, required right keys
|
||||
size_t left_columns_count = result_sample_block.columns() -
|
||||
sample_block_with_columns_to_add.columns() - required_right_keys.columns();
|
||||
|
||||
for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos)
|
||||
{
|
||||
/// We need right 'x' for 'RIGHT JOIN ... USING(x)'.
|
||||
if (left_to_right_key_remap.count(left_pos))
|
||||
/// We need right 'x' for 'RIGHT JOIN ... USING(x)'
|
||||
auto left_name = result_sample_block.getByPosition(left_pos).name;
|
||||
const auto & right_key = left_to_right_key_remap.find(left_name);
|
||||
if (right_key != left_to_right_key_remap.end())
|
||||
{
|
||||
size_t right_key_pos = left_to_right_key_remap[left_pos];
|
||||
size_t right_key_pos = saved_block_sample.getPositionByName(right_key->second);
|
||||
setRightIndex(right_key_pos, left_pos);
|
||||
}
|
||||
else
|
||||
@ -556,9 +538,9 @@ NotJoined::NotJoined(const TableJoin & table_join, const Block & saved_block_sam
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void NotJoined::setRightIndex(size_t right_pos, size_t result_position)
|
||||
void NotJoinedBlocks::setRightIndex(size_t right_pos, size_t result_position)
|
||||
{
|
||||
if (!column_indices_right.count(right_pos))
|
||||
if (!column_indices_right.contains(right_pos))
|
||||
{
|
||||
column_indices_right[right_pos] = result_position;
|
||||
extractColumnChanges(right_pos, result_position);
|
||||
@ -567,7 +549,7 @@ void NotJoined::setRightIndex(size_t right_pos, size_t result_position)
|
||||
same_result_keys[result_position] = column_indices_right[right_pos];
|
||||
}
|
||||
|
||||
void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos)
|
||||
void NotJoinedBlocks::extractColumnChanges(size_t right_pos, size_t result_pos)
|
||||
{
|
||||
auto src_props = getLowcardAndNullability(saved_block_sample.getByPosition(right_pos).column);
|
||||
auto dst_props = getLowcardAndNullability(result_sample_block.getByPosition(result_pos).column);
|
||||
@ -579,7 +561,7 @@ void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos)
|
||||
right_lowcard_changes.push_back({result_pos, dst_props.is_lowcard});
|
||||
}
|
||||
|
||||
void NotJoined::correctLowcardAndNullability(Block & block)
|
||||
void NotJoinedBlocks::correctLowcardAndNullability(Block & block)
|
||||
{
|
||||
for (auto & [pos, added] : right_nullability_changes)
|
||||
{
|
||||
@ -607,7 +589,7 @@ void NotJoined::correctLowcardAndNullability(Block & block)
|
||||
}
|
||||
}
|
||||
|
||||
void NotJoined::addLeftColumns(Block & block, size_t rows_added) const
|
||||
void NotJoinedBlocks::addLeftColumns(Block & block, size_t rows_added) const
|
||||
{
|
||||
for (size_t pos : column_indices_left)
|
||||
{
|
||||
@ -619,7 +601,7 @@ void NotJoined::addLeftColumns(Block & block, size_t rows_added) const
|
||||
}
|
||||
}
|
||||
|
||||
void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) const
|
||||
void NotJoinedBlocks::addRightColumns(Block & block, MutableColumns & columns_right) const
|
||||
{
|
||||
for (const auto & pr : column_indices_right)
|
||||
{
|
||||
@ -629,7 +611,7 @@ void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) c
|
||||
}
|
||||
}
|
||||
|
||||
void NotJoined::copySameKeys(Block & block) const
|
||||
void NotJoinedBlocks::copySameKeys(Block & block) const
|
||||
{
|
||||
for (const auto & pr : same_result_keys)
|
||||
{
|
||||
@ -639,4 +621,26 @@ void NotJoined::copySameKeys(Block & block) const
|
||||
}
|
||||
}
|
||||
|
||||
Block NotJoinedBlocks::read()
|
||||
{
|
||||
Block result_block = result_sample_block.cloneEmpty();
|
||||
{
|
||||
Block right_block = filler->getEmptyBlock();
|
||||
MutableColumns columns_right = right_block.cloneEmptyColumns();
|
||||
size_t rows_added = filler->fillColumns(columns_right);
|
||||
if (rows_added == 0)
|
||||
return {};
|
||||
|
||||
addLeftColumns(result_block, rows_added);
|
||||
addRightColumns(result_block, columns_right);
|
||||
}
|
||||
copySameKeys(result_block);
|
||||
correctLowcardAndNullability(result_block);
|
||||
|
||||
#ifndef NDEBUG
|
||||
assertBlocksHaveEqualStructure(result_block, result_sample_block, "NotJoinedBlocks");
|
||||
#endif
|
||||
return result_block;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names);
|
||||
ColumnRawPtrs getRawPointers(const Columns & columns);
|
||||
void removeLowCardinalityInplace(Block & block);
|
||||
void removeLowCardinalityInplace(Block & block, const Names & names, bool change_type = true);
|
||||
void restoreLowCardinalityInplace(Block & block);
|
||||
void restoreLowCardinalityInplace(Block & block, const Names & lowcard_keys);
|
||||
|
||||
ColumnRawPtrs extractKeysForJoin(const Block & block_keys, const Names & key_names_right);
|
||||
|
||||
@ -64,40 +64,58 @@ void changeLowCardinalityInplace(ColumnWithTypeAndName & column);
|
||||
}
|
||||
|
||||
/// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table.
|
||||
class NotJoined
|
||||
class NotJoinedBlocks final
|
||||
{
|
||||
public:
|
||||
NotJoined(const TableJoin & table_join, const Block & saved_block_sample_, const Block & right_sample_block,
|
||||
const Block & result_sample_block_, const Names & key_names_left_ = {}, const Names & key_names_right_ = {});
|
||||
using LeftToRightKeyRemap = std::unordered_map<String, String>;
|
||||
|
||||
/// Returns non joined columns from right part of join
|
||||
class RightColumnsFiller
|
||||
{
|
||||
public:
|
||||
/// Create empty block for right part
|
||||
virtual Block getEmptyBlock() = 0;
|
||||
/// Fill columns from right part of join with not joined rows
|
||||
virtual size_t fillColumns(MutableColumns & columns_right) = 0;
|
||||
|
||||
virtual ~RightColumnsFiller() = default;
|
||||
};
|
||||
|
||||
NotJoinedBlocks(std::unique_ptr<RightColumnsFiller> filler_,
|
||||
const Block & result_sample_block_,
|
||||
size_t left_columns_count,
|
||||
const LeftToRightKeyRemap & left_to_right_key_remap);
|
||||
|
||||
Block read();
|
||||
|
||||
private:
|
||||
void extractColumnChanges(size_t right_pos, size_t result_pos);
|
||||
void correctLowcardAndNullability(Block & block);
|
||||
void addLeftColumns(Block & block, size_t rows_added) const;
|
||||
void addRightColumns(Block & block, MutableColumns & columns_right) const;
|
||||
void copySameKeys(Block & block) const;
|
||||
|
||||
protected:
|
||||
std::unique_ptr<RightColumnsFiller> filler;
|
||||
|
||||
/// Right block saved in Join
|
||||
Block saved_block_sample;
|
||||
|
||||
/// Output of join
|
||||
Block result_sample_block;
|
||||
|
||||
Names key_names_left;
|
||||
Names key_names_right;
|
||||
|
||||
~NotJoined() = default;
|
||||
|
||||
private:
|
||||
/// Indices of columns in result_sample_block that should be generated
|
||||
std::vector<size_t> column_indices_left;
|
||||
/// Indices of columns that come from the right-side table: right_pos -> result_pos
|
||||
std::unordered_map<size_t, size_t> column_indices_right;
|
||||
///
|
||||
|
||||
std::unordered_map<size_t, size_t> same_result_keys;
|
||||
/// Which right columns (saved in parent) need nullability change before placing them in result block
|
||||
|
||||
/// Which right columns (saved in parent) need Nullability/LowCardinality change
|
||||
/// before placing them in result block
|
||||
std::vector<std::pair<size_t, bool>> right_nullability_changes;
|
||||
/// Which right columns (saved in parent) need LowCardinality change before placing them in result block
|
||||
std::vector<std::pair<size_t, bool>> right_lowcard_changes;
|
||||
|
||||
void setRightIndex(size_t right_pos, size_t result_position);
|
||||
void extractColumnChanges(size_t right_pos, size_t result_pos);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
#include <Processors/Transforms/JoiningTransform.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/join_common.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
|
||||
|
||||
@ -114,7 +113,7 @@ void JoiningTransform::work()
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!non_joined_stream)
|
||||
if (!non_joined_blocks)
|
||||
{
|
||||
if (!finish_counter || !finish_counter->isLast())
|
||||
{
|
||||
@ -122,15 +121,15 @@ void JoiningTransform::work()
|
||||
return;
|
||||
}
|
||||
|
||||
non_joined_stream = join->createStreamWithNonJoinedRows(outputs.front().getHeader(), max_block_size);
|
||||
if (!non_joined_stream)
|
||||
non_joined_blocks = join->getNonJoinedBlocks(outputs.front().getHeader(), max_block_size);
|
||||
if (!non_joined_blocks)
|
||||
{
|
||||
process_non_joined = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
auto block = non_joined_stream->read();
|
||||
Block block = non_joined_blocks->read();
|
||||
if (!block)
|
||||
{
|
||||
process_non_joined = false;
|
||||
|
@ -8,8 +8,7 @@ namespace DB
|
||||
class IJoin;
|
||||
using JoinPtr = std::shared_ptr<IJoin>;
|
||||
|
||||
class IBlockInputStream;
|
||||
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
|
||||
class NotJoinedBlocks;
|
||||
|
||||
/// Join rows to chunk form left table.
|
||||
/// This transform usually has two input ports and one output.
|
||||
@ -76,7 +75,7 @@ private:
|
||||
ExtraBlockPtr not_processed;
|
||||
|
||||
FinishCounterPtr finish_counter;
|
||||
BlockInputStreamPtr non_joined_stream;
|
||||
std::shared_ptr<NotJoinedBlocks> non_joined_blocks;
|
||||
size_t max_block_size;
|
||||
|
||||
Block readExecute(Chunk & chunk);
|
||||
|
Loading…
Reference in New Issue
Block a user