Disabling parallel replicas per shard will be done separately

This commit is contained in:
Igor Nikonov 2023-09-18 15:27:55 +00:00
parent 13bc294174
commit e1019ba3c4

View File

@ -236,49 +236,29 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
ContextPtr execution_context = context;
if (context->canUseParallelReplicas())
{
if (shard.shard_info.getAllNodeCount() > 1)
if (context->getSettingsRef().cluster_for_parallel_replicas.changed)
{
if (context->getSettingsRef().cluster_for_parallel_replicas.changed)
{
const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas;
if (cluster_for_parallel_replicas != cluster_name)
LOG_INFO(
log,
"cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is "
"used: {}",
cluster_for_parallel_replicas,
cluster_name);
}
LOG_TRACE(&Poco::Logger::get("ReadFromRemote"), "Setting `cluster_for_parallel_replicas` to {}", cluster_name);
context->setSetting("cluster_for_parallel_replicas", cluster_name);
const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas;
if (cluster_for_parallel_replicas != cluster_name)
LOG_INFO(
log,
"cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is "
"used: {}",
cluster_for_parallel_replicas,
cluster_name);
}
else
{
ContextMutablePtr tmp = Context::createCopy(context);
tmp->setSetting("allow_experimental_parallel_reading_from_replicas", Field{0});
execution_context = tmp;
LOG_TRACE(
&Poco::Logger::get("ReadFromRemote"),
"Parallel reading from replicas is disabled for shard. Not enough nodes: shard={} cluster={} nodes={}",
shard.shard_info.shard_num,
cluster_name,
shard.shard_info.getAllNodeCount());
}
LOG_TRACE(log, "Setting `cluster_for_parallel_replicas` to {}", cluster_name);
context->setSetting("cluster_for_parallel_replicas", cluster_name);
}
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, execution_context, throttler, scalars, external_tables, stage);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
if (execution_context->canUseParallelReplicas())
if (context->canUseParallelReplicas())
{
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
@ -294,7 +274,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
}