Merge pull request #21512 from azat/optimize_skip_unused_shards_limit

Add optimize_skip_unused_shards_limit
This commit is contained in:
alexey-milovidov 2021-03-29 21:13:18 +03:00 committed by GitHub
commit 9d84f3113a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 93 additions and 11 deletions

View File

@ -1514,6 +1514,14 @@ FORMAT PrettyCompactMonoBlock
Default value: 0
## optimize_skip_unused_shards_limit {#optimize-skip-unused-shards-limit}
Limit for number of sharding key values, turns off `optimize_skip_unused_shards` if the limit is reached.
Too many values may require significant amount for processing, while the benefit is doubtful, since if you have huge number of values in `IN (...)`, then most likely the query will be sent to all shards anyway.
Default value: 1000
## optimize_skip_unused_shards {#optimize-skip-unused-shards}
Enables or disables skipping of unused shards for [SELECT](../../sql-reference/statements/select/index.md) queries that have sharding key condition in `WHERE/PREWHERE` (assuming that the data is distributed by sharding key, otherwise does nothing).

View File

@ -116,6 +116,7 @@ class IColumn;
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards. If 2 - same as 1 but also apply ORDER BY and LIMIT stages", 0) \
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \
M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \

View File

@ -166,9 +166,9 @@ namespace
return result;
}
Disjunction analyzeFunction(const ASTFunction * fn, const ExpressionActionsPtr & expr)
Disjunction analyzeFunction(const ASTFunction * fn, const ExpressionActionsPtr & expr, size_t & limit)
{
if (!fn)
if (!fn || !limit)
{
return {};
}
@ -182,6 +182,7 @@ namespace
const auto * identifier = left->as<ASTIdentifier>() ? left->as<ASTIdentifier>() : right->as<ASTIdentifier>();
const auto * literal = left->as<ASTLiteral>() ? left->as<ASTLiteral>() : right->as<ASTLiteral>();
--limit;
return analyzeEquals(identifier, literal, expr);
}
else if (fn->name == "in")
@ -192,6 +193,19 @@ namespace
Disjunction result;
auto add_dnf = [&](const auto &dnf)
{
if (dnf.size() > limit)
{
result.clear();
return false;
}
result.insert(result.end(), dnf.begin(), dnf.end());
limit -= dnf.size();
return true;
};
if (const auto * tuple_func = right->as<ASTFunction>(); tuple_func && tuple_func->name == "tuple")
{
const auto * tuple_elements = tuple_func->children.front()->as<ASTExpressionList>();
@ -205,7 +219,10 @@ namespace
return {};
}
result.insert(result.end(), dnf.begin(), dnf.end());
if (!add_dnf(dnf))
{
return {};
}
}
}
else if (const auto * tuple_literal = right->as<ASTLiteral>();
@ -221,7 +238,10 @@ namespace
return {};
}
result.insert(result.end(), dnf.begin(), dnf.end());
if (!add_dnf(dnf))
{
return {};
}
}
}
else
@ -244,13 +264,14 @@ namespace
for (const auto & arg : args->children)
{
const auto dnf = analyzeFunction(arg->as<ASTFunction>(), expr);
const auto dnf = analyzeFunction(arg->as<ASTFunction>(), expr, limit);
if (dnf.empty())
{
return {};
}
/// limit accounted in analyzeFunction()
result.insert(result.end(), dnf.begin(), dnf.end());
}
@ -269,13 +290,14 @@ namespace
for (const auto & arg : args->children)
{
const auto dnf = analyzeFunction(arg->as<ASTFunction>(), expr);
const auto dnf = analyzeFunction(arg->as<ASTFunction>(), expr, limit);
if (dnf.empty())
{
continue;
}
/// limit accounted in analyzeFunction()
result = andDNF(result, dnf);
}
@ -286,15 +308,15 @@ namespace
}
}
std::optional<Blocks> evaluateExpressionOverConstantCondition(const ASTPtr & node, const ExpressionActionsPtr & target_expr)
std::optional<Blocks> evaluateExpressionOverConstantCondition(const ASTPtr & node, const ExpressionActionsPtr & target_expr, size_t & limit)
{
Blocks result;
if (const auto * fn = node->as<ASTFunction>())
{
const auto dnf = analyzeFunction(fn, target_expr);
const auto dnf = analyzeFunction(fn, target_expr, limit);
if (dnf.empty())
if (dnf.empty() || !limit)
{
return {};
}

View File

@ -46,10 +46,11 @@ ASTPtr evaluateConstantExpressionForDatabaseName(const ASTPtr & node, const Cont
/** Try to fold condition to countable set of constant values.
* @param node a condition that we try to fold.
* @param target_expr expression evaluated over a set of constants.
* @param limit limit for number of values
* @return optional blocks each with a single row and a single column for target expression,
* or empty blocks if condition is always false,
* or nothing if condition can't be folded to a set of constants.
*/
std::optional<Blocks> evaluateExpressionOverConstantCondition(const ASTPtr & node, const ExpressionActionsPtr & target_expr);
std::optional<Blocks> evaluateExpressionOverConstantCondition(const ASTPtr & node, const ExpressionActionsPtr & target_expr, size_t & limit);
}

View File

@ -907,7 +907,24 @@ ClusterPtr StorageDistributed::skipUnusedShards(
}
replaceConstantExpressions(condition_ast, context, metadata_snapshot->getColumns().getAll(), shared_from_this(), metadata_snapshot);
const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr);
size_t limit = context.getSettingsRef().optimize_skip_unused_shards_limit;
if (!limit || limit > SSIZE_MAX)
{
throw Exception("optimize_skip_unused_shards_limit out of range (0, {}]", ErrorCodes::ARGUMENT_OUT_OF_BOUND, SSIZE_MAX);
}
// To interpret limit==0 as limit is reached
++limit;
const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr, limit);
if (!limit)
{
LOG_TRACE(log,
"Number of values for sharding key exceeds optimize_skip_unused_shards_limit={}, "
"try to increase it, but note that this may increase query processing time.",
context.getSettingsRef().optimize_skip_unused_shards_limit);
return nullptr;
}
// Can't get definite answer if we can skip any shards
if (!blocks)

View File

@ -0,0 +1,33 @@
drop table if exists dist_01757;
create table dist_01757 as system.one engine=Distributed(test_cluster_two_shards, system, one, dummy);
set optimize_skip_unused_shards=1;
set force_optimize_skip_unused_shards=2;
-- in
select * from dist_01757 where dummy in (0,) format Null;
select * from dist_01757 where dummy in (0, 1) format Null settings optimize_skip_unused_shards_limit=2;
-- in negative
select * from dist_01757 where dummy in (0, 1) settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
-- or negative
select * from dist_01757 where dummy = 0 or dummy = 1 settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
-- or
select * from dist_01757 where dummy = 0 or dummy = 1 format Null settings optimize_skip_unused_shards_limit=2;
-- and negative
select * from dist_01757 where dummy = 0 and dummy = 1 settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
select * from dist_01757 where dummy = 0 and dummy = 2 and dummy = 3 settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
select * from dist_01757 where dummy = 0 and dummy = 2 and dummy = 3 settings optimize_skip_unused_shards_limit=2; -- { serverError 507 }
-- and
select * from dist_01757 where dummy = 0 and dummy = 1 settings optimize_skip_unused_shards_limit=2;
select * from dist_01757 where dummy = 0 and dummy = 1 and dummy = 3 settings optimize_skip_unused_shards_limit=3;
-- ARGUMENT_OUT_OF_BOUND error
select * from dist_01757 where dummy in (0, 1) settings optimize_skip_unused_shards_limit=0; -- { serverError 69 }
select * from dist_01757 where dummy in (0, 1) settings optimize_skip_unused_shards_limit=9223372036854775808; -- { serverError 69 }
drop table dist_01757;