Add settings to allow parallel replicas custom key

with range filter to use a custom range
This commit is contained in:
Joshua Hildred 2024-05-27 10:16:28 -07:00
parent bb469e0d45
commit a7230e3c6c
9 changed files with 129 additions and 19 deletions

View File

@ -1590,6 +1590,23 @@ Possible values:
Default value: `default`.
## parallel_replicas_custom_key_range_lower {#parallel_replicas_custom_key_range_lower}
Allows the filter type `range` to split the work evenly between replicas based the custom range `[parallel_replicas_custom_key_range_lower, INT_MAX]`.
When used in conjuction with [parallel_replicas_custom_key_range_upper](#parallel_replicas_custom_key_range_upper), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`.
Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` when parallelizing the query.
## parallel_replicas_custom_key_range_upper {#parallel_replicas_custom_key_range_upper}
Allows the filter type `range` to split the work evenly between replicas based the custom range `[0, parallel_replicas_custom_key_range_upper]`.
When used in conjuction with [parallel_replicas_custom_key_range_lower](#parallel_replicas_custom_key_range_lower), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`.
Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` when parallelizing the query.
## allow_experimental_parallel_reading_from_replicas
Enables or disables sending SELECT queries to all replicas of a table (up to `max_parallel_replicas`). Reading is parallelized and coordinated dynamically. It will work for any kind of MergeTree table.

View File

@ -202,6 +202,8 @@ class IColumn;
M(UInt64, parallel_replica_offset, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the index of the replica participating in query processing among parallel replicas.", 0) \
M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
M(UInt64, parallel_replicas_custom_key_range_lower, 0, "Lower bound for the universe that the parallel replicas custom range filter is calculated over", 0) \
M(UInt64, parallel_replicas_custom_key_range_upper, std::numeric_limits<UInt64>::max(), "Upper bound for the universe that the parallel replicas custom range filter is calculated over", 0) \
\
M(String, cluster_for_parallel_replicas, "", "Cluster for a shard in which current server is located", 0) \
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \

View File

@ -578,7 +578,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
settings.parallel_replicas_count,
settings.parallel_replica_offset,
std::move(custom_key_ast),
settings.parallel_replicas_custom_key_filter_type,
{settings.parallel_replicas_custom_key_filter_type,
settings.parallel_replicas_custom_key_range_lower,
settings.parallel_replicas_custom_key_range_upper},
storage->getInMemoryMetadataPtr()->columns,
context);
}

View File

@ -7,7 +7,6 @@
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
#include <boost/rational.hpp>
@ -18,18 +17,19 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int INVALID_SETTING_VALUE;
}
ASTPtr getCustomKeyFilterForParallelReplica(
size_t replicas_count,
size_t replica_num,
ASTPtr custom_key_ast,
ParallelReplicasCustomKeyFilterType filter_type,
ParallelReplicasCustomKeyFilter filter,
const ColumnsDescription & columns,
const ContextPtr & context)
{
chassert(replicas_count > 1);
if (filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT)
if (filter.filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT)
{
// first we do modulo with replica count
auto modulo_function = makeASTFunction("positiveModulo", custom_key_ast, std::make_shared<ASTLiteral>(replicas_count));
@ -40,35 +40,48 @@ ASTPtr getCustomKeyFilterForParallelReplica(
return equals_function;
}
assert(filter_type == ParallelReplicasCustomKeyFilterType::RANGE);
assert(filter.filter_type == ParallelReplicasCustomKeyFilterType::RANGE);
KeyDescription custom_key_description
= KeyDescription::getKeyFromAST(custom_key_ast, columns, context);
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
RelativeSize size_of_universum = 0;
// get
RelativeSize range_upper = RelativeSize(filter.range_upper) + RelativeSize(1);
RelativeSize range_lower = RelativeSize(filter.range_lower);
DataTypePtr custom_key_column_type = custom_key_description.data_types[0];
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
if (custom_key_description.data_types.size() == 1)
{
if (typeid_cast<const DataTypeUInt64 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
range_upper = std::min(RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1), range_upper);
else if (typeid_cast<const DataTypeUInt32 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
range_upper = std::min(RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1), range_upper);
else if (typeid_cast<const DataTypeUInt16 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
range_upper = std::min(RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1), range_upper);
else if (typeid_cast<const DataTypeUInt8 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
range_upper = std::min(RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1), range_upper);
}
if (size_of_universum == RelativeSize(0))
if (range_upper == RelativeSize(0))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Invalid custom key column type: {}. Must be one unsigned integer type",
custom_key_column_type->getName());
if (range_lower < 0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Invalid custom key filter range: Range Min must be a postive");
if (range_lower > range_upper)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Invalid custom key filter range: Range max {} must be larger than range min {}",
range_lower,
range_upper);
RelativeSize size_of_universum = range_upper - range_lower;
RelativeSize relative_range_size = RelativeSize(1) / replicas_count;
RelativeSize relative_range_offset = relative_range_size * RelativeSize(replica_num);
@ -76,16 +89,16 @@ ASTPtr getCustomKeyFilterForParallelReplica(
bool has_lower_limit = false;
bool has_upper_limit = false;
RelativeSize lower_limit_rational = relative_range_offset * size_of_universum;
RelativeSize upper_limit_rational = (relative_range_offset + relative_range_size) * size_of_universum;
RelativeSize lower_limit_rational = range_lower + relative_range_offset * size_of_universum;
RelativeSize upper_limit_rational = range_lower + (relative_range_offset + relative_range_size) * size_of_universum;
UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);
if (lower > 0)
if (lower > range_lower)
has_lower_limit = true;
if (upper_limit_rational < size_of_universum)
if (upper < range_upper)
has_upper_limit = true;
assert(has_lower_limit || has_upper_limit);

View File

@ -6,16 +6,24 @@
#include <Storages/IStorage.h>
#include <Core/SettingsEnums.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
struct ParallelReplicasCustomKeyFilter
{
ParallelReplicasCustomKeyFilterType filter_type;
UInt64 range_lower;
UInt64 range_upper;
};
/// Get AST for filter created from custom_key
/// replica_num is the number of the replica for which we are generating filter starting from 0
ASTPtr getCustomKeyFilterForParallelReplica(
size_t replicas_count,
size_t replica_num,
ASTPtr custom_key_ast,
ParallelReplicasCustomKeyFilterType filter_type,
ParallelReplicasCustomKeyFilter filter,
const ColumnsDescription & columns,
const ContextPtr & context);

View File

@ -501,7 +501,9 @@ FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
settings.parallel_replicas_count,
settings.parallel_replica_offset,
std::move(custom_key_ast),
settings.parallel_replicas_custom_key_filter_type,
{settings.parallel_replicas_custom_key_filter_type,
settings.parallel_replicas_custom_key_range_lower,
settings.parallel_replicas_custom_key_range_upper},
storage->getInMemoryMetadataPtr()->columns,
query_context);

View File

@ -904,11 +904,13 @@ void StorageDistributed::read(
[my_custom_key_ast = std::move(custom_key_ast),
column_description = this->getInMemoryMetadataPtr()->columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
custom_key_range_lower = settings.parallel_replicas_custom_key_range_lower.value,
custom_key_range_upper = settings.parallel_replicas_custom_key_range_upper.value,
context = local_context,
replica_count = modified_query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context);
replica_count, replica_num - 1, my_custom_key_ast, {custom_key_type, custom_key_range_lower, custom_key_range_upper}, column_description, context);
};
}
}

View File

@ -0,0 +1,7 @@
10
10
10
10
10
10
10

View File

@ -0,0 +1,57 @@
DROP TABLE IF EXISTS range_filter_custom_range_test;
CREATE TABLE range_filter_custom_range_test (k Int64) ENGINE=MergeTree ORDER BY k;
INSERT INTO range_filter_custom_range_test SELECT number + 5 from numbers(10);
SELECT count() FROM
(SELECT *
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test)
SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0,
parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range',
parallel_replicas_custom_key_range_lower = 5, parallel_replicas_custom_key_range_upper=15);
SELECT count() FROM
(SELECT *
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test)
SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0,
parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range',
parallel_replicas_custom_key_range_lower = 4, parallel_replicas_custom_key_range_upper=14);
SELECT count() FROM
(SELECT *
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test)
SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0,
parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range',
parallel_replicas_custom_key_range_lower = 6, parallel_replicas_custom_key_range_upper=17);
SELECT count() FROM
(SELECT *
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test)
SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0,
parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range',
parallel_replicas_custom_key_range_lower = 0, parallel_replicas_custom_key_range_upper=15);
SELECT count() FROM
(SELECT *
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test)
SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0,
parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range',
parallel_replicas_custom_key_range_lower = 15, parallel_replicas_custom_key_range_upper=25);
SELECT count() FROM
(SELECT *
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test)
SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0,
parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range',
parallel_replicas_custom_key_range_lower = 0, parallel_replicas_custom_key_range_upper=5);
SELECT count() FROM
(SELECT *
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), range_filter_custom_range_test)
SETTINGS prefer_localhost_replica = 0, max_parallel_replicas = 3, distributed_group_by_no_merge=0,
parallel_replicas_custom_key = 'k', parallel_replicas_custom_key_filter_type = 'range',
parallel_replicas_custom_key_range_lower = 500, parallel_replicas_custom_key_range_upper=10000);
DROP TABLE range_filter_custom_range_test;