Fix test.

This commit is contained in:
Nikolai Kochetov 2018-07-31 14:31:18 +03:00
parent f6b9c46d24
commit 9359ae7650
3 changed files with 111 additions and 51 deletions

View File

@ -125,7 +125,21 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (!done_with_join)
{
subquery.joined_block_actions->execute(block);
if (subquery.joined_block_actions)
subquery.joined_block_actions->execute(block);
for (const auto & name_with_alias : subquery.joined_block_aliases)
{
if (block.has(name_with_alias.first))
{
auto pos = block.getPositionByName(name_with_alias.first);
auto column = block.getByPosition(pos);
block.erase(pos);
column.name = name_with_alias.second;
block.insert(std::move(column));
}
}
if (!subquery.join->insertFromBlock(block))
done_with_join = true;
}

View File

@ -453,10 +453,7 @@ void ExpressionAnalyzer::translateQualifiedNamesImpl(ASTPtr & ast, const std::ve
/// In case if column from the joined table are in source columns, change it's name to qualified.
if (best_table_pos && source_columns.contains(ast->getColumnName()))
{
analyzed_join.duplicate_columns_from_joined_table.insert(ast->getColumnName());
tables[best_table_pos].makeQualifiedName(ast);
}
}
}
else if (typeid_cast<ASTQualifiedAsterisk *>(ast.get()))
@ -1636,8 +1633,8 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
{
NamesAndTypesList temp_columns = source_columns;
temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end());
temp_columns.insert(temp_columns.end(), analyzed_join.columns_added_by_join.begin(),
analyzed_join.columns_added_by_join.end());
for (const auto & joined_column : analyzed_join.columns_added_by_join)
temp_columns.push_back(joined_column.name_and_type);
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, settings);
getRootActions(func->arguments->children.at(0), true, false, temp_actions);
@ -2542,22 +2539,27 @@ void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, bool only
{
if (only_types)
actions->add(ExpressionAction::ordinaryJoin(nullptr, analyzed_join.key_names_left,
analyzed_join.columns_added_by_join));
analyzed_join.getColumnsAddedByJoin()));
else
for (auto & subquery_for_set : subqueries_for_sets)
if (subquery_for_set.second.join)
actions->add(ExpressionAction::ordinaryJoin(subquery_for_set.second.join, analyzed_join.key_names_left,
analyzed_join.columns_added_by_join));
analyzed_join.getColumnsAddedByJoin()));
}
void ExpressionAnalyzer::AnalyzedJoin::createJoinedBlockActions(
const ASTTablesInSelectQueryElement & join,
const Context & context)
void ExpressionAnalyzer::AnalyzedJoin::createJoinedBlockActions(const ASTSelectQuery * select_query,
const Context & context)
{
const auto & join_params = static_cast<const ASTTableJoin &>(*join.table_join);
const auto & table_to_join = static_cast<const ASTTableExpression &>(*join.table_expression);
const auto joined_table_name = getTableNameWithAliasFromTableExpression(table_to_join, context);
if (!select_query)
return;
const ASTTablesInSelectQueryElement * join = select_query->join();
if (!join)
return;
const auto & join_params = static_cast<const ASTTableJoin &>(*join->table_join);
/// Create custom expression list with join keys from right table.
auto expression_list = std::make_shared<ASTExpressionList>();
@ -2567,18 +2569,34 @@ void ExpressionAnalyzer::AnalyzedJoin::createJoinedBlockActions(
for (const auto & join_right_key : key_asts_right)
children.emplace_back(join_right_key);
/// Alias duplicating columns.
for (const auto & name : duplicate_columns_from_joined_table)
{
auto identifier = std::make_shared<ASTIdentifier>(name);
identifier->setAlias(joined_table_name.getQualifiedNamePrefix() + name);
children.emplace_back(std::move(identifier));
}
NameSet required_columns_set(key_names_right.begin(), key_names_right.end());
for (const auto & joined_column : columns_added_by_join)
required_columns_set.insert(joined_column.original_name);
ExpressionAnalyzer analyzer(expression_list, context, nullptr, columns_from_joined_table, key_names_right);
required_columns_set.insert(key_names_right.begin(), key_names_right.end());
required_columns_from_joined_table.insert(required_columns_from_joined_table.end(),
required_columns_set.begin(), required_columns_set.end());
ExpressionAnalyzer analyzer(expression_list, context, nullptr, columns_from_joined_table, required_columns_from_joined_table);
joined_block_actions = analyzer.getActions(false);
for (const auto & column_required_from_actions : joined_block_actions->getRequiredColumns())
if (!required_columns_set.count(column_required_from_actions))
required_columns_from_joined_table.push_back(column_required_from_actions);
}
NamesAndTypesList ExpressionAnalyzer::AnalyzedJoin::getColumnsAddedByJoin() const
{
NamesAndTypesList result;
for (const auto & joined_column : columns_added_by_join)
result.push_back(joined_column.name_and_type);
return result;
}
bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types)
{
assertSelect();
@ -2644,29 +2662,35 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
else
table = table_to_join.subquery;
auto required_source_columns = analyzed_join.joined_block_actions->getRequiredColumns();
NameSet required_source_columns_set(required_source_columns.begin(), required_source_columns.end());
for (const auto & right_key : analyzed_join.key_names_right)
required_source_columns_set.insert(right_key);
for (auto & type_name : analyzed_join.columns_added_by_join)
required_source_columns_set.insert(type_name.name);
required_source_columns.clear();
required_source_columns.reserve(required_source_columns_set.size());
required_source_columns.insert(required_source_columns.end(),
required_source_columns_set.begin(), required_source_columns_set.end());
auto interpreter = interpretSubquery(table, context, subquery_depth, required_source_columns);
auto interpreter = interpretSubquery(table, context, subquery_depth, analyzed_join.required_columns_from_joined_table);
subquery_for_set.source = std::make_shared<LazyBlockInputStream>(
interpreter->getSampleBlock(),
[interpreter]() mutable { return interpreter->execute().in; });
}
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
/// Alias duplicating columns.
for (const auto & joined_column : analyzed_join.columns_added_by_join)
{
const auto & qualified_name = joined_column.name_and_type.name;
if (joined_column.original_name != qualified_name)
subquery_for_set.joined_block_aliases.emplace_back(joined_column.original_name, qualified_name);
}
auto sample_block = subquery_for_set.source->getHeader();
analyzed_join.joined_block_actions->execute(sample_block);
for (const auto & name_with_alias : subquery_for_set.joined_block_aliases)
{
if (sample_block.has(name_with_alias.first))
{
auto pos = sample_block.getPositionByName(name_with_alias.first);
auto column = sample_block.getByPosition(pos);
sample_block.erase(pos);
column.name = name_with_alias.second;
sample_block.insert(std::move(column));
}
}
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_set.join = join;
subquery_for_set.join->setSampleBlock(sample_block);
subquery_for_set.joined_block_actions = analyzed_join.joined_block_actions;
@ -2958,12 +2982,14 @@ void ExpressionAnalyzer::collectUsedColumns()
for (auto it = analyzed_join.columns_added_by_join.begin(); it != analyzed_join.columns_added_by_join.end();)
{
if (required_joined_columns.count(it->name))
if (required_joined_columns.count(it->name_and_type.name))
++it;
else
analyzed_join.columns_added_by_join.erase(it++);
}
analyzed_join.createJoinedBlockActions(select_query, context);
/// Some columns from right join key may be used in query. This columns will be appended to block during join.
for (const auto & right_key_name : analyzed_join.key_names_right)
if (required_joined_columns.count(right_key_name))
@ -3222,9 +3248,6 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns)
else if (table_join.on_expression)
collectJoinedColumnsFromJoinOnExpr();
analyzed_join.createJoinedBlockActions(*node, context);
analyzed_join.joined_block_actions->execute(nested_result_sample);
/// When we use JOIN ON syntax, non_joined_columns are columns from join_key_names_left,
/// because even if a column from join_key_names_right, we may need to join it if it has different name.
/// If we use USING syntax, join_key_names_left and join_key_names_right are almost the same, but we need to use
@ -3237,14 +3260,20 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns)
const auto & col = nested_result_sample.safeGetByPosition(i);
if (not_joined_columns.end() == std::find(not_joined_columns.begin(), not_joined_columns.end(), col.name))
{
if (joined_columns.count(col.name)) /// Duplicate columns in the subquery for JOIN do not make sense.
auto name = col.name;
/// Change name for duplicate column form joined table.
if (source_columns.contains(name))
name = joined_table_name.getQualifiedNamePrefix() + name;
if (joined_columns.count(name)) /// Duplicate columns in the subquery for JOIN do not make sense.
continue;
joined_columns.insert(col.name);
joined_columns.insert(name);
bool make_nullable = settings.join_use_nulls && (table_join.kind == ASTTableJoin::Kind::Left ||
table_join.kind == ASTTableJoin::Kind::Full);
analyzed_join.columns_added_by_join.emplace_back(col.name, make_nullable ? makeNullable(col.type) : col.type);
auto type = make_nullable ? makeNullable(col.type) : col.type;
analyzed_join.columns_added_by_join.emplace_back(NameAndTypePair(name, std::move(type)), col.name);
}
}
}

View File

@ -52,7 +52,10 @@ struct SubqueryForSet
/// If set, build it from result.
SetPtr set;
JoinPtr join;
/// Apply this actions to joined block.
ExpressionActionsPtr joined_block_actions;
/// Rename column from joined block from this list.
NamesWithAliases joined_block_aliases;
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
@ -243,19 +246,33 @@ private:
ASTs key_asts_left;
ASTs key_asts_right;
/// This columns will be renamed to alias.column or database.table.column
NameSet duplicate_columns_from_joined_table;
/// All columns which can be read from right table.
struct JoinedColumn
{
/// Column will be joined to block.
NameAndTypePair name_and_type;
/// original column name from joined source.
String original_name;
JoinedColumn(const NameAndTypePair & name_and_type_, const String & original_name_)
: name_and_type(name_and_type_), original_name(original_name_) {}
};
using JoinedColumnsList = std::list<JoinedColumn>;
/// All columns which can be read from joined table.
NamesAndTypesList columns_from_joined_table;
/// Columns which will be used in query to the joined query.
Names required_columns_from_joined_table;
/// Columns which will be added to block, possible including some columns from right join key.
NamesAndTypesList columns_added_by_join;
JoinedColumnsList columns_added_by_join;
/// Such columns will be copied from left join keys during join.
NameSet columns_added_by_join_from_right_keys;
/// Actions which need to be calculated on joined block.
ExpressionActionsPtr joined_block_actions;
void createJoinedBlockActions(const ASTTablesInSelectQueryElement & join,
const Context & context);
void createJoinedBlockActions(const ASTSelectQuery * select_query, const Context & context);
NamesAndTypesList getColumnsAddedByJoin() const;
};
AnalyzedJoin analyzed_join;