Merge pull request #35796 from vdimir/full-sorting-merge-join

This commit is contained in:
Vladimir C 2022-07-07 19:16:49 +02:00 committed by GitHub
commit db838f1343
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 3126 additions and 92 deletions

View File

@ -28,18 +28,20 @@ def get_options(i, backward_compatibility_check):
if i % 2 == 1:
options.append(" --database=test_{}".format(i))
if i % 5 == 1:
if i % 3 == 1:
client_options.append("join_use_nulls=1")
if i % 15 == 1:
client_options.append("join_algorithm='parallel_hash'")
if i % 15 == 6:
client_options.append("join_algorithm='partial_merge'")
if i % 15 == 11:
client_options.append("join_algorithm='auto'")
client_options.append("max_rows_in_join=1000")
if i % 2 == 1:
join_alg_num = i // 2
if join_alg_num % 4 == 0:
client_options.append("join_algorithm='parallel_hash'")
if join_alg_num % 4 == 1:
client_options.append("join_algorithm='partial_merge'")
if join_alg_num % 4 == 2:
client_options.append("join_algorithm='full_sorting_merge'")
if join_alg_num % 4 == 3:
client_options.append("join_algorithm='auto'")
client_options.append('max_rows_in_join=1000')
if i == 13:
client_options.append("memory_tracker_fault_probability=0.001")

View File

@ -36,7 +36,8 @@ IMPLEMENT_SETTING_MULTI_ENUM(JoinAlgorithm, ErrorCodes::UNKNOWN_JOIN,
{"partial_merge", JoinAlgorithm::PARTIAL_MERGE},
{"prefer_partial_merge", JoinAlgorithm::PREFER_PARTIAL_MERGE},
{"parallel_hash", JoinAlgorithm::PARALLEL_HASH},
{"direct", JoinAlgorithm::DIRECT}})
{"direct", JoinAlgorithm::DIRECT},
{"full_sorting_merge", JoinAlgorithm::FULL_SORTING_MERGE}})
IMPLEMENT_SETTING_ENUM(TotalsMode, ErrorCodes::UNKNOWN_TOTALS_MODE,

View File

@ -44,6 +44,7 @@ enum class JoinAlgorithm
PREFER_PARTIAL_MERGE,
PARALLEL_HASH,
DIRECT,
FULL_SORTING_MERGE,
};
DECLARE_SETTING_MULTI_ENUM(JoinAlgorithm)

View File

@ -142,9 +142,12 @@ struct SortCursorImpl
bool isLast() const { return pos + 1 >= rows; }
bool isLast(size_t size) const { return pos + size >= rows; }
bool isValid() const { return pos < rows; }
void next() { ++pos; }
void next(size_t size) { pos += size; }
size_t getSize() const { return rows; }
size_t rowsLeft() const { return rows - pos; }
/// Prevent using pos instead of getRow()
private:

View File

@ -29,6 +29,7 @@
#include <Interpreters/DirectJoin.h>
#include <Interpreters/Set.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/FullSortingMergeJoin.h>
#include <Processors/QueryPlan/ExpressionStep.h>
@ -1046,10 +1047,13 @@ bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain &
return true;
}
JoinPtr SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, ActionsDAGPtr & converting_join_columns)
JoinPtr SelectQueryExpressionAnalyzer::appendJoin(
ExpressionActionsChain & chain,
ActionsDAGPtr & converting_join_columns)
{
const ColumnsWithTypeAndName & left_sample_columns = chain.getLastStep().getResultColumns();
JoinPtr table_join = makeTableJoin(*syntax->ast_join, left_sample_columns, converting_join_columns);
JoinPtr join = makeJoin(*syntax->ast_join, left_sample_columns, converting_join_columns);
if (converting_join_columns)
{
@ -1059,9 +1063,9 @@ JoinPtr SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
chain.steps.push_back(std::make_unique<ExpressionActionsChain::JoinStep>(
syntax->analyzed_join, table_join, step.getResultColumns()));
syntax->analyzed_join, join, step.getResultColumns()));
chain.addStep();
return table_join;
return join;
}
static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join)
@ -1087,7 +1091,17 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> ana
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}
else if (analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join))
{
return std::make_shared<MergeJoin>(analyzed_join, right_sample_block);
}
else if (analyzed_join->forceFullSortingMergeJoin())
{
if (analyzed_join->getClauses().size() != 1)
throw Exception("Full sorting merge join is supported only for single-condition joins", ErrorCodes::NOT_IMPLEMENTED);
if (analyzed_join->isSpecialStorage())
throw Exception("Full sorting merge join is not supported for special storage", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<FullSortingMergeJoin>(analyzed_join, right_sample_block);
}
return std::make_shared<JoinSwitcher>(analyzed_join, right_sample_block);
}
@ -1190,7 +1204,7 @@ std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> a
return std::make_shared<DirectKeyValueJoin>(analyzed_join, right_sample_block, storage);
}
JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
JoinPtr SelectQueryExpressionAnalyzer::makeJoin(
const ASTTablesInSelectQueryElement & join_element,
const ColumnsWithTypeAndName & left_columns,
ActionsDAGPtr & left_convert_actions)

View File

@ -375,7 +375,7 @@ private:
NameSet required_result_columns;
SelectQueryOptions query_options;
JoinPtr makeTableJoin(
JoinPtr makeJoin(
const ASTTablesInSelectQueryElement & join_element,
const ColumnsWithTypeAndName & left_columns,
ActionsDAGPtr & left_convert_actions);

View File

@ -0,0 +1,106 @@
#pragma once
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Poco/Logger.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int TYPE_MISMATCH;
}
/// Dummy class, actual joining is done by MergeTransform
class FullSortingMergeJoin : public IJoin
{
public:
explicit FullSortingMergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_)
: table_join(table_join_)
, right_sample_block(right_sample_block_)
{
LOG_TRACE(&Poco::Logger::get("FullSortingMergeJoin"), "Will use full sorting merge join");
}
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & /* block */, bool /* check_limits */) override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addJoinedBlock should not be called");
}
void checkTypesOfKeys(const Block & left_block) const override
{
if (table_join->getClauses().size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin supports only one join key");
/// Key column can change nullability and it's not handled on type conversion stage, so algorithm should be aware of it
if (table_join->hasUsing() && table_join->joinUseNulls())
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "FullSortingMergeJoin doesn't support USING with join_use_nulls");
const auto & onexpr = table_join->getOnlyClause();
for (size_t i = 0; i < onexpr.key_names_left.size(); ++i)
{
DataTypePtr left_type = left_block.getByName(onexpr.key_names_left[i]).type;
DataTypePtr right_type = right_sample_block.getByName(onexpr.key_names_right[i]).type;
bool type_equals
= table_join->hasUsing() ? left_type->equals(*right_type) : removeNullable(left_type)->equals(*removeNullable(right_type));
/// Even slightly different types should be converted on previous pipeline steps.
/// If we still have some differences, we can't join, because the algorithm expects strict type equality.
if (!type_equals)
{
throw DB::Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch of columns to JOIN by: {} :: {} at left, {} :: {} at right",
onexpr.key_names_left[i], left_type->getName(),
onexpr.key_names_right[i], right_type->getName());
}
}
}
/// Used just to get result header
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /* not_processed */) override
{
for (const auto & col : right_sample_block)
block.insert(col);
block = materializeBlock(block).cloneEmpty();
}
void setTotals(const Block & block) override { totals = block; }
const Block & getTotals() const override { return totals; }
size_t getTotalRowCount() const override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getTotalRowCount should not be called");
}
size_t getTotalByteCount() const override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getTotalByteCount should not be called");
}
bool alwaysReturnsEmptySet() const override { return false; }
std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & /* left_sample_block */, const Block & /* result_sample_block */, UInt64 /* max_block_size */) const override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getNonJoinedBlocks should not be called");
}
/// Left and right streams have the same priority and are processed simultaneously
virtual JoinPipelineType pipelineType() const override { return JoinPipelineType::YShaped; }
private:
std::shared_ptr<TableJoin> table_join;
Block right_sample_block;
Block totals;
};
}

View File

@ -173,6 +173,17 @@ public:
bool isFilled() const override { return from_storage_join || data->type == Type::DICT; }
JoinPipelineType pipelineType() const override
{
/// No need to process anything in the right stream if it's a dictionary will just join the left stream with it.
bool is_filled = from_storage_join || data->type == Type::DICT;
if (is_filled)
return JoinPipelineType::FilledRight;
/// Default pipeline processes right stream at first and then left.
return JoinPipelineType::FillRightFirst;
}
/** For RIGHT and FULL JOINs.
* A stream that will contain default values from left table, joined with rows from right table, that was not joined before.
* Use only after all calls to joinBlock was done.

View File

@ -6,16 +6,40 @@
#include <Core/Names.h>
#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <Common/Exception.h>
namespace DB
{
class Block;
struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
class TableJoin;
class NotJoinedBlocks;
enum class JoinPipelineType
{
/*
* Right stream processed first, then when join data structures are ready, the left stream is processed using it.
* The pipeline is not sorted.
*/
FillRightFirst,
/*
* Only the left stream is processed. Right is already filled.
*/
FilledRight,
/*
* The pipeline is created from the left and right streams processed with merging transform.
* Left and right streams have the same priority and are processed simultaneously.
* The pipelines are sorted.
*/
YShaped,
};
class IJoin
{
public:
@ -48,7 +72,8 @@ public:
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
/// Different query plan is used for such joins.
virtual bool isFilled() const { return false; }
virtual bool isFilled() const { return pipelineType() == JoinPipelineType::FilledRight; }
virtual JoinPipelineType pipelineType() const { return JoinPipelineType::FillRightFirst; }
// That can run FillingRightJoinSideTransform parallelly
virtual bool supportParallelJoin() const { return false; }
@ -60,6 +85,7 @@ private:
Block totals;
};
using JoinPtr = std::shared_ptr<IJoin>;
}

View File

@ -83,9 +83,10 @@
#include <Common/checkStackSize.h>
#include <Core/ColumnNumbers.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/IJoin.h>
#include <QueryPipeline/SizeLimits.h>
#include <base/map.h>
#include <Common/scope_guard_safe.h>
#include <memory>
namespace DB
@ -1349,6 +1350,35 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names)
{
SortDescription order_descr;
order_descr.reserve(key_names.size());
for (const auto & key_name : key_names)
order_descr.emplace_back(key_name);
auto sorting_step = std::make_unique<SortingStep>(
plan.getCurrentDataStream(),
order_descr,
settings.max_block_size,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode),
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
this->context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
sorting_step->setStepDescription("Sort before JOIN");
plan.addStep(std::move(sorting_step));
};
if (expressions.join->pipelineType() == JoinPipelineType::YShaped)
{
const auto & join_clause = expressions.join->getTableJoin().getOnlyClause();
add_sorting(query_plan, join_clause.key_names_left);
add_sorting(*joined_plan, join_clause.key_names_right);
}
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
joined_plan->getCurrentDataStream(),
@ -1357,7 +1387,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
max_streams,
analysis_result.optimize_read_in_order);
join_step->setStepDescription("JOIN");
join_step->setStepDescription(fmt::format("JOIN {}", expressions.join->pipelineType()));
std::vector<QueryPlanPtr> plans;
plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
plans.emplace_back(std::move(joined_plan));
@ -1996,7 +2026,8 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY, WITH TIES but LIMIT is specified, and limit + offset < max_block_size,
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, JOIN, LIMIT BY, WITH TIES
* but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
* and also set the number of threads to 1.
*/
@ -2008,13 +2039,14 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
&& !query.having()
&& !query.orderBy()
&& !query.limitBy()
&& query.limitLength()
&& !query.join()
&& !query_analyzer->hasAggregation()
&& !query_analyzer->hasWindow()
&& query.limitLength()
&& limit_length <= std::numeric_limits<UInt64>::max() - limit_offset
&& limit_length + limit_offset < max_block_size)
{
max_block_size = std::max(UInt64{1}, limit_length + limit_offset);
max_block_size = std::max<UInt64>(1, limit_length + limit_offset);
max_threads_execute_query = max_streams = 1;
}

View File

@ -363,7 +363,7 @@ void TableJoin::addJoinedColumnsAndCorrectTypesImpl(TColumns & left_columns, boo
* For `JOIN ON expr1 == expr2` we will infer common type later in makeTableJoin,
* when part of plan built and types of expression will be known.
*/
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage());
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage(), forceFullSortingMergeJoin());
if (auto it = left_type_map.find(col.name); it != left_type_map.end())
{
@ -507,14 +507,18 @@ static void renameIfNeeded(String & name, const NameToNameMap & renames)
}
std::pair<ActionsDAGPtr, ActionsDAGPtr>
TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & right_sample_columns)
TableJoin::createConvertingActions(
const ColumnsWithTypeAndName & left_sample_columns,
const ColumnsWithTypeAndName & right_sample_columns)
{
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage());
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage(), forceFullSortingMergeJoin());
NameToNameMap left_key_column_rename;
NameToNameMap right_key_column_rename;
auto left_converting_actions = applyKeyConvertToTable(left_sample_columns, left_type_map, left_key_column_rename, forceNullableLeft());
auto right_converting_actions = applyKeyConvertToTable(right_sample_columns, right_type_map, right_key_column_rename, forceNullableRight());
auto left_converting_actions = applyKeyConvertToTable(
left_sample_columns, left_type_map, left_key_column_rename, forceNullableLeft());
auto right_converting_actions = applyKeyConvertToTable(
right_sample_columns, right_type_map, right_key_column_rename, forceNullableRight());
{
auto log_actions = [](const String & side, const ActionsDAGPtr & dag)
@ -536,7 +540,18 @@ TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_co
else
{
LOG_DEBUG(&Poco::Logger::get("TableJoin"), "{} JOIN converting actions: empty", side);
return;
}
auto format_cols = [](const auto & cols) -> std::string
{
std::vector<std::string> str_cols;
str_cols.reserve(cols.size());
for (const auto & col : cols)
str_cols.push_back(fmt::format("'{}': {}", col.name, col.type->getName()));
return fmt::format("[{}]", fmt::join(str_cols, ", "));
};
LOG_DEBUG(&Poco::Logger::get("TableJoin"), "{} JOIN converting actions: {} -> {}",
side, format_cols(dag->getRequiredColumns()), format_cols(dag->getResultColumns()));
};
log_actions("Left", left_converting_actions);
log_actions("Right", right_converting_actions);
@ -553,7 +568,7 @@ TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_co
}
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right)
void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right, bool strict)
{
if (!left_type_map.empty() || !right_type_map.empty())
return;
@ -578,39 +593,42 @@ void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const Rig
forAllKeys(clauses, [&](const auto & left_key_name, const auto & right_key_name)
{
auto ltype = left_types.find(left_key_name);
auto rtype = right_types.find(right_key_name);
if (ltype == left_types.end() || rtype == right_types.end())
auto ltypeit = left_types.find(left_key_name);
auto rtypeit = right_types.find(right_key_name);
if (ltypeit == left_types.end() || rtypeit == right_types.end())
{
/// Name mismatch, give up
left_type_map.clear();
right_type_map.clear();
return false;
}
const auto & ltype = ltypeit->second;
const auto & rtype = rtypeit->second;
if (JoinCommon::typesEqualUpToNullability(ltype->second, rtype->second))
bool type_equals = strict ? ltype->equals(*rtype) : JoinCommon::typesEqualUpToNullability(ltype, rtype);
if (type_equals)
return true;
DataTypePtr common_type;
try
{
/// TODO(vdimir): use getMostSubtype if possible
common_type = DB::getLeastSupertype(DataTypes{ltype->second, rtype->second});
common_type = DB::getLeastSupertype(DataTypes{ltype, rtype});
}
catch (DB::Exception & ex)
{
throw DB::Exception(ErrorCodes::TYPE_MISMATCH,
"Can't infer common type for joined columns: {}: {} at left, {}: {} at right ({})",
left_key_name, ltype->second->getName(),
right_key_name, rtype->second->getName(),
"Can't infer common type for joined columns: {}: {} at left, {}: {} at right. {}",
left_key_name, ltype->getName(),
right_key_name, rtype->getName(),
ex.message());
}
bool right_side_changed = !common_type->equals(*rtype->second);
if (right_side_changed && !allow_right)
if (!allow_right && !common_type->equals(*rtype))
{
throw DB::Exception(ErrorCodes::TYPE_MISMATCH,
"Can't change type for right table: {}: {} -> {}",
right_key_name, rtype->second->getName(), common_type->getName());
"Can't change type for right table: {}: {} -> {}.",
right_key_name, rtype->getName(), common_type->getName());
}
left_type_map[left_key_name] = right_type_map[right_key_name] = common_type;
@ -646,10 +664,18 @@ static ActionsDAGPtr changeKeyTypes(const ColumnsWithTypeAndName & cols_src,
if (!has_some_to_do)
return nullptr;
return ActionsDAG::makeConvertingActions(cols_src, cols_dst, ActionsDAG::MatchColumnsMode::Name, true, add_new_cols, &key_column_rename);
return ActionsDAG::makeConvertingActions(
/* source= */ cols_src,
/* result= */ cols_dst,
/* mode= */ ActionsDAG::MatchColumnsMode::Name,
/* ignore_constant_values= */ true,
/* add_casted_columns= */ add_new_cols,
/* new_names= */ &key_column_rename);
}
static ActionsDAGPtr changeTypesToNullable(const ColumnsWithTypeAndName & cols_src, const NameSet & exception_cols)
static ActionsDAGPtr changeTypesToNullable(
const ColumnsWithTypeAndName & cols_src,
const NameSet & exception_cols)
{
ColumnsWithTypeAndName cols_dst = cols_src;
bool has_some_to_do = false;
@ -664,7 +690,14 @@ static ActionsDAGPtr changeTypesToNullable(const ColumnsWithTypeAndName & cols_s
if (!has_some_to_do)
return nullptr;
return ActionsDAG::makeConvertingActions(cols_src, cols_dst, ActionsDAG::MatchColumnsMode::Name, true, false, nullptr);
return ActionsDAG::makeConvertingActions(
/* source= */ cols_src,
/* result= */ cols_dst,
/* mode= */ ActionsDAG::MatchColumnsMode::Name,
/* ignore_constant_values= */ true,
/* add_casted_columns= */ false,
/* new_names= */ nullptr);
}
ActionsDAGPtr TableJoin::applyKeyConvertToTable(
@ -679,7 +712,7 @@ ActionsDAGPtr TableJoin::applyKeyConvertToTable(
/// Create DAG to make columns nullable if needed
if (make_nullable)
{
/// Do not need to make nullable temporary columns that would be used only as join keys, but now shown to user
/// Do not need to make nullable temporary columns that would be used only as join keys, but is not visible to user
NameSet cols_not_nullable;
for (const auto & t : key_column_rename)
cols_not_nullable.insert(t.second);
@ -788,7 +821,6 @@ void TableJoin::assertHasOneOnExpr() const
text.push_back(onexpr.formatDebug());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Expected to have only one join clause, got {}: [{}], query: '{}'",
clauses.size(), fmt::join(text, " | "), queryToString(table_join));
}
}

View File

@ -156,7 +156,8 @@ private:
/// Create converting actions and change key column names if required
ActionsDAGPtr applyKeyConvertToTable(
const ColumnsWithTypeAndName & cols_src, const NameToTypeMap & type_mapping, NameToNameMap & key_column_rename,
const ColumnsWithTypeAndName & cols_src, const NameToTypeMap & type_mapping,
NameToNameMap & key_column_rename,
bool make_nullable) const;
void addKey(const String & left_name, const String & right_name, const ASTPtr & left_ast, const ASTPtr & right_ast = nullptr);
@ -165,7 +166,7 @@ private:
/// Calculates common supertypes for corresponding join key columns.
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
void inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right);
void inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right, bool strict);
NamesAndTypesList correctedColumnsAddedByJoin() const;
@ -199,6 +200,10 @@ public:
bool preferMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PREFER_PARTIAL_MERGE); }
bool forceMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARTIAL_MERGE); }
bool allowParallelHashJoin() const;
bool forceFullSortingMergeJoin() const { return !isSpecialStorage() && join_algorithm.isSet(JoinAlgorithm::FULL_SORTING_MERGE); }
bool forceHashJoin() const
{
/// HashJoin always used for DictJoin
@ -206,8 +211,8 @@ public:
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::HASH)
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARALLEL_HASH);
}
bool allowParallelHashJoin() const;
bool joinUseNulls() const { return join_use_nulls; }
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }
bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); }
size_t defaultMaxBytes() const { return default_max_bytes; }
@ -275,7 +280,9 @@ public:
/// For `USING` join we will convert key columns inplace and affect into types in the result table
/// For `JOIN ON` we will create new columns with converted keys to join by.
std::pair<ActionsDAGPtr, ActionsDAGPtr>
createConvertingActions(const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & right_sample_columns);
createConvertingActions(
const ColumnsWithTypeAndName & left_sample_columns,
const ColumnsWithTypeAndName & right_sample_columns);
void setAsofInequality(ASOF::Inequality inequality) { asof_inequality = inequality; }
ASOF::Inequality getAsofInequality() { return asof_inequality; }

View File

@ -1,4 +1,5 @@
#include <algorithm>
#include <memory>
#include <Core/Settings.h>
#include <Core/NamesAndTypes.h>
@ -28,6 +29,7 @@
#include <Interpreters/PredicateExpressionsOptimizer.h>
#include <Interpreters/RewriteOrderByVisitor.hpp>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
@ -635,9 +637,8 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul
}
else
{
if (table_join.strictness == ASTTableJoin::Strictness::Any)
if (table_join.kind == ASTTableJoin::Kind::Full)
throw Exception("ANY FULL JOINs are not implemented.", ErrorCodes::NOT_IMPLEMENTED);
if (table_join.strictness == ASTTableJoin::Strictness::Any && table_join.kind == ASTTableJoin::Kind::Full)
throw Exception("ANY FULL JOINs are not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
out_table_join = table_join;

View File

@ -440,7 +440,7 @@ void checkTypesOfKeys(const Block & block_left, const Names & key_names_left,
{
throw DB::Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch of columns to JOIN by: {} {} at left, {} {} at right",
"Type mismatch of columns to JOIN by: {} :: {} at left, {} :: {} at right",
key_names_left[i], left_type->getName(),
key_names_right[i], right_type->getName());
}

View File

@ -113,16 +113,15 @@ struct ASTTableJoin : public IAST
void updateTreeHashImpl(SipHash & hash_state) const override;
};
inline bool isLeft(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Left; }
inline bool isRight(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Right; }
inline bool isInner(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Inner; }
inline bool isFull(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Full; }
inline bool isCrossOrComma(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Comma || kind == ASTTableJoin::Kind::Cross; }
inline bool isRightOrFull(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Right || kind == ASTTableJoin::Kind::Full; }
inline bool isLeftOrFull(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Left || kind == ASTTableJoin::Kind::Full; }
inline bool isInnerOrRight(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right; }
inline bool isInnerOrLeft(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Left; }
inline constexpr bool isLeft(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Left; }
inline constexpr bool isRight(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Right; }
inline constexpr bool isInner(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Inner; }
inline constexpr bool isFull(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Full; }
inline constexpr bool isCrossOrComma(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Comma || kind == ASTTableJoin::Kind::Cross; }
inline constexpr bool isRightOrFull(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Right || kind == ASTTableJoin::Kind::Full; }
inline constexpr bool isLeftOrFull(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Left || kind == ASTTableJoin::Kind::Full; }
inline constexpr bool isInnerOrRight(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right; }
inline constexpr bool isInnerOrLeft(ASTTableJoin::Kind kind) { return kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Left; }
/// Specification of ARRAY JOIN.
struct ASTArrayJoin : public IAST

View File

@ -21,6 +21,25 @@ IMergingTransformBase::IMergingTransformBase(
{
}
static InputPorts createPorts(const Blocks & blocks)
{
InputPorts ports;
for (const auto & block : blocks)
ports.emplace_back(block);
return ports;
}
IMergingTransformBase::IMergingTransformBase(
const Blocks & input_headers,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_)
: IProcessor(createPorts(input_headers), {output_header})
, have_all_inputs(have_all_inputs_)
, limit_hint(limit_hint_)
{
}
void IMergingTransformBase::onNewInput()
{
throw Exception("onNewInput is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
@ -170,6 +189,10 @@ IProcessor::Status IMergingTransformBase::prepare()
state.has_input = true;
}
else
{
state.no_data = true;
}
state.need_data = false;
}

View File

@ -19,6 +19,12 @@ public:
bool have_all_inputs_,
UInt64 limit_hint_);
IMergingTransformBase(
const Blocks & input_headers,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_);
OutputPort & getOutputPort() { return outputs.front(); }
/// Methods to add additional input port. It is possible to do only before the first call of `prepare`.
@ -40,6 +46,7 @@ protected:
bool has_input = false;
bool is_finished = false;
bool need_data = false;
bool no_data = false;
size_t next_input_to_read = 0;
IMergingAlgorithm::Inputs init_chunks;
@ -82,6 +89,20 @@ public:
{
}
template <typename ... Args>
IMergingTransform(
const Blocks & input_headers,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_,
bool empty_chunk_on_finish_,
Args && ... args)
: IMergingTransformBase(input_headers, output_header, have_all_inputs_, limit_hint_)
, empty_chunk_on_finish(empty_chunk_on_finish_)
, algorithm(std::forward<Args>(args) ...)
{
}
void work() override
{
if (!state.init_chunks.empty())
@ -94,6 +115,12 @@ public:
algorithm.consume(state.input_chunk, state.next_input_to_read);
state.has_input = false;
}
else if (state.no_data && empty_chunk_on_finish)
{
IMergingAlgorithm::Input current_input;
algorithm.consume(current_input, state.next_input_to_read);
state.no_data = false;
}
IMergingAlgorithm::Status status = algorithm.merge();
@ -118,6 +145,9 @@ public:
}
protected:
/// Call `consume` with empty chunk when there is no more data.
bool empty_chunk_on_finish = false;
Algorithm algorithm;
/// Profile info.

View File

@ -10,8 +10,11 @@ namespace ErrorCodes
void connect(OutputPort & output, InputPort & input)
{
if (input.state || output.state)
throw Exception("Port is already connected", ErrorCodes::LOGICAL_ERROR);
if (input.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure());
if (output.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure());
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();

View File

@ -2,6 +2,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/IJoin.h>
#include <Common/typeid_cast.h>
namespace DB
{
@ -32,7 +33,13 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
if (pipelines.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
return QueryPipelineBuilder::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, max_streams, keep_left_read_in_order, &processors);
if (join->pipelineType() == JoinPipelineType::YShaped)
return QueryPipelineBuilder::joinPipelinesYShaped(
std::move(pipelines[0]), std::move(pipelines[1]),
join, output_stream->header,
max_block_size, &processors);
return QueryPipelineBuilder::joinPipelinesRightLeft(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, max_streams, keep_left_read_in_order, &processors);
}
void JoinStep::describePipeline(FormatSettings & settings) const

View File

@ -0,0 +1,865 @@
#include <cassert>
#include <cstddef>
#include <limits>
#include <memory>
#include <optional>
#include <type_traits>
#include <vector>
#include <base/defines.h>
#include <base/types.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <Core/SortCursor.h>
#include <Core/SortDescription.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TableJoin.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Transforms/MergeJoinTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
using JoinKind = ASTTableJoin::Kind;
namespace
{
FullMergeJoinCursorPtr createCursor(const Block & block, const Names & columns)
{
SortDescription desc;
desc.reserve(columns.size());
for (const auto & name : columns)
desc.emplace_back(name);
return std::make_unique<FullMergeJoinCursor>(materializeBlock(block), desc);
}
template <bool has_left_nulls, bool has_right_nulls>
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos, int null_direction_hint = 1)
{
if constexpr (has_left_nulls && has_right_nulls)
{
const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column);
const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column);
if (left_nullable && right_nullable)
{
int res = left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
if (res)
return res;
/// NULL != NULL case
if (left_column.isNullAt(lhs_pos))
return null_direction_hint;
return 0;
}
}
if constexpr (has_left_nulls)
{
if (const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column))
{
if (left_column.isNullAt(lhs_pos))
return null_direction_hint;
return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
}
}
if constexpr (has_right_nulls)
{
if (const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column))
{
if (right_column.isNullAt(rhs_pos))
return -null_direction_hint;
return left_column.compareAt(lhs_pos, rhs_pos, right_nullable->getNestedColumn(), null_direction_hint);
}
}
return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
}
int ALWAYS_INLINE compareCursors(const SortCursorImpl & lhs, size_t lpos,
const SortCursorImpl & rhs, size_t rpos)
{
for (size_t i = 0; i < lhs.sort_columns_size; ++i)
{
/// TODO(@vdimir): use nullableCompareAt only if there's nullable columns
int cmp = nullableCompareAt<true, true>(*lhs.sort_columns[i], *rhs.sort_columns[i], lpos, rpos);
if (cmp != 0)
return cmp;
}
return 0;
}
int ALWAYS_INLINE compareCursors(const SortCursorImpl & lhs, const SortCursorImpl & rhs)
{
return compareCursors(lhs, lhs.getRow(), rhs, rhs.getRow());
}
bool ALWAYS_INLINE totallyLess(SortCursorImpl & lhs, SortCursorImpl & rhs)
{
/// The last row of left cursor is less than the current row of the right cursor.
int cmp = compareCursors(lhs, lhs.rows - 1, rhs, rhs.getRow());
return cmp < 0;
}
int ALWAYS_INLINE totallyCompare(SortCursorImpl & lhs, SortCursorImpl & rhs)
{
if (totallyLess(lhs, rhs))
return -1;
if (totallyLess(rhs, lhs))
return 1;
return 0;
}
ColumnPtr indexColumn(const ColumnPtr & column, const PaddedPODArray<UInt64> & indices)
{
auto new_col = column->cloneEmpty();
new_col->reserve(indices.size());
for (size_t idx : indices)
{
/// rows where default value should be inserted have index == size
if (idx < column->size())
new_col->insertFrom(*column, idx);
else
new_col->insertDefault();
}
return new_col;
}
Columns indexColumns(const Columns & columns, const PaddedPODArray<UInt64> & indices)
{
Columns new_columns;
new_columns.reserve(columns.size());
for (const auto & column : columns)
{
new_columns.emplace_back(indexColumn(column, indices));
}
return new_columns;
}
bool sameNext(const SortCursorImpl & impl, std::optional<size_t> pos_opt = {})
{
size_t pos = pos_opt.value_or(impl.getRow());
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
const auto & col = *impl.sort_columns[i];
if (auto cmp = col.compareAt(pos, pos + 1, col, impl.desc[i].nulls_direction); cmp != 0)
return false;
}
return true;
}
size_t nextDistinct(SortCursorImpl & impl)
{
assert(impl.isValid());
size_t start_pos = impl.getRow();
while (!impl.isLast() && sameNext(impl))
{
impl.next();
}
impl.next();
if (impl.isValid())
return impl.getRow() - start_pos;
return impl.rows - start_pos;
}
ColumnPtr replicateRow(const IColumn & column, size_t num)
{
MutableColumnPtr res = column.cloneEmpty();
res->insertManyFrom(column, 0, num);
return res;
}
template <typename TColumns>
void copyColumnsResized(const TColumns & cols, size_t start, size_t size, Chunk & result_chunk)
{
for (const auto & col : cols)
{
if (col->empty())
{
/// add defaults
result_chunk.addColumn(col->cloneResized(size));
}
else if (col->size() == 1)
{
/// copy same row n times
result_chunk.addColumn(replicateRow(*col, size));
}
else
{
/// cut column
assert(start + size <= col->size());
result_chunk.addColumn(col->cut(start, size));
}
}
}
Chunk copyChunkResized(const Chunk & lhs, const Chunk & rhs, size_t start, size_t num_rows)
{
Chunk result;
copyColumnsResized(lhs.getColumns(), start, num_rows, result);
copyColumnsResized(rhs.getColumns(), start, num_rows, result);
return result;
}
Chunk getRowFromChunk(const Chunk & chunk, size_t pos)
{
Chunk result;
copyColumnsResized(chunk.getColumns(), pos, 1, result);
return result;
}
void inline addRange(PaddedPODArray<UInt64> & left_map, size_t start, size_t end)
{
assert(end > start);
for (size_t i = start; i < end; ++i)
left_map.push_back(i);
}
void inline addMany(PaddedPODArray<UInt64> & left_map, size_t idx, size_t num)
{
for (size_t i = 0; i < num; ++i)
left_map.push_back(idx);
}
}
const Chunk & FullMergeJoinCursor::getCurrent() const
{
return current_chunk;
}
Chunk FullMergeJoinCursor::detach()
{
cursor = SortCursorImpl();
return std::move(current_chunk);
}
void FullMergeJoinCursor::setChunk(Chunk && chunk)
{
assert(!recieved_all_blocks);
assert(!cursor.isValid());
if (!chunk)
{
recieved_all_blocks = true;
detach();
return;
}
current_chunk = std::move(chunk);
cursor = SortCursorImpl(sample_block, current_chunk.getColumns(), desc);
}
bool FullMergeJoinCursor::fullyCompleted() const
{
return !cursor.isValid() && recieved_all_blocks;
}
MergeJoinAlgorithm::MergeJoinAlgorithm(
JoinPtr table_join_,
const Blocks & input_headers,
size_t max_block_size_)
: table_join(table_join_)
, max_block_size(max_block_size_)
, log(&Poco::Logger::get("MergeJoinAlgorithm"))
{
if (input_headers.size() != 2)
throw Exception("MergeJoinAlgorithm requires exactly two inputs", ErrorCodes::LOGICAL_ERROR);
auto strictness = table_join->getTableJoin().strictness();
if (strictness != ASTTableJoin::Strictness::Any && strictness != ASTTableJoin::Strictness::All)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeJoinAlgorithm is not implemented for strictness {}", strictness);
auto kind = table_join->getTableJoin().kind();
if (!isInner(kind) && !isLeft(kind) && !isRight(kind) && !isFull(kind))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeJoinAlgorithm is not implemented for kind {}", kind);
const auto & join_on = table_join->getTableJoin().getOnlyClause();
if (join_on.on_filter_condition_left || join_on.on_filter_condition_right)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeJoinAlgorithm does not support ON filter conditions");
cursors.push_back(createCursor(input_headers[0], join_on.key_names_left));
cursors.push_back(createCursor(input_headers[1], join_on.key_names_right));
for (const auto & [left_key, right_key] : table_join->getTableJoin().leftToRightKeyRemap())
{
size_t left_idx = input_headers[0].getPositionByName(left_key);
size_t right_idx = input_headers[1].getPositionByName(right_key);
left_to_right_key_remap[left_idx] = right_idx;
}
}
static void prepareChunk(Chunk & chunk)
{
if (!chunk)
return;
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
}
void MergeJoinAlgorithm::initialize(Inputs inputs)
{
if (inputs.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Two inputs arerequired, got {}", inputs.size());
LOG_DEBUG(log, "Initialize, number of inputs: {}", inputs.size());
for (size_t i = 0; i < inputs.size(); ++i)
{
assert(inputs[i].chunk.getNumColumns() == cursors[i]->sampleBlock().columns());
prepareChunk(inputs[i].chunk);
copyColumnsResized(inputs[i].chunk.getColumns(), 0, 0, sample_chunks.emplace_back());
consume(inputs[i], i);
}
}
void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
{
if (input.skip_last_row)
throw Exception("skip_last_row is not supported", ErrorCodes::NOT_IMPLEMENTED);
if (input.permutation)
throw DB::Exception("permutation is not supported", ErrorCodes::NOT_IMPLEMENTED);
if (input.chunk)
{
stat.num_blocks[source_num] += 1;
stat.num_rows[source_num] += input.chunk.getNumRows();
}
prepareChunk(input.chunk);
cursors[source_num]->setChunk(std::move(input.chunk));
}
template <JoinKind kind>
struct AllJoinImpl
{
constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind);
static void join(FullMergeJoinCursor & left_cursor,
FullMergeJoinCursor & right_cursor,
size_t max_block_size,
PaddedPODArray<UInt64> & left_map,
PaddedPODArray<UInt64> & right_map,
std::unique_ptr<AllJoinState> & state)
{
right_map.clear();
right_map.reserve(max_block_size);
left_map.clear();
left_map.reserve(max_block_size);
size_t rpos = std::numeric_limits<size_t>::max();
size_t lpos = std::numeric_limits<size_t>::max();
int cmp = 0;
assert(left_cursor->isValid() && right_cursor->isValid());
while (left_cursor->isValid() && right_cursor->isValid())
{
lpos = left_cursor->getRow();
rpos = right_cursor->getRow();
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor);
if (cmp == 0)
{
size_t lnum = nextDistinct(left_cursor.cursor);
size_t rnum = nextDistinct(right_cursor.cursor);
bool all_fit_in_block = std::max(left_map.size(), right_map.size()) + lnum * rnum <= max_block_size;
bool have_all_ranges = left_cursor.cursor.isValid() && right_cursor.cursor.isValid();
if (all_fit_in_block && have_all_ranges)
{
/// fast path if all joined rows fit in one block
for (size_t i = 0; i < rnum; ++i)
{
addRange(left_map, lpos, left_cursor.cursor.getRow());
addMany(right_map, rpos + i, lnum);
}
}
else
{
assert(state == nullptr);
state = std::make_unique<AllJoinState>(left_cursor.cursor, lpos, right_cursor.cursor, rpos);
state->addRange(0, left_cursor.getCurrent().clone(), lpos, lnum);
state->addRange(1, right_cursor.getCurrent().clone(), rpos, rnum);
return;
}
}
else if (cmp < 0)
{
size_t num = nextDistinct(left_cursor.cursor);
if constexpr (isLeftOrFull(kind))
{
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
for (size_t i = lpos; i < left_cursor->getRow(); ++i)
left_map.push_back(i);
}
}
else
{
size_t num = nextDistinct(right_cursor.cursor);
if constexpr (isRightOrFull(kind))
{
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
for (size_t i = rpos; i < right_cursor->getRow(); ++i)
right_map.push_back(i);
}
}
}
}
};
template <template<JoinKind> class Impl, typename ... Args>
void dispatchKind(JoinKind kind, Args && ... args)
{
if (Impl<JoinKind::Inner>::enabled && kind == JoinKind::Inner)
return Impl<JoinKind::Inner>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Left>::enabled && kind == JoinKind::Left)
return Impl<JoinKind::Left>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Right>::enabled && kind == JoinKind::Right)
return Impl<JoinKind::Right>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Full>::enabled && kind == JoinKind::Full)
return Impl<JoinKind::Full>::join(std::forward<Args>(args)...);
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", kind);
}
std::optional<MergeJoinAlgorithm::Status> MergeJoinAlgorithm::handleAllJoinState()
{
if (all_join_state && all_join_state->finished())
{
all_join_state.reset();
}
if (all_join_state)
{
/// Accumulate blocks with same key in all_join_state
for (size_t i = 0; i < cursors.size(); ++i)
{
if (cursors[i]->cursor.isValid() && all_join_state->keys[i].equals(cursors[i]->cursor))
{
size_t pos = cursors[i]->cursor.getRow();
size_t num = nextDistinct(cursors[i]->cursor);
all_join_state->addRange(i, cursors[i]->getCurrent().clone(), pos, num);
}
}
for (size_t i = 0; i < cursors.size(); ++i)
{
if (!cursors[i]->cursor.isValid() && !cursors[i]->fullyCompleted())
{
return Status(i);
}
}
/// If current position is valid, then we've found new key, can join accumulated data
stat.max_blocks_loaded = std::max(stat.max_blocks_loaded, all_join_state->blocksStored());
/// join all rows with current key
MutableColumns result_cols;
for (size_t i = 0; i < 2; ++i)
{
for (const auto & col : sample_chunks[i].getColumns())
result_cols.push_back(col->cloneEmpty());
}
size_t total_rows = 0;
while (total_rows < max_block_size)
{
const auto & left_range = all_join_state->getLeft();
const auto & right_range = all_join_state->getRight();
total_rows += left_range.length;
size_t i = 0;
/// Copy left block
for (const auto & col : left_range.chunk.getColumns())
result_cols[i++]->insertRangeFrom(*col, left_range.begin, left_range.length);
/// And replicate current right column
for (const auto & col : right_range.chunk.getColumns())
result_cols[i++]->insertManyFrom(*col, right_range.current, left_range.length);
bool valid = all_join_state->next();
if (!valid)
break;
}
if (total_rows)
return Status(Chunk(std::move(result_cols), total_rows));
}
return {};
}
MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
{
PaddedPODArray<UInt64> idx_map[2];
dispatchKind<AllJoinImpl>(kind, *cursors[0], *cursors[1], max_block_size, idx_map[0], idx_map[1], all_join_state);
assert(idx_map[0].size() == idx_map[1].size());
Chunk result;
Columns rcols = indexColumns(cursors[1]->getCurrent().getColumns(), idx_map[1]);
Columns lcols;
if (!left_to_right_key_remap.empty())
{
/// If we have remapped columns, then we need to get values from right columns insead of defaults
const auto & indices = idx_map[0];
const auto & left_src = cursors[0]->getCurrent().getColumns();
for (size_t col_idx = 0; col_idx < left_src.size(); ++col_idx)
{
const auto & col = left_src[col_idx];
auto new_col = col->cloneEmpty();
new_col->reserve(indices.size());
for (size_t i = 0; i < indices.size(); ++i)
{
if (indices[i] < col->size())
{
new_col->insertFrom(*col, indices[i]);
}
else
{
if (auto it = left_to_right_key_remap.find(col_idx); it != left_to_right_key_remap.end())
new_col->insertFrom(*rcols[it->second], i);
else
new_col->insertDefault();
}
}
lcols.push_back(std::move(new_col));
}
}
else
{
lcols = indexColumns(cursors[0]->getCurrent().getColumns(), idx_map[0]);
}
for (auto & col : lcols)
result.addColumn(std::move(col));
for (auto & col : rcols)
result.addColumn(std::move(col));
return Status(std::move(result));
}
template <JoinKind kind>
struct AnyJoinImpl
{
constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind);
static void join(FullMergeJoinCursor & left_cursor,
FullMergeJoinCursor & right_cursor,
PaddedPODArray<UInt64> & left_map,
PaddedPODArray<UInt64> & right_map,
AnyJoinState & state)
{
assert(enabled);
size_t num_rows = isLeft(kind) ? left_cursor->rowsLeft() :
isRight(kind) ? right_cursor->rowsLeft() :
std::min(left_cursor->rowsLeft(), right_cursor->rowsLeft());
if constexpr (isLeft(kind) || isInner(kind))
right_map.reserve(num_rows);
if constexpr (isRight(kind) || isInner(kind))
left_map.reserve(num_rows);
size_t rpos = std::numeric_limits<size_t>::max();
size_t lpos = std::numeric_limits<size_t>::max();
assert(left_cursor->isValid() && right_cursor->isValid());
int cmp = 0;
while (left_cursor->isValid() && right_cursor->isValid())
{
lpos = left_cursor->getRow();
rpos = right_cursor->getRow();
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor);
if (cmp == 0)
{
if constexpr (isLeftOrFull(kind))
{
size_t lnum = nextDistinct(left_cursor.cursor);
right_map.resize_fill(right_map.size() + lnum, rpos);
}
if constexpr (isRightOrFull(kind))
{
size_t rnum = nextDistinct(right_cursor.cursor);
left_map.resize_fill(left_map.size() + rnum, lpos);
}
if constexpr (isInner(kind))
{
nextDistinct(left_cursor.cursor);
nextDistinct(right_cursor.cursor);
left_map.emplace_back(lpos);
right_map.emplace_back(rpos);
}
}
else if (cmp < 0)
{
size_t num = nextDistinct(left_cursor.cursor);
if constexpr (isLeftOrFull(kind))
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
}
else
{
size_t num = nextDistinct(right_cursor.cursor);
if constexpr (isRightOrFull(kind))
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
}
}
/// Remember index of last joined row to propagate it to next block
state.setValue({});
if (!left_cursor->isValid())
{
state.set(0, left_cursor.cursor);
if (cmp == 0 && isLeft(kind))
state.setValue(getRowFromChunk(right_cursor.getCurrent(), rpos));
}
if (!right_cursor->isValid())
{
state.set(1, right_cursor.cursor);
if (cmp == 0 && isRight(kind))
state.setValue(getRowFromChunk(left_cursor.getCurrent(), lpos));
}
}
};
std::optional<MergeJoinAlgorithm::Status> MergeJoinAlgorithm::handleAnyJoinState()
{
if (any_join_state.empty())
return {};
auto kind = table_join->getTableJoin().kind();
Chunk result;
for (size_t source_num = 0; source_num < 2; ++source_num)
{
auto & current = *cursors[source_num];
auto & state = any_join_state;
if (any_join_state.keys[source_num].equals(current.cursor))
{
size_t start_pos = current->getRow();
size_t length = nextDistinct(current.cursor);
if (length && isLeft(kind) && source_num == 0)
{
if (state.value)
result = copyChunkResized(current.getCurrent(), state.value, start_pos, length);
else
result = createBlockWithDefaults(source_num, start_pos, length);
}
if (length && isRight(kind) && source_num == 1)
{
if (state.value)
result = copyChunkResized(state.value, current.getCurrent(), start_pos, length);
else
result = createBlockWithDefaults(source_num, start_pos, length);
}
/// We've found row with other key, no need to skip more rows with current key
if (current->isValid())
{
state.keys[source_num].reset();
}
}
else
{
any_join_state.keys[source_num].reset();
}
}
if (result)
return Status(std::move(result));
return {};
}
MergeJoinAlgorithm::Status MergeJoinAlgorithm::anyJoin(JoinKind kind)
{
if (auto result = handleAnyJoinState())
return std::move(*result);
auto & current_left = cursors[0]->cursor;
if (!current_left.isValid())
return Status(0);
auto & current_right = cursors[1]->cursor;
if (!current_right.isValid())
return Status(1);
/// join doesn't build result block, but returns indices where result rows should be placed
PaddedPODArray<UInt64> idx_map[2];
size_t prev_pos[] = {current_left.getRow(), current_right.getRow()};
dispatchKind<AnyJoinImpl>(kind, *cursors[0], *cursors[1], idx_map[0], idx_map[1], any_join_state);
assert(idx_map[0].empty() || idx_map[1].empty() || idx_map[0].size() == idx_map[1].size());
size_t num_result_rows = std::max(idx_map[0].size(), idx_map[1].size());
/// build result block from indices
Chunk result;
for (size_t source_num = 0; source_num < 2; ++source_num)
{
/// empty map means identity mapping
if (!idx_map[source_num].empty())
{
for (const auto & col : cursors[source_num]->getCurrent().getColumns())
{
result.addColumn(indexColumn(col, idx_map[source_num]));
}
}
else
{
for (const auto & col : cursors[source_num]->getCurrent().getColumns())
{
result.addColumn(col->cut(prev_pos[source_num], num_result_rows));
}
}
}
return Status(std::move(result));
}
/// if `source_num == 0` get data from left cursor and fill defaults at right
/// otherwise - vice versa
Chunk MergeJoinAlgorithm::createBlockWithDefaults(size_t source_num, size_t start, size_t num_rows) const
{
ColumnRawPtrs cols;
{
const auto & columns_left = source_num == 0 ? cursors[0]->getCurrent().getColumns() : sample_chunks[0].getColumns();
const auto & columns_right = source_num == 1 ? cursors[1]->getCurrent().getColumns() : sample_chunks[1].getColumns();
for (size_t i = 0; i < columns_left.size(); ++i)
{
if (auto it = left_to_right_key_remap.find(i); source_num == 0 || it == left_to_right_key_remap.end())
{
cols.push_back(columns_left[i].get());
}
else
{
cols.push_back(columns_right[it->second].get());
}
}
for (const auto & col : columns_right)
{
cols.push_back(col.get());
}
}
Chunk result_chunk;
copyColumnsResized(cols, start, num_rows, result_chunk);
return result_chunk;
}
/// This function also flushes cursor
Chunk MergeJoinAlgorithm::createBlockWithDefaults(size_t source_num)
{
Chunk result_chunk = createBlockWithDefaults(source_num, cursors[source_num]->cursor.getRow(), cursors[source_num]->cursor.rowsLeft());
cursors[source_num]->detach();
return result_chunk;
}
IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
{
auto kind = table_join->getTableJoin().kind();
if (!cursors[0]->cursor.isValid() && !cursors[0]->fullyCompleted())
return Status(0);
if (!cursors[1]->cursor.isValid() && !cursors[1]->fullyCompleted())
return Status(1);
if (auto result = handleAllJoinState())
return std::move(*result);
if (cursors[0]->fullyCompleted() || cursors[1]->fullyCompleted())
{
if (!cursors[0]->fullyCompleted() && isLeftOrFull(kind))
return Status(createBlockWithDefaults(0));
if (!cursors[1]->fullyCompleted() && isRightOrFull(kind))
return Status(createBlockWithDefaults(1));
return Status({}, true);
}
/// check if blocks are not intersecting at all
if (int cmp = totallyCompare(cursors[0]->cursor, cursors[1]->cursor); cmp != 0)
{
if (cmp < 0)
{
if (isLeftOrFull(kind))
return Status(createBlockWithDefaults(0));
cursors[0]->detach();
return Status(0);
}
if (cmp > 0)
{
if (isRightOrFull(kind))
return Status(createBlockWithDefaults(1));
cursors[1]->detach();
return Status(1);
}
}
auto strictness = table_join->getTableJoin().strictness();
if (strictness == ASTTableJoin::Strictness::Any)
return anyJoin(kind);
if (strictness == ASTTableJoin::Strictness::All)
return allJoin(kind);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported strictness '{}'", strictness);
}
MergeJoinTransform::MergeJoinTransform(
JoinPtr table_join,
const Blocks & input_headers,
const Block & output_header,
size_t max_block_size,
UInt64 limit_hint_)
: IMergingTransform<MergeJoinAlgorithm>(
input_headers,
output_header,
/* have_all_inputs_= */ true,
limit_hint_,
/* empty_chunk_on_finish_= */ true,
table_join, input_headers, max_block_size)
, log(&Poco::Logger::get("MergeJoinTransform"))
{
LOG_TRACE(log, "Use MergeJoinTransform");
}
void MergeJoinTransform::onFinish()
{
algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
}
}

View File

@ -0,0 +1,315 @@
#pragma once
#include <cassert>
#include <cstddef>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <utility>
#include <vector>
#include <boost/core/noncopyable.hpp>
#include <Common/PODArray.h>
#include <Core/SortCursor.h>
#include <Core/SortDescription.h>
#include <IO/ReadBuffer.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Chunk.h>
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>
namespace Poco { class Logger; }
namespace DB
{
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class FullMergeJoinCursor;
using FullMergeJoinCursorPtr = std::unique_ptr<FullMergeJoinCursor>;
/// Used instead of storing previous block
struct JoinKeyRow
{
std::vector<ColumnPtr> row;
JoinKeyRow() = default;
explicit JoinKeyRow(const SortCursorImpl & impl_, size_t pos)
{
row.reserve(impl_.sort_columns.size());
for (const auto & col : impl_.sort_columns)
{
auto new_col = col->cloneEmpty();
new_col->insertFrom(*col, pos);
row.push_back(std::move(new_col));
}
}
void reset()
{
row.clear();
}
bool equals(const SortCursorImpl & impl) const
{
if (row.empty())
return false;
assert(this->row.size() == impl.sort_columns_size);
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
int cmp = this->row[i]->compareAt(0, impl.getRow(), *impl.sort_columns[i], impl.desc[i].nulls_direction);
if (cmp != 0)
return false;
}
return true;
}
};
/// Remembers previous key if it was joined in previous block
class AnyJoinState : boost::noncopyable
{
public:
AnyJoinState() = default;
void set(size_t source_num, const SortCursorImpl & cursor)
{
assert(cursor.rows);
keys[source_num] = JoinKeyRow(cursor, cursor.rows - 1);
}
void setValue(Chunk value_) { value = std::move(value_); }
bool empty() const { return keys[0].row.empty() && keys[1].row.empty(); }
/// current keys
JoinKeyRow keys[2];
/// for LEFT/RIGHT join use previously joined row from other table.
Chunk value;
};
/// Accumulate blocks with same key and cross-join them
class AllJoinState : boost::noncopyable
{
public:
struct Range
{
Range() = default;
explicit Range(Chunk chunk_, size_t begin_, size_t length_)
: begin(begin_)
, length(length_)
, current(begin_)
, chunk(std::move(chunk_))
{
assert(length > 0 && begin + length <= chunk.getNumRows());
}
size_t begin;
size_t length;
size_t current;
Chunk chunk;
};
AllJoinState(const SortCursorImpl & lcursor, size_t lpos,
const SortCursorImpl & rcursor, size_t rpos)
: keys{JoinKeyRow(lcursor, lpos), JoinKeyRow(rcursor, rpos)}
{
}
void addRange(size_t source_num, Chunk chunk, size_t begin, size_t length)
{
if (source_num == 0)
left.emplace_back(std::move(chunk), begin, length);
else
right.emplace_back(std::move(chunk), begin, length);
}
bool next()
{
/// advance right to one row, when right finished, advance left to next block
assert(!left.empty() && !right.empty());
if (finished())
return false;
bool has_next_right = nextRight();
if (has_next_right)
return true;
return nextLeft();
}
bool finished() const { return lidx >= left.size(); }
size_t blocksStored() const { return left.size() + right.size(); }
const Range & getLeft() const { return left[lidx]; }
const Range & getRight() const { return right[ridx]; }
/// Left and right types can be different because of nullable
JoinKeyRow keys[2];
private:
bool nextLeft()
{
lidx += 1;
return lidx < left.size();
}
bool nextRight()
{
/// cycle through right rows
right[ridx].current += 1;
if (right[ridx].current >= right[ridx].begin + right[ridx].length)
{
/// reset current row index to the beginning, because range will be accessed again
right[ridx].current = right[ridx].begin;
ridx += 1;
if (ridx >= right.size())
{
ridx = 0;
return false;
}
}
return true;
}
std::vector<Range> left;
std::vector<Range> right;
size_t lidx = 0;
size_t ridx = 0;
};
/*
* Wrapper for SortCursorImpl
*/
class FullMergeJoinCursor : boost::noncopyable
{
public:
explicit FullMergeJoinCursor(const Block & sample_block_, const SortDescription & description_)
: sample_block(sample_block_.cloneEmpty())
, desc(description_)
{
}
bool fullyCompleted() const;
void setChunk(Chunk && chunk);
const Chunk & getCurrent() const;
Chunk detach();
SortCursorImpl * operator-> () { return &cursor; }
const SortCursorImpl * operator-> () const { return &cursor; }
SortCursorImpl cursor;
const Block & sampleBlock() const { return sample_block; }
private:
Block sample_block;
SortDescription desc;
Chunk current_chunk;
bool recieved_all_blocks = false;
};
/*
* This class is used to join chunks from two sorted streams.
* It is used in MergeJoinTransform.
*/
class MergeJoinAlgorithm final : public IMergingAlgorithm
{
public:
explicit MergeJoinAlgorithm(JoinPtr table_join, const Blocks & input_headers, size_t max_block_size_);
virtual void initialize(Inputs inputs) override;
virtual void consume(Input & input, size_t source_num) override;
virtual Status merge() override;
void logElapsed(double seconds, bool force)
{
/// Do not log more frequently than once per ten seconds
if (seconds - stat.last_log_seconds < 10 && !force)
return;
LOG_TRACE(log,
"Finished pocessing in {} seconds"
", left: {} blocks, {} rows; right: {} blocks, {} rows"
", max blocks loaded to memory: {}",
seconds, stat.num_blocks[0], stat.num_rows[0], stat.num_blocks[1], stat.num_rows[1],
stat.max_blocks_loaded);
stat.last_log_seconds = seconds;
}
private:
std::optional<Status> handleAnyJoinState();
Status anyJoin(ASTTableJoin::Kind kind);
std::optional<Status> handleAllJoinState();
Status allJoin(ASTTableJoin::Kind kind);
Chunk createBlockWithDefaults(size_t source_num);
Chunk createBlockWithDefaults(size_t source_num, size_t start, size_t num_rows) const;
/// For `USING` join key columns should have values from right side instead of defaults
std::unordered_map<size_t, size_t> left_to_right_key_remap;
std::vector<FullMergeJoinCursorPtr> cursors;
std::vector<Chunk> sample_chunks;
/// Keep some state to make connection between data in different blocks
AnyJoinState any_join_state;
std::unique_ptr<AllJoinState> all_join_state;
JoinPtr table_join;
size_t max_block_size;
struct Statistic
{
size_t num_blocks[2] = {0, 0};
size_t num_rows[2] = {0, 0};
size_t max_blocks_loaded = 0;
double last_log_seconds = 0;
};
Statistic stat;
Poco::Logger * log;
};
class MergeJoinTransform final : public IMergingTransform<MergeJoinAlgorithm>
{
using Base = IMergingTransform<MergeJoinAlgorithm>;
public:
MergeJoinTransform(
JoinPtr table_join,
const Blocks & input_headers,
const Block & output_header,
size_t max_block_size,
UInt64 limit_hint = 0);
String getName() const override { return "MergeJoinTransform"; }
protected:
void onFinish() override;
void work() override
{
algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
Base::work();
}
Poco::Logger * log;
};
}

View File

@ -1,3 +1,4 @@
#include <vector>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/QueryPlan/ExpressionStep.h>
@ -9,6 +10,8 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/MergeJoinTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -16,8 +19,10 @@
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentThread.h>
#include "Core/SortDescription.h"
#include <Processors/DelayedPortsProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Sources/RemoteSource.h>
@ -28,6 +33,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
void QueryPipelineBuilder::checkInitialized()
@ -298,7 +304,60 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
return pipeline;
}
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
QueryPipelineBuilderPtr QueryPipelineBuilder::mergePipelines(
QueryPipelineBuilderPtr left,
QueryPipelineBuilderPtr right,
ProcessorPtr transform,
Processors * collected_processors)
{
if (transform->getOutputs().size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge transform must have exactly 1 output, got {}", transform->getOutputs().size());
connect(*left->pipe.output_ports.front(), transform->getInputs().front());
connect(*right->pipe.output_ports.front(), transform->getInputs().back());
if (collected_processors)
collected_processors->emplace_back(transform);
left->pipe.output_ports.front() = &transform->getOutputs().front();
left->pipe.processors.emplace_back(transform);
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
// left->pipe.holder = std::move(right->pipe.holder);
left->pipe.header = left->pipe.output_ports.front()->getHeader();
left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams);
return left;
}
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped(
std::unique_ptr<QueryPipelineBuilder> left,
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr join,
const Block & out_header,
size_t max_block_size,
Processors * collected_processors)
{
left->checkInitializedAndNotCompleted();
right->checkInitializedAndNotCompleted();
left->pipe.dropExtremes();
right->pipe.dropExtremes();
if (left->pipe.output_ports.size() != 1 || right->pipe.output_ports.size() != 1)
throw Exception("Join is supported only for pipelines with one output port", ErrorCodes::LOGICAL_ERROR);
if (left->hasTotals() || right->hasTotals())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Current join algorithm is supported only for pipelines without totals");
Blocks inputs = {left->getHeader(), right->getHeader()};
auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size);
auto result = mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
return result;
}
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLeft(
std::unique_ptr<QueryPipelineBuilder> left,
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr join,

View File

@ -29,6 +29,10 @@ struct ExpressionActionsSettings;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class TableJoin;
class QueryPipelineBuilder;
using QueryPipelineBuilderPtr = std::unique_ptr<QueryPipelineBuilder>;
class QueryPipelineBuilder
{
@ -97,9 +101,16 @@ public:
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
static QueryPipelineBuilderPtr mergePipelines(
QueryPipelineBuilderPtr left,
QueryPipelineBuilderPtr right,
ProcessorPtr transform,
Processors * collected_processors);
/// Join two pipelines together using JoinPtr.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static std::unique_ptr<QueryPipelineBuilder> joinPipelines(
/// Process right stream to fill JoinPtr and then process left pipeline using it
static std::unique_ptr<QueryPipelineBuilder> joinPipelinesRightLeft(
std::unique_ptr<QueryPipelineBuilder> left,
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr join,
@ -108,6 +119,15 @@ public:
bool keep_left_read_in_order,
Processors * collected_processors = nullptr);
/// Join two independent pipelines, processing them simultaneously.
static std::unique_ptr<QueryPipelineBuilder> joinPipelinesYShaped(
std::unique_ptr<QueryPipelineBuilder> left,
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr table_join,
const Block & out_header,
size_t max_block_size,
Processors * collected_processors = nullptr);
/// Add other pipeline and execute it before current one.
/// Pipeline must have empty header, it should not generate any chunk.
/// This is used for CreatingSets.

View File

@ -1,3 +1,4 @@
{% for join_algorithm in ['partial_merge', 'full_sorting_merge'] -%}
0
0
0
@ -39,3 +40,4 @@
0 1
1 2
2 \N
{% endfor -%}

View File

@ -1,4 +1,6 @@
set join_algorithm = 'partial_merge';
{% for join_algorithm in ['partial_merge', 'full_sorting_merge'] -%}
set join_algorithm = '{{ join_algorithm }}';
select * from (select dummy as val from system.one) s1 any left join (select dummy as val from system.one) s2 using val;
select * from (select toLowCardinality(dummy) as val from system.one) s1 any left join (select dummy as val from system.one) s2 using val;
@ -28,3 +30,5 @@ select * from (select toLowCardinality(number) as l from system.numbers limit 3)
select * from (select toLowCardinality(toNullable(number)) as l from system.numbers limit 3) s1 any left join (select toLowCardinality(number) as r from system.numbers limit 3) s2 on l + 1 = r * 1;
select * from (select toLowCardinality(number) as l from system.numbers limit 3) s1 any left join (select toLowCardinality(toNullable(number)) as r from system.numbers limit 3) s2 on l + 1 = r * 1;
select * from (select toLowCardinality(toNullable(number)) as l from system.numbers limit 3) s1 any left join (select toLowCardinality(toNullable(number)) as r from system.numbers limit 3) s2 on l + 1 = r * 1;
{% endfor -%}

View File

@ -5,3 +5,11 @@ select * from (select materialize(2) as x) s1 left join (select 2 as x) s2 using
select * from (select 3 as x) s1 left join (select materialize(3) as x) s2 using x;
select * from (select toLowCardinality(4) as x) s1 left join (select 4 as x) s2 using x;
select * from (select 5 as x) s1 left join (select toLowCardinality(5) as x) s2 using x;
SET join_algorithm = 'full_sorting_merge';
select s1.x, s2.x from (select 1 as x) s1 left join (select 1 as x) s2 using x;
select * from (select materialize(2) as x) s1 left join (select 2 as x) s2 using x;
select * from (select 3 as x) s1 left join (select materialize(3) as x) s2 using x;
select * from (select toLowCardinality(4) as x) s1 left join (select 4 as x) s2 using x;
select * from (select 5 as x) s1 left join (select toLowCardinality(5) as x) s2 using x;

View File

@ -1,3 +1,5 @@
{% for join_algorithm in ['hash', 'full_sorting_merge'] -%}
join_algorithm: {{ join_algorithm }}
any left
0 a1 0
1 a2 0
@ -24,3 +26,4 @@ any right (rev)
2 a3 2 b1
3 a4 0
4 a5 4 b3
{% endfor -%}

View File

@ -10,6 +10,11 @@ INSERT INTO t2 (x, s) VALUES (2, 'b1'), (4, 'b3'), (5, 'b6');
SET join_use_nulls = 0;
SET any_join_distinct_right_table_keys = 0;
{% for join_algorithm in ['hash', 'full_sorting_merge'] -%}
SET join_algorithm = '{{ join_algorithm }}';
SELECT 'join_algorithm: {{ join_algorithm }}';
SELECT 'any left';
SELECT t1.*, t2.* FROM t1 ANY LEFT JOIN t2 USING(x) ORDER BY t1.x, t2.x;
@ -28,5 +33,7 @@ SELECT t1.*, t2.* FROM t1 ANY RIGHT JOIN t2 USING(x) ORDER BY t1.x, t2.x;
SELECT 'any right (rev)';
SELECT t1.*, t2.* FROM t2 ANY RIGHT JOIN t1 USING(x) ORDER BY t1.x, t2.x;
{% endfor -%}
DROP TABLE t1;
DROP TABLE t2;

View File

@ -1,40 +1,40 @@
{% for join_type in ['hash', 'partial_merge'] -%}
{% for join_type in ['hash', 'full_sorting_merge', 'partial_merge'] -%}
=== {{ join_type }} ===
= full =
= full using =
1 1
2 2
-1 1
1 \N
1 257
1 -1
= left =
= left using =
1 1
2 2
= right =
= right using =
1 1
-1 1
1 \N
1 257
1 -1
= inner =
= inner using =
1 1
= full =
= full on =
1 1 1 1
2 2 0 \N
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= left =
= left on =
1 1 1 1
2 2 0 \N
= right =
= right on =
1 1 1 1
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= inner =
= inner on =
1 1 1 1
= agg =
5 260

View File

@ -6,28 +6,28 @@ CREATE TABLE t_ab2 (id Nullable(Int32), a Int16, b Nullable(Int64)) ENGINE = Tin
INSERT INTO t_ab1 VALUES (0, 1, 1), (1, 2, 2);
INSERT INTO t_ab2 VALUES (2, -1, 1), (3, 1, NULL), (4, 1, 257), (5, 1, -1), (6, 1, 1);
{% for join_type in ['hash', 'partial_merge'] -%}
{% for join_type in ['hash', 'full_sorting_merge', 'partial_merge'] -%}
SELECT '=== {{ join_type }} ===';
SET join_algorithm = '{{ join_type }}';
SELECT '= full =';
SELECT '= full using =';
SELECT a, b FROM t_ab1 FULL JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= left =';
SELECT '= left using =';
SELECT a, b FROM t_ab1 LEFT JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= right =';
SELECT '= right using =';
SELECT a, b FROM t_ab1 RIGHT JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= inner =';
SELECT '= inner using =';
SELECT a, b FROM t_ab1 INNER JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= full =';
SELECT '= full on =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 FULL JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= left =';
SELECT '= left on =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 LEFT JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= right =';
SELECT '= right on =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 RIGHT JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= inner =';
SELECT '= inner on =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 INNER JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= agg =';

View File

@ -400,6 +400,207 @@
1
1
1
=== full_sorting_merge ===
= full =
-4 0 196
-3 0 197
-2 0 198
-1 0 199
0 0 200
1 101 201
2 102 202
3 103 203
4 104 204
5 105 205
6 106 \N
7 107 \N
8 108 \N
9 109 \N
10 110 \N
= left =
1 101 201
2 102 202
3 103 203
4 104 204
5 105 205
6 106 \N
7 107 \N
8 108 \N
9 109 \N
10 110 \N
= right =
-4 0 196
-3 0 197
-2 0 198
-1 0 199
0 0 200
1 101 201
2 102 202
3 103 203
4 104 204
5 105 205
= inner =
1 101 201
2 102 202
3 103 203
4 104 204
5 105 205
= full =
0 0 -4
0 0 -3
0 0 -2
0 0 -1
0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5
6 6 0
7 7 0
8 8 0
9 9 0
10 10 0
= left =
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5
6 6 0
7 7 0
8 8 0
9 9 0
10 10 0
= right =
0 0 -4
0 0 -3
0 0 -2
0 0 -1
0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5
= inner =
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5
= join on =
= full =
0 0 -4 196
0 0 -3 197
0 0 -2 198
0 0 -1 199
0 0 0 200
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
6 106 0 \N
7 107 0 \N
8 108 0 \N
9 109 0 \N
10 110 0 \N
= left =
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
6 106 0 \N
7 107 0 \N
8 108 0 \N
9 109 0 \N
10 110 0 \N
= right =
0 0 -4 196
0 0 -3 197
0 0 -2 198
0 0 -1 199
0 0 0 200
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
= inner =
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
= full =
0 0 -4 196
0 0 -3 197
0 0 -2 198
0 0 -1 199
0 0 0 200
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
6 106 0 \N
7 107 0 \N
8 108 0 \N
9 109 0 \N
10 110 0 \N
= left =
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
6 106 0 \N
7 107 0 \N
8 108 0 \N
9 109 0 \N
10 110 0 \N
= right =
0 0 -4 196
0 0 -3 197
0 0 -2 198
0 0 -1 199
0 0 0 200
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
= inner =
1 101 1 201
2 102 2 202
3 103 3 203
4 104 4 204
5 105 5 205
= agg =
1
1
1
1
1
1
0 -10 0
1 55 1055
0 0 -10 0 990
1 55 15 1055 1015
= types =
1
1
1
1
1
1
1
1
1
1
1
=== auto ===
= full =
-4 0 196

View File

@ -9,7 +9,7 @@ CREATE TABLE t2 (a Int16, b Nullable(Int64)) ENGINE = TinyLog;
INSERT INTO t1 SELECT number as a, 100 + number as b FROM system.numbers LIMIT 1, 10;
INSERT INTO t2 SELECT number - 5 as a, 200 + number - 5 as b FROM system.numbers LIMIT 1, 10;
{% for join_type in ['hash', 'partial_merge', 'auto'] -%}
{% for join_type in ['hash', 'partial_merge', 'full_sorting_merge', 'auto'] -%}
SELECT '=== {{ join_type }} ===';
SET join_algorithm = '{{ join_type }}';
@ -99,7 +99,6 @@ SELECT '=== join use nulls ===';
SET join_use_nulls = 1;
SELECT '= full =';
SELECT a, b, t2.b FROM t1 FULL JOIN t2 USING (a) ORDER BY (a);
SELECT '= left =';

View File

@ -43,8 +43,10 @@ SELECT * FROM t1 RIGHT JOIN t2 ON NULL ORDER BY t1.id NULLS FIRST, t2.id SETTING
SELECT '- full -';
SELECT * FROM t1 FULL JOIN t2 ON NULL ORDER BY t1.id NULLS FIRST, t2.id SETTINGS join_use_nulls = 1;
SELECT * FROM t1 JOIN t2 ON 1 = 1 SETTINGS join_algorithm = 'full_sorting_merge'; -- { serverError 48 }
SELECT * FROM t1 JOIN t2 ON 1 = 1 SETTINGS join_algorithm = 'partial_merge'; -- { serverError 48 }
SELECT * FROM t1 JOIN t2 ON 1 = 1 SETTINGS join_algorithm = 'auto'; -- { serverError 48 }
SELECT * FROM t1 JOIN t2 ON NULL SETTINGS join_algorithm = 'full_sorting_merge'; -- { serverError 48 }
SELECT * FROM t1 JOIN t2 ON NULL SETTINGS join_algorithm = 'partial_merge'; -- { serverError 48 }
SELECT * FROM t1 LEFT JOIN t2 ON NULL SETTINGS join_algorithm = 'partial_merge'; -- { serverError 48 }
SELECT * FROM t1 RIGHT JOIN t2 ON NULL SETTINGS join_algorithm = 'auto'; -- { serverError 48 }

View File

@ -0,0 +1 @@
ANY INNER

View File

@ -0,0 +1,327 @@
{% set table_size = 15 -%}
{% for block_size in range(1, table_size + 1) -%}
ALL INNER USING | bs = {{ block_size }}
4 0 0
5 0 0
6 0 0
8 0 0
9 0 0
11 0 0
11 0 0
12 0 0
13 0 0
13 0 0
13 0 0
14 0 0
14 0 0
ALL INNER | bs = {{ block_size }}
4 4 0 0
5 5 0 0
6 6 0 0
8 8 0 0
9 9 0 0
11 11 0 0
11 11 0 0
12 12 0 0
13 13 0 0
13 13 0 0
13 13 0 0
14 14 0 0
14 14 0 0
ALL LEFT | bs = {{ block_size }}
1 0 val7 1
2 0 val3 1
2 0 val5 1
4 4 val0 0
5 5 val12 0
6 6 val11 0
8 8 val4 0
9 9 val10 0
10 0 val1 1
10 0 val14 1
11 11 val6 0
11 11 val8 0
12 12 val2 0
13 13 val13 0
13 13 val13 0
13 13 val13 0
14 14 val9 0
14 14 val9 0
ALL RIGHT | bs = {{ block_size }}
4 4 0 val10
5 5 0 val6
6 6 0 val8
8 8 0 val1
9 9 0 val5
11 11 0 val11
11 11 0 val11
12 12 0 val0
13 13 0 val2
13 13 0 val4
13 13 0 val9
14 14 0 val3
14 14 0 val7
ALL INNER | bs = {{ block_size }} | copmosite key
2 2 2 2 2 2 0 0
2 2 2 2 2 2 0 0
ALL LEFT | bs = {{ block_size }} | copmosite key
1 1 2 0 0 \N val14 1
1 1 2 0 0 \N val3 1
1 2 1 0 0 \N val7 1
1 2 2 0 0 \N val1 1
1 2 2 0 0 \N val13 1
1 2 2 0 0 \N val4 1
1 2 2 0 0 \N val8 1
1 \N 1 0 0 \N val0 1
1 \N 1 0 0 \N val5 1
1 \N 2 0 0 \N val10 1
2 2 1 0 0 \N val11 1
2 2 1 0 0 \N val2 1
2 2 1 0 0 \N val6 1
2 2 2 2 2 2 val12 0
2 2 2 2 2 2 val9 0
ALL RIGHT | bs = {{ block_size }} | copmosite key
0 \N 0 1 1 1 1 val2
0 \N 0 1 1 1 1 val7
0 \N 0 1 1 2 1 val5
0 \N 0 1 1 \N 1 val0
0 \N 0 1 1 \N 1 val6
0 \N 0 2 1 1 1 val1
0 \N 0 2 1 1 1 val11
0 \N 0 2 1 2 1 val10
0 \N 0 2 1 2 1 val8
0 \N 0 2 1 \N 1 val3
0 \N 0 2 2 \N 1 val9
2 2 2 2 2 2 0 val4
2 2 2 2 2 2 0 val4
ANY INNER USING | bs = {{ block_size }}
4 0 0
5 0 0
6 0 0
8 0 0
9 0 0
11 0 0
12 0 0
13 0 0
14 0 0
ANY INNER | bs = {{ block_size }}
4 4 0 0
5 5 0 0
6 6 0 0
8 8 0 0
9 9 0 0
11 11 0 0
12 12 0 0
13 13 0 0
14 14 0 0
ANY LEFT | bs = {{ block_size }}
1 0 val7 1
2 0 val3 1
2 0 val5 1
4 4 val0 0
5 5 val12 0
6 6 val11 0
8 8 val4 0
9 9 val10 0
10 0 val1 1
10 0 val14 1
11 11 val6 0
11 11 val8 0
12 12 val2 0
13 13 val13 0
14 14 val9 0
ANY RIGHT | bs = {{ block_size }}
4 4 0 val10
5 5 0 val6
6 6 0 val8
8 8 0 val1
9 9 0 val5
11 11 0 val11
12 12 0 val0
13 13 0 val2
13 13 0 val4
13 13 0 val9
14 14 0 val3
14 14 0 val7
ANY INNER | bs = {{ block_size }} | copmosite key
2 2 2 2 2 2 0 0
ANY LEFT | bs = {{ block_size }} | copmosite key
1 1 2 0 0 \N val14 1
1 1 2 0 0 \N val3 1
1 2 1 0 0 \N val7 1
1 2 2 0 0 \N val1 1
1 2 2 0 0 \N val13 1
1 2 2 0 0 \N val4 1
1 2 2 0 0 \N val8 1
1 \N 1 0 0 \N val0 1
1 \N 1 0 0 \N val5 1
1 \N 2 0 0 \N val10 1
2 2 1 0 0 \N val11 1
2 2 1 0 0 \N val2 1
2 2 1 0 0 \N val6 1
2 2 2 2 2 2 val12 0
2 2 2 2 2 2 val9 0
ANY RIGHT | bs = {{ block_size }} | copmosite key
0 \N 0 1 1 1 1 val2
0 \N 0 1 1 1 1 val7
0 \N 0 1 1 2 1 val5
0 \N 0 1 1 \N 1 val0
0 \N 0 1 1 \N 1 val6
0 \N 0 2 1 1 1 val1
0 \N 0 2 1 1 1 val11
0 \N 0 2 1 2 1 val10
0 \N 0 2 1 2 1 val8
0 \N 0 2 1 \N 1 val3
0 \N 0 2 2 \N 1 val9
2 2 2 2 2 2 0 val4
{% endfor -%}
ALL INNER | join_use_nulls = 1
4 4 0 0
5 5 0 0
6 6 0 0
8 8 0 0
9 9 0 0
11 11 0 0
11 11 0 0
12 12 0 0
13 13 0 0
13 13 0 0
13 13 0 0
14 14 0 0
14 14 0 0
ALL LEFT | join_use_nulls = 1
1 \N val7 1
2 \N val3 1
2 \N val5 1
4 4 val0 0
5 5 val12 0
6 6 val11 0
8 8 val4 0
9 9 val10 0
10 \N val1 1
10 \N val14 1
11 11 val6 0
11 11 val8 0
12 12 val2 0
13 13 val13 0
13 13 val13 0
13 13 val13 0
14 14 val9 0
14 14 val9 0
ALL RIGHT | join_use_nulls = 1
4 4 0 val10
5 5 0 val6
6 6 0 val8
8 8 0 val1
9 9 0 val5
11 11 0 val11
11 11 0 val11
12 12 0 val0
13 13 0 val2
13 13 0 val4
13 13 0 val9
14 14 0 val3
14 14 0 val7
ALL INNER | join_use_nulls = 1 | copmosite key
2 2 2 2 2 2 0 0
2 2 2 2 2 2 0 0
ALL LEFT | join_use_nulls = 1 | copmosite key
1 1 2 \N \N \N val14 \N
1 1 2 \N \N \N val3 \N
1 2 1 \N \N \N val7 \N
1 2 2 \N \N \N val1 \N
1 2 2 \N \N \N val13 \N
1 2 2 \N \N \N val4 \N
1 2 2 \N \N \N val8 \N
1 \N 1 \N \N \N val0 \N
1 \N 1 \N \N \N val5 \N
1 \N 2 \N \N \N val10 \N
2 2 1 \N \N \N val11 \N
2 2 1 \N \N \N val2 \N
2 2 1 \N \N \N val6 \N
2 2 2 2 2 2 val12 0
2 2 2 2 2 2 val9 0
ALL RIGHT | join_use_nulls = 1 | copmosite key
2 2 2 2 2 2 0 val4
2 2 2 2 2 2 0 val4
\N \N \N 1 1 1 \N val2
\N \N \N 1 1 1 \N val7
\N \N \N 1 1 2 \N val5
\N \N \N 1 1 \N \N val0
\N \N \N 1 1 \N \N val6
\N \N \N 2 1 1 \N val1
\N \N \N 2 1 1 \N val11
\N \N \N 2 1 2 \N val10
\N \N \N 2 1 2 \N val8
\N \N \N 2 1 \N \N val3
\N \N \N 2 2 \N \N val9
ANY INNER | join_use_nulls = 1
4 4 0 0
5 5 0 0
6 6 0 0
8 8 0 0
9 9 0 0
11 11 0 0
12 12 0 0
13 13 0 0
14 14 0 0
ANY LEFT | join_use_nulls = 1
1 \N val7 1
2 \N val3 1
2 \N val5 1
4 4 val0 0
5 5 val12 0
6 6 val11 0
8 8 val4 0
9 9 val10 0
10 \N val1 1
10 \N val14 1
11 11 val6 0
11 11 val8 0
12 12 val2 0
13 13 val13 0
14 14 val9 0
ANY RIGHT | join_use_nulls = 1
4 4 0 val10
5 5 0 val6
6 6 0 val8
8 8 0 val1
9 9 0 val5
11 11 0 val11
12 12 0 val0
13 13 0 val2
13 13 0 val4
13 13 0 val9
14 14 0 val3
14 14 0 val7
ANY INNER | join_use_nulls = 1 | copmosite key
2 2 2 2 2 2 0 0
ANY LEFT | join_use_nulls = 1 | copmosite key
1 1 2 \N \N \N val14 \N
1 1 2 \N \N \N val3 \N
1 2 1 \N \N \N val7 \N
1 2 2 \N \N \N val1 \N
1 2 2 \N \N \N val13 \N
1 2 2 \N \N \N val4 \N
1 2 2 \N \N \N val8 \N
1 \N 1 \N \N \N val0 \N
1 \N 1 \N \N \N val5 \N
1 \N 2 \N \N \N val10 \N
2 2 1 \N \N \N val11 \N
2 2 1 \N \N \N val2 \N
2 2 1 \N \N \N val6 \N
2 2 2 2 2 2 val12 0
2 2 2 2 2 2 val9 0
ANY RIGHT | join_use_nulls = 1 | copmosite key
2 2 2 2 2 2 0 val4
\N \N \N 1 1 1 \N val2
\N \N \N 1 1 1 \N val7
\N \N \N 1 1 2 \N val5
\N \N \N 1 1 \N \N val0
\N \N \N 1 1 \N \N val6
\N \N \N 2 1 1 \N val1
\N \N \N 2 1 1 \N val11
\N \N \N 2 1 2 \N val10
\N \N \N 2 1 2 \N val8
\N \N \N 2 1 \N \N val3
\N \N \N 2 2 \N \N val9

View File

@ -0,0 +1,137 @@
-- Tags: long
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (key UInt32, key1 UInt32, key2 Nullable(UInt32), key3 UInt32, s String) engine = TinyLog;
CREATE TABLE t2 (key UInt32, key1 UInt32, key2 UInt32, key3 Nullable(UInt32), s String) engine = TinyLog;
{% set table_size = 15 %}
INSERT INTO t1
SELECT
sipHash64(number, 't1') % {{ table_size }} + 1 as key,
sipHash64(number, 'x') % 2 + 1 as key1,
if(number % 5 == 0, NULL, sipHash64(number, 'y') % 2 + 1) as key2,
sipHash64(number, 'z') % 2 + 1 as key3,
'val' || toString(number) as s
FROM numbers_mt({{ table_size }});
INSERT INTO t2
SELECT
sipHash64(number, 't2') % {{ table_size }} + 1 as key,
sipHash64(number, 'a') % 2 + 1 as key1,
sipHash64(number, 'b') % 2 + 1 as key2,
if(number % 3 == 0, NULL, sipHash64(number, 'c') % 2 + 1) as key3,
'val' || toString(number) as s
FROM numbers_mt({{ table_size - 3 }});
SET join_algorithm = 'full_sorting_merge';
{% for block_size in range(1, table_size + 1) -%}
{% for kind in ['ALL', 'ANY'] -%}
SET max_block_size = {{ block_size }};
SELECT '{{ kind }} INNER USING | bs = {{ block_size }}';
SELECT key, empty(t1.s), empty(t2.s) FROM t1
{{ kind }} INNER JOIN t2
USING (key)
ORDER BY t1.key, t2.key
;
SELECT '{{ kind }} INNER | bs = {{ block_size }}';
SELECT t1.key, t2.key, empty(t1.s), empty(t2.s) FROM t1
{{ kind }} INNER JOIN t2
ON t1.key == t2.key
ORDER BY t1.key, t2.key
;
SELECT '{{ kind }} LEFT | bs = {{ block_size }}';
SELECT t1.key, t2.key, t1.s, empty(t2.s) FROM t1
{{ kind }} LEFT JOIN t2
ON t1.key == t2.key
ORDER BY t1.key, t2.key, t1.s
;
SELECT '{{ kind }} RIGHT | bs = {{ block_size }}';
SELECT t1.key, t2.key, empty(t1.s), t2.s FROM t1
{{ kind }} RIGHT JOIN t2
ON t1.key == t2.key
ORDER BY t1.key, t2.key, t2.s
;
SELECT '{{ kind }} INNER | bs = {{ block_size }} | copmosite key';
SELECT t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, empty(t1.s), empty(t2.s) FROM t1
{{ kind }} INNER JOIN t2
ON t1.key1 == t2.key1 AND t1.key2 == t2.key2 AND t1.key3 == t2.key3 AND t1.key1 == t2.key3
ORDER BY t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3
;
SELECT '{{ kind }} LEFT | bs = {{ block_size }} | copmosite key';
SELECT t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, t1.s, empty(t2.s) FROM t1
{{ kind }} LEFT JOIN t2
ON t1.key1 == t2.key1 AND t1.key2 == t2.key2 AND t1.key3 == t2.key3 AND t1.key1 == t2.key3
ORDER BY t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, t1.s
;
SELECT '{{ kind }} RIGHT | bs = {{ block_size }} | copmosite key';
SELECT t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, empty(t1.s), t2.s FROM t1
{{ kind }} RIGHT JOIN t2
ON t1.key1 == t2.key1 AND t1.key2 == t2.key2 AND t1.key3 == t2.key3 AND t1.key1 == t2.key3
ORDER BY t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, t2.s
;
{% endfor -%}
{% endfor -%}
{% for kind in ['ALL', 'ANY'] -%}
SET join_use_nulls = 1;
SELECT '{{ kind }} INNER | join_use_nulls = 1';
SELECT t1.key, t2.key, isNull(t1.s), isNull(t2.s) FROM t1
{{ kind }} INNER JOIN t2
ON t1.key == t2.key
ORDER BY t1.key, t2.key
;
SELECT '{{ kind }} LEFT | join_use_nulls = 1';
SELECT t1.key, t2.key, t1.s, isNull(t2.s) FROM t1
{{ kind }} LEFT JOIN t2
ON t1.key == t2.key
ORDER BY t1.key, t2.key, t1.s
;
SELECT '{{ kind }} RIGHT | join_use_nulls = 1';
SELECT t1.key, t2.key, isNull(t1.s), t2.s FROM t1
{{ kind }} RIGHT JOIN t2
ON t1.key == t2.key
ORDER BY t1.key, t2.key, t2.s
;
SELECT '{{ kind }} INNER | join_use_nulls = 1 | copmosite key';
SELECT t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, empty(t1.s), empty(t2.s) FROM t1
{{ kind }} INNER JOIN t2
ON t1.key1 == t2.key1 AND t1.key2 == t2.key2 AND t1.key3 == t2.key3 AND t1.key1 == t2.key3
ORDER BY t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3
;
SELECT '{{ kind }} LEFT | join_use_nulls = 1 | copmosite key';
SELECT t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, t1.s, empty(t2.s) FROM t1
{{ kind }} LEFT JOIN t2
ON t1.key1 == t2.key1 AND t1.key2 == t2.key2 AND t1.key3 == t2.key3 AND t1.key1 == t2.key3
ORDER BY t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, t1.s
;
SELECT '{{ kind }} RIGHT | join_use_nulls = 1 | copmosite key';
SELECT t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, empty(t1.s), t2.s FROM t1
{{ kind }} RIGHT JOIN t2
ON t1.key1 == t2.key1 AND t1.key2 == t2.key2 AND t1.key3 == t2.key3 AND t1.key1 == t2.key3
ORDER BY t1.key1, t1.key2, t1.key3, t2.key1, t2.key2, t2.key3, t2.s
;
{% endfor -%}
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

View File

@ -0,0 +1,550 @@
{% for block_size in range(1, 11) -%}
t1 ALL INNER JOIN t2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
3 3 4 4
t1 ALL LEFT JOIN t2 | bs = {{ block_size }}
1 1 val1 5
1 1 val1 5
2 2 val21 5
2 2 val21 5
2 2 val21 5
2 2 val21 5
2 2 val21 5
2 2 val21 5
2 2 val21 5
2 2 val22 5
2 2 val22 5
2 2 val22 5
2 2 val22 5
2 2 val22 5
2 2 val22 5
2 2 val22 5
2 2 val23 5
2 2 val23 5
2 2 val23 5
2 2 val23 5
2 2 val23 5
2 2 val23 5
2 2 val23 5
2 2 val24 5
2 2 val24 5
2 2 val24 5
2 2 val24 5
2 2 val24 5
2 2 val24 5
2 2 val24 5
2 2 val25 5
2 2 val25 5
2 2 val25 5
2 2 val25 5
2 2 val25 5
2 2 val25 5
2 2 val25 5
2 2 val26 5
2 2 val26 5
2 2 val26 5
2 2 val26 5
2 2 val26 5
2 2 val26 5
2 2 val26 5
2 2 val27 5
2 2 val27 5
2 2 val27 5
2 2 val27 5
2 2 val27 5
2 2 val27 5
2 2 val27 5
3 3 val3 4
t1 ALL RIGHT JOIN t2 | bs = {{ block_size }}
1 1 4 val11
1 1 4 val12
2 2 5 val22
2 2 5 val22
2 2 5 val22
2 2 5 val22
2 2 5 val22
2 2 5 val22
2 2 5 val22
2 2 5 val23
2 2 5 val23
2 2 5 val23
2 2 5 val23
2 2 5 val23
2 2 5 val23
2 2 5 val23
2 2 5 val24
2 2 5 val24
2 2 5 val24
2 2 5 val24
2 2 5 val24
2 2 5 val24
2 2 5 val24
2 2 5 val25
2 2 5 val25
2 2 5 val25
2 2 5 val25
2 2 5 val25
2 2 5 val25
2 2 5 val25
2 2 5 val26
2 2 5 val26
2 2 5 val26
2 2 5 val26
2 2 5 val26
2 2 5 val26
2 2 5 val26
2 2 5 val27
2 2 5 val27
2 2 5 val27
2 2 5 val27
2 2 5 val27
2 2 5 val27
2 2 5 val27
2 2 5 val28
2 2 5 val28
2 2 5 val28
2 2 5 val28
2 2 5 val28
2 2 5 val28
2 2 5 val28
3 3 4 val3
t1 ANY INNER JOIN t2 | bs = {{ block_size }}
1 1 4 5
2 2 5 5
3 3 4 4
t1 ANY LEFT JOIN t2 | bs = {{ block_size }}
1 1 val1 5
2 2 val21 5
2 2 val22 5
2 2 val23 5
2 2 val24 5
2 2 val25 5
2 2 val26 5
2 2 val27 5
3 3 val3 4
t1 ANY RIGHT JOIN t2 | bs = {{ block_size }}
1 1 4 val11
1 1 4 val12
2 2 5 val22
2 2 5 val23
2 2 5 val24
2 2 5 val25
2 2 5 val26
2 2 5 val27
2 2 5 val28
3 3 4 val3
t1 ALL FULL JOIN t2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
2 2 5 5
3 3 4 4
t1 ALL FULL JOIN USING t2 | bs = {{ block_size }}
1 4 5
1 4 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
2 5 5
3 4 4
t1 ALL INNER JOIN tn2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5
3 3 4 4
t1 ALL LEFT JOIN tn2 | bs = {{ block_size }}
1 1 val1 5
1 1 val1 5
2 \N val21 0
2 \N val22 0
2 \N val23 0
2 \N val24 0
2 \N val25 0
2 \N val26 0
2 \N val27 0
3 3 val3 4
t1 ALL RIGHT JOIN tn2 | bs = {{ block_size }}
0 \N 0 val22
0 \N 0 val23
0 \N 0 val24
0 \N 0 val25
0 \N 0 val26
0 \N 0 val27
0 \N 0 val28
1 1 4 val11
1 1 4 val12
3 3 4 val3
t1 ANY INNER JOIN tn2 | bs = {{ block_size }}
1 1 4 5
3 3 4 4
t1 ANY LEFT JOIN tn2 | bs = {{ block_size }}
1 1 val1 5
2 \N val21 0
2 \N val22 0
2 \N val23 0
2 \N val24 0
2 \N val25 0
2 \N val26 0
2 \N val27 0
3 3 val3 4
t1 ANY RIGHT JOIN tn2 | bs = {{ block_size }}
0 \N 0 val22
0 \N 0 val23
0 \N 0 val24
0 \N 0 val25
0 \N 0 val26
0 \N 0 val27
0 \N 0 val28
1 1 4 val11
1 1 4 val12
3 3 4 val3
t1 ALL FULL JOIN tn2 | bs = {{ block_size }}
0 \N 0 5
0 \N 0 5
0 \N 0 5
0 \N 0 5
0 \N 0 5
0 \N 0 5
0 \N 0 5
1 1 4 5
1 1 4 5
2 \N 5 0
2 \N 5 0
2 \N 5 0
2 \N 5 0
2 \N 5 0
2 \N 5 0
2 \N 5 0
3 3 4 4
t1 ALL FULL JOIN USING tn2 | bs = {{ block_size }}
1 4 5
1 4 5
2 5 0
2 5 0
2 5 0
2 5 0
2 5 0
2 5 0
2 5 0
3 4 4
\N 0 5
\N 0 5
\N 0 5
\N 0 5
\N 0 5
\N 0 5
\N 0 5
tn1 ALL INNER JOIN t2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5
3 3 4 4
tn1 ALL LEFT JOIN t2 | bs = {{ block_size }}
1 1 val1 5
1 1 val1 5
3 3 val3 4
\N 0 val21 0
\N 0 val22 0
\N 0 val23 0
\N 0 val24 0
\N 0 val25 0
\N 0 val26 0
\N 0 val27 0
tn1 ALL RIGHT JOIN t2 | bs = {{ block_size }}
1 1 4 val11
1 1 4 val12
3 3 4 val3
\N 2 0 val22
\N 2 0 val23
\N 2 0 val24
\N 2 0 val25
\N 2 0 val26
\N 2 0 val27
\N 2 0 val28
tn1 ANY INNER JOIN t2 | bs = {{ block_size }}
1 1 4 5
3 3 4 4
tn1 ANY LEFT JOIN t2 | bs = {{ block_size }}
1 1 val1 5
3 3 val3 4
\N 0 val21 0
\N 0 val22 0
\N 0 val23 0
\N 0 val24 0
\N 0 val25 0
\N 0 val26 0
\N 0 val27 0
tn1 ANY RIGHT JOIN t2 | bs = {{ block_size }}
1 1 4 val11
1 1 4 val12
3 3 4 val3
\N 2 0 val22
\N 2 0 val23
\N 2 0 val24
\N 2 0 val25
\N 2 0 val26
\N 2 0 val27
\N 2 0 val28
tn1 ALL FULL JOIN t2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5
3 3 4 4
\N 0 5 0
\N 0 5 0
\N 0 5 0
\N 0 5 0
\N 0 5 0
\N 0 5 0
\N 0 5 0
\N 2 0 5
\N 2 0 5
\N 2 0 5
\N 2 0 5
\N 2 0 5
\N 2 0 5
\N 2 0 5
tn1 ALL FULL JOIN USING t2 | bs = {{ block_size }}
1 4 5
1 4 5
2 0 5
2 0 5
2 0 5
2 0 5
2 0 5
2 0 5
2 0 5
3 4 4
\N 5 0
\N 5 0
\N 5 0
\N 5 0
\N 5 0
\N 5 0
\N 5 0
tn1 ALL INNER JOIN tn2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5
3 3 4 4
tn1 ALL LEFT JOIN tn2 | bs = {{ block_size }}
1 1 val1 5
1 1 val1 5
3 3 val3 4
\N \N val21 0
\N \N val22 0
\N \N val23 0
\N \N val24 0
\N \N val25 0
\N \N val26 0
\N \N val27 0
tn1 ALL RIGHT JOIN tn2 | bs = {{ block_size }}
1 1 4 val11
1 1 4 val12
3 3 4 val3
\N \N 0 val22
\N \N 0 val23
\N \N 0 val24
\N \N 0 val25
\N \N 0 val26
\N \N 0 val27
\N \N 0 val28
tn1 ANY INNER JOIN tn2 | bs = {{ block_size }}
1 1 4 5
3 3 4 4
tn1 ANY LEFT JOIN tn2 | bs = {{ block_size }}
1 1 val1 5
3 3 val3 4
\N \N val21 0
\N \N val22 0
\N \N val23 0
\N \N val24 0
\N \N val25 0
\N \N val26 0
\N \N val27 0
tn1 ANY RIGHT JOIN tn2 | bs = {{ block_size }}
1 1 4 val11
1 1 4 val12
3 3 4 val3
\N \N 0 val22
\N \N 0 val23
\N \N 0 val24
\N \N 0 val25
\N \N 0 val26
\N \N 0 val27
\N \N 0 val28
tn1 ALL FULL JOIN tn2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5
3 3 4 4
\N \N 0 5
\N \N 0 5
\N \N 0 5
\N \N 0 5
\N \N 0 5
\N \N 0 5
\N \N 0 5
\N \N 5 0
\N \N 5 0
\N \N 5 0
\N \N 5 0
\N \N 5 0
\N \N 5 0
\N \N 5 0
tn1 ALL FULL JOIN USING tn2 | bs = {{ block_size }}
1 4 5
1 4 5
3 4 4
\N 0 5
\N 0 5
\N 0 5
\N 0 5
\N 0 5
\N 0 5
\N 0 5
\N 5 0
\N 5 0
\N 5 0
\N 5 0
\N 5 0
\N 5 0
\N 5 0
{% endfor -%}

View File

@ -0,0 +1,49 @@
-- Tags: long
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS tn1;
DROP TABLE IF EXISTS tn2;
CREATE TABLE t1 (key UInt32, s String) engine = TinyLog;
CREATE TABLE tn1 (key Nullable(UInt32), s String) engine = TinyLog;
CREATE TABLE t2 (key UInt32, s String) engine = TinyLog;
CREATE TABLE tn2 (key Nullable(UInt32), s String) engine = TinyLog;
INSERT INTO t1 VALUES (1, 'val1'), (2, 'val21'), (2, 'val22'), (2, 'val23'), (2, 'val24'), (2, 'val25'), (2, 'val26'), (2, 'val27'), (3, 'val3');
INSERT INTO tn1 VALUES (1, 'val1'), (NULL, 'val21'), (NULL, 'val22'), (NULL, 'val23'), (NULL, 'val24'), (NULL, 'val25'), (NULL, 'val26'), (NULL, 'val27'), (3, 'val3');
INSERT INTO t2 VALUES (1, 'val11'), (1, 'val12'), (2, 'val22'), (2, 'val23'), (2, 'val24'), (2, 'val25'), (2, 'val26'), (2, 'val27'), (2, 'val28'), (3, 'val3');
INSERT INTO tn2 VALUES (1, 'val11'), (1, 'val12'), (NULL, 'val22'), (NULL, 'val23'), (NULL, 'val24'), (NULL, 'val25'), (NULL, 'val26'), (NULL, 'val27'), (NULL, 'val28'), (3, 'val3');
SET join_algorithm = 'full_sorting_merge';
{% for block_size in range(1, 11) -%}
SET max_block_size = {{ block_size }};
{% for t1, t2 in [('t1', 't2'), ('t1', 'tn2'), ('tn1', 't2'), ('tn1', 'tn2')] -%}
{% for kind in ['ALL', 'ANY'] -%}
SELECT '{{ t1 }} {{ kind }} INNER JOIN {{ t2 }} | bs = {{ block_size }}';
SELECT t1.key, t2.key, length(t1.s), length(t2.s) FROM {{ t1 }} AS t1 {{ kind }} INNER JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key;
SELECT '{{ t1 }} {{ kind }} LEFT JOIN {{ t2 }} | bs = {{ block_size }}';
SELECT t1.key, t2.key, t1.s, length(t2.s) FROM {{ t1 }} AS t1 {{ kind }} LEFT JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key, t1.s;
SELECT '{{ t1 }} {{ kind }} RIGHT JOIN {{ t2 }} | bs = {{ block_size }}';
SELECT t1.key, t2.key, length(t1.s), t2.s FROM {{ t1 }} AS t1 {{ kind }} RIGHT JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key, t2.s;
{% endfor -%}
SELECT '{{ t1 }} ALL FULL JOIN {{ t2 }} | bs = {{ block_size }}';
SELECT t1.key, t2.key, length(t1.s), length(t2.s) FROM {{ t1 }} AS t1 {{ kind }} FULL JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key, length(t1.s), length(t2.s);
SELECT '{{ t1 }} ALL FULL JOIN USING {{ t2 }} | bs = {{ block_size }}';
SELECT key, length(t1.s), length(t2.s) FROM {{ t1 }} AS t1 ALL FULL JOIN {{ t2 }} AS t2 USING (key) ORDER BY key, length(t1.s), length(t2.s);
{% endfor -%}
{% endfor -%}
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS tn1;
DROP TABLE IF EXISTS tn2;

View File

@ -0,0 +1,48 @@
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000

View File

@ -0,0 +1,51 @@
-- Tags: long
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (key UInt32, s String) ENGINE = MergeTree ORDER BY key;
CREATE TABLE t2 (key UInt32, s String) ENGINE = MergeTree ORDER BY key;
{% set ltable_size = 10000000 -%}
{% set rtable_size = 1000000 -%}
INSERT INTO t1
SELECT
sipHash64(number, 'x') % {{ ltable_size }} + 1 as key,
'val' || toString(number) as s
FROM numbers_mt({{ ltable_size }})
;
INSERT INTO t2
SELECT
sipHash64(number, 'y') % {{ rtable_size }} + 1 as key,
'val' || toString(number) as s
FROM numbers_mt({{ rtable_size }})
;
SET join_algorithm = 'full_sorting_merge';
{% for kind in ['ALL', 'ANY'] -%}
{% for block_size in [32001, 65505, 65536, range(32001, 65536) | random] %}
SET max_block_size = {{ block_size }};
SELECT '{{ kind }} INNER';
SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1
{{ kind }} INNER JOIN t2
ON t1.key == t2.key
;
SELECT '{{ kind }} LEFT';
SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1
{{ kind }} LEFT JOIN t2
ON t1.key == t2.key
;
SELECT '{{ kind }} RIGHT';
SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1
{{ kind }} RIGHT JOIN t2
ON t1.key == t2.key
;
{% endfor -%}
{% endfor -%}

View File

@ -0,0 +1,36 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (key UInt32, val UInt32) ENGINE = Memory;
INSERT INTO t1 VALUES (1, 1);
CREATE TABLE t2 (key UInt32, val UInt32) ENGINE = Memory;
INSERT INTO t2 VALUES (1, 2);
SET join_algorithm = 'full_sorting_merge';
SELECT * FROM t1 JOIN t2 ON t1.key = t2.key AND t2.key > 0; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 JOIN t2 ON t1.key = t2.key AND t1.key > 0; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 JOIN t2 ON t1.key = t2.key OR t1.val = t2.key; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 ANTI JOIN t2 ON t1.key = t2.key; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 SEMI JOIN t2 ON t1.key = t2.key; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 ASOF JOIN t2 ON t1.key = t2.key AND t1.val > t2.val; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 ANY JOIN t2 ON t1.key = t2.key SETTINGS any_join_distinct_right_table_keys = 1; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 JOIN t2 USING (key) SETTINGS join_use_nulls = 1; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM ( SELECT key, sum(val) AS val FROM t1 GROUP BY key WITH TOTALS ) as t1
JOIN ( SELECT key, sum(val) AS val FROM t2 GROUP BY key WITH TOTALS ) as t2 ON t1.key = t2.key; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 JOIN ( SELECT key, sum(val) AS val FROM t2 GROUP BY key WITH TOTALS ) as t2 ON t1.key = t2.key; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM ( SELECT key, sum(val) AS val FROM t1 GROUP BY key WITH TOTALS ) as t1 JOIN t2 ON t1.key = t2.key; -- { serverError NOT_IMPLEMENTED }
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

View File

@ -0,0 +1,12 @@
1 1
1
1
1
\N
1 1
-1 0
\N 4294967295
a a
a a
a a
a a

View File

@ -0,0 +1,21 @@
SET join_algorithm = 'full_sorting_merge';
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 ON t1.key = t2.key;
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 USING key;
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT 1 :: Nullable(UInt32) as key) t2 USING (key);
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT NULL :: Nullable(UInt32) as key) t2 USING (key);
SELECT * FROM (SELECT 1 :: Int32 as key) AS t1 JOIN (SELECT 1 :: UInt32 as key) t2 ON t1.key = t2.key;
SELECT * FROM (SELECT -1 :: Nullable(Int32) as key) AS t1 FULL JOIN (SELECT 4294967295 :: UInt32 as key) t2 ON t1.key = t2.key;
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key;
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key;
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: Nullable(String) AS key) AS t2 ON t1.key = t2.key;
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: LowCardinality(String) AS key) AS t2 ON t1.key = t2.key;

View File

@ -1,3 +1,5 @@
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
--- {{ join_algorithm }} ---
2014-03-17 1406958 265108
2014-03-19 1405797 261624
2014-03-18 1383658 258723
@ -22,6 +24,7 @@ mail.ru 87949 22225
best.ru 58537 55
korablitz.ru 51844 0
hurpass.com 49671 1251
{% if join_algorithm != 'full_sorting_merge' -%}
37292 0 35642
92887 252214 0
7842 196036 0
@ -287,6 +290,7 @@ hurpass.com 49671 1251
4 4 2
6 6 3
8 8 4
{% endif -%}
0 0 0
0 0 1
1 1 2
@ -297,3 +301,4 @@ hurpass.com 49671 1251
3 3 7
4 4 8
4 4 9
{% endfor -%}

View File

@ -1,4 +1,8 @@
SET join_algorithm='parallel_hash';
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
SELECT '--- {{ join_algorithm }} ---';
SET join_algorithm = '{{ join_algorithm }}';
SELECT
EventDate,
@ -65,8 +69,9 @@ ORDER BY hits DESC
LIMIT 10
SETTINGS joined_subquery_requires_alias = 0;
SELECT CounterID FROM test.visits ARRAY JOIN Goals.ID WHERE CounterID = 942285 ORDER BY CounterID;
{% if join_algorithm != 'full_sorting_merge' -%}
SELECT CounterID FROM test.visits ARRAY JOIN Goals.ID WHERE CounterID = 942285 ORDER BY CounterID;
SELECT
CounterID,
@ -194,6 +199,8 @@ ANY INNER JOIN
USING k ORDER BY joined
SETTINGS any_join_distinct_right_table_keys = 1;
{% endif -%}
SELECT a.*, b.* FROM
(
SELECT number AS k FROM system.numbers LIMIT 10
@ -203,3 +210,5 @@ ALL INNER JOIN
SELECT intDiv(number, 2) AS k, number AS joined FROM system.numbers LIMIT 10
) AS b
USING k ORDER BY joined;
{% endfor %}