Merge pull request #10516 from azat/dist-GROUP_BY-sharding_key-fixes

Disable GROUP BY sharding_key optimization by default (and fix for WITH ROLLUP/CUBE/TOTALS)
This commit is contained in:
alexey-milovidov 2020-05-11 12:03:27 +03:00 committed by GitHub
commit 33d491edf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 100 additions and 8 deletions

View File

@ -116,6 +116,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
M(SettingBool, parallel_distributed_insert_select, false, "If true, distributed insert select query in the same cluster will be processed on local tables on every shard", 0) \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(SettingBool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \
M(SettingUInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
M(SettingBool, force_optimize_skip_unused_shards_no_nested, false, "Do not apply force_optimize_skip_unused_shards for nested Distributed tables.", 0) \
\

View File

@ -375,6 +375,9 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
if (settings.distributed_group_by_no_merge)
return true;
if (!settings.optimize_distributed_group_by_sharding_key)
return false;
/// Distributed-over-Distributed (see getQueryProcessingStageImpl())
if (to_stage == QueryProcessingStage::WithMergeableState)
return false;
@ -385,8 +388,17 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
const auto & select = query_ptr->as<ASTSelectQuery &>();
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return false;
// TODO: The following can be optimized too (but with some caveats, will be addressed later):
// - ORDER BY
// - LIMIT BY
// - LIMIT
if (select.orderBy())
return false;
if (select.limitBy() || select.limitLength())
return false;
if (select.distinct)
{
@ -402,11 +414,6 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
reason = "DISTINCT " + backQuote(serializeAST(*select.select(), true));
}
// This can use distributed_group_by_no_merge but in this case limit stage
// should be done later (which is not the case right now).
if (select.limitBy() || select.limitLength())
return false;
const ASTPtr group_by = select.groupBy();
if (!group_by)
{

View File

@ -6,5 +6,3 @@ optimize_skip_unused_shards
optimize_skip_unused_shards lack of WHERE
0
1
0
1

View File

@ -12,7 +12,6 @@ SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS distributed_group_by_no
SELECT 'optimize_skip_unused_shards';
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS optimize_skip_unused_shards=1;
-- check that querying all shards is ok
-- (there will be duplicates, since the INSERT was done via local table)
SELECT 'optimize_skip_unused_shards lack of WHERE';
SELECT DISTINCT id FROM dist_01213 SETTINGS optimize_skip_unused_shards=1;

View File

@ -0,0 +1,17 @@
Distributed(number)-over-Distributed(number)
1 0
1 1
1 0
1 1
1 0
1 1
1 0
1 1
Distributed(rand)-over-Distributed(number)
4 0
4 1
Distributed(rand)-over-Distributed(rand)
2 0
2 1
2 0
2 1

View File

@ -0,0 +1,36 @@
-- TODO: correct testing with real unique shards
set optimize_distributed_group_by_sharding_key=1;
drop table if exists dist_01247;
drop table if exists dist_layer_01247;
drop table if exists data_01247;
create table data_01247 as system.numbers engine=Memory();
-- since data is not inserted via distributed it will have duplicates
-- (and this is how we ensure that this optimization will work)
insert into data_01247 select * from system.numbers limit 2;
set max_distributed_connections=1;
set optimize_skip_unused_shards=1;
select 'Distributed(number)-over-Distributed(number)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
select count(), * from dist_01247 group by number;
drop table if exists dist_01247;
drop table if exists dist_layer_01247;
select 'Distributed(rand)-over-Distributed(number)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, rand());
select count(), * from dist_01247 group by number;
drop table if exists dist_01247;
drop table if exists dist_layer_01247;
select 'Distributed(rand)-over-Distributed(rand)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, rand());
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
select count(), * from dist_01247 group by number;
drop table dist_01247;
drop table dist_layer_01247;

View File

@ -67,3 +67,24 @@ GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge
1 1
1 0
1 1
extremes
1 0
1 1
1 0
1 1
1 0
1 1
WITH TOTALS
2 0
2 1
4 0
WITH ROLLUP
2 0
2 1
4 0
WITH CUBE
2 0
2 1
4 0

View File

@ -1,3 +1,7 @@
-- TODO: correct testing with real unique shards
set optimize_distributed_group_by_sharding_key=1;
drop table if exists dist_01247;
drop table if exists data_01247;
@ -60,3 +64,12 @@ select 'GROUP BY (Distributed-over-Distributed)';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number;
select 'GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number settings distributed_group_by_no_merge=1;
select 'extremes';
select count(), * from dist_01247 group by number settings extremes=1;
select 'WITH TOTALS';
select count(), * from dist_01247 group by number with totals;
select 'WITH ROLLUP';
select count(), * from dist_01247 group by number with rollup;
select 'WITH CUBE';
select count(), * from dist_01247 group by number with cube;