Review fixes. Add setting max_optimizations_to_apply.

This commit is contained in:
Nikolai Kochetov 2021-02-26 19:29:56 +03:00
parent 00e0dbc3e5
commit d328bfa41f
24 changed files with 92 additions and 35 deletions

View File

@ -535,12 +535,13 @@
M(566, CANNOT_RMDIR) \
M(567, DUPLICATED_PART_UUIDS) \
M(568, RAFT_ERROR) \
M(569, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \
M(1002, UNKNOWN_EXCEPTION) \
M(1003, INVALID_SHARD_ID)
M(1003, INVALID_SHARD_ID) \
/* See END */

View File

@ -431,6 +431,7 @@ class IColumn;
M(UnionMode, union_default_mode, UnionMode::Unspecified, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -1215,7 +1215,7 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & co
namespace
{
struct ConjinctionNodes
struct ConjunctionNodes
{
std::vector<ActionsDAG::Node *> allowed;
std::vector<ActionsDAG::Node *> rejected;
@ -1225,9 +1225,9 @@ struct ConjinctionNodes
/// Assuming predicate is a conjunction (probably, trivial).
/// Find separate conjunctions nodes. Split nodes into allowed and rejected sets.
/// Allowed predicate is a predicate which can be calculated using only nodes from allowed_nodes set.
ConjinctionNodes getConjinctionNodes(ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes)
ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes)
{
ConjinctionNodes conjunction;
ConjunctionNodes conjunction;
std::unordered_set<ActionsDAG::Node *> allowed;
std::unordered_set<ActionsDAG::Node *> rejected;
@ -1299,6 +1299,7 @@ ConjinctionNodes getConjinctionNodes(ActionsDAG::Node * predicate, std::unordere
if (conjunction.allowed.empty())
{
/// If nothing was added to conjunction, check if it is trivial.
if (allowed_nodes.count(predicate))
conjunction.allowed.push_back(predicate);
}
@ -1450,7 +1451,7 @@ ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name,
}
}
auto conjunction = getConjinctionNodes(predicate, allowed_nodes);
auto conjunction = getConjunctionNodes(predicate, allowed_nodes);
auto actions = cloneActionsForConjunction(conjunction.allowed);
if (!actions)
return nullptr;

View File

@ -152,6 +152,9 @@ public:
}
};
/// NOTE: std::list is an implementation detail.
/// It allows to add and remove new nodes inplace without reallocation.
/// Raw pointers to nodes remain valid.
using Nodes = std::list<Node>;
using Inputs = std::vector<Node *>;

View File

@ -284,7 +284,7 @@ void SelectStreamFactory::createForShard(
if (try_results.empty() || local_delay < max_remote_delay)
{
auto plan = createLocalPlan(modified_query_ast, header, context, stage);
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline()));
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()))));
}
else
{

View File

@ -117,7 +117,7 @@ struct QueryPlanSettings
{
QueryPlan::ExplainPlanOptions query_plan_options;
/// Apply query plan optimisations.
/// Apply query plan optimizations.
bool optimize = true;
constexpr static char name[] = "PLAN";
@ -251,7 +251,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
interpreter.buildQueryPlan(plan);
if (settings.optimize)
plan.optimize();
plan.optimize(QueryPlanOptimizationSettings(context.getSettingsRef()));
plan.explainPlan(buf, settings.query_plan_options);
}
@ -265,7 +265,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), context, SelectQueryOptions());
interpreter.buildQueryPlan(plan);
auto pipeline = plan.buildQueryPipeline();
auto pipeline = plan.buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()));
if (settings.graph)
{

View File

@ -548,7 +548,7 @@ BlockIO InterpreterSelectQuery::execute()
buildQueryPlan(query_plan);
res.pipeline = std::move(*query_plan.buildQueryPipeline());
res.pipeline = std::move(*query_plan.buildQueryPipeline(QueryPlanOptimizationSettings(context->getSettingsRef())));
return res;
}

View File

@ -413,7 +413,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
QueryPlan query_plan;
buildQueryPlan(query_plan);
auto pipeline = query_plan.buildQueryPipeline();
auto pipeline = query_plan.buildQueryPipeline(QueryPlanOptimizationSettings(context->getSettingsRef()));
res.pipeline = std::move(*pipeline);
res.pipeline.addInterpreterContext(context);

View File

@ -756,7 +756,7 @@ QueryPipelinePtr MutationsInterpreter::addStreamsForLaterStages(const std::vecto
}
}
auto pipeline = plan.buildQueryPipeline();
auto pipeline = plan.buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()));
pipeline->addSimpleTransform([&](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);

View File

@ -9,7 +9,7 @@ namespace QueryPlanOptimizations
{
/// This is the main function which optimizes the whole QueryPlan tree.
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes);
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes);
/// Optimization is a function applied to QueryPlan::Node.
/// It can read and update subtree of specified node.

View File

@ -0,0 +1,12 @@
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Core/Settings.h>
namespace DB
{
QueryPlanOptimizationSettings::QueryPlanOptimizationSettings(const Settings & settings)
{
max_optimizations_to_apply = settings.query_plan_max_optimizations_to_apply;
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <cstddef>
namespace DB
{
struct Settings;
struct QueryPlanOptimizationSettings
{
QueryPlanOptimizationSettings() = delete;
explicit QueryPlanOptimizationSettings(const Settings & settings);
/// If not zero, throw if too many optimizations were applied to query plan.
/// It helps to avoid infinite optimization loop.
size_t max_optimizations_to_apply = 0;
};
}

View File

@ -1,10 +1,20 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Common/Exception.h>
#include <stack>
namespace DB::QueryPlanOptimizations
namespace DB
{
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
namespace ErrorCodes
{
extern const int TOO_MANY_QUERY_PLAN_OPTIMIZATIONS;
}
namespace QueryPlanOptimizations
{
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
{
const auto & optimizations = getOptimizations();
@ -23,7 +33,7 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
std::stack<Frame> stack;
stack.push(Frame{.node = &root});
size_t max_optimizations_to_apply = 0;
size_t max_optimizations_to_apply = settings.max_optimizations_to_apply;
size_t total_applied_optimizations = 0;
while (!stack.empty())
@ -58,7 +68,9 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
continue;
if (max_optimizations_to_apply && max_optimizations_to_apply < total_applied_optimizations)
continue;
throw Exception(ErrorCodes::TOO_MANY_QUERY_PLAN_OPTIMIZATIONS,
"Too many optimizations applied to query plan. Current limit {}",
max_optimizations_to_apply);
/// Try to apply optimization.
auto update_depth = optimization.apply(frame.node, nodes);
@ -81,3 +93,4 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
}
}
}

View File

@ -130,10 +130,10 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
" input expected", ErrorCodes::LOGICAL_ERROR);
}
QueryPipelinePtr QueryPlan::buildQueryPipeline()
QueryPipelinePtr QueryPlan::buildQueryPipeline(const QueryPlanOptimizationSettings & optimization_settings)
{
checkInitialized();
optimize();
optimize(optimization_settings);
struct Frame
{
@ -177,7 +177,7 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline()
return last_pipeline;
}
Pipe QueryPlan::convertToPipe()
Pipe QueryPlan::convertToPipe(const QueryPlanOptimizationSettings & optimization_settings)
{
if (!isInitialized())
return {};
@ -185,7 +185,7 @@ Pipe QueryPlan::convertToPipe()
if (isCompleted())
throw Exception("Cannot convert completed QueryPlan to Pipe", ErrorCodes::LOGICAL_ERROR);
return QueryPipeline::getPipe(std::move(*buildQueryPipeline()));
return QueryPipeline::getPipe(std::move(*buildQueryPipeline(optimization_settings)));
}
void QueryPlan::addInterpreterContext(std::shared_ptr<Context> context)
@ -333,9 +333,9 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
}
}
void QueryPlan::optimize()
void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
{
QueryPlanOptimizations::optimizeTree(*root, nodes);
QueryPlanOptimizations::optimizeTree(optimization_settings, *root, nodes);
}
}

View File

@ -5,6 +5,7 @@
#include <set>
#include <Core/Names.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
namespace DB
{
@ -27,7 +28,7 @@ class Pipe;
/// A tree of query steps.
/// The goal of QueryPlan is to build QueryPipeline.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimizations.
class QueryPlan
{
public:
@ -43,12 +44,12 @@ public:
bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
void optimize();
void optimize(const QueryPlanOptimizationSettings & optimization_settings);
QueryPipelinePtr buildQueryPipeline();
QueryPipelinePtr buildQueryPipeline(const QueryPlanOptimizationSettings & optimization_settings);
/// If initialized, build pipeline and convert to pipe. Otherwise, return empty pipe.
Pipe convertToPipe();
Pipe convertToPipe(const QueryPlanOptimizationSettings & optimization_settings);
struct ExplainPlanOptions
{

View File

@ -33,7 +33,7 @@ public:
std::move(*MergeTreeDataSelectExecutor(part->storage)
.readFromParts({part}, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams));
return query_plan.convertToPipe();
return query_plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}

View File

@ -166,7 +166,7 @@ Pipe StorageBuffer::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageBuffer::read(

View File

@ -501,7 +501,7 @@ Pipe StorageDistributed::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageDistributed::read(

View File

@ -126,7 +126,7 @@ Pipe StorageMaterializedView::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageMaterializedView::read(

View File

@ -198,7 +198,7 @@ Pipe StorageMergeTree::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const

View File

@ -3809,7 +3809,7 @@ Pipe StorageReplicatedMergeTree::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}

View File

@ -59,7 +59,7 @@ Pipe StorageView::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageView::read(

View File

@ -1,3 +1,5 @@
Too many optimizations applied to query plan
Too many optimizations applied to query plan
> sipHash should be calculated after filtration
FUNCTION sipHash64
Filter column: equals

View File

@ -4,6 +4,9 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "select x + 1 from (select y + 2 as x from (select dummy + 3 as y)) settings query_plan_max_optimizations_to_apply = 1" 2>&1 |
grep -o "Too many optimizations applied to query plan"
echo "> sipHash should be calculated after filtration"
$CLICKHOUSE_CLIENT -q "explain actions = 1 select sum(x), sum(y) from (select sipHash64(number) as x, bitAnd(number, 1024) as y from numbers_mt(1000000000) limit 1000000000) where y = 0" | grep -o "FUNCTION sipHash64\|Filter column: equals"
echo "> sorting steps should know about limit"
@ -146,4 +149,4 @@ $CLICKHOUSE_CLIENT -q "
$CLICKHOUSE_CLIENT -q "
select * from (
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
) where y != 2"
) where y != 2"