diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 3b91e26cd4f..a2e131dd0b8 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -251,6 +251,7 @@ class IColumn; M(String, metrics_perf_events_list, "", "Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.", 0) \ M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \ M(Bool, prefer_column_name_to_alias, false, "Prefer using column names instead of aliases if possible.", 0) \ + M(Bool, prefer_global_in_and_join, false, "If enabled, all IN/JOIN operators will be rewritten as GLOBAL IN/JOIN. It's useful when the to-be-joined tables are only available on the initiator and we need to always scatter their data on-the-fly during distributed processing with the GLOBAL keyword. It's also useful to reduce the need to access the external sources joining external tables.", 0) \ \ \ /** Limits during query execution are part of the settings. \ diff --git a/src/Interpreters/GlobalSubqueriesVisitor.h b/src/Interpreters/GlobalSubqueriesVisitor.h index d3efb935fea..5d92f4f8b6f 100644 --- a/src/Interpreters/GlobalSubqueriesVisitor.h +++ b/src/Interpreters/GlobalSubqueriesVisitor.h @@ -192,7 +192,9 @@ private: /// GLOBAL IN static void visit(ASTFunction & func, ASTPtr &, Data & data) { - if (func.name == "globalIn" || func.name == "globalNotIn") + if ((data.getContext()->getSettingsRef().prefer_global_in_and_join + && (func.name == "in" || func.name == "notIn" || func.name == "nullIn" || func.name == "notNullIn")) + || func.name == "globalIn" || func.name == "globalNotIn" || func.name == "globalNullIn" || func.name == "globalNotNullIn") { ASTPtr & ast = func.arguments->children[1]; @@ -201,8 +203,12 @@ private: { if (func.name == "globalIn") func.name = "in"; - else + else if (func.name == "globalNotIn") func.name = "notIn"; + else if (func.name == "globalNullIn") + func.name = "nullIn"; + else if (func.name == "globalNotNullIn") + func.name = "notNullIn"; return; } @@ -214,7 +220,9 @@ private: /// GLOBAL JOIN static void visit(ASTTablesInSelectQueryElement & table_elem, ASTPtr &, Data & data) { - if (table_elem.table_join && table_elem.table_join->as().locality == ASTTableJoin::Locality::Global) + if (table_elem.table_join + && (table_elem.table_join->as().locality == ASTTableJoin::Locality::Global + || data.getContext()->getSettingsRef().prefer_global_in_and_join)) { data.addExternalStorage(table_elem.table_expression, true); data.has_global_subqueries = true; diff --git a/src/Interpreters/InJoinSubqueriesPreprocessor.cpp b/src/Interpreters/InJoinSubqueriesPreprocessor.cpp index 3c1e9c19db7..1fad674f256 100644 --- a/src/Interpreters/InJoinSubqueriesPreprocessor.cpp +++ b/src/Interpreters/InJoinSubqueriesPreprocessor.cpp @@ -70,13 +70,24 @@ private: if (!storage || !checker.hasAtLeastTwoShards(*storage)) return; - if (distributed_product_mode == DistributedProductMode::DENY) + if (distributed_product_mode == DistributedProductMode::LOCAL) { - throw Exception("Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny')." - " You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.", - ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED); + /// Convert distributed table to corresponding remote table. + + std::string database; + std::string table; + std::tie(database, table) = checker.getRemoteDatabaseAndTableName(*storage); + + String alias = database_and_table->tryGetAlias(); + if (alias.empty()) + throw Exception("Distributed table should have an alias when distributed_product_mode set to local", + ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED); + + auto & identifier = database_and_table->as(); + renamed_tables.emplace_back(identifier.clone()); + identifier.resetTable(database, table); } - else if (distributed_product_mode == DistributedProductMode::GLOBAL) + else if (getContext()->getSettingsRef().prefer_global_in_and_join || distributed_product_mode == DistributedProductMode::GLOBAL) { if (function) { @@ -98,22 +109,11 @@ private: else throw Exception("Logical error: unexpected AST node", ErrorCodes::LOGICAL_ERROR); } - else if (distributed_product_mode == DistributedProductMode::LOCAL) + else if (distributed_product_mode == DistributedProductMode::DENY) { - /// Convert distributed table to corresponding remote table. - - std::string database; - std::string table; - std::tie(database, table) = checker.getRemoteDatabaseAndTableName(*storage); - - String alias = database_and_table->tryGetAlias(); - if (alias.empty()) - throw Exception("Distributed table should have an alias when distributed_product_mode set to local", - ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED); - - auto & identifier = database_and_table->as(); - renamed_tables.emplace_back(identifier.clone()); - identifier.resetTable(database, table); + throw Exception("Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny')." + " You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.", + ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED); } else throw Exception("InJoinSubqueriesPreprocessor: unexpected value of 'distributed_product_mode' setting", diff --git a/tests/queries/0_stateless/01824_prefer_global_in_and_join.reference b/tests/queries/0_stateless/01824_prefer_global_in_and_join.reference new file mode 100644 index 00000000000..195630268b6 --- /dev/null +++ b/tests/queries/0_stateless/01824_prefer_global_in_and_join.reference @@ -0,0 +1,83 @@ +-- { echo } +CREATE DATABASE IF NOT EXISTS test_01824; +USE test_01824; +DROP TABLE IF EXISTS t1_shard; +DROP TABLE IF EXISTS t2_shard; +DROP TABLE IF EXISTS t1_distr; +DROP TABLE IF EXISTS t2_distr; +create table t1_shard (id Int32) engine MergeTree order by id; +create table t2_shard (id Int32) engine MergeTree order by id; +create table t1_distr as t1_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t1_shard, id); +create table t2_distr as t2_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t2_shard, id); +insert into t1_shard values (42); +insert into t2_shard values (42); +SET prefer_global_in_and_join = 1; +select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; +42 +42 +42 +42 +42 +42 +42 +42 +explain syntax select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; +SELECT id +FROM t1_distr AS d0 +GLOBAL ALL INNER JOIN +( + SELECT id + FROM t1_distr AS d1 + ALL INNER JOIN t2_distr AS d2 ON id = d2.id + WHERE id > 0 + ORDER BY id ASC +) AS s0 USING (id) +-- Force using local mode +set distributed_product_mode = 'local'; +select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; +42 +42 +explain syntax select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; +SELECT id +FROM t1_distr AS d0 +ALL INNER JOIN +( + SELECT id + FROM test_01824.t1_shard AS d1 + ALL INNER JOIN test_01824.t2_shard AS d2 ON id = d2.id + WHERE id > 0 + ORDER BY id ASC +) AS s0 USING (id) +DROP TABLE t1_shard; +DROP TABLE t2_shard; +DROP TABLE t1_distr; +DROP TABLE t2_distr; +DROP DATABASE test_01824; diff --git a/tests/queries/0_stateless/01824_prefer_global_in_and_join.sql b/tests/queries/0_stateless/01824_prefer_global_in_and_join.sql new file mode 100644 index 00000000000..7f163f9ac7b --- /dev/null +++ b/tests/queries/0_stateless/01824_prefer_global_in_and_join.sql @@ -0,0 +1,64 @@ +-- { echo } +CREATE DATABASE IF NOT EXISTS test_01824; +USE test_01824; + +DROP TABLE IF EXISTS t1_shard; +DROP TABLE IF EXISTS t2_shard; +DROP TABLE IF EXISTS t1_distr; +DROP TABLE IF EXISTS t2_distr; + +create table t1_shard (id Int32) engine MergeTree order by id; +create table t2_shard (id Int32) engine MergeTree order by id; + +create table t1_distr as t1_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t1_shard, id); +create table t2_distr as t2_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t2_shard, id); + +insert into t1_shard values (42); +insert into t2_shard values (42); + +SET prefer_global_in_and_join = 1; + +select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; + +explain syntax select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; + +-- Force using local mode +set distributed_product_mode = 'local'; + +select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; + +explain syntax select d0.id from t1_distr d0 +join ( + select d1.id + from t1_distr as d1 + inner join t2_distr as d2 on d1.id = d2.id + where d1.id > 0 + order by d1.id +) s0 using id; + +DROP TABLE t1_shard; +DROP TABLE t2_shard; +DROP TABLE t1_distr; +DROP TABLE t2_distr; +DROP DATABASE test_01824; diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 494917d0794..69bb176f7fb 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -243,3 +243,4 @@ 01892_setting_limit_offset_distributed 01901_test_attach_partition_from 01910_view_dictionary +01824_prefer_global_in_and_join diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 5b7965812c9..45d3dbf56d6 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -830,6 +830,7 @@ "01850_dist_INSERT_preserve_error", // uses cluster with different static databases shard_0/shard_1 "01821_table_comment", "01710_projection_fetch", + "01824_prefer_global_in_and_join", "01870_modulo_partition_key", "01870_buffer_flush", // creates database "01889_postgresql_protocol_null_fields",