Merge pull request #6940 from 4ertus2/mjoin

PartialMergeJoin
This commit is contained in:
Artem Zuikov 2019-09-18 16:16:10 +03:00 committed by GitHub
commit aae82eed5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1318 additions and 352 deletions

View File

@ -156,6 +156,18 @@ void ColumnNullable::insertFrom(const IColumn & src, size_t n)
getNullMapData().push_back(src_concrete.getNullMapData()[n]);
}
void ColumnNullable::insertFromNotNullable(const IColumn & src, size_t n)
{
getNestedColumn().insertFrom(src, n);
getNullMapData().push_back(0);
}
void ColumnNullable::insertRangeFromNotNullable(const IColumn & src, size_t start, size_t length)
{
getNestedColumn().insertRangeFrom(src, start, length);
getNullMapData().resize_fill(getNullMapData().size() + length, 0);
}
void ColumnNullable::popBack(size_t n)
{
getNestedColumn().popBack(n);

View File

@ -61,6 +61,9 @@ public:
void insert(const Field & x) override;
void insertFrom(const IColumn & src, size_t n) override;
void insertFromNotNullable(const IColumn & src, size_t n);
void insertRangeFromNotNullable(const IColumn & src, size_t start, size_t length);
void insertDefault() override
{
getNestedColumn().insertDefault();

View File

@ -288,6 +288,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).") \
M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.") \
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join if possible.") \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \

View File

@ -1,5 +1,4 @@
#include <Interpreters/Set.h>
#include <Interpreters/Join.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
@ -124,12 +123,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (!done_with_join)
{
subquery.renameColumns(block);
if (subquery.joined_block_actions)
subquery.joined_block_actions->execute(block);
if (!subquery.join->insertFromBlock(block))
if (!subquery.insertJoinedBlock(block))
done_with_join = true;
}
@ -162,8 +156,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
head_rows = profile_info.rows;
if (subquery.join)
subquery.join->setTotals(subquery.source->getTotals());
subquery.setTotals();
if (head_rows != 0)
{

View File

@ -20,4 +20,10 @@ Block materializeBlock(const Block & block)
return res;
}
void materializeBlockInplace(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->convertToFullColumnIfConst();
}
}

View File

@ -9,5 +9,6 @@ namespace DB
/** Converts columns-constants to full columns ("materializes" them).
*/
Block materializeBlock(const Block & block);
void materializeBlockInplace(Block & block);
}

View File

@ -4,17 +4,18 @@
namespace DB
{
class Context;
class Join;
using JoinPtr = std::shared_ptr<Join>;
using HashJoinPtr = std::shared_ptr<Join>;
class FunctionJoinGet final : public IFunction
{
public:
static constexpr auto name = "joinGet";
FunctionJoinGet(
TableStructureReadLockHolder table_lock_, StoragePtr storage_join_, JoinPtr join_, const String & attr_name_, DataTypePtr return_type_)
FunctionJoinGet(TableStructureReadLockHolder table_lock_, StoragePtr storage_join_, HashJoinPtr join_, const String & attr_name_,
DataTypePtr return_type_)
: table_lock(std::move(table_lock_))
, storage_join(std::move(storage_join_))
, join(std::move(join_))
@ -36,7 +37,7 @@ private:
private:
TableStructureReadLockHolder table_lock;
StoragePtr storage_join;
JoinPtr join;
HashJoinPtr join;
const String attr_name;
DataTypePtr return_type;
};

View File

@ -2,11 +2,13 @@
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/Join.h>
#include <Interpreters/MergeJoin.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Core/Settings.h>
#include <Core/Block.h>
#include <Storages/IStorage.h>
@ -16,6 +18,17 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
AnalyzedJoin::AnalyzedJoin(const Settings & settings)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, join_use_nulls(settings.join_use_nulls)
, partial_merge_join(settings.partial_merge_join)
{}
void AnalyzedJoin::addUsingKey(const ASTPtr & ast)
{
key_names_left.push_back(ast->getColumnName());
@ -129,6 +142,16 @@ Names AnalyzedJoin::requiredJoinedNames() const
return Names(required_columns_set.begin(), required_columns_set.end());
}
NameSet AnalyzedJoin::requiredRightKeys() const
{
NameSet required;
for (const auto & name : key_names_right)
for (const auto & column : columns_added_by_join)
if (name == column.name)
required.insert(name);
return required;
}
NamesWithAliases AnalyzedJoin::getRequiredColumns(const Block & sample, const Names & action_required_columns) const
{
NameSet required_columns(action_required_columns.begin(), action_required_columns.end());
@ -209,37 +232,7 @@ bool AnalyzedJoin::sameJoin(const AnalyzedJoin * x, const AnalyzedJoin * y)
&& x->table_join.strictness == y->table_join.strictness
&& x->key_names_left == y->key_names_left
&& x->key_names_right == y->key_names_right
&& x->columns_added_by_join == y->columns_added_by_join
&& x->hash_join == y->hash_join;
}
BlockInputStreamPtr AnalyzedJoin::createStreamWithNonJoinedDataIfFullOrRightJoin(const Block & source_header, UInt64 max_block_size) const
{
if (isRightOrFull(table_join.kind))
return hash_join->createStreamWithNonJoinedRows(source_header, *this, max_block_size);
return {};
}
JoinPtr AnalyzedJoin::makeHashJoin(const Block & sample_block, const SizeLimits & size_limits_for_join) const
{
auto join = std::make_shared<Join>(key_names_right, join_use_nulls, size_limits_for_join, table_join.kind, table_join.strictness);
join->setSampleBlock(sample_block);
return join;
}
void AnalyzedJoin::joinBlock(Block & block) const
{
hash_join->joinBlock(block, *this);
}
void AnalyzedJoin::joinTotals(Block & block) const
{
hash_join->joinTotals(block);
}
bool AnalyzedJoin::hasTotals() const
{
return hash_join->hasTotals();
&& x->columns_added_by_join == y->columns_added_by_join;
}
NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context)
@ -267,4 +260,11 @@ NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpressio
return names_and_type_list;
}
JoinPtr makeJoin(std::shared_ptr<AnalyzedJoin> table_join, const Block & right_sample_block)
{
if (table_join->partial_merge_join)
return std::make_shared<MergeJoin>(table_join, right_sample_block);
return std::make_shared<Join>(table_join, right_sample_block);
}
}

View File

@ -4,7 +4,9 @@
#include <Core/NamesAndTypes.h>
#include <Core/SettingsCommon.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/IJoin.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <utility>
#include <memory>
@ -17,8 +19,7 @@ class ASTSelectQuery;
struct DatabaseAndTableWithAlias;
class Block;
class Join;
using JoinPtr = std::shared_ptr<Join>;
struct Settings;
class AnalyzedJoin
{
@ -36,12 +37,15 @@ class AnalyzedJoin
friend class SyntaxAnalyzer;
const SizeLimits size_limits;
const bool join_use_nulls;
const bool partial_merge_join;
Names key_names_left;
Names key_names_right; /// Duplicating names are qualified.
ASTs key_asts_left;
ASTs key_asts_right;
ASTTableJoin table_join;
bool join_use_nulls = false;
/// All columns which can be read from joined table. Duplicating names are qualified.
NamesAndTypesList columns_from_joined_table;
@ -53,9 +57,28 @@ class AnalyzedJoin
/// Original name -> name. Only ranamed columns.
std::unordered_map<String, String> renames;
JoinPtr hash_join;
public:
AnalyzedJoin(const Settings &);
/// for StorageJoin
AnalyzedJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness,
const Names & key_names_right_)
: size_limits(limits)
, join_use_nulls(use_nulls)
, partial_merge_join(false)
, key_names_right(key_names_right_)
{
table_join.kind = kind;
table_join.strictness = strictness;
}
ASTTableJoin::Kind kind() const { return table_join.kind; }
ASTTableJoin::Strictness strictness() const { return table_join.strictness; }
const SizeLimits & sizeLimits() const { return size_limits; }
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }
bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); }
void addUsingKey(const ASTPtr & ast);
void addOnKeys(ASTPtr & left_table_ast, ASTPtr & right_table_ast);
@ -69,6 +92,7 @@ public:
void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);
size_t rightKeyInclusion(const String & name) const;
NameSet requiredRightKeys() const;
void addJoinedColumn(const NameAndTypePair & joined_column);
void addJoinedColumnsAndCorrectNullability(Block & sample_block) const;
@ -78,17 +102,12 @@ public:
Names requiredJoinedNames() const;
const Names & keyNamesLeft() const { return key_names_left; }
const Names & keyNamesRight() const { return key_names_right; }
const NamesAndTypesList & columnsFromJoinedTable() const { return columns_from_joined_table; }
const NamesAndTypesList & columnsAddedByJoin() const { return columns_added_by_join; }
void setHashJoin(JoinPtr join) { hash_join = join; }
JoinPtr makeHashJoin(const Block & sample_block, const SizeLimits & size_limits_for_join) const;
BlockInputStreamPtr createStreamWithNonJoinedDataIfFullOrRightJoin(const Block & source_header, UInt64 max_block_size) const;
void joinBlock(Block & block) const;
void joinTotals(Block & block) const;
bool hasTotals() const;
static bool sameJoin(const AnalyzedJoin * x, const AnalyzedJoin * y);
friend JoinPtr makeJoin(std::shared_ptr<AnalyzedJoin> table_join, const Block & right_sample_block);
};
struct ASTTableExpression;

View File

@ -160,11 +160,12 @@ ExpressionAction ExpressionAction::arrayJoin(const NameSet & array_joined_column
return a;
}
ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr<AnalyzedJoin> table_join)
ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr<AnalyzedJoin> table_join, JoinPtr join)
{
ExpressionAction a;
a.type = JOIN;
a.table_join = table_join;
a.join = join;
return a;
}
@ -475,7 +476,7 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
case JOIN:
{
table_join->joinBlock(block);
join->joinBlock(block);
break;
}
@ -543,7 +544,7 @@ void ExpressionAction::executeOnTotals(Block & block) const
if (type != JOIN)
execute(block, false);
else
table_join->joinTotals(block);
join->joinTotals(block);
}
@ -763,7 +764,7 @@ void ExpressionActions::execute(Block & block, bool dry_run) const
bool ExpressionActions::hasTotalsInJoin() const
{
for (const auto & action : actions)
if (action.table_join && action.table_join->hasTotals())
if (action.table_join && action.join->hasTotals())
return true;
return false;
}
@ -1157,11 +1158,11 @@ void ExpressionActions::optimizeArrayJoin()
}
std::shared_ptr<const AnalyzedJoin> ExpressionActions::getTableJoin() const
JoinPtr ExpressionActions::getTableJoinAlgo() const
{
for (const auto & action : actions)
if (action.table_join)
return action.table_join;
if (action.join)
return action.join;
return {};
}

View File

@ -21,6 +21,8 @@ namespace ErrorCodes
}
class AnalyzedJoin;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class IPreparedFunction;
using PreparedFunctionPtr = std::shared_ptr<IPreparedFunction>;
@ -101,6 +103,7 @@ public:
/// For JOIN
std::shared_ptr<const AnalyzedJoin> table_join;
JoinPtr join;
/// For PROJECT.
NamesWithAliases projection;
@ -116,7 +119,7 @@ public:
static ExpressionAction project(const Names & projected_columns_);
static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_);
static ExpressionAction arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context);
static ExpressionAction ordinaryJoin(std::shared_ptr<AnalyzedJoin> join);
static ExpressionAction ordinaryJoin(std::shared_ptr<AnalyzedJoin> table_join, JoinPtr join);
/// Which columns necessary to perform this action.
Names getNeededColumns() const;
@ -232,7 +235,7 @@ public:
static std::string getSmallestColumn(const NamesAndTypesList & columns);
std::shared_ptr<const AnalyzedJoin> getTableJoin() const;
JoinPtr getTableJoinAlgo() const;
const Settings & getSettings() const { return settings; }

View File

@ -30,6 +30,7 @@
#include <Interpreters/ExternalDictionaries.h>
#include <Interpreters/Set.h>
#include <Interpreters/AnalyzedJoin.h>
#include <Interpreters/Join.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
@ -407,9 +408,9 @@ bool SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & cha
return true;
}
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions) const
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr join) const
{
actions->add(ExpressionAction::ordinaryJoin(syntax->analyzed_join));
actions->add(ExpressionAction::ordinaryJoin(syntax->analyzed_join, join));
}
bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types)
@ -418,13 +419,13 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, b
if (!ast_join)
return false;
makeTableJoin(*ast_join);
JoinPtr table_join = makeTableJoin(*ast_join);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions);
addJoinAction(step.actions);
addJoinAction(step.actions, table_join);
return true;
}
@ -464,40 +465,40 @@ static ExpressionActionsPtr createJoinedBlockActions(const Context & context, co
return ExpressionAnalyzer(expression_list, syntax_result, context).getActions(true, false);
}
void SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQueryElement & join_element)
JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQueryElement & join_element)
{
/// Two JOINs are not supported with the same subquery, but different USINGs.
auto join_hash = join_element.getTreeHash();
String join_subquery_id = toString(join_hash.first) + "_" + toString(join_hash.second);
SubqueryForSet & subquery_for_set = subqueries_for_sets[join_subquery_id];
SubqueryForSet & subquery_for_join = subqueries_for_sets[join_subquery_id];
/// Special case - if table name is specified on the right of JOIN, then the table has the type Join (the previously prepared mapping).
if (!subquery_for_set.join)
subquery_for_set.join = tryGetStorageJoin(join_element, context);
if (!subquery_for_join.join)
subquery_for_join.join = tryGetStorageJoin(join_element, context);
if (!subquery_for_set.join)
if (!subquery_for_join.join)
{
/// Actions which need to be calculated on joined block.
ExpressionActionsPtr joined_block_actions = createJoinedBlockActions(context, analyzedJoin());
if (!subquery_for_set.source)
makeSubqueryForJoin(join_element, joined_block_actions, subquery_for_set);
/// Test actions on sample block (early error detection)
Block sample_block = subquery_for_set.renamedSampleBlock();
joined_block_actions->execute(sample_block);
if (!subquery_for_join.source)
{
NamesWithAliases required_columns_with_aliases =
analyzedJoin().getRequiredColumns(joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns());
makeSubqueryForJoin(join_element, std::move(required_columns_with_aliases), subquery_for_join);
}
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_set.join = analyzedJoin().makeHashJoin(sample_block, settings.size_limits_for_join);
subquery_for_set.joined_block_actions = joined_block_actions;
subquery_for_join.setJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside
subquery_for_join.join = makeJoin(syntax->analyzed_join, subquery_for_join.sample_block);
}
syntax->analyzed_join->setHashJoin(subquery_for_set.join);
return subquery_for_join.join;
}
void SelectQueryExpressionAnalyzer::makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element,
const ExpressionActionsPtr & joined_block_actions,
NamesWithAliases && required_columns_with_aliases,
SubqueryForSet & subquery_for_set) const
{
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
@ -505,10 +506,6 @@ void SelectQueryExpressionAnalyzer::makeSubqueryForJoin(const ASTTablesInSelectQ
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
* - this function shows the expression JOIN _data1.
*/
NamesWithAliases required_columns_with_aliases =
analyzedJoin().getRequiredColumns(joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns());
Names original_columns;
for (auto & pr : required_columns_with_aliases)
original_columns.push_back(pr.first);

View File

@ -20,6 +20,8 @@ class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct ASTTableJoin;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class ASTFunction;
class ASTExpressionList;
@ -58,15 +60,11 @@ private:
struct ExtractedSettings
{
const bool use_index_for_in_with_subqueries;
const bool join_use_nulls;
const SizeLimits size_limits_for_set;
const SizeLimits size_limits_for_join;
ExtractedSettings(const Settings & settings_)
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries),
join_use_nulls(settings_.join_use_nulls),
size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode),
size_limits_for_join(settings_.max_rows_in_join, settings_.max_bytes_in_join, settings_.join_overflow_mode)
size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
{}
};
@ -127,7 +125,7 @@ protected:
void addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool is_left) const;
void addJoinAction(ExpressionActionsPtr & actions) const;
void addJoinAction(ExpressionActionsPtr & actions, JoinPtr = {}) const;
void getRootActions(const ASTPtr & ast, bool no_subqueries, ExpressionActionsPtr & actions, bool only_consts = false);
@ -219,8 +217,8 @@ private:
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
void makeTableJoin(const ASTTablesInSelectQueryElement & join_element);
void makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element, const ExpressionActionsPtr & joined_block_actions,
JoinPtr makeTableJoin(const ASTTablesInSelectQueryElement & join_element);
void makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element, NamesWithAliases && required_columns_with_aliases,
SubqueryForSet & subquery_for_set) const;
const ASTSelectQuery * getAggregatingQuery() const;

View File

@ -0,0 +1,39 @@
#pragma once
#include <memory>
#include <vector>
#include <Core/Names.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB
{
class Block;
class IJoin
{
public:
virtual ~IJoin() = default;
/// Add block of data from right hand of JOIN.
/// @returns false, if some limit was exceeded and you should not insert more data.
virtual bool addJoinedBlock(const Block & block) = 0;
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block) = 0;
virtual bool hasTotals() const { return false; }
virtual void setTotals(const Block & block) = 0;
virtual void joinTotals(Block & block) const = 0;
virtual size_t getTotalRowCount() const = 0;
virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; }
};
using JoinPtr = std::shared_ptr<IJoin>;
}

View File

@ -1118,9 +1118,9 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
}
if (auto join = expressions.before_join->getTableJoin())
if (JoinPtr join = expressions.before_join->getTableJoinAlgo())
{
if (auto stream = join->createStreamWithNonJoinedDataIfFullOrRightJoin(header_before_join, settings.max_block_size))
if (auto stream = join->createStreamWithNonJoinedRows(header_before_join, settings.max_block_size))
{
if constexpr (pipeline_with_processors)
{

View File

@ -10,6 +10,7 @@
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Join.h>
#include <Interpreters/join_common.h>
#include <Interpreters/AnalyzedJoin.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/NullableUtils.h>
@ -35,35 +36,12 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
static std::unordered_map<String, DataTypePtr> requiredRightKeys(const Names & key_names, const NamesAndTypesList & columns_added_by_join)
{
NameSet right_keys;
for (const auto & name : key_names)
right_keys.insert(name);
std::unordered_map<String, DataTypePtr> required;
for (const auto & column : columns_added_by_join)
if (right_keys.count(column.name))
required.insert({column.name, column.type});
return required;
}
static void convertColumnToNullable(ColumnWithTypeAndName & column)
{
if (column.type->isNullable() || !column.type->canBeInsideNullable())
return;
column.type = makeNullable(column.type);
if (column.column)
column.column = makeNullable(column.column);
}
/// Converts column to nullable if needed. No backward convertion.
static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable)
{
if (nullable)
convertColumnToNullable(column);
JoinCommon::convertColumnToNullable(column);
return std::move(column);
}
@ -71,7 +49,7 @@ static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column,
{
if (nullable)
{
convertColumnToNullable(column);
JoinCommon::convertColumnToNullable(column);
if (column.type->isNullable() && negative_null_map.size())
{
MutableColumnPtr mutable_column = (*std::move(column.column)).mutate();
@ -83,15 +61,18 @@ static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column,
}
Join::Join(const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits_,
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, bool any_take_last_row_)
: kind(kind_), strictness(strictness_),
key_names_right(key_names_right_),
use_nulls(use_nulls_),
any_take_last_row(any_take_last_row_),
log(&Logger::get("Join")),
limits(limits_)
Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
, key_names_right(table_join->keyNamesRight())
, required_right_keys(table_join->requiredRightKeys())
, nullable_right_side(table_join->forceNullableRight())
, nullable_left_side(table_join->forceNullableLeft())
, any_take_last_row(any_take_last_row_)
, log(&Logger::get("Join"))
{
setSampleBlock(right_sample_block);
}
@ -269,42 +250,15 @@ size_t Join::getTotalByteCount() const
void Join::setSampleBlock(const Block & block)
{
std::unique_lock lock(rwlock);
/// You have to restore this lock if you call the fuction outside of ctor.
//std::unique_lock lock(rwlock);
LOG_DEBUG(log, "setSampleBlock: " << block.dumpStructure());
if (!empty())
return;
size_t keys_size = key_names_right.size();
ColumnRawPtrs key_columns(keys_size);
sample_block_with_columns_to_add = materializeBlock(block);
for (size_t i = 0; i < keys_size; ++i)
{
const String & column_name = key_names_right[i];
/// there could be the same key names
if (sample_block_with_keys.has(column_name))
{
key_columns[i] = sample_block_with_keys.getByName(column_name).column.get();
continue;
}
auto & col = sample_block_with_columns_to_add.getByName(column_name);
col.column = recursiveRemoveLowCardinality(col.column);
col.type = recursiveRemoveLowCardinality(col.type);
/// Extract right keys with correct keys order.
sample_block_with_keys.insert(col);
sample_block_with_columns_to_add.erase(column_name);
key_columns[i] = sample_block_with_keys.getColumns().back().get();
/// We will join only keys, where all components are not NULL.
if (auto * nullable = checkAndGetColumn<ColumnNullable>(*key_columns[i]))
key_columns[i] = &nullable->getNestedColumn();
}
ColumnRawPtrs key_columns = JoinCommon::extractKeysForJoin(key_names_right, block, right_table_keys, sample_block_with_columns_to_add);
if (strictness == ASTTableJoin::Strictness::Asof)
{
@ -343,19 +297,10 @@ void Join::setSampleBlock(const Block & block)
blocklist_sample = Block(block.getColumnsWithTypeAndName());
prepareBlockListStructure(blocklist_sample);
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
JoinCommon::createMissedColumns(sample_block_with_columns_to_add);
for (size_t i = 0; i < num_columns_to_add; ++i)
{
auto & column = sample_block_with_columns_to_add.getByPosition(i);
if (!column.column)
column.column = column.type->createColumn();
}
/// In case of LEFT and FULL joins, if use_nulls, convert joined columns to Nullable.
if (use_nulls && isLeftOrFull(kind))
for (size_t i = 0; i < num_columns_to_add; ++i)
convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i));
if (nullable_right_side)
JoinCommon::convertColumnsToNullable(sample_block_with_columns_to_add);
}
namespace
@ -504,26 +449,16 @@ void Join::prepareBlockListStructure(Block & stored_block)
}
}
bool Join::insertFromBlock(const Block & block)
bool Join::addJoinedBlock(const Block & block)
{
std::unique_lock lock(rwlock);
if (empty())
throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR);
size_t keys_size = key_names_right.size();
ColumnRawPtrs key_columns(keys_size);
/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
Columns materialized_columns;
materialized_columns.reserve(keys_size);
/// Memoize key columns to work.
for (size_t i = 0; i < keys_size; ++i)
{
materialized_columns.emplace_back(recursiveRemoveLowCardinality(block.getByName(key_names_right[i]).column->convertToFullColumnIfConst()));
key_columns[i] = materialized_columns.back().get();
}
ColumnRawPtrs key_columns = JoinCommon::temporaryMaterializeColumns(block, key_names_right, materialized_columns);
/// We will insert to the map only keys, where all components are not NULL.
ConstNullMapPtr null_map{};
@ -536,20 +471,11 @@ bool Join::insertFromBlock(const Block & block)
prepareBlockListStructure(*stored_block);
size_t size = stored_block->columns();
/// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them.
for (size_t i = 0; i < size; ++i)
stored_block->safeGetByPosition(i).column = stored_block->safeGetByPosition(i).column->convertToFullColumnIfConst();
materializeBlockInplace(*stored_block);
/// In case of LEFT and FULL joins, if use_nulls, convert joined columns to Nullable.
if (use_nulls && isLeftOrFull(kind))
{
for (size_t i = isFull(kind) ? keys_size : 0; i < size; ++i)
{
convertColumnToNullable(stored_block->getByPosition(i));
}
}
if (nullable_right_side)
JoinCommon::convertColumnsToNullable(*stored_block, (isFull(kind) ? key_names_right.size() : 0));
if (kind != ASTTableJoin::Kind::Cross)
{
@ -570,7 +496,7 @@ bool Join::insertFromBlock(const Block & block)
blocks_nullmaps.emplace_back(stored_block, null_map_holder);
}
return limits.check(getTotalRowCount(), getTotalByteCount(), "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
return table_join->sizeLimits().check(getTotalRowCount(), getTotalByteCount(), "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
@ -783,23 +709,12 @@ template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename
void Join::joinBlockImpl(
Block & block,
const Names & key_names_left,
const NamesAndTypesList & columns_added_by_join,
const Block & block_with_columns_to_add,
const Maps & maps_) const
{
size_t keys_size = key_names_left.size();
ColumnRawPtrs key_columns(keys_size);
/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
Columns materialized_columns;
materialized_columns.reserve(keys_size);
/// Memoize key columns to work with.
for (size_t i = 0; i < keys_size; ++i)
{
materialized_columns.emplace_back(recursiveRemoveLowCardinality(block.getByName(key_names_left[i]).column->convertToFullColumnIfConst()));
key_columns[i] = materialized_columns.back().get();
}
ColumnRawPtrs key_columns = JoinCommon::temporaryMaterializeColumns(block, key_names_left, materialized_columns);
/// Keys with NULL value in any column won't join to anything.
ConstNullMapPtr null_map{};
@ -814,12 +729,10 @@ void Join::joinBlockImpl(
constexpr bool right_or_full = static_in_v<KIND, ASTTableJoin::Kind::Right, ASTTableJoin::Kind::Full>;
if constexpr (right_or_full)
{
for (size_t i = 0; i < existing_columns; ++i)
{
block.getByPosition(i).column = block.getByPosition(i).column->convertToFullColumnIfConst();
if (use_nulls)
convertColumnToNullable(block.getByPosition(i));
}
materializeBlockInplace(block);
if (nullable_left_side)
JoinCommon::convertColumnsToNullable(block);
}
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
@ -829,7 +742,7 @@ void Join::joinBlockImpl(
*/
ColumnsWithTypeAndName extras;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
extras.push_back(sample_block_with_keys.getByName(key_names_right.back()));
extras.push_back(right_table_keys.getByName(key_names_right.back()));
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, blocklist_sample, extras);
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
@ -841,11 +754,8 @@ void Join::joinBlockImpl(
block.insert(added.moveColumn(i));
/// Filter & insert missing rows
auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join);
constexpr bool is_all_join = STRICTNESS == ASTTableJoin::Strictness::All;
constexpr bool inner_or_right = static_in_v<KIND, ASTTableJoin::Kind::Inner, ASTTableJoin::Kind::Right>;
constexpr bool left_or_full = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
std::vector<size_t> right_keys_to_replicate [[maybe_unused]];
@ -856,17 +766,16 @@ void Join::joinBlockImpl(
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1);
/// Add join key columns from right block if they has different name.
for (size_t i = 0; i < key_names_right.size(); ++i)
for (size_t i = 0; i < right_table_keys.columns(); ++i)
{
auto & right_name = key_names_right[i];
const auto & right_key = right_table_keys.getByPosition(i);
auto & left_name = key_names_left[i];
auto it = right_keys.find(right_name);
if (it != right_keys.end() && !block.has(right_name))
if (required_right_keys.count(right_key.name) && !block.has(right_key.name))
{
const auto & col = block.getByName(left_name);
bool is_nullable = it->second->isNullable();
block.insert(correctNullability({col.column, col.type, right_name}, is_nullable));
bool is_nullable = nullable_right_side || right_key.type->isNullable();
block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable));
}
}
}
@ -879,13 +788,12 @@ void Join::joinBlockImpl(
const IColumn::Filter & filter = null_map_filter.getData();
/// Add join key columns from right block if they has different name.
for (size_t i = 0; i < key_names_right.size(); ++i)
for (size_t i = 0; i < right_table_keys.columns(); ++i)
{
auto & right_name = key_names_right[i];
const auto & right_key = right_table_keys.getByPosition(i);
auto & left_name = key_names_left[i];
auto it = right_keys.find(right_name);
if (it != right_keys.end() && !block.has(right_name))
if (required_right_keys.count(right_key.name) && !block.has(right_key.name))
{
const auto & col = block.getByName(left_name);
ColumnPtr column = col.column->convertToFullColumnIfConst();
@ -900,11 +808,11 @@ void Join::joinBlockImpl(
mut_column->insertDefault();
}
bool is_nullable = (use_nulls && left_or_full) || it->second->isNullable();
block.insert(correctNullability({std::move(mut_column), col.type, right_name}, is_nullable, null_map_filter));
bool is_nullable = nullable_right_side || right_key.type->isNullable();
block.insert(correctNullability({std::move(mut_column), col.type, right_key.name}, is_nullable, null_map_filter));
if constexpr (is_all_join)
right_keys_to_replicate.push_back(block.getPositionByName(right_name));
right_keys_to_replicate.push_back(block.getPositionByName(right_key.name));
}
}
}
@ -974,27 +882,6 @@ void Join::joinBlockImplCross(Block & block) const
block = block.cloneWithColumns(std::move(dst_columns));
}
void Join::checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right) const
{
size_t keys_size = key_names_left.size();
for (size_t i = 0; i < keys_size; ++i)
{
/// Compare up to Nullability.
DataTypePtr left_type = removeNullable(recursiveRemoveLowCardinality(block_left.getByName(key_names_left[i]).type));
DataTypePtr right_type = removeNullable(recursiveRemoveLowCardinality(block_right.getByName(key_names_right[i]).type));
if (!left_type->equals(*right_type))
throw Exception("Type mismatch of columns to JOIN by: "
+ key_names_left[i] + " " + left_type->getName() + " at left, "
+ key_names_right[i] + " " + right_type->getName() + " at right",
ErrorCodes::TYPE_MISMATCH);
}
}
static void checkTypeOfKey(const Block & block_left, const Block & block_right)
{
auto & [c1, left_type_origin, left_name] = block_left.safeGetByPosition(0);
@ -1024,7 +911,7 @@ template <typename Maps>
void Join::joinGetImpl(Block & block, const String & column_name, const Maps & maps_) const
{
joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any>(
block, {block.getByPosition(0).name}, {}, {sample_block_with_columns_to_add.getByName(column_name)}, maps_);
block, {block.getByPosition(0).name}, {sample_block_with_columns_to_add.getByName(column_name)}, maps_);
}
@ -1038,7 +925,7 @@ void Join::joinGet(Block & block, const String & column_name) const
if (key_names_right.size() != 1)
throw Exception("joinGet only supports StorageJoin containing exactly one key", ErrorCodes::LOGICAL_ERROR);
checkTypeOfKey(block, sample_block_with_keys);
checkTypeOfKey(block, right_table_keys);
if (kind == ASTTableJoin::Kind::Left && strictness == ASTTableJoin::Strictness::Any)
{
@ -1049,18 +936,16 @@ void Join::joinGet(Block & block, const String & column_name) const
}
void Join::joinBlock(Block & block, const AnalyzedJoin & join_params) const
void Join::joinBlock(Block & block)
{
const Names & key_names_left = join_params.keyNamesLeft();
const NamesAndTypesList & columns_added_by_join = join_params.columnsAddedByJoin();
std::shared_lock lock(rwlock);
checkTypesOfKeys(block, key_names_left, sample_block_with_keys);
const Names & key_names_left = table_join->keyNamesLeft();
JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right);
if (joinDispatch(kind, strictness, maps, [&](auto kind_, auto strictness_, auto & map)
{
joinBlockImpl<kind_, strictness_>(block, key_names_left, columns_added_by_join, sample_block_with_columns_to_add, map);
joinBlockImpl<kind_, strictness_>(block, key_names_left, sample_block_with_columns_to_add, map);
}))
{
/// Joined
@ -1157,11 +1042,12 @@ struct AdderNonJoined<ASTTableJoin::Strictness::Asof, Mapped>
class NonJoinedBlockInputStream : public IBlockInputStream
{
public:
NonJoinedBlockInputStream(const Join & parent_, const Block & left_sample_block, const Names & key_names_left,
const NamesAndTypesList & columns_added_by_join, UInt64 max_block_size_)
NonJoinedBlockInputStream(const Join & parent_, const Block & left_sample_block, UInt64 max_block_size_)
: parent(parent_)
, max_block_size(max_block_size_)
{
const Names & key_names_left = parent_.table_join->keyNamesLeft();
/** left_sample_block contains keys and "left" columns.
* result_sample_block - keys, "left" columns, and "right" columns.
*/
@ -1180,10 +1066,9 @@ public:
const Block & right_sample_block = parent.sample_block_with_columns_to_add;
std::unordered_map<size_t, size_t> left_to_right_key_map;
makeResultSampleBlock(left_sample_block, right_sample_block, columns_added_by_join,
key_positions_left, left_to_right_key_map);
makeResultSampleBlock(left_sample_block, right_sample_block, key_positions_left, left_to_right_key_map);
auto nullability_changes = getNullabilityChanges(parent.sample_block_with_keys, result_sample_block,
auto nullability_changes = getNullabilityChanges(parent.right_table_keys, result_sample_block,
key_positions_left, left_to_right_key_map);
column_indices_left.reserve(left_sample_block.columns() - key_names_left.size());
@ -1249,16 +1134,12 @@ private:
void makeResultSampleBlock(const Block & left_sample_block, const Block & right_sample_block,
const NamesAndTypesList & columns_added_by_join,
const std::vector<size_t> & key_positions_left,
std::unordered_map<size_t, size_t> & left_to_right_key_map)
{
result_sample_block = materializeBlock(left_sample_block);
/// Convert left columns to Nullable if allowed
if (parent.use_nulls)
for (size_t i = 0; i < result_sample_block.columns(); ++i)
convertColumnToNullable(result_sample_block.getByPosition(i));
if (parent.nullable_left_side)
JoinCommon::convertColumnsToNullable(result_sample_block);
/// Add columns from the right-side table to the block.
for (size_t i = 0; i < right_sample_block.columns(); ++i)
@ -1268,23 +1149,19 @@ private:
result_sample_block.insert(src_column.cloneEmpty());
}
const auto & key_names_right = parent.key_names_right;
auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join);
/// Add join key columns from right block if they has different name.
for (size_t i = 0; i < key_names_right.size(); ++i)
for (size_t i = 0; i < parent.right_table_keys.columns(); ++i)
{
auto & right_name = key_names_right[i];
const auto & right_key = parent.right_table_keys.getByPosition(i);
size_t left_key_pos = key_positions_left[i];
auto it = right_keys.find(right_name);
if (it != right_keys.end() && !result_sample_block.has(right_name))
if (parent.required_right_keys.count(right_key.name) && !result_sample_block.has(right_key.name))
{
const auto & col = result_sample_block.getByPosition(left_key_pos);
bool is_nullable = (parent.use_nulls && isFull(parent.kind)) || it->second->isNullable();
result_sample_block.insert(correctNullability({col.column, col.type, right_name}, is_nullable));
bool is_nullable = (parent.nullable_right_side && isFull(parent.kind)) || right_key.type->isNullable();
result_sample_block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable));
size_t right_key_pos = result_sample_block.getPositionByName(right_name);
size_t right_key_pos = result_sample_block.getPositionByName(right_key.name);
left_to_right_key_map[left_key_pos] = right_key_pos;
}
}
@ -1418,7 +1295,7 @@ private:
}
}
static std::unordered_set<size_t> getNullabilityChanges(const Block & sample_block_with_keys, const Block & out_block,
static std::unordered_set<size_t> getNullabilityChanges(const Block & right_table_keys, const Block & out_block,
const std::vector<size_t> & key_positions,
const std::unordered_map<size_t, size_t> & left_to_right_key_map)
{
@ -1433,7 +1310,7 @@ private:
key_pos = it->second;
const auto & dst = out_block.getByPosition(key_pos).column;
const auto & src = sample_block_with_keys.getByPosition(i).column;
const auto & src = right_table_keys.getByPosition(i).column;
if (dst->isNullable() != src->isNullable())
nullability_changes.insert(key_pos);
}
@ -1461,12 +1338,11 @@ private:
};
BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sample_block, const AnalyzedJoin & join_params,
UInt64 max_block_size) const
BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sample_block, UInt64 max_block_size) const
{
return std::make_shared<NonJoinedBlockInputStream>(*this, left_sample_block,
join_params.keyNamesLeft(), join_params.columnsAddedByJoin(), max_block_size);
if (isRightOrFull(table_join->kind()))
return std::make_shared<NonJoinedBlockInputStream>(*this, left_sample_block, max_block_size);
return {};
}
}

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/RowRefs.h>
#include <Core/SettingsCommon.h>
@ -120,30 +121,22 @@ using MappedAsof = WithFlags<AsofRowRefs, false>;
* If it is true, we always generate Nullable column and substitute NULLs for non-joined rows,
* as in standard SQL.
*/
class Join
class Join : public IJoin
{
public:
Join(const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits_,
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, bool any_take_last_row_ = false);
Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
bool empty() { return type == Type::EMPTY; }
bool isNullUsedAsDefault() const { return use_nulls; }
/** Set information about structure of right hand of JOIN (joined data).
* You must call this method before subsequent calls to insertFromBlock.
*/
void setSampleBlock(const Block & block);
/** Add block of data from right hand of JOIN to the map.
* Returns false, if some limit was exceeded and you should not insert more data.
*/
bool insertFromBlock(const Block & block);
bool addJoinedBlock(const Block & block) override;
/** Join data from the map (that was previously built by calls to insertFromBlock) to the block with data from "left" table.
/** Join data from the map (that was previously built by calls to addJoinedBlock) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
void joinBlock(Block & block, const AnalyzedJoin & join_params) const;
void joinBlock(Block & block) override;
/// Infer the return type for joinGet function
DataTypePtr joinGetReturnType(const String & column_name) const;
@ -153,21 +146,20 @@ public:
/** Keep "totals" (separate part of dataset, see WITH TOTALS) to use later.
*/
void setTotals(const Block & block) { totals = block; }
bool hasTotals() const { return totals; }
void setTotals(const Block & block) override { totals = block; }
bool hasTotals() const override { return totals; }
void joinTotals(Block & block) const;
void joinTotals(Block & block) const override;
/** For RIGHT and FULL JOINs.
* A stream that will contain default values from left table, joined with rows from right table, that was not joined before.
* Use only after all calls to joinBlock was done.
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & left_sample_block, const AnalyzedJoin & join_params,
UInt64 max_block_size) const;
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & left_sample_block, UInt64 max_block_size) const override;
/// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const;
size_t getTotalRowCount() const override;
/// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools.
size_t getTotalByteCount() const;
@ -282,14 +274,19 @@ private:
friend class NonJoinedBlockInputStream;
friend class JoinBlockInputStream;
std::shared_ptr<AnalyzedJoin> table_join;
ASTTableJoin::Kind kind;
ASTTableJoin::Strictness strictness;
/// Names of key columns (columns for equi-JOIN) in "right" table (in the order they appear in USING clause).
const Names key_names_right;
/// Names of key columns in right-side table (in the order they appear in ON/USING clause). @note It could contain duplicates.
const Names & key_names_right;
/// Names right-side table keys that are needed in result (would be attached after joined columns).
const NameSet required_right_keys;
/// Substitute NULLs for non-JOINed rows.
bool use_nulls;
/// In case of LEFT and FULL joins, if use_nulls, convert right-side columns to Nullable.
bool nullable_right_side;
/// In case of RIGHT and FULL joins, if use_nulls, convert left-side columns to Nullable.
bool nullable_left_side;
/// Overwrite existing values when encountering the same key again
bool any_take_last_row;
@ -315,17 +312,14 @@ private:
/// Block with columns from the right-side table except key columns.
Block sample_block_with_columns_to_add;
/// Block with key columns in the same order they appear in the right-side table.
Block sample_block_with_keys;
/// Block with key columns in the same order they appear in the right-side table (duplicates appear once).
Block right_table_keys;
/// Block as it would appear in the BlockList
Block blocklist_sample;
Poco::Logger * log;
/// Limits for maximum map size.
SizeLimits limits;
Block totals;
/** Protect state for concurrent use in insertFromBlock and joinBlock.
@ -337,19 +331,19 @@ private:
void init(Type type_);
/** Set information about structure of right hand of JOIN (joined data).
*/
void setSampleBlock(const Block & block);
/** Take an inserted block and discard everything that does not need to be stored
* Example, remove the keys as they come from the LHS block, but do keep the ASOF timestamps
*/
void prepareBlockListStructure(Block & stored_block);
/// Throw an exception if blocks have different types of key columns.
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right) const;
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
void joinBlockImpl(
Block & block,
const Names & key_names_left,
const NamesAndTypesList & columns_added_by_join,
const Block & block_with_columns_to_add,
const Maps & maps) const;
@ -359,7 +353,4 @@ private:
void joinGetImpl(Block & block, const String & column_name, const Maps & maps) const;
};
using JoinPtr = std::shared_ptr<Join>;
using Joins = std::vector<JoinPtr>;
}

View File

@ -0,0 +1,392 @@
#include <Core/NamesAndTypes.h>
#include <Core/SortCursor.h>
#include <Columns/ColumnNullable.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/AnalyzedJoin.h>
#include <Interpreters/sortBlock.h>
#include <Interpreters/join_common.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
extern const int NOT_IMPLEMENTED;
}
struct MergeJoinEqualRange
{
size_t left_start = 0;
size_t right_start = 0;
size_t left_length = 0;
size_t right_length = 0;
bool empty() const { return !left_length && !right_length; }
};
using Range = MergeJoinEqualRange;
class MergeJoinCursor
{
public:
MergeJoinCursor(const Block & block, const SortDescription & desc_)
: impl(SortCursorImpl(block, desc_))
{}
size_t position() const { return impl.pos; }
size_t end() const { return impl.rows; }
bool atEnd() const { return impl.pos >= impl.rows; }
void nextN(size_t num) { impl.pos += num; }
int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
{
int res = 0;
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
res = impl.sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl.sort_columns[i]), 1);
if (res)
break;
}
return res;
}
bool sameNext(size_t lhs_pos) const
{
if (impl.isLast())
return false;
for (size_t i = 0; i < impl.sort_columns_size; ++i)
if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0)
return false;
return true;
}
size_t getEqualLength()
{
if (atEnd())
return 0;
size_t pos = impl.pos;
for (; pos < impl.rows; ++pos)
if (!sameNext(pos))
break;
return pos - impl.pos + 1;
}
Range getNextEqualRange(MergeJoinCursor & rhs)
{
while (!atEnd() && !rhs.atEnd())
{
int cmp = compareAt(rhs, impl.pos, rhs.impl.pos);
if (cmp < 0)
impl.next();
if (cmp > 0)
rhs.impl.next();
if (!cmp)
{
Range range{impl.pos, rhs.impl.pos, 0, 0};
range.left_length = getEqualLength();
range.right_length = rhs.getEqualLength();
return range;
}
}
return Range{impl.pos, rhs.impl.pos, 0, 0};
}
private:
SortCursorImpl impl;
};
namespace
{
MutableColumns makeMutableColumns(const Block & block, size_t rows_to_reserve = 0)
{
MutableColumns columns;
columns.reserve(block.columns());
for (const auto & src_column : block)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(rows_to_reserve);
}
return columns;
}
void makeSortAndMerge(const Names & keys, SortDescription & sort, SortDescription & merge)
{
NameSet unique_keys;
for (auto & key_name : keys)
{
merge.emplace_back(SortColumnDescription(key_name, 1, 1));
if (!unique_keys.count(key_name))
{
unique_keys.insert(key_name);
sort.emplace_back(SortColumnDescription(key_name, 1, 1));
}
}
}
void copyLeftRange(const Block & block, MutableColumns & columns, size_t start, size_t rows_to_add)
{
for (size_t i = 0; i < block.columns(); ++i)
{
const auto & src_column = block.getByPosition(i).column;
columns[i]->insertRangeFrom(*src_column, start, rows_to_add);
}
}
void copyRightRange(const Block & right_block, const Block & right_columns_to_add, MutableColumns & columns,
size_t row_position, size_t rows_to_add)
{
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name).column;
auto & dst_column = columns[i];
auto * dst_nullable = typeid_cast<ColumnNullable *>(dst_column.get());
if (dst_nullable && !isColumnNullable(*src_column))
dst_nullable->insertRangeFromNotNullable(*src_column, row_position, rows_to_add);
else
dst_column->insertRangeFrom(*src_column, row_position, rows_to_add);
}
}
void joinEqualsAnyLeft(const Block & right_block, const Block & right_columns_to_add, MutableColumns & right_columns, const Range & range)
{
copyRightRange(right_block, right_columns_to_add, right_columns, range.right_start, range.left_length);
}
void joinEquals(const Block & left_block, const Block & right_block, const Block & right_columns_to_add,
MutableColumns & left_columns, MutableColumns & right_columns, const Range & range, bool is_all)
{
size_t left_rows_to_add = range.left_length;
size_t right_rows_to_add = is_all ? range.right_length : 1;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_add; ++right_row, ++row_position)
{
copyLeftRange(left_block, left_columns, range.left_start, left_rows_to_add);
copyRightRange(right_block, right_columns_to_add, right_columns, row_position, left_rows_to_add);
}
}
void appendNulls(MutableColumns & right_columns, size_t rows_to_add)
{
for (auto & column : right_columns)
for (size_t i = 0; i < rows_to_add; ++i)
column->insertDefault();
}
void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, MutableColumns & right_columns,
size_t start, size_t end, bool copy_left)
{
if (end <= start)
return;
size_t rows_to_add = end - start;
if (copy_left)
copyLeftRange(left_block, left_columns, start, rows_to_add);
appendNulls(right_columns, rows_to_add);
}
}
MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block)
: table_join(table_join_)
, nullable_right_side(table_join->forceNullableRight())
, is_all(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind()))
{
if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED);
JoinCommon::extractKeysForJoin(table_join->keyNamesRight(), right_sample_block, right_table_keys, right_columns_to_add);
const NameSet required_right_keys = table_join->requiredRightKeys();
for (const auto & column : right_table_keys)
if (required_right_keys.count(column.name))
right_columns_to_add.insert(ColumnWithTypeAndName{nullptr, column.type, column.name});
JoinCommon::removeLowCardinalityInplace(right_columns_to_add);
JoinCommon::createMissedColumns(right_columns_to_add);
if (nullable_right_side)
JoinCommon::convertColumnsToNullable(right_columns_to_add);
makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
}
void MergeJoin::setTotals(const Block & totals_block)
{
totals = totals_block;
mergeRightBlocks();
}
void MergeJoin::mergeRightBlocks()
{
const size_t max_merged_block_size = 128 * 1024 * 1024;
if (right_blocks.empty())
return;
Blocks unsorted_blocks;
unsorted_blocks.reserve(right_blocks.size());
for (const auto & block : right_blocks)
unsorted_blocks.push_back(block);
/// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN
MergeSortingBlocksBlockInputStream stream(unsorted_blocks, right_sort_description, max_merged_block_size);
right_blocks.clear();
while (Block block = stream.read())
right_blocks.push_back(block);
}
bool MergeJoin::addJoinedBlock(const Block & src_block)
{
Block block = materializeBlock(src_block);
JoinCommon::removeLowCardinalityInplace(block);
sortBlock(block, right_sort_description);
std::unique_lock lock(rwlock);
right_blocks.push_back(block);
right_blocks_row_count += block.rows();
right_blocks_bytes += block.bytes();
return table_join->sizeLimits().check(right_blocks_row_count, right_blocks_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
void MergeJoin::joinBlock(Block & block)
{
JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight());
materializeBlockInplace(block);
JoinCommon::removeLowCardinalityInplace(block);
sortBlock(block, left_sort_description);
std::shared_lock lock(rwlock);
size_t rows_to_reserve = is_left ? block.rows() : 0;
MutableColumns left_columns = makeMutableColumns(block, (is_all ? rows_to_reserve : 0));
MutableColumns right_columns = makeMutableColumns(right_columns_to_add, rows_to_reserve);
MergeJoinCursor left_cursor(block, left_merge_description);
size_t left_key_tail = 0;
if (is_left)
{
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
{
if (left_cursor.atEnd())
break;
leftJoin(left_cursor, block, *it, left_columns, right_columns, left_key_tail);
}
left_cursor.nextN(left_key_tail);
joinInequalsLeft(block, left_columns, right_columns, left_cursor.position(), left_cursor.end(), is_all);
//left_cursor.nextN(left_cursor.end() - left_cursor.position());
changeLeftColumns(block, std::move(left_columns));
addRightColumns(block, std::move(right_columns));
}
else if (is_inner)
{
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
{
if (left_cursor.atEnd())
break;
innerJoin(left_cursor, block, *it, left_columns, right_columns, left_key_tail);
}
left_cursor.nextN(left_key_tail);
changeLeftColumns(block, std::move(left_columns));
addRightColumns(block, std::move(right_columns));
}
}
void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
size_t left_position = left_cursor.position(); /// save inequal position
Range range = left_cursor.getNextEqualRange(right_cursor);
joinInequalsLeft(left_block, left_columns, right_columns, left_position, range.left_start, is_all);
if (range.empty())
break;
if (is_all)
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
else
joinEqualsAnyLeft(right_block, right_columns_to_add, right_columns, range);
right_cursor.nextN(range.right_length);
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
{
left_key_tail = range.left_length;
break;
}
left_cursor.nextN(range.left_length);
}
}
void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
Range range = left_cursor.getNextEqualRange(right_cursor);
if (range.empty())
break;
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
right_cursor.nextN(range.right_length);
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
{
left_key_tail = range.left_length;
break;
}
left_cursor.nextN(range.left_length);
}
}
void MergeJoin::changeLeftColumns(Block & block, MutableColumns && columns)
{
if (is_left && !is_all)
return;
block.setColumns(std::move(columns));
}
void MergeJoin::addRightColumns(Block & block, MutableColumns && right_columns)
{
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
const auto & column = right_columns_to_add.getByPosition(i);
block.insert(ColumnWithTypeAndName{std::move(right_columns[i]), column.type, column.name});
}
}
}

View File

@ -0,0 +1,57 @@
#pragma once
#include <memory>
#include <shared_mutex>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Interpreters/IJoin.h>
namespace DB
{
class AnalyzedJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class MergeJoin : public IJoin
{
public:
MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block);
bool addJoinedBlock(const Block & block) override;
void joinBlock(Block &) override;
void joinTotals(Block &) const override {}
void setTotals(const Block &) override;
size_t getTotalRowCount() const override { return right_blocks_row_count; }
private:
mutable std::shared_mutex rwlock;
std::shared_ptr<AnalyzedJoin> table_join;
SortDescription left_sort_description;
SortDescription right_sort_description;
SortDescription left_merge_description;
SortDescription right_merge_description;
Block right_table_keys;
Block right_columns_to_add;
BlocksList right_blocks;
Block totals;
size_t right_blocks_row_count = 0;
size_t right_blocks_bytes = 0;
const bool nullable_right_side;
const bool is_all;
const bool is_inner;
const bool is_left;
void changeLeftColumns(Block & block, MutableColumns && columns);
void addRightColumns(Block & block, MutableColumns && columns);
void mergeRightBlocks();
void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
};
}

View File

@ -1,5 +1,7 @@
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/Join.h>
#include <Interpreters/MergeJoin.h>
#include <DataStreams/LazyBlockInputStream.h>
namespace DB
@ -31,4 +33,26 @@ void SubqueryForSet::renameColumns(Block & block)
}
}
void SubqueryForSet::setJoinActions(ExpressionActionsPtr actions)
{
actions->execute(sample_block);
joined_block_actions = actions;
}
bool SubqueryForSet::insertJoinedBlock(Block & block)
{
renameColumns(block);
if (joined_block_actions)
joined_block_actions->execute(block);
return join->addJoinedBlock(block);
}
void SubqueryForSet::setTotals()
{
if (join && source)
join->setTotals(source->getTotals());
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ExpressionActions.h>
@ -8,9 +9,6 @@
namespace DB
{
class Join;
using JoinPtr = std::shared_ptr<Join>;
class InterpreterSelectWithUnionQuery;
@ -25,6 +23,7 @@ struct SubqueryForSet
JoinPtr join;
/// Apply this actions to joined block.
ExpressionActionsPtr joined_block_actions;
Block sample_block; /// source->getHeader() + column renames
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
@ -33,12 +32,15 @@ struct SubqueryForSet
void makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
NamesWithAliases && joined_block_aliases_);
Block renamedSampleBlock() const { return sample_block; }
void renameColumns(Block & block);
void setJoinActions(ExpressionActionsPtr actions);
bool insertJoinedBlock(Block & block);
void setTotals();
private:
NamesWithAliases joined_block_aliases; /// Rename column from joined block from this list.
Block sample_block; /// source->getHeader() + column renames
void renameColumns(Block & block);
};
/// ID of subquery -> what to do with it.

View File

@ -805,8 +805,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
SyntaxAnalyzerResult result;
result.storage = storage;
result.source_columns = source_columns_;
result.analyzed_join = std::make_shared<AnalyzedJoin>(); /// TODO: move to select_query logic
result.analyzed_join->join_use_nulls = settings.join_use_nulls;
result.analyzed_join = std::make_shared<AnalyzedJoin>(settings); /// TODO: move to select_query logic
collectSourceColumns(select_query, result.storage, result.source_columns);
NameSet source_columns_set = removeDuplicateColumns(result.source_columns);

View File

@ -0,0 +1,126 @@
#include <Interpreters/join_common.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataStreams/materializeBlock.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
namespace JoinCommon
{
void convertColumnToNullable(ColumnWithTypeAndName & column)
{
if (column.type->isNullable() || !column.type->canBeInsideNullable())
return;
column.type = makeNullable(column.type);
if (column.column)
column.column = makeNullable(column.column);
}
void convertColumnsToNullable(Block & block, size_t starting_pos)
{
for (size_t i = starting_pos; i < block.columns(); ++i)
convertColumnToNullable(block.getByPosition(i));
}
ColumnRawPtrs temporaryMaterializeColumns(const Block & block, const Names & names, Columns & materialized)
{
ColumnRawPtrs ptrs;
ptrs.reserve(names.size());
materialized.reserve(names.size());
for (auto & column_name : names)
{
const auto & src_column = block.getByName(column_name).column;
materialized.emplace_back(recursiveRemoveLowCardinality(src_column->convertToFullColumnIfConst()));
ptrs.push_back(materialized.back().get());
}
return ptrs;
}
void removeLowCardinalityInplace(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i);
col.column = recursiveRemoveLowCardinality(col.column);
col.type = recursiveRemoveLowCardinality(col.type);
}
}
ColumnRawPtrs extractKeysForJoin(const Names & key_names_right, const Block & right_sample_block,
Block & sample_block_with_keys, Block & sample_block_with_columns_to_add)
{
size_t keys_size = key_names_right.size();
ColumnRawPtrs key_columns(keys_size);
sample_block_with_columns_to_add = materializeBlock(right_sample_block);
for (size_t i = 0; i < keys_size; ++i)
{
const String & column_name = key_names_right[i];
/// there could be the same key names
if (sample_block_with_keys.has(column_name))
{
key_columns[i] = sample_block_with_keys.getByName(column_name).column.get();
continue;
}
auto & col = sample_block_with_columns_to_add.getByName(column_name);
col.column = recursiveRemoveLowCardinality(col.column);
col.type = recursiveRemoveLowCardinality(col.type);
/// Extract right keys with correct keys order.
sample_block_with_keys.insert(col);
sample_block_with_columns_to_add.erase(column_name);
key_columns[i] = sample_block_with_keys.getColumns().back().get();
/// We will join only keys, where all components are not NULL.
if (auto * nullable = checkAndGetColumn<ColumnNullable>(*key_columns[i]))
key_columns[i] = &nullable->getNestedColumn();
}
return key_columns;
}
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right, const Names & key_names_right)
{
size_t keys_size = key_names_left.size();
for (size_t i = 0; i < keys_size; ++i)
{
DataTypePtr left_type = removeNullable(recursiveRemoveLowCardinality(block_left.getByName(key_names_left[i]).type));
DataTypePtr right_type = removeNullable(recursiveRemoveLowCardinality(block_right.getByName(key_names_right[i]).type));
if (!left_type->equals(*right_type))
throw Exception("Type mismatch of columns to JOIN by: "
+ key_names_left[i] + " " + left_type->getName() + " at left, "
+ key_names_right[i] + " " + right_type->getName() + " at right",
ErrorCodes::TYPE_MISMATCH);
}
}
void createMissedColumns(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
auto & column = block.getByPosition(i);
if (!column.column)
column.column = column.type->createColumn();
}
}
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Interpreters/IJoin.h>
namespace DB
{
struct ColumnWithTypeAndName;
class Block;
class IColumn;
using ColumnRawPtrs = std::vector<const IColumn *>;
namespace JoinCommon
{
void convertColumnToNullable(ColumnWithTypeAndName & column);
void convertColumnsToNullable(Block & block, size_t starting_pos = 0);
ColumnRawPtrs temporaryMaterializeColumns(const Block & block, const Names & names, Columns & materialized);
void removeLowCardinalityInplace(Block & block);
/// Split key and other columns by keys name list
ColumnRawPtrs extractKeysForJoin(const Names & key_names_right, const Block & right_sample_block,
Block & sample_block_with_keys, Block & sample_block_with_columns_to_add);
/// Throw an exception if blocks have different types of key columns. Compare up to Nullability.
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right, const Names & key_names_right);
void createMissedColumns(Block & block);
}
}

View File

@ -107,6 +107,9 @@ struct ASTTableJoin : public IAST
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
inline bool isLeft(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Left; }
inline bool isRight(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Right; }
inline bool isInner(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Inner; }
inline bool isFull(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Full; }
inline bool isCross(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Cross; }
inline bool isComma(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Comma; }

View File

@ -82,8 +82,7 @@ void CreatingSetsTransform::finishSubquery(SubqueryForSet & subquery)
head_rows = profile_info.rows;
if (subquery.join)
subquery.join->setTotals(subquery.source->getTotals());
subquery.setTotals();
if (head_rows != 0)
{
@ -175,12 +174,7 @@ void CreatingSetsTransform::work()
if (!done_with_join)
{
subquery.renameColumns(block);
if (subquery.joined_block_actions)
subquery.joined_block_actions->execute(block);
if (!subquery.join->insertFromBlock(block))
if (!subquery.insertJoinedBlock(block))
done_with_join = true;
}

View File

@ -8,6 +8,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/AnalyzedJoin.h>
#include <Common/assert_cast.h>
#include <Poco/String.h> /// toLower
@ -49,8 +50,8 @@ StorageJoin::StorageJoin(
if (!getColumns().hasPhysical(key))
throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
join = std::make_shared<Join>(key_names, use_nulls, limits, kind, strictness, overwrite);
join->setSampleBlock(getSampleBlock().sortColumns());
table_join = std::make_shared<AnalyzedJoin>(limits, use_nulls, kind, strictness, key_names);
join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns(), overwrite);
restore();
}
@ -62,8 +63,7 @@ void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteL
Poco::File(path + "tmp/").createDirectories();
increment = 0;
join = std::make_shared<Join>(key_names, use_nulls, limits, kind, strictness);
join->setSampleBlock(getSampleBlock().sortColumns());
join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns());
}
@ -75,7 +75,7 @@ void StorageJoin::assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Stric
}
void StorageJoin::insertBlock(const Block & block) { join->insertFromBlock(block); }
void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block); }
size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
@ -209,10 +209,10 @@ public:
for (size_t i = 0; i < sample_block.columns(); ++i)
{
auto & [_, type, name] = sample_block.getByPosition(i);
if (parent.sample_block_with_keys.has(name))
if (parent.right_table_keys.has(name))
{
key_pos = i;
column_with_null[i] = parent.sample_block_with_keys.getByName(name).type->isNullable();
column_with_null[i] = parent.right_table_keys.getByName(name).type->isNullable();
}
else
{

View File

@ -9,8 +9,9 @@
namespace DB
{
class AnalyzedJoin;
class Join;
using JoinPtr = std::shared_ptr<Join>;
using HashJoinPtr = std::shared_ptr<Join>;
/** Allows you save the state for later use on the right side of the JOIN.
@ -29,7 +30,7 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
/// Access the innards.
JoinPtr & getJoin() { return join; }
HashJoinPtr & getJoin() { return join; }
/// Verify that the data structure is suitable for implementing this type of JOIN.
void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const;
@ -50,7 +51,8 @@ private:
ASTTableJoin::Kind kind; /// LEFT | INNER ...
ASTTableJoin::Strictness strictness; /// ANY | ALL
JoinPtr join;
std::shared_ptr<AnalyzedJoin> table_join;
HashJoinPtr join;
void insertBlock(const Block & block) override;
size_t getSize() const override;

View File

@ -0,0 +1,218 @@
t join none using
0 0 0
-
0 0 0
-
-
t join none on
0 0 0 0
-
0 0 0 0
-
-
none join t using
none join t on
/none
t join none using
0 0 \N
-
0 0 \N
-
-
t join none on
0 0 \N \N
-
0 0 \N \N
-
-
none join t using
none join t on
/none
any left
0 0 0
1 10 0
2 20 2
3 30 0
4 40 4
-
0 0 0
1 10 0
2 20 0
3 30 0
4 40 0
-
0 0 0
1 10 0
2 20 2
3 30 0
4 40 4
-
0 0 0
1 10 0
2 20 0
3 30 0
4 40 0
all left
0 0 0 0
1 10 0 0
2 20 2 21
2 20 2 22
3 30 0 0
4 40 4 41
4 40 4 42
-
0 0 0 0
1 10 0 0
2 20 0 0
3 30 0 0
4 40 0 0
-
0 0 0 0
1 10 0 0
2 20 0 0
3 30 0 0
4 40 0 0
-
0 0 0 0
1 10 0 0
2 20 2 21
2 20 2 22
3 30 0 0
4 40 4 41
4 40 4 42
-
0 0 0 0
1 10 0 0
2 20 2 21
2 20 2 22
3 30 0 0
4 40 4 41
4 40 4 42
any inner
0 0 0
2 20 2
4 40 4
-
0 0 0
-
0 0 0
2 20 2
4 40 4
-
0 0 0
all inner
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42
-
0 0 0 0
-
0 0 0 0
-
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42
-
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42
any left
0 0 0
1 10 \N
2 20 2
3 30 \N
4 40 4
-
0 0 0
1 10 \N
2 20 \N
3 30 \N
4 40 \N
-
0 0 0
1 10 \N
2 20 2
3 30 \N
4 40 4
-
0 0 0
1 10 \N
2 20 \N
3 30 \N
4 40 \N
all left
0 0 0 0
1 10 \N \N
2 20 2 21
2 20 2 22
3 30 \N \N
4 40 4 41
4 40 4 42
-
0 0 0 0
1 10 \N \N
2 20 \N \N
3 30 \N \N
4 40 \N \N
-
0 0 0 0
1 10 \N \N
2 20 \N \N
3 30 \N \N
4 40 \N \N
-
0 0 0 0
1 10 \N \N
2 20 2 21
2 20 2 22
3 30 \N \N
4 40 4 41
4 40 4 42
-
0 0 0 0
1 10 \N \N
2 20 2 21
2 20 2 22
3 30 \N \N
4 40 4 41
4 40 4 42
any inner
0 0 0
2 20 2
4 40 4
-
0 0 0
-
0 0 0
2 20 2
4 40 4
-
0 0 0
all inner
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42
-
0 0 0 0
-
0 0 0 0
-
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42
-
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42

View File

@ -0,0 +1,164 @@
DROP TABLE IF EXISTS t0;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
INSERT INTO t1 (x, y) VALUES (0, 0);
SET partial_merge_join = 1;
SET any_join_distinct_right_table_keys = 1;
SELECT 't join none using';
SELECT * FROM t1 ANY LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 USING (x) ORDER BY x;
SELECT 't join none on';
SELECT * FROM t1 ANY LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT 'none join t using';
SELECT * FROM t0 ANY LEFT JOIN t1 USING (x);
SELECT * FROM t0 LEFT JOIN t1 USING (x);
SELECT * FROM t0 ANY INNER JOIN t1 USING (x);
SELECT * FROM t0 INNER JOIN t1 USING (x);
SELECT 'none join t on';
SELECT * FROM t0 ANY LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 ANY INNER JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 INNER JOIN t1 ON t1.x = t0.x;
SELECT '/none';
SET join_use_nulls = 1;
SELECT 't join none using';
SELECT * FROM t1 ANY LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 USING (x) ORDER BY x;
SELECT 't join none on';
SELECT * FROM t1 ANY LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT 'none join t using';
SELECT * FROM t0 ANY LEFT JOIN t1 USING (x);
SELECT * FROM t0 LEFT JOIN t1 USING (x);
SELECT * FROM t0 ANY INNER JOIN t1 USING (x);
SELECT * FROM t0 INNER JOIN t1 USING (x);
SELECT 'none join t on';
SELECT * FROM t0 ANY LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 ANY INNER JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 INNER JOIN t1 ON t1.x = t0.x;
SELECT '/none';
INSERT INTO t1 (x, y) VALUES (1, 10) (2, 20);
INSERT INTO t1 (x, y) VALUES (4, 40) (3, 30);
INSERT INTO t2 (x, y) VALUES (4, 41) (2, 21) (2, 22);
INSERT INTO t2 (x, y) VALUES (0, 0) (5, 50) (4, 42);
SET join_use_nulls = 0;
SELECT 'any left';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x,y) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x,y) ORDER BY x;
SELECT 'all left';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND toUInt32(intDiv(t1.y,10)) = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND toUInt64(t1.x) = intDiv(t2.y,10) ORDER BY x, t2.y;
SELECT 'any inner';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x,y) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x,y) ORDER BY x;
SELECT 'all inner';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND toUInt32(intDiv(t1.y,10)) = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND toUInt64(t1.x) = intDiv(t2.y,10) ORDER BY x, t2.y;
SET join_use_nulls = 1;
SELECT 'any left';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x,y) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x,y) ORDER BY x;
SELECT 'all left';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND toUInt32(intDiv(t1.y,10)) = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND toUInt64(t1.x) = intDiv(t2.y,10) ORDER BY x, t2.y;
SELECT 'any inner';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x,y) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x,y) ORDER BY x;
SELECT 'all inner';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND toUInt32(intDiv(t1.y,10)) = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND toUInt64(t1.x) = intDiv(t2.y,10) ORDER BY x, t2.y;
DROP TABLE t0;
DROP TABLE t1;
DROP TABLE t2;

View File

@ -0,0 +1,5 @@
1 1
2
3
4
5

View File

@ -0,0 +1,7 @@
set partial_merge_join = 1;
select s1.x, s2.x from (select 1 as x) s1 left join (select 1 as x) s2 using x;
select * from (select materialize(2) as x) s1 left join (select 2 as x) s2 using x;
select * from (select 3 as x) s1 left join (select materialize(3) as x) s2 using x;
select * from (select toLowCardinality(4) as x) s1 left join (select 4 as x) s2 using x;
select * from (select 5 as x) s1 left join (select toLowCardinality(5) as x) s2 using x;