some more ExpressionAnalyzer refactoring

This commit is contained in:
chertus 2019-09-04 19:20:02 +03:00
parent 904cfed046
commit b854c945ad
9 changed files with 61 additions and 70 deletions

View File

@ -16,4 +16,7 @@ using NameOrderedSet = std::set<std::string>;
using NameToNameMap = std::unordered_map<std::string, std::string>;
using NameToNameSetMap = std::unordered_map<std::string, NameSet>;
using NameWithAlias = std::pair<std::string, std::string>;
using NamesWithAliases = std::vector<NameWithAlias>;
}

View File

@ -93,14 +93,14 @@ NameSet AnalyzedJoin::getOriginalColumnsSet() const
return out;
}
std::unordered_map<String, String> AnalyzedJoin::getOriginalColumnsMap(const NameSet & required_columns) const
NamesWithAliases AnalyzedJoin::getNamesWithAliases(const NameSet & required_columns) const
{
std::unordered_map<String, String> out;
NamesWithAliases out;
for (const auto & column : required_columns)
{
auto it = original_names.find(column);
if (it != original_names.end())
out.insert(*it);
out.emplace_back(it->second, it->first); /// {original_name, name}
}
return out;
}
@ -129,15 +129,15 @@ Names AnalyzedJoin::requiredJoinedNames() const
return Names(required_columns_set.begin(), required_columns_set.end());
}
void AnalyzedJoin::appendRequiredColumns(const Block & sample, NameSet & required_columns) const
NamesWithAliases AnalyzedJoin::getRequiredColumns(const Block & sample, const Names & action_required_columns) const
{
for (auto & column : key_names_right)
NameSet required_columns(action_required_columns.begin(), action_required_columns.end());
for (auto & column : requiredJoinedNames())
if (!sample.has(column))
required_columns.insert(column);
for (auto & column : columns_added_by_join)
if (!sample.has(column.name))
required_columns.insert(column.name);
return getNamesWithAliases(required_columns);
}
void AnalyzedJoin::addJoinedColumn(const NameAndTypePair & joined_column)

View File

@ -64,12 +64,12 @@ public:
NameSet getQualifiedColumnsSet() const;
NameSet getOriginalColumnsSet() const;
std::unordered_map<String, String> getOriginalColumnsMap(const NameSet & required_columns) const;
NamesWithAliases getNamesWithAliases(const NameSet & required_columns) const;
NamesWithAliases getRequiredColumns(const Block & sample, const Names & action_columns) const;
void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);
size_t rightKeyInclusion(const String & name) const;
void appendRequiredColumns(const Block & sample, NameSet & required_columns) const;
void addJoinedColumn(const NameAndTypePair & joined_column);
void addJoinedColumnsAndCorrectNullability(Block & sample_block) const;

View File

@ -20,9 +20,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
using NameWithAlias = std::pair<std::string, std::string>;
using NamesWithAliases = std::vector<NameWithAlias>;
class AnalyzedJoin;
class IPreparedFunction;

View File

@ -418,8 +418,7 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, b
if (!ast_join)
return false;
SubqueryForSet & subquery_for_set = getSubqueryForJoin(*ast_join);
syntax->analyzed_join->setHashJoin(subquery_for_set.join);
makeTableJoin(*ast_join);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
@ -457,7 +456,7 @@ static JoinPtr tryGetStorageJoin(const ASTTablesInSelectQueryElement & join_elem
return {};
}
SubqueryForSet & SelectQueryExpressionAnalyzer::getSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element)
void SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQueryElement & join_element)
{
/// Two JOINs are not supported with the same subquery, but different USINGs.
auto join_hash = join_element.getTreeHash();
@ -470,55 +469,45 @@ SubqueryForSet & SelectQueryExpressionAnalyzer::getSubqueryForJoin(const ASTTabl
subquery_for_set.join = tryGetStorageJoin(join_element, context);
if (!subquery_for_set.join)
makeHashJoin(join_element, subquery_for_set);
return subquery_for_set;
}
void SelectQueryExpressionAnalyzer::makeHashJoin(const ASTTablesInSelectQueryElement & join_element,
SubqueryForSet & subquery_for_set) const
{
{
/// Actions which need to be calculated on joined block.
ExpressionActionsPtr joined_block_actions = createJoinedBlockActions();
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
* - this function shows the expression JOIN _data1.
*/
if (!subquery_for_set.source)
{
ASTPtr table;
auto & table_to_join = join_element.table_expression->as<ASTTableExpression &>();
if (table_to_join.subquery)
table = table_to_join.subquery;
else if (table_to_join.table_function)
table = table_to_join.table_function;
else if (table_to_join.database_and_table_name)
table = table_to_join.database_and_table_name;
Names action_columns = joined_block_actions->getRequiredColumns();
NameSet required_columns(action_columns.begin(), action_columns.end());
analyzedJoin().appendRequiredColumns(joined_block_actions->getSampleBlock(), required_columns);
auto original_map = analyzedJoin().getOriginalColumnsMap(required_columns);
Names original_columns;
for (auto & pr : original_map)
original_columns.push_back(pr.second);
auto interpreter = interpretSubquery(table, context, subquery_depth, original_columns);
subquery_for_set.makeSource(interpreter, original_map);
}
makeSubqueryForJoin(join_element, joined_block_actions, subquery_for_set);
/// Test actions on sample block (early error detection)
Block sample_block = subquery_for_set.renamedSampleBlock();
joined_block_actions->execute(sample_block);
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_set.join = analyzedJoin().makeHashJoin(sample_block, settings.size_limits_for_join);
subquery_for_set.joined_block_actions = joined_block_actions;
}
syntax->analyzed_join->setHashJoin(subquery_for_set.join);
}
void SelectQueryExpressionAnalyzer::makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element,
const ExpressionActionsPtr & joined_block_actions,
SubqueryForSet & subquery_for_set) const
{
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
* - this function shows the expression JOIN _data1.
*/
NamesWithAliases required_columns_with_aliases =
analyzedJoin().getRequiredColumns(joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns());
Names original_columns;
for (auto & pr : required_columns_with_aliases)
original_columns.push_back(pr.first);
auto interpreter = interpretSubquery(join_element.table_expression, context, subquery_depth, original_columns);
subquery_for_set.makeSource(interpreter, std::move(required_columns_with_aliases));
}
ExpressionActionsPtr SelectQueryExpressionAnalyzer::createJoinedBlockActions() const

View File

@ -219,9 +219,10 @@ private:
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
SubqueryForSet & getSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element);
void makeTableJoin(const ASTTablesInSelectQueryElement & join_element);
void makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element, const ExpressionActionsPtr & joined_block_actions,
SubqueryForSet & subquery_for_set) const;
ExpressionActionsPtr createJoinedBlockActions() const;
void makeHashJoin(const ASTTablesInSelectQueryElement & join_element, SubqueryForSet & subquery_for_set) const;
const ASTSelectQuery * getAggregatingQuery() const;
};

View File

@ -6,26 +6,14 @@ namespace DB
{
void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
const std::unordered_map<String, String> & name_to_origin)
NamesWithAliases && joined_block_aliases_)
{
joined_block_aliases = std::move(joined_block_aliases_);
source = std::make_shared<LazyBlockInputStream>(interpreter->getSampleBlock(),
[interpreter]() mutable { return interpreter->execute().in; });
for (const auto & names : name_to_origin)
joined_block_aliases.emplace_back(names.second, names.first);
sample_block = source->getHeader();
for (const auto & name_with_alias : joined_block_aliases)
{
if (sample_block.has(name_with_alias.first))
{
auto pos = sample_block.getPositionByName(name_with_alias.first);
auto column = sample_block.getByPosition(pos);
sample_block.erase(pos);
column.name = name_with_alias.second;
sample_block.insert(std::move(column));
}
}
renameColumns(sample_block);
}
void SubqueryForSet::renameColumns(Block & block)

View File

@ -31,7 +31,7 @@ struct SubqueryForSet
StoragePtr table;
void makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
const std::unordered_map<String, String> & name_to_origin);
NamesWithAliases && joined_block_aliases_);
Block renamedSampleBlock() const { return sample_block; }
void renameColumns(Block & block);

View File

@ -18,6 +18,19 @@ namespace DB
std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
const ASTPtr & table_expression, const Context & context, size_t subquery_depth, const Names & required_source_columns)
{
if (auto * expr = table_expression->as<ASTTableExpression>())
{
ASTPtr table;
if (expr->subquery)
table = expr->subquery;
else if (expr->table_function)
table = expr->table_function;
else if (expr->database_and_table_name)
table = expr->database_and_table_name;
return interpretSubquery(table, context, subquery_depth, required_source_columns);
}
/// Subquery or table name. The name of the table is similar to the subquery `SELECT * FROM t`.
const auto * subquery = table_expression->as<ASTSubquery>();
const auto * function = table_expression->as<ASTFunction>();