Rework SELECT from Distributed query stages optimization

Before this patch it wasn't possible to optimize simple SELECT * FROM
dist ORDER BY (w/o GROUP BY and DISTINCT) to more optimal stage
(QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit),
since that code was under
allow_nondeterministic_optimize_skip_unused_shards, rework it and make
it possible.

Also now distributed_push_down_limit is respected for
optimize_distributed_group_by_sharding_key.

Next step will be to enable distributed_push_down_limit by default.

v2: fix detection of aggregates
This commit is contained in:
Azat Khuzhin 2021-07-17 16:31:06 +03:00
parent bb6d030fb8
commit 2fb95d9ee0
6 changed files with 109 additions and 126 deletions

View File

@ -284,78 +284,6 @@ void replaceConstantExpressions(
visitor.visit(node);
}
/// This is the implementation of optimize_distributed_group_by_sharding_key.
/// It returns up to which stage the query can be processed on a shard, which
/// is one of the following:
/// - QueryProcessingStage::Complete
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit
/// - none (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns)
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto sharding_block_has = [&](const auto & exprs) -> bool
{
std::unordered_set<std::string> expr_columns;
for (auto & expr : exprs)
{
auto id = expr->template as<ASTIdentifier>();
if (!id)
continue;
expr_columns.emplace(id->name());
}
for (const auto & column : sharding_key_columns)
{
if (!expr_columns.contains(column))
return false;
}
return true;
};
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return {};
// Window functions are not supported.
if (query_info.has_window)
return {};
// TODO: extremes support can be implemented
if (extremes)
return {};
// DISTINCT
if (select.distinct)
{
if (!sharding_block_has(select.select()->children))
return {};
}
// GROUP BY
const ASTPtr group_by = select.groupBy();
if (!query_info.syntax_analyzer_result->aggregates.empty() && (!group_by || !sharding_block_has(group_by->children)))
return {};
// ORDER BY
const ASTPtr order_by = select.orderBy();
if (order_by)
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
// LIMIT BY
// LIMIT
// OFFSET
if (select.limitBy() || select.limitLength() || select.limitOffset())
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
}
size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster)
{
size_t num_local_shards = cluster->getLocalShardCount();
@ -523,9 +451,6 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
}
}
if (settings.distributed_push_down_limit)
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
@ -536,22 +461,89 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
if (getClusterQueriedNodes(settings, cluster) == 1)
return QueryProcessingStage::Complete;
if (settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key &&
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic))
{
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns());
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
return *stage;
}
}
auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings);
if (optimized_stage)
return *optimized_stage;
return QueryProcessingStage::WithMergeableState;
}
std::optional<QueryProcessingStage::Enum> StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const
{
bool optimize_sharding_key_aggregation =
settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key &&
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic);
QueryProcessingStage::Enum default_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
if (settings.distributed_push_down_limit)
default_stage = QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto expr_contains_sharding_key = [&](const auto & exprs) -> bool
{
std::unordered_set<std::string> expr_columns;
for (auto & expr : exprs)
{
auto id = expr->template as<ASTIdentifier>();
if (!id)
continue;
expr_columns.emplace(id->name());
}
for (const auto & column : sharding_key_expr->getRequiredColumns())
{
if (!expr_columns.contains(column))
return false;
}
return true;
};
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return {};
// Window functions are not supported.
if (query_info.has_window)
return {};
// TODO: extremes support can be implemented
if (settings.extremes)
return {};
// DISTINCT
if (select.distinct)
{
if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(select.select()->children))
return {};
}
// GROUP BY
const ASTPtr group_by = select.groupBy();
if (!query_info.syntax_analyzer_result->aggregates.empty() || group_by)
{
if (!optimize_sharding_key_aggregation || !group_by || !expr_contains_sharding_key(group_by->children))
return {};
}
// ORDER BY
const ASTPtr order_by = select.orderBy();
if (order_by)
return default_stage;
// LIMIT BY
// LIMIT
// OFFSET
if (select.limitBy() || select.limitLength() || select.limitOffset())
return default_stage;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
}
Pipe StorageDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -176,6 +176,24 @@ private:
ClusterPtr
skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const;
/// This method returns optimal query processing stage.
///
/// Here is the list of stages (from the less optimal to more optimal):
/// - WithMergeableState
/// - WithMergeableStateAfterAggregation
/// - WithMergeableStateAfterAggregationAndLimit
/// - Complete
///
/// Some simple queries w/o GROUP BY/DISTINCT can use more optimal stage.
///
/// Also in case of optimize_distributed_group_by_sharding_key=1 the queries
/// with GROUP BY/DISTINCT sharding_key can also use more optimal stage.
/// (see also optimize_skip_unused_shards/allow_nondeterministic_optimize_skip_unused_shards)
///
/// @return QueryProcessingStage or empty std::optoinal
/// (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }

View File

@ -57,7 +57,9 @@ LIMIT
1 0
LIMIT OFFSET
1 1
OFFSET
OFFSET distributed_push_down_limit=0
1 1
OFFSET distributed_push_down_limit=1
1 1
1 0
1 1

View File

@ -60,8 +60,10 @@ select 'LIMIT';
select count(), * from dist_01247 group by number limit 1;
select 'LIMIT OFFSET';
select count(), * from dist_01247 group by number limit 1 offset 1;
select 'OFFSET';
select count(), * from dist_01247 group by number offset 1;
select 'OFFSET distributed_push_down_limit=0';
select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=0;
select 'OFFSET distributed_push_down_limit=1';
select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=1;
-- this will emulate different data on for different shards
select 'WHERE LIMIT OFFSET';
select count(), * from dist_01247 where number = _shard_num-1 group by number order by number limit 1 offset 1;

View File

@ -12,18 +12,6 @@ distributed_push_down_limit=1
8
9
40 40
auto-distributed_push_down_limit
0
1
2
3
4
5
6
7
8
9
40 40
distributed_push_down_limit=1 with OFFSET
97
96

View File

@ -86,9 +86,11 @@ function test_distributed_push_down_limit_0()
function test_distributed_push_down_limit_1()
{
local args=(
"remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814)"
"remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key)"
0 # offset
--distributed_push_down_limit 1
--optimize_skip_unused_shards 1
--optimize_distributed_group_by_sharding_key 1
)
test_distributed_push_down_limit_with_query_log "${args[@]}"
}
@ -97,22 +99,11 @@ function test_distributed_push_down_limit_1_offset()
{
local settings_and_opts=(
--distributed_push_down_limit 1
)
$CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814) group by key order by key desc limit 5, 10"
}
function test_auto_distributed_push_down_limit()
{
local args=(
dist_01814
0 # offset
--optimize_skip_unused_shards 1
--optimize_distributed_group_by_sharding_key 1
--prefer_localhost_replica 0
--distributed_push_down_limit 0
)
test_distributed_push_down_limit_with_query_log "${args[@]}"
$CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key) group by key order by key desc limit 5, 10"
}
function main()
@ -151,16 +142,6 @@ function main()
done
echo "$out"
echo 'auto-distributed_push_down_limit'
for ((i = 0; i < max_tries; ++i)); do
out=$(test_auto_distributed_push_down_limit)
out_lines=( $out )
if [[ ${#out_lines[@]} -gt 2 ]] && [[ ${out_lines[-1]} = 40 ]] && [[ ${out_lines[-2]} = 40 ]]; then
break
fi
done
echo "$out"
echo 'distributed_push_down_limit=1 with OFFSET'
test_distributed_push_down_limit_1_offset
}