mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Fix pushdown for join with external table
Fix possible incorrect result for queries joining and filtering table external engine (like PostgreSQL), due to too agressive filter pushdown. Since now, conditions from where section won't be send to external database in case of outer join with external table.
This commit is contained in:
parent
27d7fd05a9
commit
7781067ba1
@ -419,7 +419,7 @@ String transformQueryForExternalDatabase(
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "No column names for query '{}' to external table '{}.{}'",
|
||||
query_info.query_tree->formatASTForErrorMessage(), database, table);
|
||||
|
||||
auto clone_query = getASTForExternalDatabaseFromQueryTree(query_info.query_tree);
|
||||
auto clone_query = getASTForExternalDatabaseFromQueryTree(query_info.query_tree, query_info.table_expression);
|
||||
|
||||
return transformQueryForExternalDatabaseImpl(
|
||||
clone_query,
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Storages/transformQueryForExternalDatabaseAnalyzer.h>
|
||||
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Analyzer/InDepthQueryTreeVisitor.h>
|
||||
|
||||
#include <Columns/ColumnConst.h>
|
||||
@ -10,7 +11,7 @@
|
||||
#include <Analyzer/QueryNode.h>
|
||||
#include <Analyzer/ConstantNode.h>
|
||||
#include <Analyzer/ConstantValue.h>
|
||||
|
||||
#include <Analyzer/JoinNode.h>
|
||||
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
|
||||
@ -20,6 +21,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -55,7 +57,7 @@ public:
|
||||
|
||||
}
|
||||
|
||||
ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree)
|
||||
ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree, const QueryTreeNodePtr & table_expression)
|
||||
{
|
||||
auto new_tree = query_tree->clone();
|
||||
|
||||
@ -63,6 +65,21 @@ ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tre
|
||||
visitor.visit(new_tree);
|
||||
const auto * query_node = new_tree->as<QueryNode>();
|
||||
|
||||
const auto & join_tree = query_node->getJoinTree();
|
||||
bool allow_where = true;
|
||||
if (const auto * join_node = join_tree->as<JoinNode>())
|
||||
{
|
||||
if (join_node->getStrictness() != JoinStrictness::All)
|
||||
allow_where = false;
|
||||
|
||||
if (join_node->getKind() == JoinKind::Left)
|
||||
allow_where = join_node->getLeftTableExpression()->isEqual(*table_expression);
|
||||
else if (join_node->getKind() == JoinKind::Right)
|
||||
allow_where = join_node->getRightTableExpression()->isEqual(*table_expression);
|
||||
else
|
||||
allow_where = (join_node->getKind() == JoinKind::Inner);
|
||||
}
|
||||
|
||||
auto query_node_ast = query_node->toAST({ .add_cast_for_constants = false, .fully_qualified_identifiers = false });
|
||||
const IAST * ast = query_node_ast.get();
|
||||
|
||||
@ -76,7 +93,13 @@ ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tre
|
||||
if (union_ast->list_of_selects->children.size() != 1)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "QueryNode AST is not a single ASTSelectQuery, got {}", union_ast->list_of_selects->children.size());
|
||||
|
||||
return union_ast->list_of_selects->children.at(0);
|
||||
ASTPtr select_query = union_ast->list_of_selects->children.at(0);
|
||||
auto * select_query_typed = select_query->as<ASTSelectQuery>();
|
||||
if (!select_query_typed)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected ASTSelectQuery, got {}", select_query ? select_query->formatForErrorMessage() : "nullptr");
|
||||
if (!allow_where)
|
||||
select_query_typed->setExpression(ASTSelectQuery::Expression::WHERE, nullptr);
|
||||
return select_query;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,6 +6,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree);
|
||||
ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree, const QueryTreeNodePtr & table_expression);
|
||||
|
||||
}
|
||||
|
@ -834,6 +834,60 @@ def test_literal_escaping(started_cluster):
|
||||
cursor.execute(f"DROP TABLE escaping")
|
||||
|
||||
|
||||
def test_filter_pushdown(started_cluster):
|
||||
cursor = started_cluster.postgres_conn.cursor()
|
||||
cursor.execute("CREATE SCHEMA test_filter_pushdown")
|
||||
cursor.execute(
|
||||
"CREATE TABLE test_filter_pushdown.test_table (id integer, value integer)"
|
||||
)
|
||||
cursor.execute(
|
||||
"INSERT INTO test_filter_pushdown.test_table VALUES (1, 10), (1, 110), (2, 0), (3, 33), (4, 0)"
|
||||
)
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
CREATE TABLE test_filter_pushdown_pg_table (id UInt32, value UInt32)
|
||||
ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_filter_pushdown');
|
||||
"""
|
||||
)
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
CREATE TABLE test_filter_pushdown_local_table (id UInt32, value UInt32) ENGINE Memory AS SELECT * FROM test_filter_pushdown_pg_table
|
||||
"""
|
||||
)
|
||||
|
||||
node1.query(
|
||||
"CREATE TABLE ch_table (id UInt32, pg_id UInt32) ENGINE MergeTree ORDER BY id"
|
||||
)
|
||||
node1.query("INSERT INTO ch_table VALUES (1, 1), (2, 2), (3, 1), (4, 2), (5, 999)")
|
||||
|
||||
def compare_results(query, **kwargs):
|
||||
result1 = node1.query(
|
||||
query.format(pg_table="test_filter_pushdown_pg_table", **kwargs)
|
||||
)
|
||||
result2 = node1.query(
|
||||
query.format(pg_table="test_filter_pushdown_local_table", **kwargs)
|
||||
)
|
||||
assert result1 == result2
|
||||
|
||||
for kind in ["INNER", "LEFT", "RIGHT", "FULL"]:
|
||||
for value in [0, 10]:
|
||||
compare_results(
|
||||
"SELECT * FROM ch_table {kind} JOIN {pg_table} as p ON ch_table.pg_id = p.id WHERE value = {value} ORDER BY ALL",
|
||||
kind=kind,
|
||||
value=value,
|
||||
)
|
||||
|
||||
compare_results(
|
||||
"SELECT * FROM {pg_table} as p {kind} JOIN ch_table ON ch_table.pg_id = p.id WHERE value = {value} ORDER BY ALL",
|
||||
kind=kind,
|
||||
value=value,
|
||||
)
|
||||
|
||||
cursor.execute("DROP SCHEMA test_filter_pushdown CASCADE")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
Loading…
Reference in New Issue
Block a user