This commit is contained in:
Igor Nikonov 2023-08-16 23:31:20 +00:00
parent 21ef1f1d1c
commit 2e4d346e44
2 changed files with 3 additions and 9 deletions

View File

@ -296,7 +296,7 @@ void executeQueryWithParallelReplicas(
else
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings);
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), not_optimized_cluster->getShardCount());
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
/// This is a little bit weird, but we construct an "empty" coordinator without

View File

@ -243,16 +243,10 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
{
const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas;
if (cluster_for_parallel_replicas != cluster_name)
{
LOG_INFO(log, "cluster_for_parallel_replicas was set for the query but has no effect: {}. Distributed table cluster is used: {}", cluster_for_parallel_replicas, cluster_name);
}
LOG_INFO(log, "cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is used: {}",
cluster_for_parallel_replicas, cluster_name);
}
context->setSetting("cluster_for_parallel_replicas", cluster_name);
/// the cluster is defined by Distributed table and passed to shards via `_cluster_for_parallel_replicas` scalar value
// scalars["_cluster_for_parallel_replicas"] =
// Block{{DataTypeString().createColumnConst(1, cluster_name), std::make_shared<DataTypeString>(), "_cluster_for_parallel_replicas"}};
}
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;