This commit is contained in:
Antonio Andelic 2023-01-19 09:20:40 +00:00
parent 1c0a3e38c0
commit 7a75144ce3
4 changed files with 92 additions and 143 deletions

View File

@ -653,9 +653,9 @@ void Cluster::initMisc()
}
}
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings) const
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
{
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings)};
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
}
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
@ -668,7 +668,7 @@ std::unique_ptr<Cluster> Cluster::getClusterWithMultipleShards(const std::vector
return std::unique_ptr<Cluster>{ new Cluster(SubclusterTag{}, *this, indices) };
}
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings)
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
{
if (from.addresses_with_failover.empty())
throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR);
@ -678,6 +678,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
for (size_t shard_index : collections::range(0, from.shards_info.size()))
{
const auto & replicas = from.addresses_with_failover[shard_index];
size_t replicas_used = 0;
for (const auto & address : replicas)
{
if (!unique_hosts.emplace(address.host_name, address.port).second)
@ -685,6 +686,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
ShardInfo info;
info.shard_num = ++shard_num;
++replicas_used;
if (address.is_local)
info.local_addresses.push_back(address);
@ -711,6 +713,9 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
addresses_with_failover.emplace_back(Addresses{address});
shards_info.emplace_back(std::move(info));
if (max_replicas_from_shard && replicas_used == max_replicas_from_shard)
break;
}
}

View File

@ -250,7 +250,7 @@ public:
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings) const;
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
@ -271,7 +271,7 @@ private:
/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings);
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
/// Inter-server secret
String secret;

View File

@ -191,18 +191,7 @@ void executeQuery(
auto where_expression = select_query.where();
if (where_expression)
{
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(where_expression);
args->children.push_back(shard_filter);
auto and_function = std::make_shared<ASTFunction>();
and_function->name = "and";
and_function->arguments = args;
and_function->children.push_back(and_function->arguments);
shard_filter = std::move(and_function);
}
shard_filter = makeASTFunction("and", where_expression, shard_filter);
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}

View File

@ -445,10 +445,6 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ClusterPtr cluster = getCluster();
// if it's custom_key we will turn replicas into shards and filter specific data on each of them
if (settings.max_parallel_replicas > 1 && cluster->getShardCount() == 1 && settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY)
cluster = cluster->getClusterWithReplicasAsShards(settings);
query_info.cluster = cluster;
size_t nodes = getClusterQueriedNodes(settings, cluster);
@ -758,15 +754,19 @@ void StorageDistributed::read(
bool parallel_replicas = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas
&& !settings.use_hedged_requests && settings.parallel_replicas_mode == ParallelReplicasMode::READ_TASKS;
auto shard_count = query_info.getCluster()->getShardCount();
ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator;
if (settings.max_parallel_replicas > 1
&& settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY)
&& settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY
&& getCluster()->getShardCount() == 1)
{
LOG_INFO(log, "Single shard cluster used with custom_key, transforming replicas into shards");
query_info.cluster = getCluster()->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
query_info.optimized_cluster = nullptr; // it's a single shard cluster so nothing could've been optimized
const std::string_view custom_key = settings.parallel_replicas_custom_key.value;
assert(!custom_key.empty());
if (custom_key.empty())
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Parallel replicas mode set to 'custom_key' but 'parallel_replicas_custom_key' has no value");
ParserExpression parser;
auto custom_key_ast = parseQuery(
@ -777,135 +777,90 @@ void StorageDistributed::read(
settings.max_query_size,
settings.max_parser_depth);
additional_shard_filter_generator = [&](uint64_t shard_num) -> ASTPtr
additional_shard_filter_generator = [&, custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) mutable -> ASTPtr
{
ParserExpression parser;
auto custom_key_ast = parseQuery(
parser, settings.parallel_replicas_custom_key.value.data(), settings.parallel_replicas_custom_key.value.data() + settings.parallel_replicas_custom_key.value.size(),
"parallel replicas custom key", settings.max_query_size, settings.max_parser_depth);
ASTPtr shard_filter = nullptr ;
if (settings.parallel_replicas_custom_key_filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT)
{
// first we do modulo with replica count
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(custom_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(shard_count));
auto modulo_function = std::make_shared<ASTFunction>();
modulo_function->name = "positiveModulo";
modulo_function->arguments = args;
modulo_function->children.push_back(modulo_function->arguments);
auto modulo_function = makeASTFunction("positiveModulo", custom_key_ast, std::make_shared<ASTLiteral>(shard_count));
/// then we compare result to the current replica number (offset)
args = std::make_shared<ASTExpressionList>();
args->children.push_back(modulo_function);
args->children.push_back(std::make_shared<ASTLiteral>(shard_num - 1));
auto equals_function = std::make_shared<ASTFunction>();
equals_function->name = "equals";
equals_function->arguments = args;
equals_function->children.push_back(equals_function->arguments);
auto equals_function = makeASTFunction("equals", std::move(modulo_function), std::make_shared<ASTLiteral>(shard_num - 1));
return equals_function;
}
else
assert(settings.parallel_replicas_custom_key_filter_type == ParallelReplicasCustomKeyFilterType::RANGE);
KeyDescription custom_key_description = KeyDescription::getKeyFromAST(custom_key_ast, getInMemoryMetadataPtr()->columns, local_context);
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
RelativeSize size_of_universum = 0;
DataTypePtr custom_key_column_type = custom_key_description.data_types[0];
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
if (custom_key_description.data_types.size() == 1)
{
assert(settings.parallel_replicas_custom_key_filter_type == ParallelReplicasCustomKeyFilterType::RANGE);
KeyDescription custom_key_description = KeyDescription::getKeyFromAST(custom_key_ast, getInMemoryMetadataPtr()->columns, local_context);
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
RelativeSize size_of_universum = 0;
DataTypePtr custom_key_column_type = custom_key_description.data_types[0];
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
if (custom_key_description.data_types.size() == 1)
{
if (typeid_cast<const DataTypeUInt64 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt32 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt16 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt8 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
}
if (size_of_universum == RelativeSize(0))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid custom key column type: {}. Must be one unsigned integer type", custom_key_column_type->getName());
RelativeSize relative_range_size = RelativeSize(1) / query_info.getCluster()->getShardCount();
RelativeSize relative_range_offset = relative_range_size * RelativeSize(shard_num - 1);
/// Calculate the half-interval of `[lower, upper)` column values.
bool has_lower_limit = false;
bool has_upper_limit = false;
RelativeSize lower_limit_rational = relative_range_offset * size_of_universum;
RelativeSize upper_limit_rational = (relative_range_offset + relative_range_size) * size_of_universum;
UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);
if (lower > 0)
has_lower_limit = true;
if (upper_limit_rational < size_of_universum)
has_upper_limit = true;
assert(has_lower_limit || has_upper_limit);
/// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed.
std::shared_ptr<ASTFunction> lower_function;
std::shared_ptr<ASTFunction> upper_function;
if (has_lower_limit)
{
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(custom_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(lower));
lower_function = std::make_shared<ASTFunction>();
lower_function->name = "greaterOrEquals";
lower_function->arguments = args;
lower_function->children.push_back(lower_function->arguments);
if (!has_upper_limit)
return lower_function;
}
if (has_upper_limit)
{
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(custom_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(upper));
upper_function = std::make_shared<ASTFunction>();
upper_function->name = "less";
upper_function->arguments = args;
upper_function->children.push_back(upper_function->arguments);
if (!has_lower_limit)
return upper_function;
}
assert(has_lower_limit && has_upper_limit);
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(lower_function);
args->children.push_back(upper_function);
auto and_function = std::make_shared<ASTFunction>();
and_function->name = "and";
and_function->arguments = args;
and_function->children.push_back(and_function->arguments);
return and_function;
if (typeid_cast<const DataTypeUInt64 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt32 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt16 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt8 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
}
if (size_of_universum == RelativeSize(0))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid custom key column type: {}. Must be one unsigned integer type", custom_key_column_type->getName());
RelativeSize relative_range_size = RelativeSize(1) / shard_count;
RelativeSize relative_range_offset = relative_range_size * RelativeSize(shard_num - 1);
/// Calculate the half-interval of `[lower, upper)` column values.
bool has_lower_limit = false;
bool has_upper_limit = false;
RelativeSize lower_limit_rational = relative_range_offset * size_of_universum;
RelativeSize upper_limit_rational = (relative_range_offset + relative_range_size) * size_of_universum;
UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);
if (lower > 0)
has_lower_limit = true;
if (upper_limit_rational < size_of_universum)
has_upper_limit = true;
assert(has_lower_limit || has_upper_limit);
/// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed.
std::shared_ptr<ASTFunction> lower_function;
std::shared_ptr<ASTFunction> upper_function;
if (has_lower_limit)
{
lower_function = makeASTFunction("greaterOrEquals", custom_key_ast, std::make_shared<ASTLiteral>(lower));
if (!has_upper_limit)
return lower_function;
}
if (has_upper_limit)
{
upper_function = makeASTFunction("less", custom_key_ast, std::make_shared<ASTLiteral>(upper));
if (!has_lower_limit)
return upper_function;
}
assert(upper_function && lower_function);
return makeASTFunction("and", std::move(lower_function), std::move(upper_function));
};
}