Refactoring: extract SubqueryForSet to own files

This commit is contained in:
chertus 2019-01-30 15:01:00 +03:00
parent b52bc2466d
commit d6450bc488
7 changed files with 111 additions and 68 deletions

View File

@ -120,17 +120,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (!done_with_join)
{
for (const auto & name_with_alias : subquery.joined_block_aliases)
{
if (block.has(name_with_alias.first))
{
auto pos = block.getPositionByName(name_with_alias.first);
auto column = block.getByPosition(pos);
block.erase(pos);
column.name = name_with_alias.second;
block.insert(std::move(column));
}
}
subquery.renameColumns(block);
if (subquery.joined_block_actions)
subquery.joined_block_actions->execute(block);

View File

@ -3,6 +3,7 @@
#include <Parsers/IAST.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/SubqueryForSet.h>
namespace DB
@ -11,32 +12,6 @@ namespace DB
class Context;
class ASTFunction;
class Join;
using JoinPtr = std::shared_ptr<Join>;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
{
/// The source is obtained using the InterpreterSelectQuery subquery.
BlockInputStreamPtr source;
/// If set, build it from result.
SetPtr set;
JoinPtr join;
/// Apply this actions to joined block.
ExpressionActionsPtr joined_block_actions;
/// Rename column from joined block from this list.
NamesWithAliases joined_block_aliases;
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
};
/// ID of subquery -> what to do with it.
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
/// The case of an explicit enumeration of values.
SetPtr makeExplicitSet(
const ASTFunction * node, const Block & sample_block, bool create_ordered_set,

View File

@ -22,7 +22,6 @@
#include <Columns/IColumn.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/InJoinSubqueriesPreprocessor.h>
@ -39,7 +38,6 @@
#include <Storages/StorageMemory.h>
#include <Storages/StorageJoin.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Dictionaries/IDictionary.h>
@ -569,9 +567,6 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
if (!subquery_for_set.join)
{
JoinPtr join = std::make_shared<Join>(analyzedJoin().key_names_right, settings.join_use_nulls,
settings.size_limits_for_join, join_params.kind, join_params.strictness);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
@ -588,39 +583,23 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
else if (table_to_join.database_and_table_name)
table = table_to_join.database_and_table_name;
const JoinedColumnsList & columns_from_joined_table = analyzedJoin().columns_from_joined_table;
Names original_columns;
for (const auto & column : analyzedJoin().columns_from_joined_table)
for (const auto & column : columns_from_joined_table)
if (required_columns_from_joined_table.count(column.name_and_type.name))
original_columns.emplace_back(column.original_name);
auto interpreter = interpretSubquery(table, context, subquery_depth, original_columns);
subquery_for_set.source = std::make_shared<LazyBlockInputStream>(
interpreter->getSampleBlock(),
[interpreter]() mutable { return interpreter->execute().in; });
}
/// Alias duplicating columns as qualified.
for (const auto & column : analyzedJoin().columns_from_joined_table)
if (required_columns_from_joined_table.count(column.name_and_type.name))
subquery_for_set.joined_block_aliases.emplace_back(column.original_name, column.name_and_type.name);
auto sample_block = subquery_for_set.source->getHeader();
for (const auto & name_with_alias : subquery_for_set.joined_block_aliases)
{
if (sample_block.has(name_with_alias.first))
{
auto pos = sample_block.getPositionByName(name_with_alias.first);
auto column = sample_block.getByPosition(pos);
sample_block.erase(pos);
column.name = name_with_alias.second;
sample_block.insert(std::move(column));
}
subquery_for_set.makeSource(interpreter, columns_from_joined_table, required_columns_from_joined_table);
}
Block sample_block = subquery_for_set.renamedSampleBlock();
joined_block_actions->execute(sample_block);
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_set.join = join;
subquery_for_set.join = std::make_shared<Join>(analyzedJoin().key_names_right, settings.join_use_nulls,
settings.size_limits_for_join, join_params.kind, join_params.strictness);
subquery_for_set.join->setSampleBlock(sample_block);
subquery_for_set.joined_block_actions = joined_block_actions;
}

View File

@ -379,8 +379,9 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
if (query_analyzer->appendJoin(chain, dry_run || !res.first_stage))
{
res.has_join = true;
res.before_join = chain.getLastActions();
if (!res.hasJoin())
throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR);
chain.addStep();
}
@ -547,7 +548,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
if (expressions.first_stage)
{
if (expressions.has_join)
if (expressions.hasJoin())
{
const ASTTableJoin & join = static_cast<const ASTTableJoin &>(*query.join()->table_join);
if (join.kind == ASTTableJoin::Kind::Full || join.kind == ASTTableJoin::Kind::Right)

View File

@ -132,7 +132,7 @@ private:
struct AnalysisResult
{
bool has_join = false;
bool hasJoin() const { return before_join.get(); }
bool has_where = false;
bool need_aggregate = false;
bool has_having = false;

View File

@ -0,0 +1,49 @@
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/AnalyzedJoin.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <DataStreams/LazyBlockInputStream.h>
namespace DB
{
void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
const std::list<JoinedColumn> & columns_from_joined_table,
const NameSet & required_columns_from_joined_table)
{
source = std::make_shared<LazyBlockInputStream>(interpreter->getSampleBlock(),
[interpreter]() mutable { return interpreter->execute().in; });
for (const auto & column : columns_from_joined_table)
if (required_columns_from_joined_table.count(column.name_and_type.name))
joined_block_aliases.emplace_back(column.original_name, column.name_and_type.name);
sample_block = source->getHeader();
for (const auto & name_with_alias : joined_block_aliases)
{
if (sample_block.has(name_with_alias.first))
{
auto pos = sample_block.getPositionByName(name_with_alias.first);
auto column = sample_block.getByPosition(pos);
sample_block.erase(pos);
column.name = name_with_alias.second;
sample_block.insert(std::move(column));
}
}
}
void SubqueryForSet::renameColumns(Block & block)
{
for (const auto & name_with_alias : joined_block_aliases)
{
if (block.has(name_with_alias.first))
{
auto pos = block.getPositionByName(name_with_alias.first);
auto column = block.getByPosition(pos);
block.erase(pos);
column.name = name_with_alias.second;
block.insert(std::move(column));
}
}
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
class Join;
using JoinPtr = std::shared_ptr<Join>;
class InterpreterSelectWithUnionQuery;
struct JoinedColumn;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
{
/// The source is obtained using the InterpreterSelectQuery subquery.
BlockInputStreamPtr source;
/// If set, build it from result.
SetPtr set;
JoinPtr join;
/// Apply this actions to joined block.
ExpressionActionsPtr joined_block_actions;
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
void makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
const std::list<JoinedColumn> & columns_from_joined_table,
const NameSet & required_columns_from_joined_table);
Block renamedSampleBlock() const { return sample_block; }
void renameColumns(Block & block);
private:
NamesWithAliases joined_block_aliases; /// Rename column from joined block from this list.
Block sample_block; /// source->getHeader() + column renames
};
/// ID of subquery -> what to do with it.
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
}