Merge pull request #10373 from azat/dist-SELECT-optimization

Optimize queries with LIMIT/LIMIT BY/ORDER BY for distributed with GROUP BY sharding_key
This commit is contained in:
alexey-milovidov 2020-09-04 01:38:56 +03:00 committed by GitHub
commit 4f9df21d3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 426 additions and 152 deletions

View File

@ -1290,6 +1290,47 @@ Possible values:
Default value: 0.
## distributed\_group\_by\_no\_merge {#distributed-group-by-no-merge}
Do not merge aggregation states from different servers for distributed query processing, you can use this in case it is for certain that there are different keys on different shards
Possible values:
- 0 — Disabled (final query processing is done on the initiator node).
- 1 - Do not merge aggregation states from different servers for distributed query processing (query completelly processed on the shard, initiator only proxy the data).
- 2 - Same as 1 but apply `ORDER BY` and `LIMIT` on the initiator (can be used for queries with `ORDER BY` and/or `LIMIT`).
**Example**
```sql
SELECT *
FROM remote('127.0.0.{2,3}', system.one)
GROUP BY dummy
LIMIT 1
SETTINGS distributed_group_by_no_merge = 1
FORMAT PrettyCompactMonoBlock
┌─dummy─┐
│ 0 │
│ 0 │
└───────┘
```
```sql
SELECT *
FROM remote('127.0.0.{2,3}', system.one)
GROUP BY dummy
LIMIT 1
SETTINGS distributed_group_by_no_merge = 2
FORMAT PrettyCompactMonoBlock
┌─dummy─┐
│ 0 │
└───────┘
```
Default value: 0
## optimize\_skip\_unused\_shards {#optimize-skip-unused-shards}
Enables or disables skipping of unused shards for [SELECT](../../sql-reference/statements/select/index.md) queries that have sharding key condition in `WHERE/PREWHERE` (assuming that the data is distributed by sharding key, otherwise does nothing).
@ -1337,6 +1378,40 @@ Possible values:
Default value: 0
## optimize\_distributed\_group\_by\_sharding\_key {#optimize-distributed-group-by-sharding-key}
Optimize `GROUP BY sharding_key` queries, by avoiding costly aggregation on the initiator server (which will reduce memory usage for the query on the initiator server).
The following types of queries are supported (and all combinations of them):
- `SELECT DISTINCT [..., ]sharding_key[, ...] FROM dist`
- `SELECT ... FROM dist GROUP BY sharding_key[, ...]`
- `SELECT ... FROM dist GROUP BY sharding_key[, ...] ORDER BY x`
- `SELECT ... FROM dist GROUP BY sharding_key[, ...] LIMIT 1`
- `SELECT ... FROM dist GROUP BY sharding_key[, ...] LIMIT 1 BY x`
The following types of queries are not supported (support for some of them may be added later):
- `SELECT ... GROUP BY sharding_key[, ...] WITH TOTALS`
- `SELECT ... GROUP BY sharding_key[, ...] WITH ROLLUP`
- `SELECT ... GROUP BY sharding_key[, ...] WITH CUBE`
- `SELECT ... GROUP BY sharding_key[, ...] SETTINGS extremes=1`
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 0
See also:
- [distributed\_group\_by\_no\_merge](#distributed-group-by-no-merge)
- [optimize\_skip\_unused\_shards](#optimize-skip-unused-shards)
!!! note "Note"
Right now it requires `optimize_skip_unused_shards` (the reason behind this is that one day it may be enabled by default, and it will work correctly only if data was inserted via Distributed table, i.e. data is distributed according to sharding_key).
## optimize\_throw\_if\_noop {#setting-optimize_throw_if_noop}
Enables or disables throwing an exception if an [OPTIMIZE](../../sql-reference/statements/misc.md#misc_operations-optimize) query didnt perform a merge.

View File

@ -104,6 +104,8 @@ public:
query_processing_stage = QueryProcessingStage::FetchColumns;
else if (stage == "with_mergeable_state")
query_processing_stage = QueryProcessingStage::WithMergeableState;
else if (stage == "with_mergeable_state_after_aggregation")
query_processing_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
else
throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS);
@ -564,8 +566,8 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
desc.add_options()
("help", "produce help message")
("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("randomize,r", value<bool>()->default_value(false), "randomize order of execution")

View File

@ -10,17 +10,36 @@ namespace DB
namespace QueryProcessingStage
{
/// Numbers matter - the later stage has a larger number.
///
/// It is part of Protocol ABI, add values only to the end.
/// Also keep in mind that the code may depends on the order of fields, so be double aware when you will add new values.
enum Enum
{
FetchColumns = 0, /// Only read/have been read the columns specified in the query.
WithMergeableState = 1, /// Until the stage where the results of processing on different servers can be combined.
Complete = 2, /// Completely.
/// Only read/have been read the columns specified in the query.
FetchColumns = 0,
/// Until the stage where the results of processing on different servers can be combined.
WithMergeableState = 1,
/// Completely.
Complete = 2,
/// Until the stage where the aggregate functions were calculated and finalized.
///
/// It is used for auto distributed_group_by_no_merge optimization for distributed engine.
/// (See comments in StorageDistributed).
WithMergeableStateAfterAggregation = 3,
MAX = 4,
};
inline const char * toString(UInt64 stage)
{
static const char * data[] = { "FetchColumns", "WithMergeableState", "Complete" };
return stage < 3
static const char * data[] =
{
"FetchColumns",
"WithMergeableState",
"Complete",
"WithMergeableStateAfterAggregation",
};
return stage < MAX
? data[stage]
: "Unknown stage";
}

View File

@ -107,8 +107,8 @@ class IColumn;
\
M(Bool, skip_unavailable_shards, false, "If 1, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
\
M(Bool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. If 2 - same as 1 but also apply ORDER BY and LIMIT stages", 0) \
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \

View File

@ -553,6 +553,11 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
return res;
}
if (options.to_stage == QueryProcessingStage::Enum::WithMergeableStateAfterAggregation)
{
return analysis_result.before_order_and_select->getSampleBlock();
}
return analysis_result.final_projection->getSampleBlock();
}
@ -740,6 +745,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
auto & expressions = analysis_result;
const auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets();
bool intermediate_stage = false;
bool to_aggregation_stage = false;
bool from_aggregation_stage = false;
if (options.only_analyze)
{
@ -788,6 +795,14 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
options.to_stage == QueryProcessingStage::WithMergeableState)
intermediate_stage = true;
/// Support optimize_distributed_group_by_sharding_key
/// Is running on the initiating server during distributed processing?
if (from_stage == QueryProcessingStage::WithMergeableStateAfterAggregation)
from_aggregation_stage = true;
/// Is running on remote servers during distributed processing?
if (options.to_stage == QueryProcessingStage::WithMergeableStateAfterAggregation)
to_aggregation_stage = true;
if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
@ -848,6 +863,12 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.need_aggregate)
executeMergeAggregated(query_plan, aggregate_overflow_row, aggregate_final);
}
if (from_aggregation_stage)
{
if (intermediate_stage || expressions.first_stage || expressions.second_stage)
throw Exception("Query with after aggregation stage cannot have any other stages", ErrorCodes::LOGICAL_ERROR);
}
if (expressions.first_stage)
{
@ -939,9 +960,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets);
}
if (expressions.second_stage)
if (expressions.second_stage || from_aggregation_stage)
{
if (expressions.need_aggregate)
if (from_aggregation_stage)
{
/// No need to aggregate anything, since this was done on remote shards.
}
else if (expressions.need_aggregate)
{
/// If you need to combine aggregated results from multiple servers
if (!expressions.first_stage)
@ -994,7 +1019,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
* limiting the number of rows in each up to `offset + limit`.
*/
bool has_prelimit = false;
if (query.limitLength() && !query.limit_with_ties && !hasWithTotalsInAnySubqueryInFromClause(query) &&
if (!to_aggregation_stage &&
query.limitLength() && !query.limit_with_ties && !hasWithTotalsInAnySubqueryInFromClause(query) &&
!query.arrayJoinExpressionList() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes)
{
executePreLimit(query_plan, false);
@ -1023,18 +1049,23 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
has_prelimit = true;
}
/** We must do projection after DISTINCT because projection may remove some columns.
*/
executeProjection(query_plan, expressions.final_projection);
/// Projection not be done on the shards, since then initiator will not find column in blocks.
/// (significant only for WithMergeableStateAfterAggregation).
if (!to_aggregation_stage)
{
/// We must do projection after DISTINCT because projection may remove some columns.
executeProjection(query_plan, expressions.final_projection);
}
/** Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
*/
/// Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
executeExtremes(query_plan);
if (!has_prelimit) /// Limit is no longer needed if there is prelimit.
/// Limit is no longer needed if there is prelimit.
if (!to_aggregation_stage && !has_prelimit)
executeLimit(query_plan);
executeOffset(query_plan);
if (!to_aggregation_stage)
executeOffset(query_plan);
}
}

View File

@ -56,12 +56,15 @@
#include <memory>
#include <filesystem>
#include <optional>
namespace
{
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY = 1;
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2;
const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2;
}
namespace DB
@ -242,22 +245,82 @@ void replaceConstantExpressions(
visitor.visit(node);
}
QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & context, QueryProcessingStage::Enum to_stage, const ClusterPtr & cluster)
/// Returns one of the following:
/// - QueryProcessingStage::Complete
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - none (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const ASTPtr & query_ptr, bool extremes, const Block & sharding_key_block)
{
const Settings & settings = context.getSettingsRef();
const auto & select = query_ptr->as<ASTSelectQuery &>();
auto sharding_block_has = [&](const auto & exprs, size_t limit = SIZE_MAX) -> bool
{
size_t i = 0;
for (auto & expr : exprs)
{
++i;
if (i > limit)
break;
auto id = expr->template as<ASTIdentifier>();
if (!id)
return false;
/// TODO: if GROUP BY contains multiIf()/if() it should contain only columns from sharding_key
if (!sharding_key_block.has(id->name))
return false;
}
return true;
};
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return {};
// TODO: extremes support can be implemented
if (extremes)
return {};
// DISTINCT
if (select.distinct)
{
if (!sharding_block_has(select.select()->children))
return {};
}
// GROUP BY
const ASTPtr group_by = select.groupBy();
if (!group_by)
{
if (!select.distinct)
return {};
}
else
{
if (!sharding_block_has(group_by->children, 1))
return {};
}
// ORDER BY
const ASTPtr order_by = select.orderBy();
if (order_by)
return QueryProcessingStage::WithMergeableStateAfterAggregation;
// LIMIT BY
// LIMIT
if (select.limitBy() || select.limitLength())
return QueryProcessingStage::WithMergeableStateAfterAggregation;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
}
size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster)
{
size_t num_local_shards = cluster->getLocalShardCount();
size_t num_remote_shards = cluster->getRemoteShardCount();
size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
if (settings.distributed_group_by_no_merge)
return QueryProcessingStage::Complete;
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::WithMergeableState;
return result_size == 1 ? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
return (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
}
}
@ -374,87 +437,23 @@ StoragePtr StorageDistributed::createWithOwnCluster(
return res;
}
bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
std::string reason;
if (settings.distributed_group_by_no_merge)
return true;
if (!settings.optimize_distributed_group_by_sharding_key)
return false;
/// Distributed-over-Distributed (see getQueryProcessingStageImpl())
if (to_stage == QueryProcessingStage::WithMergeableState)
return false;
if (!settings.optimize_skip_unused_shards)
return false;
if (!has_sharding_key)
return false;
const auto & select = query_ptr->as<ASTSelectQuery &>();
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return false;
// TODO: The following can be optimized too (but with some caveats, will be addressed later):
// - ORDER BY
// - LIMIT BY
// - LIMIT
if (select.orderBy())
return false;
if (select.limitBy() || select.limitLength())
return false;
if (select.distinct)
{
for (auto & expr : select.select()->children)
{
const auto * id = expr->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;
}
reason = "DISTINCT " + backQuote(serializeAST(*select.select(), true));
}
const ASTPtr group_by = select.groupBy();
if (!group_by)
{
if (!select.distinct)
return false;
}
else
{
// injective functions are optimized out in optimizeGroupBy()
// hence all we need to check is that column in GROUP BY matches sharding expression
auto & group_exprs = group_by->children;
if (group_exprs.empty())
throw Exception("No ASTExpressionList in GROUP BY", ErrorCodes::LOGICAL_ERROR);
const auto * id = group_exprs[0]->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;
reason = "GROUP BY " + backQuote(serializeAST(*group_by, true));
}
LOG_DEBUG(log, "Force distributed_group_by_no_merge for {} (injective)", reason);
return true;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
auto metadata_snapshot = getInMemoryMetadataPtr();
if (canForceGroupByNoMerge(context, to_stage, query_ptr))
return QueryProcessingStage::Complete;
if (settings.distributed_group_by_no_merge)
{
if (settings.distributed_group_by_no_merge == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION)
return QueryProcessingStage::WithMergeableStateAfterAggregation;
else
return QueryProcessingStage::Complete;
}
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::WithMergeableState;
ClusterPtr cluster = getCluster();
if (settings.optimize_skip_unused_shards)
@ -464,7 +463,26 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
cluster = optimized_cluster;
}
return getQueryProcessingStageImpl(context, to_stage, cluster);
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if (getClusterQueriedNodes(settings, cluster) == 1)
return QueryProcessingStage::Complete;
if (settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key &&
sharding_key_is_deterministic)
{
Block sharding_key_block = sharding_key_expr->getSampleBlock();
auto stage = getOptimizedQueryProcessingStage(query_ptr, settings.extremes, sharding_key_block);
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
return *stage;
}
}
return QueryProcessingStage::WithMergeableState;
}
Pipe StorageDistributed::read(

View File

@ -66,8 +66,6 @@ public:
bool isRemote() const override { return true; }
/// Return true if distributed_group_by_no_merge may be applied.
bool canForceGroupByNoMerge(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const override;
Pipe read(

View File

@ -452,6 +452,8 @@ Block StorageMerge::getQueryHeader(
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
case QueryProcessingStage::WithMergeableStateAfterAggregation:
case QueryProcessingStage::MAX:
{
auto query = query_info.query->clone();
removeJoin(*query->as<ASTSelectQuery>());

View File

@ -1,6 +1,41 @@
distributed_group_by_no_merge=1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
distributed_group_by_no_merge=2
LIMIT
1 1 1
OFFSET
2 1 1
ALIAS
0
0
ORDER BY
1
1
0
0
ORDER BY LIMIT
1
LIMIT BY
0
1
LIMIT BY LIMIT
0
GROUP BY ORDER BY
1
1
1
1
GROUP BY w/ ALIAS
0
1
0
1
ORDER BY w/ ALIAS
0

View File

@ -1,2 +1,42 @@
SELECT count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) SETTINGS distributed_group_by_no_merge = 1;
SELECT count(), uniq(dummy) FROM remote('127.0.0.{2,3,4,5}', system.one) SETTINGS distributed_group_by_no_merge = 1;
SELECT 'distributed_group_by_no_merge=1';
SELECT count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) SETTINGS distributed_group_by_no_merge=1;
SELECT count(), uniq(dummy) FROM remote('127.0.0.{2,3,4,5}', system.one) SETTINGS distributed_group_by_no_merge=1;
SELECT count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1 SETTINGS distributed_group_by_no_merge=1;
SELECT 'distributed_group_by_no_merge=2';
SET max_distributed_connections=1;
SET max_threads=1;
-- breaks any(_shard_num)
SET optimize_move_functions_out_of_any=0;
SELECT 'LIMIT';
SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1 SETTINGS distributed_group_by_no_merge=2;
SELECT 'OFFSET';
SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1, 1 SETTINGS distributed_group_by_no_merge=2;
SELECT 'ALIAS';
SELECT dummy AS d FROM remote('127.0.0.{2,3}', system.one) ORDER BY d SETTINGS distributed_group_by_no_merge=2;
DROP TABLE IF EXISTS data_00184;
CREATE TABLE data_00184 Engine=Memory() AS SELECT * FROM numbers(2);
SELECT 'ORDER BY';
SELECT number FROM remote('127.0.0.{2,3}', currentDatabase(), data_00184) ORDER BY number DESC SETTINGS distributed_group_by_no_merge=2;
SELECT 'ORDER BY LIMIT';
SELECT number FROM remote('127.0.0.{2,3}', currentDatabase(), data_00184) ORDER BY number DESC LIMIT 1 SETTINGS distributed_group_by_no_merge=2;
SELECT 'LIMIT BY';
SELECT number FROM remote('127.0.0.{2,3}', currentDatabase(), data_00184) LIMIT 1 BY number SETTINGS distributed_group_by_no_merge=2;
SELECT 'LIMIT BY LIMIT';
SELECT number FROM remote('127.0.0.{2,3}', currentDatabase(), data_00184) LIMIT 1 BY number LIMIT 1 SETTINGS distributed_group_by_no_merge=2;
SELECT 'GROUP BY ORDER BY';
SELECT uniq(number) u FROM remote('127.0.0.{2,3}', currentDatabase(), data_00184) GROUP BY number ORDER BY u DESC SETTINGS distributed_group_by_no_merge=2;
-- cover possible tricky issues
SELECT 'GROUP BY w/ ALIAS';
SELECT n FROM remote('127.0.0.{2,3}', currentDatabase(), data_00184) GROUP BY number AS n SETTINGS distributed_group_by_no_merge=2;
SELECT 'ORDER BY w/ ALIAS';
SELECT n FROM remote('127.0.0.{2,3}', currentDatabase(), data_00184) ORDER BY number AS n LIMIT 1 SETTINGS distributed_group_by_no_merge=2;
drop table data_00184;

View File

@ -26,8 +26,10 @@ GROUP BY number, 1
GROUP BY 1
4 0
GROUP BY number ORDER BY number DESC
2 1
2 0
1 1
1 1
1 0
1 0
GROUP BY toString(number)
1 0
1 1
@ -49,12 +51,17 @@ DISTINCT
0
1
HAVING
HAVING LIMIT
1 0
LIMIT
2 0
2 1
LIMIT BY
2 0
2 1
1 0
LIMIT OFFSET
1 1
WHERE LIMIT OFFSET
1 1
LIMIT BY 1
1 0
1 1
GROUP BY (Distributed-over-Distributed)
4 0
4 1
@ -67,24 +74,48 @@ GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge
1 1
1 0
1 1
extremes
1 0
1 1
1 0
1 1
GROUP BY (extemes)
2 0
2 1
1 0
1 1
WITH TOTALS
2 0
2 1
LIMIT (extemes)
2 0
2 0
2 0
GROUP BY WITH TOTALS
2 0
2 1
4 0
WITH ROLLUP
GROUP BY WITH ROLLUP
2 0
2 1
4 0
WITH CUBE
GROUP BY WITH CUBE
2 0
2 1
4 0
GROUP BY WITH TOTALS ORDER BY
2 0
2 1
4 0
GROUP BY WITH TOTALS ORDER BY LIMIT
2 0
4 0
GROUP BY WITH TOTALS LIMIT
2 0
4 0
GROUP BY sharding_key, ...
0 0
1 0
0 0
1 0
GROUP BY ..., sharding_key
0 0
1 0

View File

@ -54,26 +54,58 @@ select 'DISTINCT';
select DISTINCT number from dist_01247;
select 'HAVING';
select count() cnt, * from dist_01247 group by number having cnt < 0;
select count() cnt, * from dist_01247 group by number having cnt == 2;
select 'HAVING LIMIT';
select count() cnt, * from dist_01247 group by number having cnt == 1 limit 1;
select 'LIMIT';
select count(), * from dist_01247 group by number limit 1;
select 'LIMIT OFFSET';
select count(), * from dist_01247 group by number limit 1 offset 1;
-- this will emulate different data on for different shards
select 'WHERE LIMIT OFFSET';
select count(), * from dist_01247 where number = _shard_num-1 group by number limit 1 offset 1;
select 'LIMIT BY';
select count(), * from dist_01247 group by number limit 0 by number;
select count(), * from dist_01247 group by number limit 1 by number;
select 'LIMIT BY 1';
select count(), * from dist_01247 group by number order by number limit 1 by number;
select 'GROUP BY (Distributed-over-Distributed)';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number;
select 'GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number settings distributed_group_by_no_merge=1;
select 'extremes';
select 'GROUP BY (extemes)';
select count(), * from dist_01247 group by number settings extremes=1;
select 'WITH TOTALS';
select 'LIMIT (extemes)';
select count(), * from dist_01247 group by number limit 1 settings extremes=1;
select 'GROUP BY WITH TOTALS';
select count(), * from dist_01247 group by number with totals;
select 'WITH ROLLUP';
select 'GROUP BY WITH ROLLUP';
select count(), * from dist_01247 group by number with rollup;
select 'WITH CUBE';
select 'GROUP BY WITH CUBE';
select count(), * from dist_01247 group by number with cube;
select 'GROUP BY WITH TOTALS ORDER BY';
select count(), * from dist_01247 group by number with totals order by number;
select 'GROUP BY WITH TOTALS ORDER BY LIMIT';
select count(), * from dist_01247 group by number with totals order by number limit 1;
select 'GROUP BY WITH TOTALS LIMIT';
select count(), * from dist_01247 group by number with totals limit 1;
-- GROUP BY (compound)
drop table if exists dist_01247;
drop table if exists data_01247;
create table data_01247 engine=Memory() as select number key, 0 value from numbers(2);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, key);
select 'GROUP BY sharding_key, ...';
select * from dist_01247 group by key, value;
select 'GROUP BY ..., sharding_key';
select * from dist_01247 group by value, key;
drop table dist_01247;
drop table data_01247;

View File

@ -1,17 +1,6 @@
Distributed(number)-over-Distributed(number)
1 0
1 1
1 0
1 1
1 0
1 1
1 0
1 1
Distributed(rand)-over-Distributed(number)
4 0
4 1
Distributed(rand)-over-Distributed(rand)
2 0
2 1
2 0
2 1

View File

@ -21,20 +21,21 @@ set optimize_skip_unused_shards=1;
select 'Distributed(number)-over-Distributed(number)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
select count(), * from dist_01247 group by number;
select count(), * from dist_01247 group by number order by number limit 1;
drop table if exists dist_01247;
drop table if exists dist_layer_01247;
select 'Distributed(rand)-over-Distributed(number)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, rand());
select count(), * from dist_01247 group by number;
select count(), * from dist_01247 group by number order by number limit 1;
drop table if exists dist_01247;
drop table if exists dist_layer_01247;
select 'Distributed(rand)-over-Distributed(rand)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, rand());
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
select count(), * from dist_01247 group by number;
select count(), * from dist_01247 group by number order by number limit 1;
drop table dist_01247;
drop table dist_layer_01247;

View File

@ -1,6 +1,7 @@
# Add testcase here to skip it in Arcadia CI (Yandex synchronization check)
# It is useful for tests with not supported features in Arcadia build
00105_shard_collations
00184_shard_distributed_group_by_no_merge
00436_convert_charset
00490_special_line_separators_and_characters_outside_of_bmp
00506_union_distributed
@ -101,8 +102,8 @@
01236_distributed_over_live_view_over_distributed
01236_graphite_mt
01237_live_view_over_distributed_with_subquery_select_table_alias
01247_dist_on_dist_group_by_sharding_key_optimization
01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key
01247_optimize_distributed_group_by_sharding_key
01247_optimize_distributed_group_by_sharding_key_dist_on_dist
01251_dict_is_in_infinite_loop
01253_subquery_in_aggregate_function_JustStranger
01254_dict_create_without_db