From 20b4eed9a1eed9db7bdc2ba6a3e9c655ada23b8c Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 27 Apr 2020 00:56:54 +0300 Subject: [PATCH 1/5] Disable GROUP BY sharding_key optimization for WITH ROLLUP/CUBE/TOTALS --- src/Storages/StorageDistributed.cpp | 14 ++++++++----- ..._GROUP_BY_injective_sharding_key.reference | 21 +++++++++++++++++++ ..._merge_GROUP_BY_injective_sharding_key.sql | 11 ++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 9c557ad5c8a..72c364905db 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -399,8 +399,17 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro const auto & select = query_ptr->as(); + if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube) + return false; + + // TODO: The following can be optimized too (but with some caveats, will be addressed later): + // - ORDER BY + // - LIMIT BY + // - LIMIT if (select.orderBy()) return false; + if (select.limitBy() || select.limitLength()) + return false; if (select.distinct) { @@ -416,11 +425,6 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro reason = "DISTINCT " + backQuote(serializeAST(*select.select(), true)); } - // This can use distributed_group_by_no_merge but in this case limit stage - // should be done later (which is not the case right now). - if (select.limitBy() || select.limitLength()) - return false; - const ASTPtr group_by = select.groupBy(); if (!group_by) { diff --git a/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.reference b/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.reference index b8cf0042ed3..c299e3be7a0 100644 --- a/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.reference +++ b/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.reference @@ -67,3 +67,24 @@ GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge 1 1 1 0 1 1 +extremes +1 0 +1 1 +1 0 +1 1 + +1 0 +1 1 +WITH TOTALS +2 0 +2 1 + +4 0 +WITH ROLLUP +2 0 +2 1 +4 0 +WITH CUBE +2 0 +2 1 +4 0 diff --git a/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql b/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql index 56345e23094..2a4f0e0601d 100644 --- a/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql +++ b/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql @@ -1,3 +1,5 @@ +-- TODO: correct testing with real unique shards + drop table if exists dist_01247; drop table if exists data_01247; @@ -60,3 +62,12 @@ select 'GROUP BY (Distributed-over-Distributed)'; select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number; select 'GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge'; select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number settings distributed_group_by_no_merge=1; + +select 'extremes'; +select count(), * from dist_01247 group by number settings extremes=1; +select 'WITH TOTALS'; +select count(), * from dist_01247 group by number with totals; +select 'WITH ROLLUP'; +select count(), * from dist_01247 group by number with rollup; +select 'WITH CUBE'; +select count(), * from dist_01247 group by number with cube; From 4e09d812eff4e7d25385073c9affe659e23a684d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 27 Apr 2020 00:59:56 +0300 Subject: [PATCH 2/5] Add a test for GROUP BY sharding_key optimization over Distributed-on-Distributed --- ...oup_by_sharding_key_optimization.reference | 17 ++++++++++ ...ist_group_by_sharding_key_optimization.sql | 34 +++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.reference create mode 100644 tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql diff --git a/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.reference b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.reference new file mode 100644 index 00000000000..9fc7f7bfcd7 --- /dev/null +++ b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.reference @@ -0,0 +1,17 @@ +Distributed(number)-over-Distributed(number) +1 0 +1 1 +1 0 +1 1 +1 0 +1 1 +1 0 +1 1 +Distributed(rand)-over-Distributed(number) +4 0 +4 1 +Distributed(rand)-over-Distributed(rand) +2 0 +2 1 +2 0 +2 1 diff --git a/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql new file mode 100644 index 00000000000..dff47118c9c --- /dev/null +++ b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql @@ -0,0 +1,34 @@ +-- TODO: correct testing with real unique shards + +drop table if exists dist_01247; +drop table if exists dist_layer_01247; +drop table if exists data_01247; + +create table data_01247 as system.numbers engine=Memory(); +-- since data is not inserted via distributed it will have duplicates +-- (and this is how we ensure that this optimization will work) +insert into data_01247 select * from system.numbers limit 2; + +set max_distributed_connections=1; +set optimize_skip_unused_shards=1; + +select 'Distributed(number)-over-Distributed(number)'; +create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number); +create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number); +select count(), * from dist_01247 group by number; +drop table if exists dist_01247; +drop table if exists dist_layer_01247; + +select 'Distributed(rand)-over-Distributed(number)'; +create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number); +create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, rand()); +select count(), * from dist_01247 group by number; +drop table if exists dist_01247; +drop table if exists dist_layer_01247; + +select 'Distributed(rand)-over-Distributed(rand)'; +create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, rand()); +create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number); +select count(), * from dist_01247 group by number; +drop table if exists dist_01247; +drop table if exists dist_layer_01247; From 038235684ddcc3abf2ba0ab1b39a3ec12e949cd2 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 29 Apr 2020 00:07:30 +0300 Subject: [PATCH 3/5] Add optimize_distributed_group_by_sharding_key and disable it by default I know at least one way to fool that optimization, by using as sharding key something like `if(col1>0, col1, col2)` (although this is not common sharding key I would say, but can be useful if this will work correctly), so let's disable it by default. --- src/Core/Settings.h | 1 + src/Storages/StorageDistributed.cpp | 3 +++ .../01247_dist_on_dist_group_by_sharding_key_optimization.sql | 2 ++ ...buted_group_by_no_merge_GROUP_BY_injective_sharding_key.sql | 2 ++ 4 files changed, 8 insertions(+) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7b5b130ee72..651b519dba7 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -116,6 +116,7 @@ struct Settings : public SettingsCollection M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \ M(SettingBool, parallel_distributed_insert_select, false, "If true, distributed insert select query in the same cluster will be processed on local tables on every shard", 0) \ M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \ + M(SettingBool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \ M(SettingUInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \ M(SettingBool, force_optimize_skip_unused_shards_no_nested, false, "Do not apply force_optimize_skip_unused_shards for nested Distributed tables.", 0) \ \ diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 72c364905db..221b0df467c 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -389,6 +389,9 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro if (settings.distributed_group_by_no_merge) return true; + if (!settings.optimize_distributed_group_by_sharding_key) + return false; + /// Distributed-over-Distributed (see getQueryProcessingStageImpl()) if (to_stage == QueryProcessingStage::WithMergeableState) return false; diff --git a/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql index dff47118c9c..dc2941e63fd 100644 --- a/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql +++ b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql @@ -1,5 +1,7 @@ -- TODO: correct testing with real unique shards +set optimize_distributed_group_by_sharding_key=1; + drop table if exists dist_01247; drop table if exists dist_layer_01247; drop table if exists data_01247; diff --git a/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql b/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql index 2a4f0e0601d..bca3e4830d2 100644 --- a/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql +++ b/tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql @@ -1,5 +1,7 @@ -- TODO: correct testing with real unique shards +set optimize_distributed_group_by_sharding_key=1; + drop table if exists dist_01247; drop table if exists data_01247; From faea8afa22b862f8ef0b6d7458bd135d4dae5415 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 30 Apr 2020 10:13:19 +0300 Subject: [PATCH 4/5] Revert behaviour with duplicates in 01213_optimize_skip_unused_shards_DISTINCT Since now optimization is done only under optimize_distributed_group_by_sharding_key. --- .../01213_optimize_skip_unused_shards_DISTINCT.reference | 2 -- .../0_stateless/01213_optimize_skip_unused_shards_DISTINCT.sql | 1 - 2 files changed, 3 deletions(-) diff --git a/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.reference b/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.reference index eac8957dd47..4ade9cd9c5d 100644 --- a/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.reference +++ b/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.reference @@ -6,5 +6,3 @@ optimize_skip_unused_shards optimize_skip_unused_shards lack of WHERE 0 1 -0 -1 diff --git a/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.sql b/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.sql index 67ba1cda870..5b45bea9046 100644 --- a/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.sql +++ b/tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.sql @@ -12,7 +12,6 @@ SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS distributed_group_by_no SELECT 'optimize_skip_unused_shards'; SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS optimize_skip_unused_shards=1; -- check that querying all shards is ok --- (there will be duplicates, since the INSERT was done via local table) SELECT 'optimize_skip_unused_shards lack of WHERE'; SELECT DISTINCT id FROM dist_01213 SETTINGS optimize_skip_unused_shards=1; From 170a341c1f7000ebe09e74c5803ba1e973f282df Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Mon, 11 May 2020 06:40:07 +0300 Subject: [PATCH 5/5] Trigger CI --- .../01247_dist_on_dist_group_by_sharding_key_optimization.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql index dc2941e63fd..7cf171c8c73 100644 --- a/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql +++ b/tests/queries/0_stateless/01247_dist_on_dist_group_by_sharding_key_optimization.sql @@ -32,5 +32,5 @@ select 'Distributed(rand)-over-Distributed(rand)'; create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, rand()); create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number); select count(), * from dist_01247 group by number; -drop table if exists dist_01247; -drop table if exists dist_layer_01247; +drop table dist_01247; +drop table dist_layer_01247;