mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Do only merging of sorted blocks on initiator with distributed_group_by_no_merge=2
When distributed_group_by_no_merge=2 is used (or when optimize_distributed_group_by_sharding_key takes place), remote servers will do full ORDER BY, so initiator can skip this step and do only merge of ordered blocks.
This commit is contained in:
parent
b854a7b7f8
commit
af660140c3
@ -1103,9 +1103,15 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
/** If there is an ORDER BY for distributed query processing,
|
||||
* but there is no aggregation, then on the remote servers ORDER BY was made
|
||||
* - therefore, we merge the sorted streams from remote servers.
|
||||
*
|
||||
* Also in case of remote servers was process the query up to WithMergeableStateAfterAggregation
|
||||
* (distributed_group_by_no_merge=2 or optimize_distributed_group_by_sharding_key=1 takes place),
|
||||
* then merge the sorted streams is enough, since remote servers already did full ORDER BY.
|
||||
*/
|
||||
|
||||
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
|
||||
if (from_aggregation_stage)
|
||||
executeMergeSorted(query_plan, "for ORDER BY");
|
||||
else if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
|
||||
executeMergeSorted(query_plan, "for ORDER BY");
|
||||
else /// Otherwise, just sort.
|
||||
executeOrder(query_plan, query_info.input_order_info);
|
||||
|
@ -0,0 +1,20 @@
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
@ -0,0 +1,20 @@
|
||||
drop table if exists data_01730;
|
||||
|
||||
-- does not use 127.1 due to prefer_localhost_replica
|
||||
|
||||
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 20 settings distributed_group_by_no_merge=0, max_memory_usage='100Mi'; -- { serverError 241 }
|
||||
-- no memory limit error, because with distributed_group_by_no_merge=2 remote servers will do ORDER BY and will cut to the LIMIT
|
||||
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 20 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi';
|
||||
|
||||
-- since the MergingSortedTransform will start processing only when all ports (remotes) will have some data,
|
||||
-- and the query with GROUP BY on remote servers will first do GROUP BY and then send the block,
|
||||
-- so the initiator will first receive all blocks from remotes and only after start merging,
|
||||
-- and will hit the memory limit.
|
||||
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi'; -- { serverError 241 }
|
||||
|
||||
-- with optimize_aggregation_in_order=1 remote servers will produce blocks more frequently,
|
||||
-- since they don't need to wait until the aggregation will be finished,
|
||||
-- and so the query will not hit the memory limit error.
|
||||
create table data_01730 engine=MergeTree() order by key as select number key from numbers(1e6);
|
||||
select * from remote('127.{2..11}', currentDatabase(), data_01730) group by key order by key limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi', optimize_aggregation_in_order=1 format Null;
|
||||
drop table data_01730;
|
Loading…
Reference in New Issue
Block a user