Merge pull request #23434 from amosbird/preferglobal

Add prefer_global_in_and_join setting
This commit is contained in:
alexey-milovidov 2021-06-17 19:48:28 +03:00 committed by GitHub
commit c48402d997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 181 additions and 23 deletions

View File

@ -251,6 +251,7 @@ class IColumn;
M(String, metrics_perf_events_list, "", "Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.", 0) \
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \
M(Bool, prefer_column_name_to_alias, false, "Prefer using column names instead of aliases if possible.", 0) \
M(Bool, prefer_global_in_and_join, false, "If enabled, all IN/JOIN operators will be rewritten as GLOBAL IN/JOIN. It's useful when the to-be-joined tables are only available on the initiator and we need to always scatter their data on-the-fly during distributed processing with the GLOBAL keyword. It's also useful to reduce the need to access the external sources joining external tables.", 0) \
\
\
/** Limits during query execution are part of the settings. \

View File

@ -192,7 +192,9 @@ private:
/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
if (func.name == "globalIn" || func.name == "globalNotIn")
if ((data.getContext()->getSettingsRef().prefer_global_in_and_join
&& (func.name == "in" || func.name == "notIn" || func.name == "nullIn" || func.name == "notNullIn"))
|| func.name == "globalIn" || func.name == "globalNotIn" || func.name == "globalNullIn" || func.name == "globalNotNullIn")
{
ASTPtr & ast = func.arguments->children[1];
@ -201,8 +203,12 @@ private:
{
if (func.name == "globalIn")
func.name = "in";
else
else if (func.name == "globalNotIn")
func.name = "notIn";
else if (func.name == "globalNullIn")
func.name = "nullIn";
else if (func.name == "globalNotNullIn")
func.name = "notNullIn";
return;
}
@ -214,7 +220,9 @@ private:
/// GLOBAL JOIN
static void visit(ASTTablesInSelectQueryElement & table_elem, ASTPtr &, Data & data)
{
if (table_elem.table_join && table_elem.table_join->as<ASTTableJoin &>().locality == ASTTableJoin::Locality::Global)
if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == ASTTableJoin::Locality::Global
|| data.getContext()->getSettingsRef().prefer_global_in_and_join))
{
data.addExternalStorage(table_elem.table_expression, true);
data.has_global_subqueries = true;

View File

@ -70,13 +70,24 @@ private:
if (!storage || !checker.hasAtLeastTwoShards(*storage))
return;
if (distributed_product_mode == DistributedProductMode::DENY)
if (distributed_product_mode == DistributedProductMode::LOCAL)
{
throw Exception("Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny')."
" You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.",
ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED);
/// Convert distributed table to corresponding remote table.
std::string database;
std::string table;
std::tie(database, table) = checker.getRemoteDatabaseAndTableName(*storage);
String alias = database_and_table->tryGetAlias();
if (alias.empty())
throw Exception("Distributed table should have an alias when distributed_product_mode set to local",
ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED);
auto & identifier = database_and_table->as<ASTTableIdentifier &>();
renamed_tables.emplace_back(identifier.clone());
identifier.resetTable(database, table);
}
else if (distributed_product_mode == DistributedProductMode::GLOBAL)
else if (getContext()->getSettingsRef().prefer_global_in_and_join || distributed_product_mode == DistributedProductMode::GLOBAL)
{
if (function)
{
@ -98,22 +109,11 @@ private:
else
throw Exception("Logical error: unexpected AST node", ErrorCodes::LOGICAL_ERROR);
}
else if (distributed_product_mode == DistributedProductMode::LOCAL)
else if (distributed_product_mode == DistributedProductMode::DENY)
{
/// Convert distributed table to corresponding remote table.
std::string database;
std::string table;
std::tie(database, table) = checker.getRemoteDatabaseAndTableName(*storage);
String alias = database_and_table->tryGetAlias();
if (alias.empty())
throw Exception("Distributed table should have an alias when distributed_product_mode set to local",
ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED);
auto & identifier = database_and_table->as<ASTTableIdentifier &>();
renamed_tables.emplace_back(identifier.clone());
identifier.resetTable(database, table);
throw Exception("Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny')."
" You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.",
ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED);
}
else
throw Exception("InJoinSubqueriesPreprocessor: unexpected value of 'distributed_product_mode' setting",

View File

@ -0,0 +1,83 @@
-- { echo }
CREATE DATABASE IF NOT EXISTS test_01824;
USE test_01824;
DROP TABLE IF EXISTS t1_shard;
DROP TABLE IF EXISTS t2_shard;
DROP TABLE IF EXISTS t1_distr;
DROP TABLE IF EXISTS t2_distr;
create table t1_shard (id Int32) engine MergeTree order by id;
create table t2_shard (id Int32) engine MergeTree order by id;
create table t1_distr as t1_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t1_shard, id);
create table t2_distr as t2_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t2_shard, id);
insert into t1_shard values (42);
insert into t2_shard values (42);
SET prefer_global_in_and_join = 1;
select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
42
42
42
42
42
42
42
42
explain syntax select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
SELECT id
FROM t1_distr AS d0
GLOBAL ALL INNER JOIN
(
SELECT id
FROM t1_distr AS d1
ALL INNER JOIN t2_distr AS d2 ON id = d2.id
WHERE id > 0
ORDER BY id ASC
) AS s0 USING (id)
-- Force using local mode
set distributed_product_mode = 'local';
select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
42
42
explain syntax select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
SELECT id
FROM t1_distr AS d0
ALL INNER JOIN
(
SELECT id
FROM test_01824.t1_shard AS d1
ALL INNER JOIN test_01824.t2_shard AS d2 ON id = d2.id
WHERE id > 0
ORDER BY id ASC
) AS s0 USING (id)
DROP TABLE t1_shard;
DROP TABLE t2_shard;
DROP TABLE t1_distr;
DROP TABLE t2_distr;
DROP DATABASE test_01824;

View File

@ -0,0 +1,64 @@
-- { echo }
CREATE DATABASE IF NOT EXISTS test_01824;
USE test_01824;
DROP TABLE IF EXISTS t1_shard;
DROP TABLE IF EXISTS t2_shard;
DROP TABLE IF EXISTS t1_distr;
DROP TABLE IF EXISTS t2_distr;
create table t1_shard (id Int32) engine MergeTree order by id;
create table t2_shard (id Int32) engine MergeTree order by id;
create table t1_distr as t1_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t1_shard, id);
create table t2_distr as t2_shard engine Distributed(test_cluster_two_shards_localhost, test_01824, t2_shard, id);
insert into t1_shard values (42);
insert into t2_shard values (42);
SET prefer_global_in_and_join = 1;
select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
explain syntax select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
-- Force using local mode
set distributed_product_mode = 'local';
select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
explain syntax select d0.id from t1_distr d0
join (
select d1.id
from t1_distr as d1
inner join t2_distr as d2 on d1.id = d2.id
where d1.id > 0
order by d1.id
) s0 using id;
DROP TABLE t1_shard;
DROP TABLE t2_shard;
DROP TABLE t1_distr;
DROP TABLE t2_distr;
DROP DATABASE test_01824;

View File

@ -243,3 +243,4 @@
01892_setting_limit_offset_distributed
01901_test_attach_partition_from
01910_view_dictionary
01824_prefer_global_in_and_join

View File

@ -830,6 +830,7 @@
"01850_dist_INSERT_preserve_error", // uses cluster with different static databases shard_0/shard_1
"01821_table_comment",
"01710_projection_fetch",
"01824_prefer_global_in_and_join",
"01870_modulo_partition_key",
"01870_buffer_flush", // creates database
"01889_postgresql_protocol_null_fields",