Fix filter pushdown for full_sorting_merge join

This commit is contained in:
vdimir 2023-08-22 15:06:50 +00:00
parent 1822012180
commit b23773380c
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
6 changed files with 43 additions and 4 deletions

View File

@ -2191,8 +2191,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
/// Replace predicate result to constant 1.
Node node;
node.type = ActionType::COLUMN;
node.result_name = std::move(predicate->result_name);
node.result_type = std::move(predicate->result_type);
node.result_name = predicate->result_name;
node.result_type = predicate->result_type;
node.column = node.result_type->createColumnConst(0, 1);
if (predicate->type != ActionType::INPUT)

View File

@ -201,5 +201,9 @@ void CreateSetAndFilterOnTheFlyStep::updateOutputStream()
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
}
bool CreateSetAndFilterOnTheFlyStep::isColumnPartOfSetKey(const String & column_name) const
{
return std::find(column_names.begin(), column_names.end(), column_name) != column_names.end();
}
}

View File

@ -35,6 +35,8 @@ public:
SetWithStatePtr getSet() const { return own_set; }
bool isColumnPartOfSetKey(const String & column_name) const;
/// Set for another stream.
void setFiltering(SetWithStatePtr filtering_set_) { filtering_set = filtering_set_; }

View File

@ -428,8 +428,15 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps;
}
if (auto updated_steps = simplePushDownOverStep<CreateSetAndFilterOnTheFlyStep>(parent_node, nodes, child))
return updated_steps;
if (const auto * join_filter_set_step = typeid_cast<CreateSetAndFilterOnTheFlyStep *>(child.get()))
{
const auto & filter_column_name = assert_cast<const FilterStep *>(parent_node->step.get())->getFilterColumnName();
bool can_remove_filter = !join_filter_set_step->isColumnPartOfSetKey(filter_column_name);
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs, can_remove_filter))
return updated_steps;
}
if (auto * union_step = typeid_cast<UnionStep *>(child.get()))
{

View File

@ -0,0 +1,8 @@
1
1
1
1 1
1
1
1
1 1

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (key UInt8) ENGINE = Memory;
INSERT INTO t1 VALUES (1),(2);
SET join_algorithm = 'full_sorting_merge';
SELECT key FROM ( SELECT key FROM t1 ) AS t1 JOIN ( SELECT key FROM t1 ) AS t2 ON t1.key = t2.key WHERE key;
SELECT key FROM ( SELECT 1 AS key ) AS t1 JOIN ( SELECT 1 AS key ) AS t2 ON t1.key = t2.key WHERE key;
SELECT * FROM ( SELECT 1 AS key GROUP BY NULL ) AS t1 INNER JOIN (SELECT 1 AS key) AS t2 ON t1.key = t2.key WHERE t1.key ORDER BY key;
SET max_rows_in_set_to_optimize_join = 0;
SELECT key FROM ( SELECT key FROM t1 ) AS t1 JOIN ( SELECT key FROM t1 ) AS t2 ON t1.key = t2.key WHERE key;
SELECT key FROM ( SELECT 1 AS key ) AS t1 JOIN ( SELECT 1 AS key ) AS t2 ON t1.key = t2.key WHERE key;
SELECT * FROM ( SELECT 1 AS key GROUP BY NULL ) AS t1 INNER JOIN (SELECT 1 AS key) AS t2 ON t1.key = t2.key WHERE t1.key ORDER BY key;
DROP TABLE IF EXISTS t1;