mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Force WithMergeableStateAfterAggregation via distributed_group_by_no_merge (convert to UInt64)
Possible values: - 1 - Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. - 2 - same as 1 but also apply ORDER BY and LIMIT stages
This commit is contained in:
parent
10b4f3b41f
commit
fffeeeba06
@ -1290,6 +1290,47 @@ Possible values:
|
||||
|
||||
Default value: 0.
|
||||
|
||||
## distributed\_group\_by\_no\_merge {#distributed-group-by-no-merge}
|
||||
|
||||
Do not merge aggregation states from different servers for distributed query processing, you can use this in case it is for certain that there are different keys on different shards
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — Disabled (final query processing is done on the initiator node).
|
||||
- 1 - Do not merge aggregation states from different servers for distributed query processing (query completelly processed on the shard, initiator only proxy the data).
|
||||
- 2 - Same as 1 but apply `ORDER BY` and `LIMIT` on the initiator (can be used for queries with `ORDER BY` and/or `LIMIT`).
|
||||
|
||||
**Example**
|
||||
|
||||
```sql
|
||||
SELECT *
|
||||
FROM remote('127.0.0.{2,3}', system.one)
|
||||
GROUP BY dummy
|
||||
LIMIT 1
|
||||
SETTINGS distributed_group_by_no_merge = 1
|
||||
FORMAT PrettyCompactMonoBlock
|
||||
|
||||
┌─dummy─┐
|
||||
│ 0 │
|
||||
│ 0 │
|
||||
└───────┘
|
||||
```
|
||||
|
||||
```sql
|
||||
SELECT *
|
||||
FROM remote('127.0.0.{2,3}', system.one)
|
||||
GROUP BY dummy
|
||||
LIMIT 1
|
||||
SETTINGS distributed_group_by_no_merge = 2
|
||||
FORMAT PrettyCompactMonoBlock
|
||||
|
||||
┌─dummy─┐
|
||||
│ 0 │
|
||||
└───────┘
|
||||
```
|
||||
|
||||
Default value: 0
|
||||
|
||||
## optimize\_skip\_unused\_shards {#optimize-skip-unused-shards}
|
||||
|
||||
Enables or disables skipping of unused shards for [SELECT](../../sql-reference/statements/select/index.md) queries that have sharding key condition in `WHERE/PREWHERE` (assuming that the data is distributed by sharding key, otherwise does nothing).
|
||||
|
@ -107,8 +107,8 @@ class IColumn;
|
||||
\
|
||||
M(Bool, skip_unavailable_shards, false, "If 1, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
|
||||
\
|
||||
M(Bool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
|
||||
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \
|
||||
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. If 2 - same as 1 but also apply ORDER BY and LIMIT stages", 0) \
|
||||
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \
|
||||
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
|
||||
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
|
||||
|
@ -63,6 +63,8 @@ namespace
|
||||
{
|
||||
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY = 1;
|
||||
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2;
|
||||
|
||||
const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -440,7 +442,12 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
if (settings.distributed_group_by_no_merge)
|
||||
return QueryProcessingStage::Complete;
|
||||
{
|
||||
if (settings.distributed_group_by_no_merge == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION)
|
||||
return QueryProcessingStage::WithMergeableStateAfterAggregation;
|
||||
else
|
||||
return QueryProcessingStage::Complete;
|
||||
}
|
||||
|
||||
/// Nested distributed query cannot return Complete stage,
|
||||
/// since the parent query need to aggregate the results after.
|
||||
|
Loading…
Reference in New Issue
Block a user