Merge pull request #22087 from ClickHouse/better-filter-push-down

Better filter push down
This commit is contained in:
Nikolai Kochetov 2021-04-09 10:22:17 +03:00 committed by GitHub
commit 28ca191102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 377 additions and 186 deletions

View File

@ -454,7 +454,11 @@ class IColumn;
M(UnionMode, union_default_mode, UnionMode::Unspecified, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
\
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
M(Bool, query_plan_filter_push_down, true, "Allow to push down filter by predicate query plan step", 0) \
\
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
// End of COMMON_SETTINGS

View File

@ -156,8 +156,7 @@ void executeQuery(
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto header = input_streams.front().header;
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
}

View File

@ -134,6 +134,8 @@ class HashJoin : public IJoin
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
const TableJoin & getTableJoin() const override { return *table_join; }
/** Add block of data from right hand of JOIN to the map.
* Returns false, if some limit was exceeded and you should not insert more data.
*/

View File

@ -14,11 +14,15 @@ class Block;
struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
class TableJoin;
class IJoin
{
public:
virtual ~IJoin() = default;
virtual const TableJoin & getTableJoin() const = 0;
/// Add block of data from right hand of JOIN.
/// @returns false, if some limit was exceeded and you should not insert more data.
virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0;

View File

@ -37,7 +37,6 @@
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPlan/AddingDelayedSourceStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
@ -1081,26 +1080,14 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.hasJoin())
{
JoinPtr join = expressions.join;
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
expressions.join);
expressions.join,
expressions.join_has_delayed_stream,
settings.max_block_size);
join_step->setStepDescription("JOIN");
query_plan.addStep(std::move(join_step));
if (expressions.join_has_delayed_stream)
{
const Block & join_result_sample = query_plan.getCurrentDataStream().header;
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, settings.max_block_size);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
auto add_non_joined_rows_step = std::make_unique<AddingDelayedSourceStep>(
query_plan.getCurrentDataStream(), std::move(source));
add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN");
query_plan.addStep(std::move(add_non_joined_rows_step));
}
}
if (expressions.hasWhere())

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
@ -251,11 +252,23 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
if (!blocksHaveEqualStructure(plans[i]->getCurrentDataStream().header, result_header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
plans[i]->getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto converting_step = std::make_unique<ExpressionStep>(plans[i]->getCurrentDataStream(), std::move(actions_dag));
converting_step->setStepDescription("Conversion before UNION");
plans[i]->addStep(std::move(converting_step));
}
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));

View File

@ -19,6 +19,8 @@ class JoinSwitcher : public IJoin
public:
JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_);
const TableJoin & getTableJoin() const override { return *table_join; }
/// Add block of data from right hand of JOIN into current join object.
/// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it.
/// @returns false, if join-on-disk disk limit exceeded

View File

@ -23,6 +23,7 @@ class MergeJoin : public IJoin
public:
MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block);
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & block, bool check_limits) override;
void joinBlock(Block &, ExtraBlockPtr & not_processed) override;
void joinTotals(Block &) const override;

View File

@ -8,9 +8,35 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
InputPorts createInputPorts(
const Block & header,
size_t num_ports,
IProcessor::PortNumbers delayed_ports,
bool assert_main_ports_empty)
{
if (!assert_main_ports_empty)
return InputPorts(num_ports, header);
InputPorts res;
std::sort(delayed_ports.begin(), delayed_ports.end());
size_t next_delayed_port = 0;
for (size_t i = 0; i < num_ports; ++i)
{
if (next_delayed_port < delayed_ports.size() && i == delayed_ports[next_delayed_port])
{
res.emplace_back(header);
++next_delayed_port;
}
else
res.emplace_back(Block());
}
return res;
}
DelayedPortsProcessor::DelayedPortsProcessor(
const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty)
: IProcessor(InputPorts(num_ports, header),
: IProcessor(createInputPorts(header, num_ports, delayed_ports, assert_main_ports_empty),
OutputPorts((assert_main_ports_empty ? delayed_ports.size() : num_ports), header))
, num_delayed_ports(delayed_ports.size())
{

View File

@ -252,10 +252,10 @@ static Pipes removeEmptyPipes(Pipes pipes)
Pipe Pipe::unitePipes(Pipes pipes)
{
return Pipe::unitePipes(std::move(pipes), nullptr);
return Pipe::unitePipes(std::move(pipes), nullptr, false);
}
Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors)
Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header)
{
Pipe res;
@ -275,12 +275,25 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors)
OutputPortRawPtrs totals;
OutputPortRawPtrs extremes;
res.header = pipes.front().header;
res.collected_processors = collected_processors;
res.header = pipes.front().header;
if (allow_empty_header && !res.header)
{
for (const auto & pipe : pipes)
{
if (const auto & header = pipe.getHeader())
{
res.header = header;
break;
}
}
}
for (auto & pipe : pipes)
{
assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes");
if (!allow_empty_header || pipe.header)
assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes");
res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());

View File

@ -155,7 +155,7 @@ private:
/// This methods are for QueryPipeline. It is allowed to complete graph only there.
/// So, we may be sure that Pipe always has output port if not empty.
bool isCompleted() const { return !empty() && output_ports.empty(); }
static Pipe unitePipes(Pipes pipes, Processors * collected_processors);
static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header);
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
void setOutputFormat(ProcessorPtr output);

View File

@ -211,11 +211,14 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output)
QueryPipeline QueryPipeline::unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
const Block & common_header,
const ExpressionActionsSettings & settings,
size_t max_threads_limit,
Processors * collected_processors)
{
if (pipelines.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of pipelines");
Block common_header = pipelines.front()->getHeader();
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
/// If true, result max_threads will be sum(max_threads).
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
@ -229,19 +232,7 @@ QueryPipeline QueryPipeline::unitePipelines(
pipeline.checkInitialized();
pipeline.pipe.collected_processors = collected_processors;
if (!pipeline.isCompleted())
{
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
common_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag, settings);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, actions);
});
}
assertBlocksHaveEqualStructure(pipeline.getHeader(), common_header, "QueryPipeline::unitePipelines");
pipes.emplace_back(std::move(pipeline.pipe));
@ -255,7 +246,7 @@ QueryPipeline QueryPipeline::unitePipelines(
}
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes), collected_processors));
pipeline.init(Pipe::unitePipes(std::move(pipes), collected_processors, false));
if (will_limit_max_threads)
{
@ -289,7 +280,9 @@ void QueryPipeline::addCreatingSetsTransform(const Block & res_header, SubqueryF
void QueryPipeline::addPipelineBefore(QueryPipeline pipeline)
{
checkInitializedAndNotCompleted();
assertBlocksHaveEqualStructure(getHeader(), pipeline.getHeader(), "QueryPipeline");
if (pipeline.getHeader())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for CreatingSets should have empty header. Got: {}",
pipeline.getHeader().dumpStructure());
IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts());
for (size_t i = 0; i < delayed_streams.size(); ++i)
@ -300,7 +293,7 @@ void QueryPipeline::addPipelineBefore(QueryPipeline pipeline)
Pipes pipes;
pipes.emplace_back(std::move(pipe));
pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline)));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
pipe = Pipe::unitePipes(std::move(pipes), collected_processors, true);
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams, true);
addTransform(std::move(processor));

View File

@ -90,13 +90,12 @@ public:
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static QueryPipeline unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
const Block & common_header,
const ExpressionActionsSettings & settings,
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
/// Add other pipeline and execute it before current one.
/// Pipeline must have same header.
/// Pipeline must have empty header, it should not generate any chunk.
/// This is used for CreatingSets.
void addPipelineBefore(QueryPipeline pipeline);
void addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, const Context & context);

View File

@ -1,42 +0,0 @@
#include <Processors/QueryPlan/AddingDelayedSourceStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false, /// New rows are added from delayed stream
}
};
}
AddingDelayedSourceStep::AddingDelayedSourceStep(
const DataStream & input_stream_,
ProcessorPtr source_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, source(std::move(source_))
{
}
void AddingDelayedSourceStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
source->setQueryPlanStep(this);
pipeline.addDelayedStream(source);
/// Now, after adding delayed stream, it has implicit dependency on other port.
/// Here we add resize processor to remove this dependency.
/// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck`
pipeline.resize(pipeline.getNumStreams(), true);
}
}

View File

@ -1,28 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
/// Adds another source to pipeline. Data from this source will be read after data from all other sources.
/// NOTE: tis step is needed because of non-joined data from JOIN. Remove this step after adding JoinStep.
class AddingDelayedSourceStep : public ITransformingStep
{
public:
AddingDelayedSourceStep(
const DataStream & input_stream_,
ProcessorPtr source_);
String getName() const override { return "AddingDelayedSource"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
private:
ProcessorPtr source;
};
}

View File

@ -30,12 +30,11 @@ static ITransformingStep::Traits getTraits()
CreatingSetStep::CreatingSetStep(
const DataStream & input_stream_,
Block header,
String description_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
const Context & context_)
: ITransformingStep(input_stream_, header, getTraits())
: ITransformingStep(input_stream_, Block{}, getTraits())
, description(std::move(description_))
, subquery_for_set(std::move(subquery_for_set_))
, network_transfer_limits(std::move(network_transfer_limits_))
@ -70,10 +69,12 @@ CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_)
output_stream = input_streams.front();
for (size_t i = 1; i < input_streams.size(); ++i)
assertBlocksHaveEqualStructure(output_stream->header, input_streams[i].header, "CreatingSets");
if (input_streams[i].header)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Creating set input must have empty header. Got: {}",
input_streams[i].header.dumpStructure());
}
QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings)
QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
if (pipelines.empty())
throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR);
@ -82,14 +83,13 @@ QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, cons
if (pipelines.size() == 1)
return main_pipeline;
std::swap(pipelines.front(), pipelines.back());
pipelines.pop_back();
pipelines.erase(pipelines.begin());
QueryPipeline delayed_pipeline;
if (pipelines.size() > 1)
{
QueryPipelineProcessorsCollector collector(delayed_pipeline, this);
delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header, settings.getActionsSettings());
delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines));
processors = collector.detachProcessors();
}
else
@ -129,7 +129,6 @@ void addCreatingSetsStep(
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
input_streams.front().header,
std::move(description),
std::move(set),
limits,

View File

@ -12,7 +12,6 @@ class CreatingSetStep : public ITransformingStep
public:
CreatingSetStep(
const DataStream & input_stream_,
Block header,
String description_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
@ -38,7 +37,7 @@ public:
String getName() const override { return "CreatingSets"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;

View File

@ -4,6 +4,8 @@
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Interpreters/JoinSwitcher.h>
namespace DB
{
@ -108,12 +110,14 @@ void ExpressionStep::describeActions(FormatSettings & settings) const
settings.out << '\n';
}
JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_)
JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_)
: ITransformingStep(
input_stream_,
Transform::transformHeader(input_stream_.header, join_),
getJoinTraits())
, join(std::move(join_))
, has_non_joined_rows(has_non_joined_rows_)
, max_block_size(max_block_size_)
{
}
@ -132,6 +136,21 @@ void JoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipel
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<Transform>(header, join, on_totals, add_default_totals);
});
if (has_non_joined_rows)
{
const Block & join_result_sample = pipeline.getHeader();
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, max_block_size);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
source->setQueryPlanStep(this);
pipeline.addDelayedStream(source);
/// Now, after adding delayed stream, it has implicit dependency on other port.
/// Here we add resize processor to remove this dependency.
/// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck`
pipeline.resize(pipeline.getNumStreams(), true);
}
}
}

View File

@ -40,13 +40,17 @@ class JoinStep : public ITransformingStep
public:
using Transform = JoiningTransform;
explicit JoinStep(const DataStream & input_stream_, JoinPtr join_);
explicit JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_);
String getName() const override { return "Join"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
const JoinPtr & getJoin() const { return join; }
private:
JoinPtr join;
bool has_non_joined_rows;
size_t max_block_size;
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <array>
namespace DB
@ -23,6 +24,7 @@ struct Optimization
using Function = size_t (*)(QueryPlan::Node *, QueryPlan::Nodes &);
const Function apply = nullptr;
const char * name;
const bool QueryPlanOptimizationSettings::* const is_enabled;
};
/// Move ARRAY JOIN up if possible.
@ -46,11 +48,11 @@ inline const auto & getOptimizations()
{
static const std::array<Optimization, 5> optimizations =
{{
{tryLiftUpArrayJoin, "liftUpArrayJoin"},
{tryPushDownLimit, "pushDownLimit"},
{trySplitFilter, "splitFilter"},
{tryMergeExpressions, "mergeExpressions"},
{tryPushDownFilter, "pushDownFilter"},
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
}};
return optimizations;

View File

@ -8,7 +8,9 @@ namespace DB
QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const Settings & from)
{
QueryPlanOptimizationSettings settings;
settings.optimize_plan = from.query_plan_enable_optimizations;
settings.max_optimizations_to_apply = from.query_plan_max_optimizations_to_apply;
settings.filter_push_down = from.query_plan_filter_push_down;
return settings;
}

View File

@ -14,6 +14,12 @@ struct QueryPlanOptimizationSettings
/// It helps to avoid infinite optimization loop.
size_t max_optimizations_to_apply = 0;
/// If disabled, no optimization applied.
bool optimize_plan = true;
/// If filter push down optimization is enabled.
bool filter_push_down = true;
static QueryPlanOptimizationSettings fromSettings(const Settings & from);
static QueryPlanOptimizationSettings fromContext(const Context & from);
};

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
@ -11,8 +12,10 @@
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -82,7 +85,7 @@ static size_t tryAddNewFilterStep(
return 3;
}
static Names getAggregatinKeys(const Aggregator::Params & params)
static Names getAggregatingKeys(const Aggregator::Params & params)
{
Names keys;
keys.reserve(params.keys.size());
@ -112,17 +115,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
if (auto * aggregating = typeid_cast<AggregatingStep *>(child.get()))
{
const auto & params = aggregating->getParams();
Names keys = getAggregatinKeys(params);
Names keys = getAggregatingKeys(params);
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys))
return updated_steps;
}
if (typeid_cast<CreatingSetsStep *>(child.get()))
{
/// CreatingSets does not change header.
/// We can push down filter and update header.
/// - Something
/// Filter - CreatingSets - CreatingSet
/// - CreatingSet
auto input_streams = child->getInputStreams();
input_streams.front() = filter->getOutputStream();
child = std::make_unique<CreatingSetsStep>(input_streams);
std::swap(parent, child);
std::swap(parent_node->children, child_node->children);
std::swap(parent_node->children.front(), child_node->children.front());
/// - Filter - Something
/// CreatingSets - CreatingSet
/// - CreatingSet
return 2;
}
if (auto * totals_having = typeid_cast<TotalsHavingStep *>(child.get()))
{
/// If totals step has HAVING expression, skip it for now.
/// TODO:
/// We can merge HAING expression with current filer.
/// We can merge HAVING expression with current filer.
/// Also, we can push down part of HAVING which depend only on aggregation keys.
if (totals_having->getActions())
return 0;
@ -168,6 +190,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps;
}
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
const auto & table_join = join->getJoin()->getTableJoin();
/// Push down is for left table only. We need to update JoinStep for push down into right.
/// Only inner and left join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left)
{
const auto & left_header = join->getInputStreams().front().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
for (const auto & name : table_join.keyNamesLeft())
{
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.
if (!left_header.has(name) || !res_header.has(name))
continue;
/// Skip if type is changed. Push down expression expect equal types.
if (!left_header.getByName(name).type->equals(*res_header.getByName(name).type))
continue;
allowed_keys.push_back(name);
}
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys))
return updated_steps;
}
}
/// TODO.
/// We can filter earlier if expression does not depend on WITH FILL columns.
/// But we cannot just push down condition, because other column may be filled with defaults.
@ -193,6 +245,48 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps;
}
if (auto * union_step = typeid_cast<UnionStep *>(child.get()))
{
/// Union does not change header.
/// We can push down filter and update header.
auto union_input_streams = child->getInputStreams();
for (auto & input_stream : union_input_streams)
input_stream.header = filter->getOutputStream().header;
/// - Something
/// Filter - Union - Something
/// - Something
child = std::make_unique<UnionStep>(union_input_streams, union_step->getMaxThreads());
std::swap(parent, child);
std::swap(parent_node->children, child_node->children);
std::swap(parent_node->children.front(), child_node->children.front());
/// - Filter - Something
/// Union - Something
/// - Something
for (size_t i = 1; i < parent_node->children.size(); ++i)
{
auto & filter_node = nodes.emplace_back();
filter_node.children.push_back(parent_node->children[i]);
parent_node->children[i] = &filter_node;
filter_node.step = std::make_unique<FilterStep>(
filter_node.children.front()->step->getOutputStream(),
filter->getExpression()->clone(),
filter->getFilterColumnName(),
filter->removesFilterColumn());
}
/// - Filter - Something
/// Union - Filter - Something
/// - Filter - Something
return 3;
}
return 0;
}

View File

@ -16,6 +16,9 @@ namespace QueryPlanOptimizations
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
{
if (!settings.optimize_plan)
return;
const auto & optimizations = getOptimizations();
struct Frame
@ -63,6 +66,9 @@ void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Nod
/// Apply all optimizations.
for (const auto & optimization : optimizations)
{
if (!(settings.*(optimization.is_enabled)))
continue;
/// Just in case, skip optimization if it is not initialized.
if (!optimization.apply)
continue;

View File

@ -6,8 +6,25 @@
namespace DB
{
UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_)
: header(std::move(result_header))
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static Block checkHeaders(const DataStreams & input_streams)
{
if (input_streams.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of query plan steps");
Block res = input_streams.front().header;
for (const auto & stream : input_streams)
assertBlocksHaveEqualStructure(stream.header, res, "UnionStep");
return res;
}
UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_)
: header(checkHeaders(input_streams_))
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
@ -18,7 +35,7 @@ UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max
output_stream = DataStream{.header = header};
}
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings)
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
auto pipeline = std::make_unique<QueryPipeline>();
QueryPipelineProcessorsCollector collector(*pipeline, this);
@ -30,7 +47,7 @@ QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const Build
return pipeline;
}
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header, settings.getActionsSettings(), max_threads);
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads);
processors = collector.detachProcessors();
return pipeline;

View File

@ -9,14 +9,16 @@ class UnionStep : public IQueryPlanStep
{
public:
/// max_threads is used to limit the number of threads for result pipeline.
UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_ = 0);
explicit UnionStep(DataStreams input_streams_, size_t max_threads_ = 0);
String getName() const override { return "Union"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;
size_t getMaxThreads() const { return max_threads; }
private:
Block header;
size_t max_threads;

View File

@ -93,7 +93,6 @@ SRCS(
Pipe.cpp
Port.cpp
QueryPipeline.cpp
QueryPlan/AddingDelayedSourceStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/ArrayJoinStep.cpp
QueryPlan/BuildQueryPipelineSettings.cpp

View File

@ -1360,8 +1360,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
for (const auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
const auto & common_header = plans.front()->getCurrentDataStream().header;
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), common_header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
auto plan = std::make_unique<QueryPlan>();
plan->unitePlans(std::move(union_step), std::move(plans));

View File

@ -423,7 +423,7 @@ void StorageBuffer::read(
plans.emplace_back(std::make_unique<QueryPlan>(std::move(buffers_plan)));
query_plan = QueryPlan();
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), result_header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
union_step->setStepDescription("Unite sources from Buffer table");
query_plan.unitePlans(std::move(union_step), std::move(plans));
}

View File

@ -697,7 +697,7 @@ QueryPipelinePtr StorageDistributed::distributedWrite(const ASTInsertQuery & que
}
return std::make_unique<QueryPipeline>(
QueryPipeline::unitePipelines(std::move(pipelines), {}, ExpressionActionsSettings::fromContext(context)));
QueryPipeline::unitePipelines(std::move(pipelines)));
}

View File

@ -13,12 +13,12 @@
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='LEFT'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>

View File

@ -3,53 +3,54 @@
<settings>
<join_algorithm>partial_merge</join_algorithm>
<query_plan_filter_push_down>0</query_plan_filter_push_down>
</settings>
<fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 20000 + number % 100 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 30000 + number % 10 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 40000 + number % 1 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 20000 + number % 100 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 30000 + number % 10 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 40000 + number % 1 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<query short='1' tag='ANY LEFT'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query short='1' tag='ANY LEFT KEY'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query short='1' tag='ANY LEFT ON'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query short='1' tag='ANY LEFT'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT KEY'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT ON'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query tag='LEFT'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='LEFT'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT KEY (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT ON (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT IN (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT KEY (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT ON (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT IN (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER KEY (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER ON (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER IN (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER KEY (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER ON (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER IN (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT KEY (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT ON (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT IN (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT KEY (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT ON (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT IN (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='RIGHT'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='RIGHT KEY'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='RIGHT ON'>SELECT COUNT() FROM ints l RIGHT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='RIGHT IN'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='RIGHT'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='RIGHT KEY'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='RIGHT ON'>SELECT COUNT() FROM ints l RIGHT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='RIGHT IN'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query tag='FULL'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='FULL KEY'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='FULL ON'>SELECT COUNT() FROM ints l FULL JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='FULL IN'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='FULL'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='FULL KEY'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='FULL ON'>SELECT COUNT() FROM ints l FULL JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='FULL IN'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<drop_query>DROP TABLE IF EXISTS ints</drop_query>
</test>

View File

@ -123,3 +123,26 @@ Filter column: notEquals(y, 2)
3 10
0 37
> filter is pushed down before CreatingSets
CreatingSets
Filter
Filter
1
3
> one condition of filter is pushed down before LEFT JOIN
Join
Filter column: notEquals(number, 1)
Join
0 0
3 3
> one condition of filter is pushed down before INNER JOIN
Join
Filter column: notEquals(number, 1)
Join
3 3
> filter is pushed down before UNION
Union
Filter
Filter
2 3
2 3

View File

@ -150,3 +150,49 @@ $CLICKHOUSE_CLIENT -q "
select * from (
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
) where y != 2"
echo "> filter is pushed down before CreatingSets"
$CLICKHOUSE_CLIENT -q "
explain select number from (
select number from numbers(5) where number in (select 1 + number from numbers(3))
) where number != 2 settings enable_optimize_predicate_expression=0" |
grep -o "CreatingSets\|Filter"
$CLICKHOUSE_CLIENT -q "
select number from (
select number from numbers(5) where number in (select 1 + number from numbers(3))
) where number != 2 settings enable_optimize_predicate_expression=0"
echo "> one condition of filter is pushed down before LEFT JOIN"
$CLICKHOUSE_CLIENT -q "
explain actions = 1
select number as a, r.b from numbers(4) as l any left join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" |
grep -o "Join\|Filter column: notEquals(number, 1)"
$CLICKHOUSE_CLIENT -q "
select number as a, r.b from numbers(4) as l any left join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0"
echo "> one condition of filter is pushed down before INNER JOIN"
$CLICKHOUSE_CLIENT -q "
explain actions = 1
select number as a, r.b from numbers(4) as l any inner join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" |
grep -o "Join\|Filter column: notEquals(number, 1)"
$CLICKHOUSE_CLIENT -q "
select number as a, r.b from numbers(4) as l any inner join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0"
echo "> filter is pushed down before UNION"
$CLICKHOUSE_CLIENT -q "
explain select a, b from (
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
) where a != 1 settings enable_optimize_predicate_expression = 0" |
grep -o "Union\|Filter"
$CLICKHOUSE_CLIENT -q "
select a, b from (
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
) where a != 1 settings enable_optimize_predicate_expression = 0"