Address review comments

This commit is contained in:
Joshua Hildred 2024-06-03 10:12:24 -07:00
parent 677b2de5ed
commit bde8d19f45
2 changed files with 39 additions and 9 deletions

View File

@ -203,7 +203,7 @@ class IColumn;
M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \ M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \ M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
M(UInt64, parallel_replicas_custom_key_range_lower, 0, "Lower bound for the universe that the parallel replicas custom range filter is calculated over", 0) \ M(UInt64, parallel_replicas_custom_key_range_lower, 0, "Lower bound for the universe that the parallel replicas custom range filter is calculated over", 0) \
M(UInt64, parallel_replicas_custom_key_range_upper, std::numeric_limits<UInt32>::max(), "Upper bound for the universe that the parallel replicas custom range filter is calculated over", 0) \ M(UInt64, parallel_replicas_custom_key_range_upper, 0, "Upper bound for the universe that the parallel replicas custom range filter is calculated over", 0) \
\ \
M(String, cluster_for_parallel_replicas, "", "Cluster for a shard in which current server is located", 0) \ M(String, cluster_for_parallel_replicas, "", "Cluster for a shard in which current server is located", 0) \
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \ M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \

View File

@ -47,20 +47,53 @@ ASTPtr getCustomKeyFilterForParallelReplica(
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>; using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
RelativeSize range_upper = RelativeSize(filter.range_upper) + RelativeSize(1); RelativeSize range_upper = filter.range_upper > 0 ? RelativeSize(filter.range_upper) + RelativeSize(1)
: RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
RelativeSize range_lower = RelativeSize(filter.range_lower); RelativeSize range_lower = RelativeSize(filter.range_lower);
DataTypePtr custom_key_column_type = custom_key_description.data_types[0]; DataTypePtr custom_key_column_type = custom_key_description.data_types[0];
if (custom_key_description.data_types.size() == 1) if (custom_key_description.data_types.size() == 1)
{ {
if (typeid_cast<const DataTypeUInt64 *>(custom_key_column_type.get())) if (typeid_cast<const DataTypeUInt64 *>(custom_key_column_type.get()))
range_upper = std::min(RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1), range_upper); {
range_upper = filter.range_upper > 0 ? RelativeSize(filter.range_upper) + RelativeSize(1)
: RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
if (range_upper > RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Invalid custom key range upper bound: {}. Value must be smaller than custom key column type (UInt64) max value",
range_upper);
}
else if (typeid_cast<const DataTypeUInt32 *>(custom_key_column_type.get())) else if (typeid_cast<const DataTypeUInt32 *>(custom_key_column_type.get()))
range_upper = std::min(RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1), range_upper); {
range_upper = filter.range_upper > 0 ? RelativeSize(filter.range_upper) + RelativeSize(1)
: RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
if (range_upper > RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Invalid custom key range upper bound: {}. Value must be smaller than custom key column type (UInt32) max value",
range_upper);
}
else if (typeid_cast<const DataTypeUInt16 *>(custom_key_column_type.get())) else if (typeid_cast<const DataTypeUInt16 *>(custom_key_column_type.get()))
range_upper = std::min(RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1), range_upper); {
range_upper = filter.range_upper > 0 ? RelativeSize(filter.range_upper) + RelativeSize(1)
: RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
if (range_upper > RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Invalid custom key range upper bound: {}. Value must be smaller than custom key column type (UInt16) max value",
range_upper);
}
else if (typeid_cast<const DataTypeUInt8 *>(custom_key_column_type.get())) else if (typeid_cast<const DataTypeUInt8 *>(custom_key_column_type.get()))
range_upper = std::min(RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1), range_upper); {
range_upper = filter.range_upper > 0 ? RelativeSize(filter.range_upper) + RelativeSize(1)
: RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
if (range_upper > RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Invalid custom key range upper bound: {}. Value must be smaller than custom key column type (UInt8) max value",
range_upper);
}
} }
if (range_upper == RelativeSize(0)) if (range_upper == RelativeSize(0))
@ -69,9 +102,6 @@ ASTPtr getCustomKeyFilterForParallelReplica(
"Invalid custom key column type: {}. Must be one unsigned integer type", "Invalid custom key column type: {}. Must be one unsigned integer type",
custom_key_column_type->getName()); custom_key_column_type->getName());
if (range_lower < 0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Invalid custom key filter range: Range lower bound must be a positive");
if (range_lower >= range_upper) if (range_lower >= range_upper)
throw Exception( throw Exception(
ErrorCodes::INVALID_SETTING_VALUE, ErrorCodes::INVALID_SETTING_VALUE,