Add optimize_skip_unused_shards_limit

Limit for number of sharding key values, turns off
optimize_skip_unused_shards if the limit is reached
This commit is contained in:
Azat Khuzhin 2021-03-07 22:23:45 +03:00
parent 40a99b7431
commit 16f4c02d42
5 changed files with 30 additions and 0 deletions

View File

@ -1514,6 +1514,14 @@ FORMAT PrettyCompactMonoBlock
Default value: 0
## optimize_skip_unused_shards_limit {#optimize-skip-unused-shards-limit}
Limit for number of sharding key values, turns off `optimize_skip_unused_shards` if the limit is reached.
Too many values may require significant amount for processing, while the benefit is doubtful, since if you have huge number of values in `IN (...)`, then most likely the query will be sent to all shards anyway.
Default value: 1000
## optimize_skip_unused_shards {#optimize-skip-unused-shards}
Enables or disables skipping of unused shards for [SELECT](../../sql-reference/statements/select/index.md) queries that have sharding key condition in `WHERE/PREWHERE` (assuming that the data is distributed by sharding key, otherwise does nothing).

View File

@ -116,6 +116,7 @@ class IColumn;
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. If 2 - same as 1 but also apply ORDER BY and LIMIT stages", 0) \
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \
M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \

View File

@ -915,6 +915,11 @@ ClusterPtr StorageDistributed::skipUnusedShards(
return nullptr;
}
// Too huge number of values
size_t limit = context.getSettingsRef().optimize_skip_unused_shards_limit;
if (limit && blocks->size() > limit)
return nullptr;
std::set<int> shards;
for (const auto & block : *blocks)

View File

@ -0,0 +1,16 @@
drop table if exists dist_01757;
create table dist_01757 as system.one engine=Distributed(test_cluster_two_shards, system, one, dummy);
set optimize_skip_unused_shards=1;
set force_optimize_skip_unused_shards=2;
select * from dist_01757 where dummy in (0,) format Null;
select * from dist_01757 where dummy in (0, 1) format Null settings optimize_skip_unused_shards_limit=2;
select * from dist_01757 where dummy in (0, 1) format Null settings optimize_skip_unused_shards_limit=0;
select * from dist_01757 where dummy in (0, 1) format Null settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
select * from dist_01757 where dummy = 0 or dummy = 1 format Null settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
select * from dist_01757 where dummy = 0 and dummy = 1 format Null settings optimize_skip_unused_shards_limit=1;
drop table dist_01757;