Don't store converting actions in TableJoin

This commit is contained in:
vdimir 2021-06-29 12:52:19 +03:00
parent ed8c156190
commit df873866c9
No known key found for this signature in database
GPG Key ID: F57B3E10A21DBB31
4 changed files with 43 additions and 63 deletions

View File

@ -832,14 +832,14 @@ bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain &
return true;
}
JoinPtr SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain)
JoinPtr SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, ActionsDAGPtr & converting_join_columns)
{
const ColumnsWithTypeAndName & left_sample_columns = chain.getLastStep().getResultColumns();
JoinPtr table_join = makeTableJoin(*syntax->ast_join, left_sample_columns);
JoinPtr table_join = makeTableJoin(*syntax->ast_join, left_sample_columns, converting_join_columns);
if (auto left_actions = syntax->analyzed_join->leftConvertingActions())
if (converting_join_columns)
{
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(left_actions));
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(converting_join_columns));
chain.addStep();
}
@ -871,10 +871,9 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> ana
return std::make_shared<JoinSwitcher>(analyzed_join, sample_block);
}
std::unique_ptr<QueryPlan> buildJoinedPlan(
static std::unique_ptr<QueryPlan> buildJoinedPlan(
ContextPtr context,
const ASTTablesInSelectQueryElement & join_element,
const ColumnsWithTypeAndName & left_sample_columns,
TableJoin & analyzed_join,
SelectQueryOptions query_options)
{
@ -918,40 +917,44 @@ std::unique_ptr<QueryPlan> buildJoinedPlan(
joined_actions_step->setStepDescription("Joined actions");
joined_plan->addStep(std::move(joined_actions_step));
const ColumnsWithTypeAndName & right_sample_columns = joined_plan->getCurrentDataStream().header.getColumnsWithTypeAndName();
analyzed_join.createConvertingActions(left_sample_columns, right_sample_columns);
if (auto right_actions = analyzed_join.rightConvertingActions())
{
auto converting_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), right_actions);
converting_step->setStepDescription("Convert joined columns");
joined_plan->addStep(std::move(converting_step));
}
return joined_plan;
}
JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
const ASTTablesInSelectQueryElement & join_element, const ColumnsWithTypeAndName & left_sample_columns)
const ASTTablesInSelectQueryElement & join_element,
const ColumnsWithTypeAndName & left_columns,
ActionsDAGPtr & left_convert_actions)
{
/// Two JOINs are not supported with the same subquery, but different USINGs.
if (joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table join was already created for query");
if (auto storage = syntax->analyzed_join->getStorageJoin())
ActionsDAGPtr right_convert_actions = nullptr;
const auto & analyzed_join = syntax->analyzed_join;
if (auto storage = analyzed_join->getStorageJoin())
{
syntax->analyzed_join->createConvertingActions(left_sample_columns, {});
return storage->getJoinLocked(syntax->analyzed_join);
std::tie(left_convert_actions, right_convert_actions) = analyzed_join->createConvertingActions(left_columns, {});
return storage->getJoinLocked(analyzed_join);
}
joined_plan = buildJoinedPlan(getContext(), join_element, left_sample_columns, *syntax->analyzed_join, query_options);
joined_plan = buildJoinedPlan(getContext(), join_element, *analyzed_join, query_options);
JoinPtr join = chooseJoinAlgorithm(syntax->analyzed_join, joined_plan->getCurrentDataStream().header, getContext());
const ColumnsWithTypeAndName & right_columns = joined_plan->getCurrentDataStream().header.getColumnsWithTypeAndName();
std::tie(left_convert_actions, right_convert_actions) = analyzed_join->createConvertingActions(left_columns, right_columns);
if (right_convert_actions)
{
auto converting_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), right_convert_actions);
converting_step->setStepDescription("Convert joined columns");
joined_plan->addStep(std::move(converting_step));
}
JoinPtr join = chooseJoinAlgorithm(analyzed_join, joined_plan->getCurrentDataStream().header, getContext());
/// Do not make subquery for join over dictionary.
if (syntax->analyzed_join->getDictionaryReader())
if (analyzed_join->getDictionaryReader())
joined_plan.reset();
return join;
@ -1544,8 +1547,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
{
query_analyzer.appendJoinLeftKeys(chain, only_types || !first_stage);
before_join = chain.getLastActions();
join = query_analyzer.appendJoin(chain);
converting_join_columns = query_analyzer.analyzedJoin().leftConvertingActions();
join = query_analyzer.appendJoin(chain, converting_join_columns);
chain.addStep();
}

View File

@ -341,7 +341,8 @@ private:
JoinPtr makeTableJoin(
const ASTTablesInSelectQueryElement & join_element,
const ColumnsWithTypeAndName & left_sample_columns);
const ColumnsWithTypeAndName & left_columns,
ActionsDAGPtr & left_convert_actions);
const ASTSelectQuery * getAggregatingQuery() const;
@ -362,7 +363,8 @@ private:
/// Before aggregation:
ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, ActionsDAGPtr & before_array_join, bool only_types);
bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types);
JoinPtr appendJoin(ExpressionActionsChain & chain);
JoinPtr appendJoin(ExpressionActionsChain & chain, ActionsDAGPtr & converting_join_columns);
/// remove_filter is set in ExpressionActionsChain::finalize();
/// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier).
ActionsDAGPtr appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns);

View File

@ -80,8 +80,6 @@ void TableJoin::resetCollected()
renames.clear();
left_type_map.clear();
right_type_map.clear();
left_converting_actions = nullptr;
right_converting_actions = nullptr;
}
void TableJoin::addUsingKey(const ASTPtr & ast)
@ -386,15 +384,15 @@ bool TableJoin::tryInitDictJoin(const Block & sample_block, ContextPtr context)
return true;
}
bool TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & right_sample_columns)
std::pair<ActionsDAGPtr, ActionsDAGPtr>
TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & right_sample_columns)
{
bool need_convert = false;
need_convert = inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage());
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage());
left_converting_actions = applyKeyConvertToTable(left_sample_columns, left_type_map, key_names_left);
right_converting_actions = applyKeyConvertToTable(right_sample_columns, right_type_map, key_names_right);
auto left_converting_actions = applyKeyConvertToTable(left_sample_columns, left_type_map, key_names_left);
auto right_converting_actions = applyKeyConvertToTable(right_sample_columns, right_type_map, key_names_right);
return need_convert;
return {left_converting_actions, right_converting_actions};
}
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
@ -491,7 +489,6 @@ ActionsDAGPtr TableJoin::applyKeyConvertToTable(
return dag;
}
void TableJoin::setStorageJoin(std::shared_ptr<StorageJoin> storage)
{
if (right_storage_dictionary)
@ -506,11 +503,6 @@ void TableJoin::setStorageJoin(std::shared_ptr<StorageDictionary> storage)
right_storage_dictionary = storage;
}
std::shared_ptr<StorageJoin> TableJoin::getStorageJoin()
{
return right_storage_join;
}
String TableJoin::renamedRightColumnName(const String & name) const
{
if (const auto it = renames.find(name); it != renames.end())
@ -580,14 +572,4 @@ std::pair<String, String> TableJoin::joinConditionColumnNames() const
return res;
}
bool TableJoin::isSpecialStorage() const
{
return right_storage_dictionary || right_storage_join;
}
const DictionaryReader * TableJoin::getDictionaryReader() const
{
return dictionary_reader.get();
}
}

View File

@ -96,9 +96,6 @@ private:
NameToTypeMap left_type_map;
NameToTypeMap right_type_map;
ActionsDAGPtr left_converting_actions;
ActionsDAGPtr right_converting_actions;
/// Name -> original name. Names are the same as in columns_from_joined_table list.
std::unordered_map<String, String> original_names;
/// Original name -> name. Only renamed columns.
@ -205,11 +202,8 @@ public:
/// Calculate converting actions, rename key columns in required
/// For `USING` join we will convert key columns inplace and affect into types in the result table
/// For `JOIN ON` we will create new columns with converted keys to join by.
bool createConvertingActions(const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & right_sample_columns);
/// Key columns should be converted before join.
ActionsDAGPtr leftConvertingActions() const { return left_converting_actions; }
ActionsDAGPtr rightConvertingActions() const { return right_converting_actions; }
std::pair<ActionsDAGPtr, ActionsDAGPtr>
createConvertingActions(const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & right_sample_columns);
void setAsofInequality(ASOF::Inequality inequality) { asof_inequality = inequality; }
ASOF::Inequality getAsofInequality() { return asof_inequality; }
@ -240,12 +234,12 @@ public:
void setStorageJoin(std::shared_ptr<StorageJoin> storage);
void setStorageJoin(std::shared_ptr<StorageDictionary> storage);
std::shared_ptr<StorageJoin> getStorageJoin();
std::shared_ptr<StorageJoin> getStorageJoin() { return right_storage_join; }
bool tryInitDictJoin(const Block & sample_block, ContextPtr context);
bool isSpecialStorage() const;
const DictionaryReader * getDictionaryReader() const;
bool isSpecialStorage() const { return right_storage_dictionary || right_storage_join; }
const DictionaryReader * getDictionaryReader() const { return dictionary_reader.get(); }
};
}