Optimize queries with LIMIT/LIMIT BY/ORDER BY for distributed with GROUP BY sharding_key

Previous set of QueryProcessingStage does not allow to do this.
But after WithMergeableStateAfterAggregation had been introduced the
following queries can be optimized too under
optimize_distributed_group_by_sharding_key:
- GROUP BY sharding_key LIMIT
- GROUP BY sharding_key LIMIT BY
- GROUP BY sharding_key ORDER BY

And right now it is still not supports:
- WITH TOTALS (looks like it can be supported)
- WITH ROLLUP (looks like it can be supported)
- WITH CUBE
- SETTINGS extremes=1 (looks like it can be supported)
But will be implemented separatelly.

vX: fixes
v2: fix WITH *
v3: fix extremes
v4: fix LIMIT OFFSET (and make a little bit cleaner)
v5: fix HAVING
v6: fix ORDER BY
v7: rebase against 20.7
v8: move out WithMergeableStateAfterAggregation
v9: add optimize_distributed_group_by_sharding_key into test names
This commit is contained in:
Azat Khuzhin 2020-04-23 00:44:22 +03:00
parent 4043be3121
commit 10b4f3b41f
8 changed files with 189 additions and 133 deletions

View File

@ -795,6 +795,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
options.to_stage == QueryProcessingStage::WithMergeableState)
intermediate_stage = true;
/// Support optimize_distributed_group_by_sharding_key
/// Is running on the initiating server during distributed processing?
if (from_stage == QueryProcessingStage::WithMergeableStateAfterAggregation)
from_aggregation_stage = true;

View File

@ -56,6 +56,7 @@
#include <memory>
#include <filesystem>
#include <optional>
namespace
@ -242,22 +243,81 @@ void replaceConstantExpressions(
visitor.visit(node);
}
QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & context, QueryProcessingStage::Enum to_stage, const ClusterPtr & cluster)
/// Returns one of the following:
/// - QueryProcessingStage::Complete
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - none (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const ASTPtr & query_ptr, bool extremes, const Block & sharding_key_block)
{
const Settings & settings = context.getSettingsRef();
const auto & select = query_ptr->as<ASTSelectQuery &>();
auto sharding_block_has = [&](const auto & exprs, size_t limit = SIZE_MAX) -> bool
{
size_t i = 0;
for (auto & expr : exprs)
{
if (++i > limit)
break;
auto id = expr->template as<ASTIdentifier>();
if (!id)
return false;
/// TODO: if GROUP BY contains multiIf()/if() it should contain only columns from sharding_key
if (!sharding_key_block.has(id->name))
return false;
}
return true;
};
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return {};
// TODO: extremes support can be implemented
if (extremes)
return {};
// DISTINCT
if (select.distinct)
{
if (!sharding_block_has(select.select()->children))
return {};
}
// GROUP BY
const ASTPtr group_by = select.groupBy();
if (!group_by)
{
if (!select.distinct)
return {};
}
else
{
if (!sharding_block_has(group_by->children, 1))
return {};
}
// ORDER BY
const ASTPtr order_by = select.orderBy();
if (order_by)
return QueryProcessingStage::WithMergeableStateAfterAggregation;
// LIMIT BY
// LIMIT
if (select.limitBy() || select.limitLength())
return QueryProcessingStage::WithMergeableStateAfterAggregation;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
}
size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster)
{
size_t num_local_shards = cluster->getLocalShardCount();
size_t num_remote_shards = cluster->getRemoteShardCount();
size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
if (settings.distributed_group_by_no_merge)
return QueryProcessingStage::Complete;
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::WithMergeableState;
return result_size == 1 ? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
return (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
}
}
@ -374,88 +434,19 @@ StoragePtr StorageDistributed::createWithOwnCluster(
return res;
}
bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
std::string reason;
if (settings.distributed_group_by_no_merge)
return true;
if (!settings.optimize_distributed_group_by_sharding_key)
return false;
/// Distributed-over-Distributed (see getQueryProcessingStageImpl())
if (to_stage == QueryProcessingStage::WithMergeableState)
return false;
if (!settings.optimize_skip_unused_shards)
return false;
if (!has_sharding_key)
return false;
const auto & select = query_ptr->as<ASTSelectQuery &>();
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return false;
// TODO: The following can be optimized too (but with some caveats, will be addressed later):
// - ORDER BY
// - LIMIT BY
// - LIMIT
if (select.orderBy())
return false;
if (select.limitBy() || select.limitLength())
return false;
if (select.distinct)
{
for (auto & expr : select.select()->children)
{
const auto * id = expr->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;
}
reason = "DISTINCT " + backQuote(serializeAST(*select.select(), true));
}
const ASTPtr group_by = select.groupBy();
if (!group_by)
{
if (!select.distinct)
return false;
}
else
{
// injective functions are optimized out in optimizeGroupBy()
// hence all we need to check is that column in GROUP BY matches sharding expression
auto & group_exprs = group_by->children;
if (group_exprs.empty())
throw Exception("No ASTExpressionList in GROUP BY", ErrorCodes::LOGICAL_ERROR);
const auto * id = group_exprs[0]->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;
reason = "GROUP BY " + backQuote(serializeAST(*group_by, true));
}
LOG_DEBUG(log, "Force distributed_group_by_no_merge for {} (injective)", reason);
return true;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
auto metadata_snapshot = getInMemoryMetadataPtr();
if (canForceGroupByNoMerge(context, to_stage, query_ptr))
if (settings.distributed_group_by_no_merge)
return QueryProcessingStage::Complete;
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::WithMergeableState;
ClusterPtr cluster = getCluster();
if (settings.optimize_skip_unused_shards)
{
@ -464,7 +455,26 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
cluster = optimized_cluster;
}
return getQueryProcessingStageImpl(context, to_stage, cluster);
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if (getClusterQueriedNodes(settings, cluster) == 1)
return QueryProcessingStage::Complete;
if (settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key &&
sharding_key_is_deterministic)
{
Block sharding_key_block = sharding_key_expr->getSampleBlock();
auto stage = getOptimizedQueryProcessingStage(query_ptr, settings.extremes, sharding_key_block);
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
return *stage;
}
}
return QueryProcessingStage::WithMergeableState;
}
Pipe StorageDistributed::read(

View File

@ -66,8 +66,6 @@ public:
bool isRemote() const override { return true; }
/// Return true if distributed_group_by_no_merge may be applied.
bool canForceGroupByNoMerge(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const override;
Pipe read(

View File

@ -26,8 +26,10 @@ GROUP BY number, 1
GROUP BY 1
4 0
GROUP BY number ORDER BY number DESC
2 1
2 0
1 1
1 1
1 0
1 0
GROUP BY toString(number)
1 0
1 1
@ -49,12 +51,17 @@ DISTINCT
0
1
HAVING
HAVING LIMIT
1 0
LIMIT
2 0
2 1
LIMIT BY
2 0
2 1
1 0
LIMIT OFFSET
1 1
WHERE LIMIT OFFSET
1 1
LIMIT BY 1
1 0
1 1
GROUP BY (Distributed-over-Distributed)
4 0
4 1
@ -67,24 +74,48 @@ GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge
1 1
1 0
1 1
extremes
1 0
1 1
1 0
1 1
GROUP BY (extemes)
2 0
2 1
1 0
1 1
WITH TOTALS
2 0
2 1
LIMIT (extemes)
2 0
2 0
2 0
GROUP BY WITH TOTALS
2 0
2 1
4 0
WITH ROLLUP
GROUP BY WITH ROLLUP
2 0
2 1
4 0
WITH CUBE
GROUP BY WITH CUBE
2 0
2 1
4 0
GROUP BY WITH TOTALS ORDER BY
2 0
2 1
4 0
GROUP BY WITH TOTALS ORDER BY LIMIT
2 0
4 0
GROUP BY WITH TOTALS LIMIT
2 0
4 0
GROUP BY sharding_key, ...
0 0
1 0
0 0
1 0
GROUP BY ..., sharding_key
0 0
1 0

View File

@ -54,26 +54,55 @@ select 'DISTINCT';
select DISTINCT number from dist_01247;
select 'HAVING';
select count() cnt, * from dist_01247 group by number having cnt < 0;
select count() cnt, * from dist_01247 group by number having cnt == 2;
select 'HAVING LIMIT';
select count() cnt, * from dist_01247 group by number having cnt == 1 limit 1;
select 'LIMIT';
select count(), * from dist_01247 group by number limit 1;
select 'LIMIT OFFSET';
select count(), * from dist_01247 group by number limit 1 offset 1;
-- this will emulate different data on for different shards
select 'WHERE LIMIT OFFSET';
select count(), * from dist_01247 where number = _shard_num-1 group by number limit 1 offset 1;
select 'LIMIT BY';
select count(), * from dist_01247 group by number limit 0 by number;
select count(), * from dist_01247 group by number limit 1 by number;
select 'LIMIT BY 1';
select count(), * from dist_01247 group by number order by number limit 1 by number;
select 'GROUP BY (Distributed-over-Distributed)';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number;
select 'GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number settings distributed_group_by_no_merge=1;
select 'extremes';
select 'GROUP BY (extemes)';
select count(), * from dist_01247 group by number settings extremes=1;
select 'WITH TOTALS';
select 'LIMIT (extemes)';
select count(), * from dist_01247 group by number limit 1 settings extremes=1;
select 'GROUP BY WITH TOTALS';
select count(), * from dist_01247 group by number with totals;
select 'WITH ROLLUP';
select 'GROUP BY WITH ROLLUP';
select count(), * from dist_01247 group by number with rollup;
select 'WITH CUBE';
select 'GROUP BY WITH CUBE';
select count(), * from dist_01247 group by number with cube;
select 'GROUP BY WITH TOTALS ORDER BY';
select count(), * from dist_01247 group by number with totals order by number;
select 'GROUP BY WITH TOTALS ORDER BY LIMIT';
select count(), * from dist_01247 group by number with totals order by number limit 1;
select 'GROUP BY WITH TOTALS LIMIT';
select count(), * from dist_01247 group by number with totals limit 1;
-- GROUP BY (compound)
drop table if exists dist_01247;
drop table if exists data_01247;
create table data_01247 engine=Memory() as select number key, 0 value from numbers(2);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, key);
select 'GROUP BY sharding_key, ...';
select * from dist_01247 group by key, value;
select 'GROUP BY ..., sharding_key';
select * from dist_01247 group by value, key;

View File

@ -1,17 +1,6 @@
Distributed(number)-over-Distributed(number)
1 0
1 1
1 0
1 1
1 0
1 1
1 0
1 1
Distributed(rand)-over-Distributed(number)
4 0
4 1
Distributed(rand)-over-Distributed(rand)
2 0
2 1
2 0
2 1

View File

@ -21,20 +21,18 @@ set optimize_skip_unused_shards=1;
select 'Distributed(number)-over-Distributed(number)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
select count(), * from dist_01247 group by number;
select count(), * from dist_01247 group by number order by number limit 1;
drop table if exists dist_01247;
drop table if exists dist_layer_01247;
select 'Distributed(rand)-over-Distributed(number)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, rand());
select count(), * from dist_01247 group by number;
select count(), * from dist_01247 group by number order by number limit 1;
drop table if exists dist_01247;
drop table if exists dist_layer_01247;
select 'Distributed(rand)-over-Distributed(rand)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, rand());
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
select count(), * from dist_01247 group by number;
drop table dist_01247;
drop table dist_layer_01247;
select count(), * from dist_01247 group by number order by number limit 1;

View File

@ -101,8 +101,8 @@
01236_distributed_over_live_view_over_distributed
01236_graphite_mt
01237_live_view_over_distributed_with_subquery_select_table_alias
01247_dist_on_dist_group_by_sharding_key_optimization
01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key
01247_optimize_distributed_group_by_sharding_key
01247_optimize_distributed_group_by_sharding_key_dist_on_dist
01251_dict_is_in_infinite_loop
01253_subquery_in_aggregate_function_JustStranger
01254_dict_create_without_db