This commit is contained in:
feng lv 2020-11-01 13:54:07 +00:00
parent 7bfd5d9e8d
commit 382fff9009
11 changed files with 238 additions and 365 deletions

View File

@ -0,0 +1,39 @@
#pragma once
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class IInterpreterUnionOrSelectQuery : public IInterpreter
{
public:
IInterpreterUnionOrSelectQuery(const ASTPtr & query_ptr_, const Context & context_, const SelectQueryOptions & options_)
: query_ptr(query_ptr_)
, context(std::make_shared<Context>(context_))
, options(options_)
, max_streams(context->getSettingsRef().max_threads)
{
}
virtual void buildQueryPlan(QueryPlan & query_plan) = 0;
virtual void ignoreWithTotals() = 0;
virtual ~IInterpreterUnionOrSelectQuery() override = default;
Block getSampleBlock() { return result_header; }
size_t getMaxStreams() const { return max_streams; }
protected:
ASTPtr query_ptr;
std::shared_ptr<Context> context;
Block result_header;
SelectQueryOptions options;
size_t max_streams = 1;
};
}

View File

@ -119,7 +119,6 @@ String InterpreterSelectQuery::generateFilterActions(
ParserExpression expr_parser; ParserExpression expr_parser;
expr_list->children.push_back(parseQuery(expr_parser, column_str, 0, context->getSettingsRef().max_parser_depth)); expr_list->children.push_back(parseQuery(expr_parser, column_str, 0, context->getSettingsRef().max_parser_depth));
} }
select_ast->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>()); select_ast->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>());
auto tables = select_ast->tables(); auto tables = select_ast->tables();
auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>(); auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
@ -215,10 +214,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const SelectQueryOptions & options_, const SelectQueryOptions & options_,
const Names & required_result_column_names, const Names & required_result_column_names,
const StorageMetadataPtr & metadata_snapshot_) const StorageMetadataPtr & metadata_snapshot_)
: options(options_)
/// NOTE: the query almost always should be cloned because it will be modified during analysis. /// NOTE: the query almost always should be cloned because it will be modified during analysis.
, query_ptr(options.modify_inplace ? query_ptr_ : query_ptr_->clone()) : IInterpreterUnionOrSelectQuery(options_.modify_inplace ? query_ptr_ : query_ptr_->clone(), context_, options_)
, context(std::make_shared<Context>(context_))
, storage(storage_) , storage(storage_)
, input(input_) , input(input_)
, input_pipe(std::move(input_pipe_)) , input_pipe(std::move(input_pipe_))
@ -464,12 +461,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
sanitizeBlock(result_header, true); sanitizeBlock(result_header, true);
} }
Block InterpreterSelectQuery::getSampleBlock()
{
return result_header;
}
void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan) void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{ {
executeImpl(query_plan, input, std::move(input_pipe)); executeImpl(query_plan, input, std::move(input_pipe));

View File

@ -3,16 +3,15 @@
#include <memory> #include <memory>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <Parsers/ASTSelectQuery.h>
#include <DataStreams/IBlockStream_fwd.h> #include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Interpreters/SelectQueryOptions.h> #include <Interpreters/StorageID.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h> #include <Storages/TableLockHolder.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Interpreters/StorageID.h>
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
@ -32,7 +31,7 @@ using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage. /** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
*/ */
class InterpreterSelectQuery : public IInterpreter class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
{ {
public: public:
/** /**
@ -79,18 +78,12 @@ public:
BlockIO execute() override; BlockIO execute() override;
/// Builds QueryPlan for current query. /// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan); virtual void buildQueryPlan(QueryPlan & query_plan) override;
bool ignoreLimits() const override { return options.ignore_limits; } bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; } bool ignoreQuota() const override { return options.ignore_quota; }
Block getSampleBlock(); virtual void ignoreWithTotals() override;
void ignoreWithTotals();
ASTPtr getQuery() const { return query_ptr; }
size_t getMaxStreams() const { return max_streams; }
const SelectQueryInfo & getQueryInfo() const { return query_info; } const SelectQueryInfo & getQueryInfo() const { return query_info; }
@ -158,9 +151,6 @@ private:
*/ */
void initSettings(); void initSettings();
SelectQueryOptions options;
ASTPtr query_ptr;
std::shared_ptr<Context> context;
TreeRewriterResultPtr syntax_analyzer_result; TreeRewriterResultPtr syntax_analyzer_result;
std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer; std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer;
SelectQueryInfo query_info; SelectQueryInfo query_info;
@ -172,15 +162,10 @@ private:
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;
/// List of columns to read to execute the query. /// List of columns to read to execute the query.
Names required_columns; Names required_columns;
/// Structure of query source (table, subquery, etc). /// Structure of query source (table, subquery, etc).
Block source_header; Block source_header;
/// Structure of query result.
Block result_header;
/// The subquery interpreter, if the subquery /// The subquery interpreter, if the subquery
std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery; std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery;

View File

@ -10,8 +10,6 @@
#include <Processors/QueryPlan/UnionStep.h> #include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/DistinctStep.h> #include <Processors/QueryPlan/DistinctStep.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB namespace DB
{ {
@ -22,206 +20,40 @@ namespace ErrorCodes
extern const int EXPECTED_ALL_OR_DISTINCT; extern const int EXPECTED_ALL_OR_DISTINCT;
} }
struct CustomizeUnionModeRewrite
{
using TypeToVisit = ASTSelectWithUnionQuery;
const UnionMode & default_union_mode;
void visit(ASTSelectWithUnionQuery & union_select, ASTPtr &)
{
size_t num_selects = union_select.list_of_selects->children.size();
if (!num_selects)
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
if (num_selects > 1)
{
for (auto & mode : union_select.union_modes)
{
if (mode == ASTSelectWithUnionQuery::Mode::Unspecified)
{
if (default_union_mode == UnionMode::ALL)
mode = ASTSelectWithUnionQuery::Mode::ALL;
else if (default_union_mode == UnionMode::DISTINCT)
mode = ASTSelectWithUnionQuery::Mode::DISTINCT;
else
throw Exception(
"Expected ALL or DISTINCT in SelectWithUnion query, because setting (union_default_mode) is empty",
DB::ErrorCodes::EXPECTED_ALL_OR_DISTINCT);
}
}
/// Optimize: if there is UNION DISTINCT, all previous UNION DISTINCT can be rewritten to UNION ALL.
/// Therefore we have at most one UNION DISTINCT in a sequence.
for (auto rit = union_select.union_modes.rbegin(); rit != union_select.union_modes.rend(); ++rit)
{
if (*rit == ASTSelectWithUnionQuery::Mode::DISTINCT)
{
/// Number of streams need to do a DISTINCT transform after unite
for (auto mode_to_modify = ++rit; mode_to_modify != union_select.union_modes.rend(); ++mode_to_modify)
*mode_to_modify = ASTSelectWithUnionQuery::Mode::ALL;
break;
}
}
}
}
};
using CustomizeUnionQueryOptimizeVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeUnionModeRewrite>, true>;
QueryPlan NestedInterpreter::buildQueryPlan(const std::shared_ptr<Context> & context, const Block & header)
{
QueryPlan res;
if (type == Type::LEAF)
{
if (interpreter)
{
interpreter->buildQueryPlan(res);
return res;
}
else
throw Exception("Interpreter is not initialized.", ErrorCodes::LOGICAL_ERROR);
}
if (num_distinct_union == 0)
{
std::vector<std::unique_ptr<QueryPlan>> plans(children.size());
DataStreams data_streams(children.size());
for (size_t i = 0; i < children.size(); ++i)
{
plans[i] = std::make_unique<QueryPlan>(children[i]->buildQueryPlan(context, header));
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), header, max_threads);
res.unitePlans(std::move(union_step), std::move(plans));
return res;
}
/// The first union_distinct_num UNION streams need to do a DISTINCT transform after unite
else
{
QueryPlan distinct_query_plan;
std::vector<std::unique_ptr<QueryPlan>> plans(num_distinct_union);
DataStreams data_streams(num_distinct_union);
for (size_t i = 0; i < num_distinct_union; ++i)
{
plans[i] = std::make_unique<QueryPlan>(children[i]->buildQueryPlan(context, header));
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), header, max_threads);
distinct_query_plan.unitePlans(std::move(union_step), std::move(plans));
/// Add distinct transform
const Settings & settings = context->getSettingsRef();
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
auto distinct_step
= std::make_unique<DistinctStep>(distinct_query_plan.getCurrentDataStream(), limits, 0, header.getNames(), false);
distinct_query_plan.addStep(std::move(distinct_step));
/// No other UNION streams after DISTINCT stream
if (num_distinct_union == children.size())
{
return distinct_query_plan;
}
/// Build final UNION step
std::vector<std::unique_ptr<QueryPlan>> final_plans(children.size() - num_distinct_union + 1);
DataStreams final_data_streams(children.size() - num_distinct_union + 1);
final_plans[0] = std::make_unique<QueryPlan>(std::move(distinct_query_plan));
final_data_streams[0] = final_plans[0]->getCurrentDataStream();
for (size_t i = 1; i < children.size() - num_distinct_union + 1; ++i)
{
final_plans[i] = std::make_unique<QueryPlan>(children[num_distinct_union + i - 1]->buildQueryPlan(context, header));
final_data_streams[i] = final_plans[i]->getCurrentDataStream();
}
auto final_union_step = std::make_unique<UnionStep>(std::move(final_data_streams), header, max_threads);
res.unitePlans(std::move(final_union_step), std::move(final_plans));
return res;
}
}
void NestedInterpreter::ignoreWithTotals()
{
if (type == Type::LEAF)
{
if (interpreter)
interpreter->ignoreWithTotals();
else
{
throw Exception("Interpreter is not initialized.", ErrorCodes::LOGICAL_ERROR);
}
return;
}
for (auto & child : children)
{
child->ignoreWithTotals();
}
}
InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_, const ASTPtr & query_ptr_, const Context & context_, const SelectQueryOptions & options_, const Names & required_result_column_names)
const Context & context_, : IInterpreterUnionOrSelectQuery(query_ptr_, context_, options_)
const SelectQueryOptions & options_,
const Names & required_result_column_names)
: options(options_),
query_ptr(query_ptr_),
context(std::make_shared<Context>(context_)),
max_streams(context->getSettingsRef().max_threads)
{ {
std::cout << "\n\n In InterpreterSelectWithUnionQuery\n\n";
const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>(); const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
std::cout << "\n\n before throw\n\n";
if (!ast.flatten_nodes_list)
std::cout << "\n\n flatten_nodes_list is null\n\n";
size_t total_num_selects = ast.flatten_nodes_list->children.size();
std::cout << "\n\n after get num throw\n\n";
if (!total_num_selects)
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
std::cout << "\n\n after throw\n\n";
/// Rewrite ast with settings.union_default_mode size_t num_children = ast.list_of_selects->children.size();
const auto & settings = context->getSettingsRef(); if (!num_children)
CustomizeUnionQueryOptimizeVisitor::Data data_union_mode{settings.union_default_mode}; throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
CustomizeUnionQueryOptimizeVisitor(data_union_mode).visit(query_ptr);
/// We first build nested interpreters for each select query, then using this nested interpreters to build Tree Structured nested interpreter. /// We first build nested interpreters for each select query, then using this nested interpreters to build Tree Structured nested interpreter.
/// Note that we pass 'required_result_column_names' to first SELECT. /// Note that we pass 'required_result_column_names' to first SELECT.
/// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT, /// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT,
/// because names could be different. /// because names could be different.
std::vector<std::shared_ptr<InterpreterSelectQuery>> interpreters; nested_interpreters.reserve(num_children);
interpreters.reserve(total_num_selects); std::vector<Names> required_result_column_names_for_other_selects(num_children);
std::vector<Names> required_result_column_names_for_other_selects(total_num_selects);
if (!required_result_column_names.empty() && total_num_selects > 1) if (!required_result_column_names.empty() && num_children > 1)
{ {
/// Result header if there are no filtering by 'required_result_column_names'. /// Result header if there are no filtering by 'required_result_column_names'.
/// We use it to determine positions of 'required_result_column_names' in SELECT clause. /// We use it to determine positions of 'required_result_column_names' in SELECT clause.
Block full_result_header Block full_result_header = getCurrentChildResultHeader(ast.list_of_selects->children.at(0), required_result_column_names);
= InterpreterSelectQuery(ast.flatten_nodes_list->children.at(0), *context, options.copy().analyze().noModify())
.getSampleBlock();
std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size()); std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size());
for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num) for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num)
positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]); positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]);
for (size_t query_num = 1; query_num < total_num_selects; ++query_num) for (size_t query_num = 1; query_num < num_children; ++query_num)
{ {
Block full_result_header_for_current_select Block full_result_header_for_current_select
= InterpreterSelectQuery(ast.flatten_nodes_list->children.at(query_num), *context, options.copy().analyze().noModify()) = getCurrentChildResultHeader(ast.list_of_selects->children.at(query_num), required_result_column_names);
.getSampleBlock();
if (full_result_header_for_current_select.columns() != full_result_header.columns()) if (full_result_header_for_current_select.columns() != full_result_header.columns())
throw Exception("Different number of columns in UNION ALL elements:\n" throw Exception("Different number of columns in UNION ALL elements:\n"
@ -236,26 +68,26 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
} }
} }
for (size_t query_num = 0; query_num < total_num_selects; ++query_num) for (size_t query_num = 0; query_num < num_children; ++query_num)
{ {
const Names & current_required_result_column_names const Names & current_required_result_column_names
= query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num]; = query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num];
interpreters.emplace_back(std::make_shared<InterpreterSelectQuery>( nested_interpreters.emplace_back(
ast.flatten_nodes_list->children.at(query_num), *context, options, current_required_result_column_names)); buildCurrentChildInterpreter(ast.list_of_selects->children.at(query_num), current_required_result_column_names));
} }
/// Determine structure of the result. /// Determine structure of the result.
if (total_num_selects == 1) if (num_children == 1)
{ {
result_header = interpreters.front()->getSampleBlock(); result_header = nested_interpreters.front()->getSampleBlock();
} }
else else
{ {
Blocks headers(total_num_selects); Blocks headers(num_children);
for (size_t query_num = 0; query_num < total_num_selects; ++query_num) for (size_t query_num = 0; query_num < num_children; ++query_num)
headers[query_num] = interpreters[query_num]->getSampleBlock(); headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
result_header = getCommonHeaderForUnion(headers); result_header = getCommonHeaderForUnion(headers);
} }
@ -263,7 +95,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
/// InterpreterSelectWithUnionQuery ignores limits if all nested interpreters ignore limits. /// InterpreterSelectWithUnionQuery ignores limits if all nested interpreters ignore limits.
bool all_nested_ignore_limits = true; bool all_nested_ignore_limits = true;
bool all_nested_ignore_quota = true; bool all_nested_ignore_quota = true;
for (auto & interpreter : interpreters) for (auto & interpreter : nested_interpreters)
{ {
if (!interpreter->ignoreLimits()) if (!interpreter->ignoreLimits())
all_nested_ignore_limits = false; all_nested_ignore_limits = false;
@ -273,44 +105,6 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
options.ignore_limits |= all_nested_ignore_limits; options.ignore_limits |= all_nested_ignore_limits;
options.ignore_quota |= all_nested_ignore_quota; options.ignore_quota |= all_nested_ignore_quota;
int index = 0;
buildNestedTreeInterpreter(query_ptr, nested_interpreter, interpreters, index);
}
/// We build a Tree Structured nested interpreters to build QueryPlan later
/// The structure of build nested interpreters is same as AST Tree
void InterpreterSelectWithUnionQuery::buildNestedTreeInterpreter(
const ASTPtr & ast_ptr,
std::shared_ptr<NestedInterpreter> nested_interpreter_,
std::vector<std::shared_ptr<InterpreterSelectQuery>> & interpreters,
int & index)
{
std::cout << "\n\n in build \n\n";
if (auto inner_union = ast_ptr->as<ASTSelectWithUnionQuery>())
{
auto internal_intepreter = std::make_shared<NestedInterpreter>();
const auto & union_modes = inner_union->union_modes;
for (auto rit = union_modes.rbegin(); rit != union_modes.rend(); ++rit)
{
if (*rit == ASTSelectWithUnionQuery::Mode::DISTINCT)
{
internal_intepreter->num_distinct_union = union_modes.rend() - rit + 1;
break;
}
}
nested_interpreter_->children.push_back(internal_intepreter);
for (auto & child : inner_union->list_of_selects->children)
buildNestedTreeInterpreter(child, internal_intepreter, interpreters, index);
return;
}
auto leaf_interpreter = std::make_shared<NestedInterpreter>();
leaf_interpreter->type = NestedInterpreter::Type::LEAF;
leaf_interpreter->interpreter = interpreters[index++];
nested_interpreter_->children.push_back(leaf_interpreter);
} }
Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & headers) Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & headers)
@ -343,33 +137,167 @@ Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & he
return common_header; return common_header;
} }
Block InterpreterSelectWithUnionQuery::getCurrentChildResultHeader(const ASTPtr & ast_ptr_, const Names & required_result_column_names)
{
if (const auto _ = ast_ptr_->as<ASTSelectWithUnionQuery>())
return InterpreterSelectWithUnionQuery(ast_ptr_, *context, options.copy().analyze().noModify(), required_result_column_names)
.getSampleBlock();
else
return InterpreterSelectQuery(ast_ptr_, *context, options.copy().analyze().noModify()).getSampleBlock();
}
std::unique_ptr<IInterpreterUnionOrSelectQuery>
InterpreterSelectWithUnionQuery::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_, const Names & current_required_result_column_names)
{
if (const auto _ = ast_ptr_->as<ASTSelectWithUnionQuery>())
return std::make_unique<InterpreterSelectWithUnionQuery>(ast_ptr_, *context, options, current_required_result_column_names);
else
return std::make_unique<InterpreterSelectQuery>(ast_ptr_, *context, options, current_required_result_column_names);
}
InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default; InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default;
Block InterpreterSelectWithUnionQuery::getSampleBlock(const ASTPtr & query_ptr_, const Context & context_)
Block InterpreterSelectWithUnionQuery::getSampleBlock()
{ {
return result_header; auto & cache = context_.getSampleBlockCache();
}
Block InterpreterSelectWithUnionQuery::getSampleBlock(
const ASTPtr & query_ptr,
const Context & context)
{
auto & cache = context.getSampleBlockCache();
/// Using query string because query_ptr changes for every internal SELECT /// Using query string because query_ptr changes for every internal SELECT
auto key = queryToString(query_ptr); auto key = queryToString(query_ptr_);
if (cache.find(key) != cache.end()) if (cache.find(key) != cache.end())
{ {
return cache[key]; return cache[key];
} }
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock(); return cache[key] = InterpreterSelectWithUnionQuery(query_ptr_, context_, SelectQueryOptions().analyze()).getSampleBlock();
}
size_t InterpreterSelectWithUnionQuery::optimizeUnionList()
{
auto union_distinct_num = 0;
auto union_default_mode = context->getSettingsRef().union_default_mode;
auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
size_t num_selects = ast.list_of_selects->children.size();
if (!num_selects)
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
if (num_selects > 1)
{
for (auto & mode : ast.union_modes)
{
if (mode == ASTSelectWithUnionQuery::Mode::Unspecified)
{
if (union_default_mode == UnionMode::ALL)
mode = ASTSelectWithUnionQuery::Mode::ALL;
else if (union_default_mode == UnionMode::DISTINCT)
mode = ASTSelectWithUnionQuery::Mode::DISTINCT;
else
throw Exception(
"Expected ALL or DISTINCT in SelectWithUnion query, because setting (union_default_mode) is empty",
DB::ErrorCodes::EXPECTED_ALL_OR_DISTINCT);
}
}
/// Optimize: if there is UNION DISTINCT, all previous UNION DISTINCT can be rewritten to UNION ALL.
/// Therefore we have at most one UNION DISTINCT in a sequence.
for (auto rit = ast.union_modes.rbegin(); rit != ast.union_modes.rend(); ++rit)
{
if (*rit == ASTSelectWithUnionQuery::Mode::DISTINCT)
{
/// Number of streams need to do a DISTINCT transform after unite
union_distinct_num = ast.union_modes.rend() - rit + 1;
for (auto mode_to_modify = ++rit; mode_to_modify != ast.union_modes.rend(); ++mode_to_modify)
*mode_to_modify = ASTSelectWithUnionQuery::Mode::ALL;
break;
}
}
}
return union_distinct_num;
} }
void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan) void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{ {
query_plan = nested_interpreter->buildQueryPlan(context, result_header); auto num_distinct_union = optimizeUnionList();
size_t num_plans = nested_interpreters.size();
/// Skip union for single interpreter.
if (num_plans == 1)
{
nested_interpreters.front()->buildQueryPlan(query_plan);
return;
}
/// All UNION streams in the chain does not need to do DISTINCT transform
if (num_distinct_union == 0)
{
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
DataStreams data_streams(num_plans);
for (size_t i = 0; i < num_plans; ++i)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
/// The first union_distinct_num UNION streams need to do a DISTINCT transform after unite
else
{
QueryPlan distinct_query_plan;
std::vector<std::unique_ptr<QueryPlan>> plans(num_distinct_union);
DataStreams data_streams(num_distinct_union);
for (size_t i = 0; i < num_distinct_union; ++i)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
distinct_query_plan.unitePlans(std::move(union_step), std::move(plans));
/// Add distinct transform
const Settings & settings = context->getSettingsRef();
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
auto distinct_step
= std::make_unique<DistinctStep>(distinct_query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false);
distinct_query_plan.addStep(std::move(distinct_step));
/// No other UNION streams after DISTINCT stream
if (num_plans == num_distinct_union)
{
query_plan = std::move(distinct_query_plan);
return;
}
/// Build final UNION step
std::vector<std::unique_ptr<QueryPlan>> final_plans(num_plans - num_distinct_union + 1);
DataStreams final_data_streams(num_plans - num_distinct_union + 1);
final_plans[0] = std::make_unique<QueryPlan>(std::move(distinct_query_plan));
final_data_streams[0] = final_plans[0]->getCurrentDataStream();
for (size_t i = 1; i < num_plans - num_distinct_union + 1; ++i)
{
final_plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[num_distinct_union + i - 1]->buildQueryPlan(*final_plans[i]);
final_data_streams[i] = final_plans[i]->getCurrentDataStream();
}
auto final_union_step = std::make_unique<UnionStep>(std::move(final_data_streams), result_header, max_threads);
query_plan.unitePlans(std::move(final_union_step), std::move(final_plans));
}
} }
BlockIO InterpreterSelectWithUnionQuery::execute() BlockIO InterpreterSelectWithUnionQuery::execute()
@ -390,7 +318,8 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
void InterpreterSelectWithUnionQuery::ignoreWithTotals() void InterpreterSelectWithUnionQuery::ignoreWithTotals()
{ {
nested_interpreter->ignoreWithTotals(); for (auto & interpreter : nested_interpreters)
interpreter->ignoreWithTotals();
} }
} }

View File

@ -1,9 +1,7 @@
#pragma once #pragma once
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
namespace DB namespace DB
@ -13,27 +11,13 @@ class Context;
class InterpreterSelectQuery; class InterpreterSelectQuery;
class QueryPlan; class QueryPlan;
struct NestedInterpreter
{
~NestedInterpreter() { }
enum class Type
{
LEAF,
INTERNAL
};
Type type = Type::INTERNAL;
std::vector<std::shared_ptr<NestedInterpreter>> children;
std::shared_ptr<InterpreterSelectQuery> interpreter;
size_t num_distinct_union = 0;
QueryPlan buildQueryPlan(const std::shared_ptr<Context> & context, const Block & header);
void ignoreWithTotals();
};
/** Interprets one or multiple SELECT queries inside UNION/UNION ALL/UNION DISTINCT chain. /** Interprets one or multiple SELECT queries inside UNION/UNION ALL/UNION DISTINCT chain.
*/ */
class InterpreterSelectWithUnionQuery : public IInterpreter class InterpreterSelectWithUnionQuery : public IInterpreterUnionOrSelectQuery
{ {
public: public:
using IInterpreterUnionOrSelectQuery::getSampleBlock;
InterpreterSelectWithUnionQuery( InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_, const ASTPtr & query_ptr_,
const Context & context_, const Context & context_,
@ -43,41 +27,30 @@ public:
~InterpreterSelectWithUnionQuery() override; ~InterpreterSelectWithUnionQuery() override;
/// Builds QueryPlan for current query. /// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan); virtual void buildQueryPlan(QueryPlan & query_plan) override;
BlockIO execute() override; BlockIO execute() override;
bool ignoreLimits() const override { return options.ignore_limits; } bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; } bool ignoreQuota() const override { return options.ignore_quota; }
Block getSampleBlock();
static Block getSampleBlock( static Block getSampleBlock(
const ASTPtr & query_ptr_, const ASTPtr & query_ptr_,
const Context & context_); const Context & context_);
void ignoreWithTotals(); virtual void ignoreWithTotals() override;
ASTPtr getQuery() const { return query_ptr; }
private: private:
SelectQueryOptions options; std::vector<std::unique_ptr<IInterpreterUnionOrSelectQuery>> nested_interpreters;
ASTPtr query_ptr;
std::shared_ptr<Context> context;
std::shared_ptr<NestedInterpreter> nested_interpreter;
Block result_header;
size_t max_streams = 1;
static Block getCommonHeaderForUnion(const Blocks & headers); static Block getCommonHeaderForUnion(const Blocks & headers);
static void buildNestedTreeInterpreter( Block getCurrentChildResultHeader(const ASTPtr & ast_ptr_, const Names & required_result_column_names);
const ASTPtr & ast_ptr,
std::shared_ptr<NestedInterpreter> nested_interpreter_, std::unique_ptr<IInterpreterUnionOrSelectQuery>
std::vector<std::shared_ptr<InterpreterSelectQuery>> & interpreters, buildCurrentChildInterpreter(const ASTPtr & ast_ptr_, const Names & current_required_result_column_names);
int & index);
size_t optimizeUnionList();
}; };
} }

View File

@ -70,12 +70,10 @@ namespace ErrorCodes
static void checkASTSizeLimits(const IAST & ast, const Settings & settings) static void checkASTSizeLimits(const IAST & ast, const Settings & settings)
{ {
std::cout << "\n\n before check limits";
if (settings.max_ast_depth) if (settings.max_ast_depth)
ast.checkDepth(settings.max_ast_depth); ast.checkDepth(settings.max_ast_depth);
if (settings.max_ast_elements) if (settings.max_ast_elements)
ast.checkSize(settings.max_ast_elements); ast.checkSize(settings.max_ast_elements);
std::cout << "\n\n after check limits";
} }

View File

@ -15,8 +15,7 @@ ASTPtr ASTSelectWithUnionQuery::clone() const
res->list_of_selects = list_of_selects->clone(); res->list_of_selects = list_of_selects->clone();
res->children.push_back(res->list_of_selects); res->children.push_back(res->list_of_selects);
res->union_modes.insert(res->union_modes.begin(), union_modes.begin(), union_modes.end()); res->union_modes = union_modes;
res->flatten_nodes_list = flatten_nodes_list->clone();
cloneOutputOptions(*res); cloneOutputOptions(*res);
return res; return res;
@ -25,10 +24,8 @@ ASTPtr ASTSelectWithUnionQuery::clone() const
void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{ {
std::cout << "\n\nin format \n\n";
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' '); std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
#if 0
auto mode_to_str = [&](auto mode) auto mode_to_str = [&](auto mode)
{ {
if (mode == Mode::Unspecified) if (mode == Mode::Unspecified)
@ -38,18 +35,16 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F
else else
return "DISTINCT"; return "DISTINCT";
}; };
#endif
for (ASTs::const_iterator it = flatten_nodes_list->children.begin(); it != flatten_nodes_list->children.end(); ++it) for (ASTs::const_iterator it = list_of_selects->children.begin(); it != list_of_selects->children.end(); ++it)
{ {
if (it != list_of_selects->children.begin()) if (it != list_of_selects->children.begin())
settings.ostr << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "") << "UNION " settings.ostr << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "") << "UNION "
// << mode_to_str(union_modes[it - list_of_selects->children.begin() - 1]) << (settings.hilite ? hilite_none : "") << mode_to_str(union_modes[it - list_of_selects->children.begin() - 1]) << (settings.hilite ? hilite_none : "")
<< settings.nl_or_ws; << settings.nl_or_ws;
(*it)->formatImpl(settings, state, frame); (*it)->formatImpl(settings, state, frame);
} }
std::cout << "\n\nafter format \n\n";
} }
} }

View File

@ -28,9 +28,6 @@ public:
Modes union_modes; Modes union_modes;
ASTPtr list_of_selects; ASTPtr list_of_selects;
/// we need flatten_nodes to help build nested_interpreter
ASTPtr flatten_nodes_list;
}; };
} }

View File

@ -76,18 +76,13 @@ void IAST::updateTreeHashImpl(SipHash & hash_state) const
size_t IAST::checkDepthImpl(size_t max_depth, size_t level) const size_t IAST::checkDepthImpl(size_t max_depth, size_t level) const
{ {
std::cout << "\n\n in check depth impl\n\n";
std::cout << "\nchildren.size = " << children.size() << "\n\n";
size_t res = level + 1; size_t res = level + 1;
for (const auto & child : children) for (const auto & child : children)
{ {
std::cout << "\n in for\n\n";
if (level >= max_depth) if (level >= max_depth)
throw Exception("AST is too deep. Maximum: " + toString(max_depth), ErrorCodes::TOO_DEEP_AST); throw Exception("AST is too deep. Maximum: " + toString(max_depth), ErrorCodes::TOO_DEEP_AST);
res = std::max(res, child->checkDepthImpl(max_depth, level + 1)); res = std::max(res, child->checkDepthImpl(max_depth, level + 1));
std::cout << "\n after for\n\n";
} }
std::cout << "\n\n after impl\n\n";
return res; return res;
} }

View File

@ -11,7 +11,6 @@
#include <set> #include <set>
#include <sstream> #include <sstream>
#include <iostream>
class SipHash; class SipHash;
@ -92,7 +91,6 @@ public:
*/ */
size_t checkDepth(size_t max_depth) const size_t checkDepth(size_t max_depth) const
{ {
std::cout << "\n in check depth\n\n";
return checkDepthImpl(max_depth, 0); return checkDepthImpl(max_depth, 0);
} }

View File

@ -1,5 +1,3 @@
#include <list>
#include <memory>
#include <Parsers/ExpressionListParsers.h> #include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserSelectWithUnionQuery.h> #include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
@ -10,24 +8,9 @@
namespace DB namespace DB
{ {
static void getSelectsFromUnionListNode(ASTPtr & ast_select, ASTs & selects)
{
if (auto * inner_union = ast_select->as<ASTSelectWithUnionQuery>())
{
for (auto & child : inner_union->list_of_selects->children)
{
getSelectsFromUnionListNode(child, selects);
}
return;
}
selects.push_back(std::move(ast_select));
}
bool ParserSelectWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool ParserSelectWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
std::cout << "\n\n in ParserSelectWithUnionQuery\n\n";
ASTPtr list_node; ASTPtr list_node;
ParserUnionList parser( ParserUnionList parser(
@ -42,27 +25,17 @@ bool ParserSelectWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>(); auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
node = select_with_union_query; node = select_with_union_query;
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>(); select_with_union_query->list_of_selects = list_node;
select_with_union_query->children.push_back(select_with_union_query->list_of_selects); select_with_union_query->children.push_back(select_with_union_query->list_of_selects);
select_with_union_query->list_of_selects->children.insert(
select_with_union_query->list_of_selects->children.begin(), list_node->children.begin(), list_node->children.end());
select_with_union_query->union_modes = parser.getUnionModes(); select_with_union_query->union_modes = parser.getUnionModes();
/// NOTE: We cann't simply flatten inner union query now, since we may have different union mode in query, /// NOTE: We cann't flatten inner union query now, since we may have different union mode in query,
/// so flatten may change it's semantics. For example: /// so flatten may change it's semantics. For example:
/// flatten `SELECT 1 UNION (SELECT 1 UNION ALL SELETC 1)` -> `SELECT 1 UNION SELECT 1 UNION ALL SELECT 1` /// flatten `SELECT 1 UNION (SELECT 1 UNION ALL SELETC 1)` -> `SELECT 1 UNION SELECT 1 UNION ALL SELECT 1`
/// We can use a non-flatten AST to help build QueryPlan in InterpreterSelectWithUnionQuery
select_with_union_query->flatten_nodes_list = std::make_shared<ASTExpressionList>();
for (auto & child : list_node->children)
{
getSelectsFromUnionListNode(child, select_with_union_query->flatten_nodes_list->children);
}
std::cout << "\n\n after ParserSelectWithUnionQuery\n\n";
std::cout << "\n\n flatten_nodes.size =" << select_with_union_query->flatten_nodes_list->children.size() << "\n\n";
return true; return true;
} }
} }