This commit is contained in:
Nikita Taranov 2024-11-13 15:26:07 +01:00
parent d0405135a7
commit 04e80e675a
8 changed files with 66 additions and 20 deletions

View File

@ -3369,6 +3369,8 @@ UInt64 calculateCacheKey(const DB::ASTPtr & select_query)
SipHash hash;
hash.update(select.tables()->getTreeHash(/*ignore_aliases=*/true));
if (const auto prewhere = select.prewhere())
hash.update(prewhere->getTreeHash(/*ignore_aliases=*/true));
if (const auto where = select.where())
hash.update(where->getTreeHash(/*ignore_aliases=*/true));
if (const auto group_by = select.groupBy())

View File

@ -11,10 +11,12 @@
#include <Interpreters/PreparedSets.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/createBlockSelector.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/DumpASTNode.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/parseQuery.h>
#include <Storages/SelectQueryInfo.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
@ -341,16 +343,29 @@ Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, cons
return result;
}
UInt64 calculateCacheKey(std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression)
UInt64 calculateCacheKey(
std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression, const SelectQueryInfo & select_query_info)
{
const auto * select = select_query_info.query->as<DB::ASTSelectQuery>();
if (!select)
return 0;
IQueryTreeNode::HashState hash;
if (const auto prewhere = select->prewhere())
hash.update(prewhere->getTreeHash(/*ignore_aliases=*/true));
if (const auto where = select->where())
hash.update(where->getTreeHash(/*ignore_aliases=*/true));
chassert(right_table_expression);
hash.update(right_table_expression->getTreeHash());
chassert(table_join && table_join->oneDisjunct());
const auto keys
= NameOrderedSet{table_join->getClauses().at(0).key_names_right.begin(), table_join->getClauses().at(0).key_names_right.end()};
for (const auto & name : keys)
hash.update(name);
return hash.get64();
}
}

View File

@ -1,13 +1,11 @@
#pragma once
#include <condition_variable>
#include <memory>
#include <optional>
#include <Analyzer/IQueryTreeNode.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/HashTablesStatistics.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/HashTablesStatistics.h>
#include <Interpreters/IJoin.h>
#include <base/defines.h>
#include <base/types.h>
@ -17,6 +15,8 @@
namespace DB
{
struct SelectQueryInfo;
/**
* Can run addBlockToJoin() parallelly to speedup the join process. On test, it almose linear speedup by
* the degree of parallelism.
@ -82,5 +82,6 @@ private:
Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
};
UInt64 calculateCacheKey(std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression);
UInt64 calculateCacheKey(
std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression, const SelectQueryInfo & select_query_info);
}

View File

@ -2,8 +2,8 @@
#include <Core/Settings.h>
#include <Common/scope_guard_safe.h>
#include <Core/ParallelReplicasMode.h>
#include <Common/scope_guard_safe.h>
#include <Columns/ColumnAggregateFunction.h>
@ -1241,11 +1241,13 @@ void joinCastPlanColumnsToNullable(QueryPlan & plan_to_add_cast, PlannerContextP
plan_to_add_cast.addStep(std::move(cast_join_columns_step));
}
JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_expression,
JoinTreeQueryPlan buildQueryPlanForJoinNode(
const QueryTreeNodePtr & join_table_expression,
JoinTreeQueryPlan left_join_tree_query_plan,
JoinTreeQueryPlan right_join_tree_query_plan,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context)
PlannerContextPtr & planner_context,
const SelectQueryInfo & select_query_info)
{
auto & join_node = join_table_expression->as<JoinNode &>();
if (left_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns)
@ -1528,7 +1530,8 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
}
const Block & right_header = right_plan.getCurrentHeader();
auto join_algorithm = chooseJoinAlgorithm(table_join, join_node.getRightTableExpression(), left_header, right_header, planner_context);
auto join_algorithm = chooseJoinAlgorithm(
table_join, join_node.getRightTableExpression(), left_header, right_header, planner_context, select_query_info);
auto result_plan = QueryPlan();
@ -1883,11 +1886,13 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
auto left_query_plan = std::move(query_plans_stack.back());
query_plans_stack.pop_back();
query_plans_stack.push_back(buildQueryPlanForJoinNode(table_expression,
query_plans_stack.push_back(buildQueryPlanForJoinNode(
table_expression,
std::move(left_query_plan),
std::move(right_query_plan),
table_expressions_outer_scope_columns[i],
planner_context));
planner_context,
select_query_info));
}
else
{

View File

@ -789,12 +789,14 @@ std::shared_ptr<DirectKeyValueJoin> tryDirectJoin(const std::shared_ptr<TableJoi
}
}
static std::shared_ptr<IJoin> tryCreateJoin(JoinAlgorithm algorithm,
static std::shared_ptr<IJoin> tryCreateJoin(
JoinAlgorithm algorithm,
std::shared_ptr<TableJoin> & table_join,
const QueryTreeNodePtr & right_table_expression,
const Block & left_table_expression_header,
const Block & right_table_expression_header,
const PlannerContextPtr & planner_context)
const PlannerContextPtr & planner_context,
const SelectQueryInfo & select_query_info)
{
if (table_join->kind() == JoinKind::Paste)
return std::make_shared<PasteJoin>(table_join, right_table_expression_header);
@ -824,7 +826,7 @@ static std::shared_ptr<IJoin> tryCreateJoin(JoinAlgorithm algorithm,
{
const auto & settings = query_context->getSettingsRef();
StatsCollectingParams params{
calculateCacheKey(table_join, right_table_expression),
calculateCacheKey(table_join, right_table_expression, select_query_info),
settings[Setting::collect_hash_table_stats_during_joins],
query_context->getServerSettings()[ServerSetting::max_entries_for_hash_table_stats],
settings[Setting::max_size_to_preallocate_for_joins]};
@ -866,11 +868,13 @@ static std::shared_ptr<IJoin> tryCreateJoin(JoinAlgorithm algorithm,
return nullptr;
}
std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_join,
std::shared_ptr<IJoin> chooseJoinAlgorithm(
std::shared_ptr<TableJoin> & table_join,
const QueryTreeNodePtr & right_table_expression,
const Block & left_table_expression_header,
const Block & right_table_expression_header,
const PlannerContextPtr & planner_context)
const PlannerContextPtr & planner_context,
const SelectQueryInfo & select_query_info)
{
if (table_join->getMixedJoinExpression()
&& !table_join->isEnabledAlgorithm(JoinAlgorithm::HASH)
@ -926,7 +930,14 @@ std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_jo
for (auto algorithm : table_join->getEnabledJoinAlgorithms())
{
auto join = tryCreateJoin(algorithm, table_join, right_table_expression, left_table_expression_header, right_table_expression_header, planner_context);
auto join = tryCreateJoin(
algorithm,
table_join,
right_table_expression,
left_table_expression_header,
right_table_expression_header,
planner_context,
select_query_info);
if (join)
return join;
}

View File

@ -12,6 +12,8 @@
namespace DB
{
struct SelectQueryInfo;
/** Join clause represent single JOIN ON section clause.
* Join clause consists of JOIN keys and conditions.
*
@ -218,10 +220,11 @@ std::optional<bool> tryExtractConstantFromJoinNode(const QueryTreeNodePtr & join
* Table join structure can be modified during JOIN algorithm choosing for special JOIN algorithms.
* For example JOIN with Dictionary engine, or JOIN with JOIN engine.
*/
std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_join,
std::shared_ptr<IJoin> chooseJoinAlgorithm(
std::shared_ptr<TableJoin> & table_join,
const QueryTreeNodePtr & right_table_expression,
const Block & left_table_expression_header,
const Block & right_table_expression_header,
const PlannerContextPtr & planner_context);
const PlannerContextPtr & planner_context,
const SelectQueryInfo & select_query_info);
}

View File

@ -20,7 +20,9 @@ $CLICKHOUSE_CLIENT -q "
INSERT INTO t2 SELECT number, number FROM numbers_mt(1e6);
"
# list of query_id-s that expected to be executed without preallocation
queries_without_preallocation=()
# list of query_id-s that expected to be executed with preallocation
queries_with_preallocation=()
run_new_query() {
@ -51,6 +53,9 @@ $CLICKHOUSE_CLIENT "${opts[@]}" --query_id="$query_id" -q "SELECT * FROM t1 AS x
# now t1 is the right table
run_new_query "SELECT * FROM t2 AS x INNER JOIN t1 AS y ON x.a = y.a"
run_new_query "SELECT * FROM t1 AS x INNER JOIN t2 AS y ON x.a = y.a WHERE a < 200_000"
run_new_query "SELECT * FROM t1 AS x INNER JOIN t2 AS y ON x.a = y.a WHERE a >= 200_000"
##################################
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"