SelectQueryOptions v3 (no inheritance)

This commit is contained in:
chertus 2019-03-18 15:05:51 +03:00
parent b6e2697101
commit bd559f8db8
8 changed files with 61 additions and 64 deletions

View File

@ -52,7 +52,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
else if (ast.getKind() == ASTExplainQuery::AnalyzedSyntax)
{
InterpreterSelectWithUnionQuery interpreter(ast.children.at(0), context,
modify(analyze(SelectQueryOptions(QueryProcessingStage::FetchColumns))));
SelectQueryOptions(QueryProcessingStage::FetchColumns).analyze().modify());
interpreter.getQuery()->format(IAST::FormatSettings(ss, false));
}

View File

@ -89,7 +89,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const Context & context_,
const BlockInputStreamPtr & input_,
const SelectQueryOptions & options)
: InterpreterSelectQuery(query_ptr_, context_, input_, nullptr, noSubquery(options))
: InterpreterSelectQuery(query_ptr_, context_, input_, nullptr, options.copy().noSubquery())
{}
InterpreterSelectQuery::InterpreterSelectQuery(
@ -97,7 +97,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const Context & context_,
const StoragePtr & storage_,
const SelectQueryOptions & options)
: InterpreterSelectQuery(query_ptr_, context_, nullptr, storage_, noSubquery(options))
: InterpreterSelectQuery(query_ptr_, context_, nullptr, storage_, options.copy().noSubquery())
{}
InterpreterSelectQuery::~InterpreterSelectQuery() = default;
@ -123,11 +123,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const Context & context_,
const BlockInputStreamPtr & input_,
const StoragePtr & storage_,
const SelectQueryOptions & options,
const SelectQueryOptions & options_,
const Names & required_result_column_names)
: SelectQueryOptions(options)
: options(options_)
/// NOTE: the query almost always should be cloned because it will be modified during analysis.
, query_ptr(modify_inplace ? query_ptr_ : query_ptr_->clone())
, query_ptr(options.modify_inplace ? query_ptr_ : query_ptr_->clone())
, context(context_)
, storage(storage_)
, input(input_)
@ -136,7 +136,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
initSettings();
const Settings & settings = context.getSettingsRef();
if (settings.max_subquery_depth && subquery_depth > settings.max_subquery_depth)
if (settings.max_subquery_depth && options.subquery_depth > settings.max_subquery_depth)
throw Exception("Too deep subqueries. Maximum: " + settings.max_subquery_depth.toString(),
ErrorCodes::TOO_DEEP_SUBQUERIES);
@ -174,7 +174,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
/// Read from subquery.
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression, getSubqueryContext(context), subqueryOptions(QueryProcessingStage::Complete), required_columns);
table_expression, getSubqueryContext(context), options.subquery(), required_columns);
source_header = interpreter_subquery->getSampleBlock();
}
@ -200,13 +200,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, subquery_depth).analyze(
syntax_analyzer_result = SyntaxAnalyzer(context, options.subquery_depth).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage);
query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, NamesAndTypesList(),
NameSet(required_result_column_names.begin(), required_result_column_names.end()), subquery_depth, !only_analyze);
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
options.subquery_depth, !options.only_analyze);
if (!only_analyze)
if (!options.only_analyze)
{
if (query.sample_size() && (input || !storage || !storage->supportsSampling()))
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
@ -223,7 +224,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
context.addExternalTable(it.first, it.second);
}
if (!only_analyze || modify_inplace)
if (!options.only_analyze || options.modify_inplace)
{
if (query_analyzer->isRewriteSubqueriesPredicate())
{
@ -232,7 +233,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression,
getSubqueryContext(context),
subqueryOptions(QueryProcessingStage::Complete),
options.subquery(),
required_columns);
}
}
@ -286,7 +287,7 @@ Block InterpreterSelectQuery::getSampleBlock()
BlockIO InterpreterSelectQuery::execute()
{
Pipeline pipeline;
executeImpl(pipeline, input, only_analyze);
executeImpl(pipeline, input, options.only_analyze);
executeUnion(pipeline);
BlockIO res;
@ -297,7 +298,7 @@ BlockIO InterpreterSelectQuery::execute()
BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
{
Pipeline pipeline;
executeImpl(pipeline, input, only_analyze);
executeImpl(pipeline, input, options.only_analyze);
return pipeline.streams;
}
@ -307,10 +308,10 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
res.first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& to_stage >= QueryProcessingStage::WithMergeableState;
&& options.to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& to_stage > QueryProcessingStage::WithMergeableState;
&& options.to_stage > QueryProcessingStage::WithMergeableState;
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
@ -535,16 +536,16 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
expressions = analyzeExpressions(from_stage, false);
if (from_stage == QueryProcessingStage::WithMergeableState &&
to_stage == QueryProcessingStage::WithMergeableState)
options.to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(options.to_stage));
}
if (to_stage > QueryProcessingStage::FetchColumns)
if (options.to_stage > QueryProcessingStage::FetchColumns)
{
/// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by.
bool aggregate_overflow_row =
@ -557,7 +558,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
/// Do I need to immediately finalize the aggregate functions after the aggregation?
bool aggregate_final =
expressions.need_aggregate &&
to_stage > QueryProcessingStage::WithMergeableState &&
options.to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
if (expressions.first_stage)
@ -920,7 +921,7 @@ void InterpreterSelectQuery::executeFetchColumns(
/// Limitation on the number of columns to read.
/// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
if (!only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
if (!options.only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + toString(required_columns.size())
+ ", maximum: " + settings.max_columns_to_read.toString(),
@ -982,7 +983,8 @@ void InterpreterSelectQuery::executeFetchColumns(
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
subquery, getSubqueryContext(context), noModify(subqueryOptions(QueryProcessingStage::Complete)), required_columns);
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
if (query_analyzer->hasAggregation())
interpreter_subquery->ignoreWithTotals();
@ -1039,7 +1041,7 @@ void InterpreterSelectQuery::executeFetchColumns(
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*/
if (to_stage == QueryProcessingStage::Complete)
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.min_execution_speed = settings.min_execution_speed;
limits.max_execution_speed = settings.max_execution_speed;
@ -1054,7 +1056,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{
stream->setLimits(limits);
if (to_stage == QueryProcessingStage::Complete)
if (options.to_stage == QueryProcessingStage::Complete)
stream->setQuota(quota);
});
}

View File

@ -7,8 +7,8 @@
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Storages/SelectQueryInfo.h>
@ -27,7 +27,7 @@ using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
*/
class InterpreterSelectQuery : public IInterpreter, private SelectQueryOptions
class InterpreterSelectQuery : public IInterpreter
{
public:
/**
@ -207,6 +207,7 @@ private:
*/
void initSettings();
const SelectQueryOptions options;
ASTPtr query_ptr;
Context context;
NamesAndTypesList source_columns;

View File

@ -26,9 +26,9 @@ namespace ErrorCodes
InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const SelectQueryOptions & options,
const SelectQueryOptions & options_,
const Names & required_result_column_names)
: SelectQueryOptions(options),
: options(options_),
query_ptr(query_ptr_),
context(context_)
{
@ -53,7 +53,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
/// We use it to determine positions of 'required_result_column_names' in SELECT clause.
Block full_result_header = InterpreterSelectQuery(
ast.list_of_selects->children.at(0), context, analyze(noModify(queryOptions()))).getSampleBlock();
ast.list_of_selects->children.at(0), context, options.copy().analyze().noModify()).getSampleBlock();
std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size());
for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num)
@ -62,7 +62,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
for (size_t query_num = 1; query_num < num_selects; ++query_num)
{
Block full_result_header_for_current_select = InterpreterSelectQuery(
ast.list_of_selects->children.at(query_num), context, analyze(noModify(queryOptions()))).getSampleBlock();
ast.list_of_selects->children.at(query_num), context, options.copy().analyze().noModify()).getSampleBlock();
if (full_result_header_for_current_select.columns() != full_result_header.columns())
throw Exception("Different number of columns in UNION ALL elements:\n"
@ -85,7 +85,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>(
ast.list_of_selects->children.at(query_num),
context,
queryOptions(),
options,
current_required_result_column_names));
}
@ -172,7 +172,7 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(
return cache[key];
}
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, analyze(SelectQueryOptions())).getSampleBlock();
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock();
}

View File

@ -14,7 +14,7 @@ class InterpreterSelectQuery;
/** Interprets one or multiple SELECT queries inside UNION ALL chain.
*/
class InterpreterSelectWithUnionQuery : public IInterpreter, private SelectQueryOptions
class InterpreterSelectWithUnionQuery : public IInterpreter
{
public:
InterpreterSelectWithUnionQuery(
@ -41,6 +41,7 @@ public:
ASTPtr getQuery() const { return query_ptr; }
private:
const SelectQueryOptions options;
ASTPtr query_ptr;
Context context;

View File

@ -367,8 +367,7 @@ void MutationsInterpreter::prepare(bool dry_run)
select->children.push_back(where_expression);
}
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage,
dry_run ? analyze(SelectQueryOptions()) : SelectQueryOptions());
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run));
is_prepared = true;
}

View File

@ -20,9 +20,13 @@ namespace DB
* is_subquery
* - there could be some specific for subqueries. Ex. there's no need to pass duplicated columns in results, cause of indirect results.
*/
class SelectQueryOptions
struct SelectQueryOptions
{
public:
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
bool only_analyze;
bool modify_inplace;
SelectQueryOptions(QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, size_t depth = 0)
: to_stage(stage)
, subquery_depth(depth)
@ -30,45 +34,35 @@ public:
, modify_inplace(false)
{}
const SelectQueryOptions & queryOptions() const { return *this; }
SelectQueryOptions copy() const { return *this; }
SelectQueryOptions subqueryOptions(QueryProcessingStage::Enum stage) const
SelectQueryOptions subquery() const
{
SelectQueryOptions out = *this;
out.to_stage = stage;
out.to_stage = QueryProcessingStage::Complete;
++out.subquery_depth;
return out;
}
friend SelectQueryOptions analyze(const SelectQueryOptions & src, bool value = true)
SelectQueryOptions & analyze(bool value = true)
{
SelectQueryOptions out = src;
out.only_analyze = value;
return out;
only_analyze = value;
return *this;
}
friend SelectQueryOptions modify(const SelectQueryOptions & src, bool value = true)
SelectQueryOptions & modify(bool value = true)
{
SelectQueryOptions out = src;
out.modify_inplace = value;
return out;
modify_inplace = value;
return *this;
}
friend SelectQueryOptions noSubquery(const SelectQueryOptions & src)
SelectQueryOptions & noModify() { return modify(false); }
SelectQueryOptions & noSubquery()
{
SelectQueryOptions out = src;
out.subquery_depth = 0;
return out;
subquery_depth = 0;
return *this;
}
friend SelectQueryOptions noModify(const SelectQueryOptions & src) { return modify(src, false); }
friend bool isSubquery(const SelectQueryOptions & opt) { return opt.subquery_depth; }
protected:
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
bool only_analyze;
bool modify_inplace;
};
}

View File

@ -274,7 +274,7 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
if (!storage)
return BlockInputStreams{
InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
analyze(SelectQueryOptions(processed_stage))).execute().in};
SelectQueryOptions(processed_stage).analyze()).execute().in};
BlockInputStreams source_streams;
@ -429,7 +429,7 @@ Block StorageMerge::getQueryHeader(
case QueryProcessingStage::Complete:
return materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
analyze(SelectQueryOptions(processed_stage))).getSampleBlock());
SelectQueryOptions(processed_stage).analyze()).getSampleBlock());
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}