diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index ffaf53085c4..0dbd349525d 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1590,6 +1590,23 @@ Possible values: Default value: `default`. +## parallel_replicas_custom_key_range_lower {#parallel_replicas_custom_key_range_lower} + +Allows the filter type `range` to split the work evenly between replicas based the custom range `[parallel_replicas_custom_key_range_lower, INT_MAX]`. + +When used in conjuction with [parallel_replicas_custom_key_range_upper](#parallel_replicas_custom_key_range_upper), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`. + +Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` when parallelizing the query. + +## parallel_replicas_custom_key_range_upper {#parallel_replicas_custom_key_range_upper} + + +Allows the filter type `range` to split the work evenly between replicas based the custom range `[0, parallel_replicas_custom_key_range_upper]`. + +When used in conjuction with [parallel_replicas_custom_key_range_lower](#parallel_replicas_custom_key_range_lower), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`. + +Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` when parallelizing the query. + ## allow_experimental_parallel_reading_from_replicas Enables or disables sending SELECT queries to all replicas of a table (up to `max_parallel_replicas`). Reading is parallelized and coordinated dynamically. It will work for any kind of MergeTree table. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 27ce54c03a7..d6e4f0ae92b 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -202,6 +202,8 @@ class IColumn; M(UInt64, parallel_replica_offset, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the index of the replica participating in query processing among parallel replicas.", 0) \ M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \ M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \ + M(UInt64, parallel_replicas_custom_key_range_lower, 0, "Lower bound for the universe that the parallel replicas custom range filter is calculated over", 0) \ + M(UInt64, parallel_replicas_custom_key_range_upper, std::numeric_limits::max(), "Upper bound for the universe that the parallel replicas custom range filter is calculated over", 0) \ \ M(String, cluster_for_parallel_replicas, "", "Cluster for a shard in which current server is located", 0) \ M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \ diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 09f987a1c24..a418a4c9729 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -578,7 +578,9 @@ InterpreterSelectQuery::InterpreterSelectQuery( settings.parallel_replicas_count, settings.parallel_replica_offset, std::move(custom_key_ast), - settings.parallel_replicas_custom_key_filter_type, + {settings.parallel_replicas_custom_key_filter_type, + settings.parallel_replicas_custom_key_range_lower, + settings.parallel_replicas_custom_key_range_upper}, storage->getInMemoryMetadataPtr()->columns, context); } diff --git a/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp b/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp index d78b6ab0c4d..31669efb698 100644 --- a/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp +++ b/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp @@ -7,7 +7,6 @@ #include -#include #include @@ -18,18 +17,19 @@ namespace DB namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; + extern const int INVALID_SETTING_VALUE; } ASTPtr getCustomKeyFilterForParallelReplica( size_t replicas_count, size_t replica_num, ASTPtr custom_key_ast, - ParallelReplicasCustomKeyFilterType filter_type, + ParallelReplicasCustomKeyFilter filter, const ColumnsDescription & columns, const ContextPtr & context) { chassert(replicas_count > 1); - if (filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT) + if (filter.filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT) { // first we do modulo with replica count auto modulo_function = makeASTFunction("positiveModulo", custom_key_ast, std::make_shared(replicas_count)); @@ -40,35 +40,48 @@ ASTPtr getCustomKeyFilterForParallelReplica( return equals_function; } - assert(filter_type == ParallelReplicasCustomKeyFilterType::RANGE); + assert(filter.filter_type == ParallelReplicasCustomKeyFilterType::RANGE); KeyDescription custom_key_description = KeyDescription::getKeyFromAST(custom_key_ast, columns, context); using RelativeSize = boost::rational; - RelativeSize size_of_universum = 0; + // get + RelativeSize range_upper = RelativeSize(filter.range_upper) + RelativeSize(1); + RelativeSize range_lower = RelativeSize(filter.range_lower); DataTypePtr custom_key_column_type = custom_key_description.data_types[0]; - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); if (custom_key_description.data_types.size() == 1) { if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + range_upper = std::min(RelativeSize(std::numeric_limits::max()) + RelativeSize(1), range_upper); else if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + range_upper = std::min(RelativeSize(std::numeric_limits::max()) + RelativeSize(1), range_upper); else if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + range_upper = std::min(RelativeSize(std::numeric_limits::max()) + RelativeSize(1), range_upper); else if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + range_upper = std::min(RelativeSize(std::numeric_limits::max()) + RelativeSize(1), range_upper); } - if (size_of_universum == RelativeSize(0)) + if (range_upper == RelativeSize(0)) throw Exception( ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Invalid custom key column type: {}. Must be one unsigned integer type", custom_key_column_type->getName()); + if (range_lower < 0) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Invalid custom key filter range: Range Min must be a postive"); + + if (range_lower > range_upper) + throw Exception( + ErrorCodes::INVALID_SETTING_VALUE, + "Invalid custom key filter range: Range max {} must be larger than range min {}", + range_lower, + range_upper); + + RelativeSize size_of_universum = range_upper - range_lower; + RelativeSize relative_range_size = RelativeSize(1) / replicas_count; RelativeSize relative_range_offset = relative_range_size * RelativeSize(replica_num); @@ -76,16 +89,16 @@ ASTPtr getCustomKeyFilterForParallelReplica( bool has_lower_limit = false; bool has_upper_limit = false; - RelativeSize lower_limit_rational = relative_range_offset * size_of_universum; - RelativeSize upper_limit_rational = (relative_range_offset + relative_range_size) * size_of_universum; + RelativeSize lower_limit_rational = range_lower + relative_range_offset * size_of_universum; + RelativeSize upper_limit_rational = range_lower + (relative_range_offset + relative_range_size) * size_of_universum; UInt64 lower = boost::rational_cast(lower_limit_rational); UInt64 upper = boost::rational_cast(upper_limit_rational); - if (lower > 0) + if (lower > range_lower) has_lower_limit = true; - if (upper_limit_rational < size_of_universum) + if (upper < range_upper) has_upper_limit = true; assert(has_lower_limit || has_upper_limit); diff --git a/src/Interpreters/getCustomKeyFilterForParallelReplicas.h b/src/Interpreters/getCustomKeyFilterForParallelReplicas.h index 1506c1992c0..36198be8e51 100644 --- a/src/Interpreters/getCustomKeyFilterForParallelReplicas.h +++ b/src/Interpreters/getCustomKeyFilterForParallelReplicas.h @@ -6,16 +6,24 @@ #include #include #include +#include namespace DB { +struct ParallelReplicasCustomKeyFilter +{ + ParallelReplicasCustomKeyFilterType filter_type; + UInt64 range_lower; + UInt64 range_upper; +}; + /// Get AST for filter created from custom_key /// replica_num is the number of the replica for which we are generating filter starting from 0 ASTPtr getCustomKeyFilterForParallelReplica( size_t replicas_count, size_t replica_num, ASTPtr custom_key_ast, - ParallelReplicasCustomKeyFilterType filter_type, + ParallelReplicasCustomKeyFilter filter, const ColumnsDescription & columns, const ContextPtr & context); diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 83b6f4f2c26..b092f3d0c26 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -501,7 +501,9 @@ FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage, settings.parallel_replicas_count, settings.parallel_replica_offset, std::move(custom_key_ast), - settings.parallel_replicas_custom_key_filter_type, + {settings.parallel_replicas_custom_key_filter_type, + settings.parallel_replicas_custom_key_range_lower, + settings.parallel_replicas_custom_key_range_upper}, storage->getInMemoryMetadataPtr()->columns, query_context); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 9c58468c4a4..5048ef4788e 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -904,11 +904,13 @@ void StorageDistributed::read( [my_custom_key_ast = std::move(custom_key_ast), column_description = this->getInMemoryMetadataPtr()->columns, custom_key_type = settings.parallel_replicas_custom_key_filter_type.value, + custom_key_range_lower = settings.parallel_replicas_custom_key_range_lower.value, + custom_key_range_upper = settings.parallel_replicas_custom_key_range_upper.value, context = local_context, replica_count = modified_query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr { return getCustomKeyFilterForParallelReplica( - replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context); + replica_count, replica_num - 1, my_custom_key_ast, {custom_key_type, custom_key_range_lower, custom_key_range_upper}, column_description, context); }; } } diff --git a/tests/queries/0_stateless/03164_parallel_replicas_range_filter_min_max.reference b/tests/queries/0_stateless/03164_parallel_replicas_range_filter_min_max.reference new file mode 100644 index 00000000000..5ba3f6bc471 --- /dev/null +++ b/tests/queries/0_stateless/03164_parallel_replicas_range_filter_min_max.reference @@ -0,0 +1,7 @@ +10 +10 +10 +10 +10 +10 +10 diff --git a/tests/queries/0_stateless/03164_parallel_replicas_range_filter_min_max.sql b/tests/queries/0_stateless/03164_parallel_replicas_range_filter_min_max.sql new file mode 100644 index 00000000000..c9588d931a8 --- /dev/null +++ b/tests/queries/0_stateless/03164_parallel_replicas_range_filter_min_max.sql @@ -0,0 +1,57 @@ +DROP TABLE IF EXISTS range_filter_custom_range_test; + +CREATE TABLE range_filter_custom_range_test (k Int64) ENGINE=MergeTree ORDER BY k; + +INSERT INTO range_filter_custom_range_test SELECT number + 5 from numbers(10); + +SELECT count() FROM +(SELECT * +FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test) +SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0, +parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range', +parallel_replicas_custom_key_range_lower = 5, parallel_replicas_custom_key_range_upper=15); + +SELECT count() FROM +(SELECT * +FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test) +SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0, +parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range', +parallel_replicas_custom_key_range_lower = 4, parallel_replicas_custom_key_range_upper=14); + +SELECT count() FROM +(SELECT * +FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test) +SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0, +parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range', +parallel_replicas_custom_key_range_lower = 6, parallel_replicas_custom_key_range_upper=17); + + +SELECT count() FROM +(SELECT * +FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test) +SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0, +parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range', +parallel_replicas_custom_key_range_lower = 0, parallel_replicas_custom_key_range_upper=15); + +SELECT count() FROM +(SELECT * +FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test) +SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0, +parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range', +parallel_replicas_custom_key_range_lower = 15, parallel_replicas_custom_key_range_upper=25); + +SELECT count() FROM +(SELECT * +FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test) +SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0, +parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range', +parallel_replicas_custom_key_range_lower = 0, parallel_replicas_custom_key_range_upper=5); + +SELECT count() FROM +(SELECT * +FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test) +SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0, +parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range', +parallel_replicas_custom_key_range_lower = 500, parallel_replicas_custom_key_range_upper=10000); + +DROP TABLE range_filter_custom_range_test; \ No newline at end of file