mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
Process prewhere clause in "skip unused shards" optimization (#6521)
* Process prewhere clause in optimize_skip_unused_shards * Better diagnostics and logging
This commit is contained in:
parent
f0a161787a
commit
fe5cd47068
@ -176,6 +176,22 @@ IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeA
|
|||||||
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
|
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string makeFormattedListOfShards(const ClusterPtr & cluster)
|
||||||
|
{
|
||||||
|
std::ostringstream os;
|
||||||
|
|
||||||
|
bool head = true;
|
||||||
|
os << "[";
|
||||||
|
for (const auto & shard_info : cluster->getShardsInfo())
|
||||||
|
{
|
||||||
|
(head ? os : os << ", ") << shard_info.shard_num;
|
||||||
|
head = false;
|
||||||
|
}
|
||||||
|
os << "]";
|
||||||
|
|
||||||
|
return os.str();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -312,10 +328,23 @@ BlockInputStreams StorageDistributed::read(
|
|||||||
|
|
||||||
if (settings.optimize_skip_unused_shards)
|
if (settings.optimize_skip_unused_shards)
|
||||||
{
|
{
|
||||||
auto smaller_cluster = skipUnusedShards(cluster, query_info);
|
if (has_sharding_key)
|
||||||
|
{
|
||||||
|
auto smaller_cluster = skipUnusedShards(cluster, query_info);
|
||||||
|
|
||||||
if (smaller_cluster)
|
if (smaller_cluster)
|
||||||
cluster = smaller_cluster;
|
{
|
||||||
|
cluster = smaller_cluster;
|
||||||
|
LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": "
|
||||||
|
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
|
||||||
|
" " << makeFormattedListOfShards(cluster));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": "
|
||||||
|
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ClusterProxy::executeQuery(
|
return ClusterProxy::executeQuery(
|
||||||
@ -488,15 +517,32 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
|
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
|
||||||
/// using constraints from "WHERE" condition, otherwise returns `nullptr`
|
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
|
||||||
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info)
|
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info)
|
||||||
{
|
{
|
||||||
|
if (!has_sharding_key)
|
||||||
|
{
|
||||||
|
throw Exception("Internal error: cannot determine shards of a distributed table if no sharding expression is supplied", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||||
|
|
||||||
if (!select.where() || !sharding_key_expr)
|
if (!select.prewhere() && !select.where())
|
||||||
|
{
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
const auto & blocks = evaluateExpressionOverConstantCondition(select.where(), sharding_key_expr);
|
ASTPtr condition_ast;
|
||||||
|
if (select.prewhere() && select.where())
|
||||||
|
{
|
||||||
|
condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr);
|
||||||
|
|
||||||
// Can't get definite answer if we can skip any shards
|
// Can't get definite answer if we can skip any shards
|
||||||
if (!blocks)
|
if (!blocks)
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
OK
|
||||||
|
OK
|
||||||
|
1
|
||||||
|
OK
|
||||||
|
0
|
||||||
|
1
|
||||||
|
4
|
||||||
|
4
|
||||||
|
2
|
||||||
|
4
|
||||||
|
OK
|
||||||
|
OK
|
||||||
|
OK
|
||||||
|
OK
|
||||||
|
OK
|
@ -0,0 +1,100 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CURDIR/../shell_config.sh
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS distributed_00754;"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS mergetree_00754;"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query "
|
||||||
|
CREATE TABLE mergetree_00754 (a Int64, b Int64, c String) ENGINE = MergeTree ORDER BY (a, b);
|
||||||
|
"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "
|
||||||
|
CREATE TABLE distributed_00754 AS mergetree_00754
|
||||||
|
ENGINE = Distributed(test_unavailable_shard, ${CLICKHOUSE_DATABASE}, mergetree_00754, jumpConsistentHash(a+b, 2));
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (0, 0, 'Hello');"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (1, 0, 'World');"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (0, 1, 'Hello');"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (1, 1, 'World');"
|
||||||
|
|
||||||
|
# Should fail because the second shard is unavailable
|
||||||
|
${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM distributed_00754;" 2>&1 \
|
||||||
|
| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
# Should fail without setting `optimize_skip_unused_shards` = 1
|
||||||
|
${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0;" 2>&1 \
|
||||||
|
| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
# Should pass now
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0;
|
||||||
|
"
|
||||||
|
|
||||||
|
|
||||||
|
# Should still fail because of matching unavailable shard
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 2 AND b = 2;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
# Try more complex expressions for constant folding - all should pass.
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 1 AND a = 0 WHERE b = 0;
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 1 WHERE b = 1 AND length(c) = 5;
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a IN (0, 1) AND b IN (0, 1) WHERE c LIKE '%l%';
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a IN (0, 1) WHERE b IN (0, 1) AND c LIKE '%l%';
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR a = 1 AND b = 1 WHERE c LIKE '%l%';
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE (a = 0 OR a = 1) WHERE (b = 0 OR b = 1);
|
||||||
|
"
|
||||||
|
|
||||||
|
# These should fail.
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b <= 1;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 WHERE c LIKE '%l%';
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 OR a = 1 AND b = 0;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR a = 2 AND b = 2;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET optimize_skip_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR c LIKE '%l%';
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
Loading…
Reference in New Issue
Block a user