diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index f93a8b680b5..8b1f64d0613 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1398,6 +1398,17 @@ Possible values: Default value: 0 +## allow_nondeterministic_optimize_skip_unused_shards {#allow-nondeterministic-optimize-skip-unused-shards} + +Allow nondeterministic (like `rand` or `dictGet`, since later has some caveats with updates) functions in sharding key. + +Possible values: + +- 0 — Disallowed. +- 1 — Allowed. + +Default value: 0 + ## optimize_skip_unused_shards_nesting {#optimize-skip-unused-shards-nesting} Controls [`optimize_skip_unused_shards`](#optimize-skip-unused-shards) (hence still requires [`optimize_skip_unused_shards`](#optimize-skip-unused-shards)) depends on the nesting level of the distributed query (case when you have `Distributed` table that look into another `Distributed` table). diff --git a/src/Core/Settings.h b/src/Core/Settings.h index d73098ca6e0..68ac9fc9a5f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -111,6 +111,7 @@ class IColumn; M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. If 2 - same as 1 but also apply ORDER BY and LIMIT stages", 0) \ M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \ M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \ + M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \ M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \ M(UInt64, optimize_skip_unused_shards_nesting, 0, "Same as optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \ M(UInt64, force_optimize_skip_unused_shards_nesting, 0, "Same as force_optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \ diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index afac3f97d68..b858239d637 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -454,7 +454,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con if (settings.optimize_skip_unused_shards && settings.optimize_distributed_group_by_sharding_key && has_sharding_key && - sharding_key_is_deterministic) + (settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic)) { Block sharding_key_block = sharding_key_expr->getSampleBlock(); auto stage = getOptimizedQueryProcessingStage(query_ptr, settings.extremes, sharding_key_block); @@ -710,7 +710,9 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons ClusterPtr cluster = getCluster(); const Settings & settings = context.getSettingsRef(); - if (has_sharding_key && sharding_key_is_deterministic) + bool sharding_key_is_usable = settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic; + + if (has_sharding_key && sharding_key_is_usable) { ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, metadata_snapshot, context); if (optimized) @@ -723,7 +725,7 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons std::stringstream exception_message; if (!has_sharding_key) exception_message << "No sharding key"; - else if (!sharding_key_is_deterministic) + else if (!sharding_key_is_usable) exception_message << "Sharding key is not deterministic"; else exception_message << "Sharding key " << sharding_key_column_name << " is not used"; diff --git a/tests/queries/0_stateless/01528_allow_nondeterministic_optimize_skip_unused_shards.reference b/tests/queries/0_stateless/01528_allow_nondeterministic_optimize_skip_unused_shards.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01528_allow_nondeterministic_optimize_skip_unused_shards.sql b/tests/queries/0_stateless/01528_allow_nondeterministic_optimize_skip_unused_shards.sql new file mode 100644 index 00000000000..b0bfb2aae3f --- /dev/null +++ b/tests/queries/0_stateless/01528_allow_nondeterministic_optimize_skip_unused_shards.sql @@ -0,0 +1,7 @@ +drop table if exists dist_01528; +create table dist_01528 as system.one engine=Distributed('test_cluster_two_shards', system, one, rand()+dummy); + +set optimize_skip_unused_shards=1; +set force_optimize_skip_unused_shards=1; +select * from dist_01528 where dummy = 2; -- { serverError 507; } +select * from dist_01528 where dummy = 2 settings allow_nondeterministic_optimize_skip_unused_shards=1;