diff --git a/.gitignore b/.gitignore index 585a4074767..9816f1cbb6c 100644 --- a/.gitignore +++ b/.gitignore @@ -248,3 +248,6 @@ website/package-lock.json # Ignore files for locally disabled tests /dbms/tests/queries/**/*.disabled + +# cquery cache +/.cquery-cache diff --git a/dbms/programs/server/config.xml b/dbms/programs/server/config.xml index 514a081eaca..108e64e3387 100644 --- a/dbms/programs/server/config.xml +++ b/dbms/programs/server/config.xml @@ -187,6 +187,20 @@ + + + + localhost + 9000 + + + + + localhost + 1 + + + diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index a3198a0fb74..d8efc939ecd 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -97,8 +97,8 @@ public: /// Approximate number of allocated bytes in memory - for profiling and limits. size_t allocatedBytes() const; - operator bool() const { return !data.empty(); } - bool operator!() const { return data.empty(); } + operator bool() const { return !!columns(); } + bool operator!() const { return !this->operator bool(); } /** Get a list of column names separated by commas. */ std::string dumpNames() const; diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 0cd6dde2625..db81bc58061 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -397,14 +397,24 @@ void Cluster::initMisc() std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const { - return std::unique_ptr{ new Cluster(*this, index) }; + return std::unique_ptr{ new Cluster(*this, {index}) }; } -Cluster::Cluster(const Cluster & from, size_t index) - : shards_info{from.shards_info[index]} +std::unique_ptr Cluster::getClusterWithMultipleShards(const std::vector & indices) const { - if (!from.addresses_with_failover.empty()) - addresses_with_failover.emplace_back(from.addresses_with_failover[index]); + return std::unique_ptr{ new Cluster(*this, indices) }; +} + +Cluster::Cluster(const Cluster & from, const std::vector & indices) + : shards_info{} +{ + for (size_t index : indices) + { + shards_info.emplace_back(from.shards_info.at(index)); + + if (!from.addresses_with_failover.empty()) + addresses_with_failover.emplace_back(from.addresses_with_failover.at(index)); + } initMisc(); } diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index 2ef8b889160..f998ad8f912 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -143,6 +143,9 @@ public: /// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster. std::unique_ptr getClusterWithSingleShard(size_t index) const; + /// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster. + std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; + private: using SlotToShard = std::vector; SlotToShard slot_to_shard; @@ -153,8 +156,8 @@ public: private: void initMisc(); - /// For getClusterWithSingleShard implementation. - Cluster(const Cluster & from, size_t index); + /// For getClusterWithMultipleShards implementation. + Cluster(const Cluster & from, const std::vector & indices); String hash_of_addresses; /// Description of the cluster shards. diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index fc8ea2c4630..c325cc2f845 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -89,6 +89,7 @@ struct Settings M(SettingBool, skip_unavailable_shards, false, "Silently skip unavailable shards.") \ \ M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \ + M(SettingBool, distributed_optimize_skip_select_on_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \ \ M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.") \ M(SettingUInt64, merge_tree_min_rows_for_seek, 0, "You can skip reading more than that number of rows at the price of one seek per file.") \ diff --git a/dbms/src/Interpreters/evaluateConstantExpression.cpp b/dbms/src/Interpreters/evaluateConstantExpression.cpp index 769f45f9c31..29753a4c637 100644 --- a/dbms/src/Interpreters/evaluateConstantExpression.cpp +++ b/dbms/src/Interpreters/evaluateConstantExpression.cpp @@ -1,18 +1,20 @@ -#include +#include + #include #include -#include -#include -#include -#include +#include #include #include -#include -#include +#include #include -#include -#include +#include +#include +#include +#include +#include +#include #include +#include namespace DB @@ -77,4 +79,236 @@ ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, cons return evaluateConstantExpressionAsLiteral(node, context); } +namespace +{ + using Conjunction = ColumnsWithTypeAndName; + using Disjunction = std::vector; + + Disjunction analyzeEquals(const ASTIdentifier * identifier, const ASTLiteral * literal, const ExpressionActionsPtr & expr) + { + if (!identifier || !literal) + { + return {}; + } + + for (const auto & name_and_type : expr->getRequiredColumnsWithTypes()) + { + const auto & name = name_and_type.name; + const auto & type = name_and_type.type; + + if (name == identifier->name) + { + ColumnWithTypeAndName column; + // FIXME: what to do if field is not convertable? + column.column = type->createColumnConst(1, convertFieldToType(literal->value, *type)); + column.name = name; + column.type = type; + return {{std::move(column)}}; + } + } + + return {}; + } + + Disjunction andDNF(const Disjunction & left, const Disjunction & right) + { + if (left.empty()) + { + return right; + } + + Disjunction result; + + for (const auto & conjunct1 : left) + { + for (const auto & conjunct2 : right) + { + Conjunction new_conjunct{conjunct1}; + new_conjunct.insert(new_conjunct.end(), conjunct2.begin(), conjunct2.end()); + result.emplace_back(new_conjunct); + } + } + + return result; + } + + Disjunction analyzeFunction(const ASTFunction * fn, const ExpressionActionsPtr & expr) + { + if (!fn) + { + return {}; + } + + // TODO: enumerate all possible function names! + + if (fn->name == "equals") + { + const auto * left = fn->arguments->children.front().get(); + const auto * right = fn->arguments->children.back().get(); + const auto * identifier = typeid_cast(left) ? typeid_cast(left) + : typeid_cast(right); + const auto * literal = typeid_cast(left) ? typeid_cast(left) + : typeid_cast(right); + + return analyzeEquals(identifier, literal, expr); + } + else if (fn->name == "in") + { + const auto * left = fn->arguments->children.front().get(); + const auto * right = fn->arguments->children.back().get(); + const auto * identifier = typeid_cast(left); + const auto * inner_fn = typeid_cast(right); + + if (!inner_fn) + { + return {}; + } + + const auto * tuple = typeid_cast(inner_fn->children.front().get()); + + if (!tuple) + { + return {}; + } + + Disjunction result; + + for (const auto & child : tuple->children) + { + const auto * literal = typeid_cast(child.get()); + const auto dnf = analyzeEquals(identifier, literal, expr); + + if (dnf.empty()) + { + return {}; + } + + result.insert(result.end(), dnf.begin(), dnf.end()); + } + + return result; + } + else if (fn->name == "or") + { + const auto * args = typeid_cast(fn->children.front().get()); + + if (!args) + { + return {}; + } + + Disjunction result; + + for (const auto & arg : args->children) + { + const auto dnf = analyzeFunction(typeid_cast(arg.get()), expr); + + if (dnf.empty()) + { + return {}; + } + + result.insert(result.end(), dnf.begin(), dnf.end()); + } + + return result; + } + else if (fn->name == "and") + { + const auto * args = typeid_cast(fn->children.front().get()); + + if (!args) + { + return {}; + } + + Disjunction result; + + for (const auto & arg : args->children) + { + const auto dnf = analyzeFunction(typeid_cast(arg.get()), expr); + + if (dnf.empty()) + { + continue; + } + + result = andDNF(result, dnf); + } + + return result; + } + + return {}; + } +} + +std::optional evaluateExpressionOverConstantCondition(const ASTPtr & node, const ExpressionActionsPtr & target_expr) +{ + Blocks result; + + // TODO: `node` may be always-false literal. + + if (const auto fn = typeid_cast(node.get())) + { + const auto dnf = analyzeFunction(fn, target_expr); + + if (dnf.empty()) + { + return {}; + } + + auto hasRequiredColumns = [&target_expr](const Block & block) -> bool + { + for (const auto & name : target_expr->getRequiredColumns()) + { + bool hasColumn = false; + for (const auto & column_name : block.getNames()) + { + if (column_name == name) + { + hasColumn = true; + break; + } + } + + if (!hasColumn) + return false; + } + + return true; + }; + + for (const auto & conjunct : dnf) + { + Block block(conjunct); + + // Block should contain all required columns from `target_expr` + if (!hasRequiredColumns(block)) + { + return {}; + } + + target_expr->execute(block); + + if (block.rows() == 1) + { + result.push_back(block); + } + else if (block.rows() == 0) + { + // filter out cases like "WHERE a = 1 AND a = 2" + continue; + } + else + { + // FIXME: shouldn't happen + return {}; + } + } + } + + return {result}; +} + } diff --git a/dbms/src/Interpreters/evaluateConstantExpression.h b/dbms/src/Interpreters/evaluateConstantExpression.h index c35b7177622..a901612040b 100644 --- a/dbms/src/Interpreters/evaluateConstantExpression.h +++ b/dbms/src/Interpreters/evaluateConstantExpression.h @@ -1,17 +1,22 @@ #pragma once -#include +#include #include #include #include +#include +#include + namespace DB { class Context; +class ExpressionActions; class IDataType; +using ExpressionActionsPtr = std::shared_ptr; /** Evaluate constant expression and its type. * Used in rare cases - for elements of set for IN, for data to INSERT. @@ -20,17 +25,24 @@ class IDataType; std::pair> evaluateConstantExpression(const ASTPtr & node, const Context & context); -/** Evaluate constant expression - * and returns ASTLiteral with its value. +/** Evaluate constant expression and returns ASTLiteral with its value. */ ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context); -/** Evaluate constant expression - * and returns ASTLiteral with its value. +/** Evaluate constant expression and returns ASTLiteral with its value. * Also, if AST is identifier, then return string literal with its name. * Useful in places where some name may be specified as identifier, or as result of a constant expression. */ ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context); +/** Try to fold condition to countable set of constant values. + * @param condition a condition that we try to fold. + * @param target_expr expression evaluated over a set of constants. + * @return optional blocks each with a single row and a single column for target expression, + * or empty blocks if condition is always false, + * or nothing if condition can't be folded to a set of constants. + */ +std::optional evaluateExpressionOverConstantCondition(const ASTPtr & condition, const ExpressionActionsPtr & target_expr); + } diff --git a/dbms/src/Storages/MergeTree/KeyCondition.cpp b/dbms/src/Storages/MergeTree/KeyCondition.cpp index 9484bd8c3cc..31a4e08707f 100644 --- a/dbms/src/Storages/MergeTree/KeyCondition.cpp +++ b/dbms/src/Storages/MergeTree/KeyCondition.cpp @@ -313,7 +313,7 @@ bool KeyCondition::addCondition(const String & column, const Range & range) return true; } -/** Computes value of constant expression and it data type. +/** Computes value of constant expression and its data type. * Returns false, if expression isn't constant. */ static bool getConstant(const ASTPtr & expr, Block & block_with_constants, Field & out_value, DataTypePtr & out_type) diff --git a/dbms/src/Storages/MergeTree/KeyCondition.h b/dbms/src/Storages/MergeTree/KeyCondition.h index d025f70bf09..1d700ad80d9 100644 --- a/dbms/src/Storages/MergeTree/KeyCondition.h +++ b/dbms/src/Storages/MergeTree/KeyCondition.h @@ -253,7 +253,7 @@ public: /// Get the maximum number of the key element used in the condition. size_t getMaxKeyColumn() const; - /// Impose an additional condition: the value in the column column must be in the `range` range. + /// Impose an additional condition: the value in the column `column` must be in the range `range`. /// Returns whether there is such a column in the key. bool addCondition(const String & column, const Range & range); diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 9feeeb5bcf9..0429db5cef6 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -1,38 +1,41 @@ +#include + #include #include #include #include +#include -#include -#include #include +#include #include #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include #include +#include #include +#include +#include +#include +#include +#include +#include +#include -#include +#include +#include +#include #include #include +#include #include -#include +#include #include -#include -#include #include #include @@ -58,6 +61,7 @@ namespace ErrorCodes extern const int INFINITE_LOOP; extern const int TYPE_MISMATCH; extern const int NO_SUCH_COLUMN_IN_TABLE; + extern const int TOO_MANY_ROWS; } @@ -133,6 +137,29 @@ void initializeFileNamesIncrement(const std::string & path, SimpleIncrement & in increment.set(getMaximumFileNumber(path)); } +/// the same as DistributedBlockOutputStream::createSelector, should it be static? +IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result) +{ + const auto & slot_to_shard = cluster->getSlotToShard(); + +#define CREATE_FOR_TYPE(TYPE) \ + if (typeid_cast(result.type.get())) \ + return createBlockSelector(*result.column, slot_to_shard); + + CREATE_FOR_TYPE(UInt8) + CREATE_FOR_TYPE(UInt16) + CREATE_FOR_TYPE(UInt32) + CREATE_FOR_TYPE(UInt64) + CREATE_FOR_TYPE(Int8) + CREATE_FOR_TYPE(Int16) + CREATE_FOR_TYPE(Int32) + CREATE_FOR_TYPE(Int64) + +#undef CREATE_FOR_TYPE + + throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH}; +} + } @@ -267,6 +294,14 @@ BlockInputStreams StorageDistributed::read( : ClusterProxy::SelectStreamFactory( header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables()); + if (settings.distributed_optimize_skip_select_on_unused_shards) + { + auto smaller_cluster = skipUnusedShards(cluster, query_info); + + if (smaller_cluster) + cluster = smaller_cluster; + } + return ClusterProxy::executeQuery( select_stream_factory, cluster, modified_query_ast, context, settings); } @@ -425,6 +460,41 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() directory_monitor->shutdownAndDropAllData(); } +/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible +/// using constraints from "WHERE" condition, otherwise returns `nullptr` +ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info) +{ + const auto & select = typeid_cast(*query_info.query); + + if (!select.where_expression) + { + return nullptr; + } + + const auto & blocks = evaluateExpressionOverConstantCondition(select.where_expression, sharding_key_expr); + + // Can't get definite answer if we can skip any shards + if (!blocks) + { + return nullptr; + } + + std::set shards; + + for (const auto & block : *blocks) + { + if (!block.has(sharding_key_column_name)) + throw Exception("sharding_key_expr should evaluate as a single row", ErrorCodes::TOO_MANY_ROWS); + + const auto result = block.getByName(sharding_key_column_name); + const auto selector = createSelector(cluster, result); + + shards.insert(selector.begin(), selector.end()); + } + + return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()}); +} + void registerStorageDistributed(StorageFactory & factory) { diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index 1ae53f5637c..e14d9f7081f 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -166,6 +166,8 @@ protected: const ASTPtr & sharding_key_, const String & data_path_, bool attach); + + ClusterPtr skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info); }; } diff --git a/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards.reference b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards.reference new file mode 100644 index 00000000000..add8c239ade --- /dev/null +++ b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards.reference @@ -0,0 +1,16 @@ +OK +OK +1 +OK +0 +4 +2 +1 +1 +1 +4 +OK +OK +OK +OK +OK diff --git a/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards.sh b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards.sh new file mode 100755 index 00000000000..92af8677058 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00754_distributed_optimize_skip_select_on_unused_shards.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.mergetree;" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.distributed;" + +${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.mergetree (a Int64, b Int64, c Int64) ENGINE = MergeTree ORDER BY (a, b);" +${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.distributed AS test.mergetree ENGINE = Distributed(test_unavailable_shard, test, mergetree, jumpConsistentHash(a+b, 2));" + +${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (0, 0, 0);" +${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (1, 0, 0);" +${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (0, 1, 1);" +${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (1, 1, 1);" + +# Should fail because second shard is unavailable +${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM test.distributed;" 2>&1 \ +| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +# Should fail without setting `distributed_optimize_skip_select_on_unused_shards` +${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0;" 2>&1 \ +| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +# Should pass now +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0; +" + +# Should still fail because of matching unavailable shard +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 2 AND b = 2; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +# Try more complext expressions for constant folding - all should pass. + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 1 AND a = 0 AND b = 0; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a IN (0, 1) AND b IN (0, 1); +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 OR a = 1 AND b = 1; +" + +# TODO: should pass one day. +#${CLICKHOUSE_CLIENT} -n --query=" +# SET distributed_optimize_skip_select_on_unused_shards = 1; +# SELECT count(*) FROM test.distributed WHERE a = 0 AND b >= 0 AND b <= 1; +#" + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 AND c = 0; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 AND c != 10; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 AND (a+b)*b != 12; +" + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE (a = 0 OR a = 1) AND (b = 0 OR b = 1); +" + +# These ones should fail. + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b <= 1; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND c = 0; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 OR a = 1 AND b = 0; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 OR a = 2 AND b = 2; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' + +${CLICKHOUSE_CLIENT} -n --query=" + SET distributed_optimize_skip_select_on_unused_shards = 1; + SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 OR c = 0; +" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL' diff --git a/dbms/tests/server-test.xml b/dbms/tests/server-test.xml index 82b76f62fa4..c20d34cce3f 100644 --- a/dbms/tests/server-test.xml +++ b/dbms/tests/server-test.xml @@ -53,6 +53,20 @@ Europe/Moscow + + + + localhost + 59000 + + + + + localhost + 1 + + +