diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 884bc85aaaf..f8d718b481d 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -236,49 +236,29 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact scalars["_shard_num"] = Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; - ContextPtr execution_context = context; if (context->canUseParallelReplicas()) { - if (shard.shard_info.getAllNodeCount() > 1) + if (context->getSettingsRef().cluster_for_parallel_replicas.changed) { - if (context->getSettingsRef().cluster_for_parallel_replicas.changed) - { - const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas; - if (cluster_for_parallel_replicas != cluster_name) - LOG_INFO( - log, - "cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is " - "used: {}", - cluster_for_parallel_replicas, - cluster_name); - } - - LOG_TRACE(&Poco::Logger::get("ReadFromRemote"), "Setting `cluster_for_parallel_replicas` to {}", cluster_name); - context->setSetting("cluster_for_parallel_replicas", cluster_name); + const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas; + if (cluster_for_parallel_replicas != cluster_name) + LOG_INFO( + log, + "cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is " + "used: {}", + cluster_for_parallel_replicas, + cluster_name); } - else - { - ContextMutablePtr tmp = Context::createCopy(context); - tmp->setSetting("allow_experimental_parallel_reading_from_replicas", Field{0}); - execution_context = tmp; - LOG_TRACE( - &Poco::Logger::get("ReadFromRemote"), - "Parallel reading from replicas is disabled for shard. Not enough nodes: shard={} cluster={} nodes={}", - shard.shard_info.shard_num, - cluster_name, - shard.shard_info.getAllNodeCount()); - } + LOG_TRACE(log, "Setting `cluster_for_parallel_replicas` to {}", cluster_name); + context->setSetting("cluster_for_parallel_replicas", cluster_name); } - std::shared_ptr remote_query_executor; - - remote_query_executor = std::make_shared( - shard.shard_info.pool, query_string, output_stream->header, execution_context, throttler, scalars, external_tables, stage); - + auto remote_query_executor = std::make_shared( + shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage); remote_query_executor->setLogger(log); - if (execution_context->canUseParallelReplicas()) + if (context->canUseParallelReplicas()) { // when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard: // establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard. @@ -294,7 +274,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); - pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); + pipes.emplace_back( + createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); addConvertingActions(pipes.back(), output_stream->header); }