create coordinator after the number of replicas to use for the query is determined

This commit is contained in:
zoomxi 2024-07-24 09:57:00 +08:00
parent dee5790b22
commit d460fa9f36
6 changed files with 8 additions and 32 deletions

View File

@ -517,14 +517,11 @@ void executeQueryWithParallelReplicas(
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
}
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(
new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,
storage_id,
std::move(coordinator),
header,
processed_stage,
new_context,

View File

@ -362,7 +362,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ASTPtr query_ast_,
ClusterPtr cluster_,
const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
ContextMutablePtr context_,
@ -375,7 +374,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, cluster(cluster_)
, query_ast(query_ast_)
, storage_id(storage_id_)
, coordinator(std::move(coordinator_))
, stage(std::move(stage_))
, context(context_)
, throttler(throttler_)
@ -429,7 +427,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
{
shuffled_pool = shard.pool->getShuffledPools(current_settings);
shuffled_pool.resize(max_replicas_to_use);
coordinator->adjustParticipatingReplicasCount(max_replicas_to_use);
}
else
{
@ -439,6 +436,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
coordinator
= std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use, current_settings.parallel_replicas_mark_segment_size);
for (size_t i=0; i < max_replicas_to_use; ++i)
{
IConnections::ReplicaInfo replica_info

View File

@ -70,7 +70,6 @@ public:
ASTPtr query_ast_,
ClusterPtr cluster_,
const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
ContextMutablePtr context_,

View File

@ -1031,7 +1031,7 @@ void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica
if (!pimpl)
{
unavailable_nodes_registered_before_initialization.push_back(replica_number);
if (unavailable_nodes_registered_before_initialization.size() == participating_replicas_count)
if (unavailable_nodes_registered_before_initialization.size() == replicas_count)
throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Can't connect to any replica chosen for query execution");
}
else
@ -1061,7 +1061,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode)
}
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_)
: replicas_count(replicas_count_), participating_replicas_count(replicas_count_), mark_segment_size(mark_segment_size_)
: replicas_count(replicas_count_), mark_segment_size(mark_segment_size_)
{
}

View File

@ -30,15 +30,11 @@ public:
/// needed to report total rows to read
void setProgressCallback(ProgressCallback callback);
/// Participating replicas count may be less than replicas count in a shard.
void adjustParticipatingReplicasCount(size_t count) { participating_replicas_count = count; }
private:
void initialize(CoordinationMode mode);
std::mutex mutex;
const size_t replicas_count{0};
size_t participating_replicas_count{0};
size_t mark_segment_size{0};
std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation

View File

@ -33,7 +33,8 @@ def create_tables(cluster, table_name):
@pytest.mark.parametrize("skip_unavailable_shards", [1, 0])
def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
@pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100])
def test_skip_all_replicas(start_cluster, skip_unavailable_shards, max_parallel_replicas):
cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt"
create_tables(cluster_name, table_name)
@ -43,25 +44,8 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"max_parallel_replicas": max_parallel_replicas,
"cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": skip_unavailable_shards,
},
)
@pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100])
def test_skip_all_participating_replicas(start_cluster, max_parallel_replicas):
cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt1"
create_tables(cluster_name, table_name)
with pytest.raises(QueryRuntimeException):
initiator.query(
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": max_parallel_replicas,
"cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": 1,
},
)