Propagate setting cluster_for_parallel_replicas to shards

This commit is contained in:
Igor Nikonov 2023-08-14 22:50:46 +00:00
parent 709287fbdc
commit c94994afcf
8 changed files with 61 additions and 17 deletions

View File

@ -273,6 +273,8 @@ public:
/// Are distributed DDL Queries (ON CLUSTER Clause) allowed for this cluster
bool areDistributedDDLQueriesAllowed() const { return allow_distributed_ddl_queries; }
String getName() const { return name; }
private:
SlotToShard slot_to_shard;

View File

@ -234,7 +234,8 @@ void executeQuery(
std::move(external_tables),
log,
shards,
query_info.storage_limits);
query_info.storage_limits,
query_info.getCluster()->getName());
read_from_remote->setStepDescription("Read from remote replica");
plan->addStep(std::move(read_from_remote));
@ -266,14 +267,16 @@ void executeQueryWithParallelReplicas(
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ClusterPtr & not_optimized_cluster)
{
const auto & settings = context->getSettingsRef();
auto new_context = Context::createCopy(context);
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
Int64 shard_num = 0; /// shard_num is 1-based, so 0 - no shard specified
UInt64 shard_num = 0; /// shard_num is 1-based, so 0 - no shard specified
auto it = scalars.find("_shard_num");
if (it != scalars.end())
{

View File

@ -1,6 +1,7 @@
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
@ -103,7 +104,8 @@ ReadFromRemote::ReadFromRemote(
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_)
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_)
: ISourceStep(DataStream{.header = std::move(header_)})
, shards(std::move(shards_))
, stage(stage_)
@ -116,6 +118,7 @@ ReadFromRemote::ReadFromRemote(
, storage_limits(std::move(storage_limits_))
, log(log_)
, shard_count(shard_count_)
, cluster_name(cluster_name_)
{
}
@ -234,6 +237,16 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::READ_TASKS)
{
String cluster_for_parallel_replicas = cluster_name;
LOG_DEBUG(&Poco::Logger::get(__FUNCTION__), "_cluster_for_parallel_replicas: {}", cluster_for_parallel_replicas);
scalars["_cluster_for_parallel_replicas"] = Block{
{DataTypeString().createColumnConst(1, cluster_for_parallel_replicas),
std::make_shared<DataTypeString>(),
"_cluster_for_parallel_replicas"}};
}
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
@ -242,6 +255,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
remote_query_executor->setLogger(log);
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::READ_TASKS)
{
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
// The coordinator will return query result from the shard.
@ -249,6 +263,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
// Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting)
// each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
}
else
remote_query_executor->setPoolMode(PoolMode::GET_MANY);

View File

@ -35,7 +35,8 @@ public:
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_);
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_);
String getName() const override { return "ReadFromRemote"; }
@ -55,8 +56,9 @@ private:
Tables external_tables;
std::shared_ptr<const StorageLimitsList> storage_limits;
Poco::Logger * log;
UInt32 shard_count;
String cluster_name;
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
};

View File

@ -220,7 +220,18 @@ void StorageMergeTree::read(
local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
auto cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
auto scalars = local_context->hasQueryContext() ? local_context->getQueryContext()->getScalars() : Scalars{};
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
{
auto it = scalars.find("_cluster_for_parallel_replicas");
if (it != scalars.end())
{
const Block & block = it->second;
cluster_for_parallel_replicas = block.getColumns()[0]->getDataAt(0).toString();
}
}
LOG_DEBUG(&Poco::Logger::get("StorageMergeTree::read"), "_cluster_for_parallel_replicas: {}", cluster_for_parallel_replicas);
auto cluster = local_context->getCluster(cluster_for_parallel_replicas);
Block header;

View File

@ -5153,7 +5153,18 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
{
auto table_id = getStorageID();
auto parallel_replicas_cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
auto scalars = local_context->hasQueryContext() ? local_context->getQueryContext()->getScalars() : Scalars{};
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
{
auto it = scalars.find("_cluster_for_parallel_replicas");
if (it != scalars.end())
{
const Block & block = it->second;
cluster_for_parallel_replicas = block.getColumns()[0]->getDataAt(0).toString();
}
}
LOG_DEBUG(&Poco::Logger::get(__FUNCTION__), "_cluster_for_parallel_replicas: {}", cluster_for_parallel_replicas);
auto parallel_replicas_cluster = local_context->getCluster(cluster_for_parallel_replicas);
ASTPtr modified_query_ast;
Block header;

View File

@ -105,10 +105,10 @@ def test_parallel_replicas_over_distributed(start_cluster, cluster):
expected_result = f"6001\t-1999\t1999\t0\n"
# w/o parallel replicas
assert (
node.query(f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d")
== expected_result
)
# assert (
# node.query(f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d")
# == expected_result
# )
# parallel replicas
assert (
@ -119,7 +119,7 @@ def test_parallel_replicas_over_distributed(start_cluster, cluster):
"prefer_localhost_replica": 0,
"max_parallel_replicas": 4,
"use_hedged_requests": 0,
"cluster_for_parallel_replicas": cluster,
# "cluster_for_parallel_replicas": cluster,
},
)
== expected_result

View File

@ -14,13 +14,13 @@ insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0; --, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0; --, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
-- 2 shards
@ -38,10 +38,10 @@ insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0, cluster_for_parallel_replicas = 'test_cluster_two_shard_three_replicas_localhost';
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0; --, cluster_for_parallel_replicas = 'test_cluster_two_shard_three_replicas_localhost';
insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0, cluster_for_parallel_replicas = 'test_cluster_two_shard_three_replicas_localhost';
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0; --, cluster_for_parallel_replicas = 'test_cluster_two_shard_three_replicas_localhost';