This commit is contained in:
Igor Nikonov 2023-11-30 21:39:48 +00:00
parent a1219d0efd
commit 45ebe78d32
4 changed files with 20 additions and 17 deletions

View File

@ -605,13 +605,13 @@ void Cluster::addShard(const Settings & settings, Addresses addresses, bool trea
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back({
.insert_path_for_internal_replication = std::move(insert_paths),
.shard_num = current_shard_num,
.weight = weight,
.local_addresses = std::move(shard_local_addresses),
.pool = std::move(shard_pool),
.per_replica_pools = std::move(all_replicas_pools),
.has_internal_replication = internal_replication,
std::move(insert_paths),
current_shard_num,
weight,
std::move(shard_local_addresses),
std::move(shard_pool),
std::move(all_replicas_pools),
internal_replication
});
}

View File

@ -139,7 +139,7 @@ void SelectStreamFactory::createForShard(
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
.shard_filter_generator = shard_filter_generator,
.shard_filter_generator = std::move(shard_filter_generator),
});
};

View File

@ -258,7 +258,7 @@ void executeQuery(
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled,
shard_filter_generator);
std::move(shard_filter_generator));
}
if (!remote_shards.empty())

View File

@ -881,16 +881,19 @@ void StorageDistributed::read(
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context))
{
chassert(query_info.getCluster()->getShardsInfo().size() == 1);
additional_shard_filter_generator
= [my_custom_key_ast = std::move(custom_key_ast),
column_description = this->getInMemoryMetadataPtr()->columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
context = local_context,
replica_count = query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
additional_shard_filter_generator =
[&,
my_custom_key_ast = std::move(custom_key_ast),
replica_count = query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context);
replica_count,
replica_num - 1,
my_custom_key_ast,
settings.parallel_replicas_custom_key_filter_type,
this->getInMemoryMetadataPtr()->columns,
local_context);
};
}
}