Merge pull request #16105 from azat/allow_nondeterministic_optimize_skip_unused_shards

Add allow_nondeterministic_optimize_skip_unused_shards
This commit is contained in:
alexey-milovidov 2020-10-21 00:16:26 +03:00 committed by GitHub
commit 8084ce75cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 3 deletions

View File

@ -1406,6 +1406,17 @@ Possible values:
Default value: 0 Default value: 0
## allow_nondeterministic_optimize_skip_unused_shards {#allow-nondeterministic-optimize-skip-unused-shards}
Allow nondeterministic (like `rand` or `dictGet`, since later has some caveats with updates) functions in sharding key.
Possible values:
- 0 — Disallowed.
- 1 — Allowed.
Default value: 0
## optimize_skip_unused_shards_nesting {#optimize-skip-unused-shards-nesting} ## optimize_skip_unused_shards_nesting {#optimize-skip-unused-shards-nesting}
Controls [`optimize_skip_unused_shards`](#optimize-skip-unused-shards) (hence still requires [`optimize_skip_unused_shards`](#optimize-skip-unused-shards)) depends on the nesting level of the distributed query (case when you have `Distributed` table that look into another `Distributed` table). Controls [`optimize_skip_unused_shards`](#optimize-skip-unused-shards) (hence still requires [`optimize_skip_unused_shards`](#optimize-skip-unused-shards)) depends on the nesting level of the distributed query (case when you have `Distributed` table that look into another `Distributed` table).

View File

@ -111,6 +111,7 @@ class IColumn;
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. If 2 - same as 1 but also apply ORDER BY and LIMIT stages", 0) \ M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. If 2 - same as 1 but also apply ORDER BY and LIMIT stages", 0) \
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \ M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \ M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \ M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
M(UInt64, optimize_skip_unused_shards_nesting, 0, "Same as optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \ M(UInt64, optimize_skip_unused_shards_nesting, 0, "Same as optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \
M(UInt64, force_optimize_skip_unused_shards_nesting, 0, "Same as force_optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \ M(UInt64, force_optimize_skip_unused_shards_nesting, 0, "Same as force_optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \

View File

@ -454,7 +454,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
if (settings.optimize_skip_unused_shards && if (settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key && settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key && has_sharding_key &&
sharding_key_is_deterministic) (settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic))
{ {
Block sharding_key_block = sharding_key_expr->getSampleBlock(); Block sharding_key_block = sharding_key_expr->getSampleBlock();
auto stage = getOptimizedQueryProcessingStage(query_ptr, settings.extremes, sharding_key_block); auto stage = getOptimizedQueryProcessingStage(query_ptr, settings.extremes, sharding_key_block);
@ -710,7 +710,9 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons
ClusterPtr cluster = getCluster(); ClusterPtr cluster = getCluster();
const Settings & settings = context.getSettingsRef(); const Settings & settings = context.getSettingsRef();
if (has_sharding_key && sharding_key_is_deterministic) bool sharding_key_is_usable = settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic;
if (has_sharding_key && sharding_key_is_usable)
{ {
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, metadata_snapshot, context); ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, metadata_snapshot, context);
if (optimized) if (optimized)
@ -723,7 +725,7 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons
std::stringstream exception_message; std::stringstream exception_message;
if (!has_sharding_key) if (!has_sharding_key)
exception_message << "No sharding key"; exception_message << "No sharding key";
else if (!sharding_key_is_deterministic) else if (!sharding_key_is_usable)
exception_message << "Sharding key is not deterministic"; exception_message << "Sharding key is not deterministic";
else else
exception_message << "Sharding key " << sharding_key_column_name << " is not used"; exception_message << "Sharding key " << sharding_key_column_name << " is not used";

View File

@ -0,0 +1,7 @@
drop table if exists dist_01528;
create table dist_01528 as system.one engine=Distributed('test_cluster_two_shards', system, one, rand()+dummy);
set optimize_skip_unused_shards=1;
set force_optimize_skip_unused_shards=1;
select * from dist_01528 where dummy = 2; -- { serverError 507; }
select * from dist_01528 where dummy = 2 settings allow_nondeterministic_optimize_skip_unused_shards=1;