From fe5cd470681010e86445f3660ac8095d811deb1b Mon Sep 17 00:00:00 2001 From: Alexander Kazakov Date: Mon, 19 Aug 2019 23:28:24 +0300 Subject: [PATCH] Process prewhere clause in "skip unused shards" optimization (#6521) * Process prewhere clause in optimize_skip_unused_shards * Better diagnostics and logging --- dbms/src/Storages/StorageDistributed.cpp | 58 ++++++++-- ...t_on_unused_shards_with_prewhere.reference | 15 +++ ...p_select_on_unused_shards_with_prewhere.sh | 100 ++++++++++++++++++ 3 files changed, 167 insertions(+), 6 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.reference create mode 100755 dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.sh diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index e862d27fdaa..49c2ebc7286 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -176,6 +176,22 @@ IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeA throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH}; } +std::string makeFormattedListOfShards(const ClusterPtr & cluster) +{ + std::ostringstream os; + + bool head = true; + os << "["; + for (const auto & shard_info : cluster->getShardsInfo()) + { + (head ? os : os << ", ") << shard_info.shard_num; + head = false; + } + os << "]"; + + return os.str(); +} + } @@ -312,10 +328,23 @@ BlockInputStreams StorageDistributed::read( if (settings.optimize_skip_unused_shards) { - auto smaller_cluster = skipUnusedShards(cluster, query_info); + if (has_sharding_key) + { + auto smaller_cluster = skipUnusedShards(cluster, query_info); - if (smaller_cluster) - cluster = smaller_cluster; + if (smaller_cluster) + { + cluster = smaller_cluster; + LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": " + "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): " + " " << makeFormattedListOfShards(cluster)); + } + else + { + LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": " + "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster"); + } + } } return ClusterProxy::executeQuery( @@ -488,15 +517,32 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() } /// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible -/// using constraints from "WHERE" condition, otherwise returns `nullptr` +/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr` ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info) { + if (!has_sharding_key) + { + throw Exception("Internal error: cannot determine shards of a distributed table if no sharding expression is supplied", ErrorCodes::LOGICAL_ERROR); + } + const auto & select = query_info.query->as(); - if (!select.where() || !sharding_key_expr) + if (!select.prewhere() && !select.where()) + { return nullptr; + } - const auto & blocks = evaluateExpressionOverConstantCondition(select.where(), sharding_key_expr); + ASTPtr condition_ast; + if (select.prewhere() && select.where()) + { + condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone()); + } + else + { + condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone(); + } + + const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr); // Can't get definite answer if we can skip any shards if (!blocks) diff --git a/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.reference b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.reference new file mode 100644 index 00000000000..4c66ccfd2a2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.reference @@ -0,0 +1,15 @@ +OK +OK +1 +OK +0 +1 +4 +4 +2 +4 +OK +OK +OK +OK +OK diff --git a/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.sh b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.sh new file mode 100755 index 00000000000..c8ae239063a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards_with_prewhere.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS distributed_00754;" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS mergetree_00754;" + +${CLICKHOUSE_CLIENT} --query " + CREATE TABLE mergetree_00754 (a Int64, b Int64, c String) ENGINE = MergeTree ORDER BY (a, b); +" +${CLICKHOUSE_CLIENT} --query " + CREATE TABLE distributed_00754 AS mergetree_00754 + ENGINE = Distributed(test_unavailable_shard, ${CLICKHOUSE_DATABASE}, mergetree_00754, jumpConsistentHash(a+b, 2)); +" + +${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (0, 0, 'Hello');" +${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (1, 0, 'World');" +${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (0, 1, 'Hello');" +${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (1, 1, 'World');" + +# Should fail because the second shard is unavailable +${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM distributed_00754;" 2>&1 \ +| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +# Should fail without setting `optimize_skip_unused_shards` = 1 +${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0;" 2>&1 \ +| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +# Should pass now +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0; +" + + +# Should still fail because of matching unavailable shard +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 2 AND b = 2; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +# Try more complex expressions for constant folding - all should pass. + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 1 AND a = 0 WHERE b = 0; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 1 WHERE b = 1 AND length(c) = 5; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a IN (0, 1) AND b IN (0, 1) WHERE c LIKE '%l%'; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a IN (0, 1) WHERE b IN (0, 1) AND c LIKE '%l%'; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR a = 1 AND b = 1 WHERE c LIKE '%l%'; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE (a = 0 OR a = 1) WHERE (b = 0 OR b = 1); +" + +# These should fail. + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b <= 1; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 0 WHERE c LIKE '%l%'; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 0 OR a = 1 AND b = 0; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR a = 2 AND b = 2; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET optimize_skip_unused_shards = 1; + SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR c LIKE '%l%'; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'