throw if can't connect to any participating replicas

This commit is contained in:
zoomxi 2024-07-22 17:57:30 +08:00
parent 846922aeae
commit 8246614f5e
4 changed files with 43 additions and 3 deletions

View File

@ -21,7 +21,7 @@
#include <Client/ConnectionPoolWithFailover.h> #include <Client/ConnectionPoolWithFailover.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>
namespace DB namespace DB
@ -429,6 +429,7 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
{ {
shuffled_pool = shard.pool->getShuffledPools(current_settings); shuffled_pool = shard.pool->getShuffledPools(current_settings);
shuffled_pool.resize(max_replicas_to_use); shuffled_pool.resize(max_replicas_to_use);
coordinator->adjustParticipatingReplicasCount(max_replicas_to_use);
} }
else else
{ {

View File

@ -1031,7 +1031,7 @@ void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica
if (!pimpl) if (!pimpl)
{ {
unavailable_nodes_registered_before_initialization.push_back(replica_number); unavailable_nodes_registered_before_initialization.push_back(replica_number);
if (unavailable_nodes_registered_before_initialization.size() == replicas_count) if (unavailable_nodes_registered_before_initialization.size() == participating_replicas_count)
throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Can't connect to any replica chosen for query execution"); throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Can't connect to any replica chosen for query execution");
} }
else else
@ -1061,7 +1061,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode)
} }
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_) ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_)
: replicas_count(replicas_count_), mark_segment_size(mark_segment_size_) : replicas_count(replicas_count_), participating_replicas_count(replicas_count_), mark_segment_size(mark_segment_size_)
{ {
} }

View File

@ -30,11 +30,15 @@ public:
/// needed to report total rows to read /// needed to report total rows to read
void setProgressCallback(ProgressCallback callback); void setProgressCallback(ProgressCallback callback);
/// Participating replicas count may be less than replicas count in a shard.
void adjustParticipatingReplicasCount(size_t count) { participating_replicas_count = count; }
private: private:
void initialize(CoordinationMode mode); void initialize(CoordinationMode mode);
std::mutex mutex; std::mutex mutex;
const size_t replicas_count{0}; const size_t replicas_count{0};
size_t participating_replicas_count{0};
size_t mark_segment_size{0}; size_t mark_segment_size{0};
std::unique_ptr<ImplInterface> pimpl; std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation

View File

@ -48,3 +48,38 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
"skip_unavailable_shards": skip_unavailable_shards, "skip_unavailable_shards": skip_unavailable_shards,
}, },
) )
@pytest.mark.parametrize("skip_unavailable_shards", [1, 0])
def test_skip_all_participating_replicas1(start_cluster, skip_unavailable_shards):
cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt1"
create_tables(cluster_name, table_name)
with pytest.raises(QueryRuntimeException):
initiator.query(
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": skip_unavailable_shards,
"parallel_replicas_min_number_of_rows_per_replica": 500,
},
)
@pytest.mark.parametrize("skip_unavailable_shards", [1, 0])
def test_skip_all_participating_replicas2(start_cluster, skip_unavailable_shards):
cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt2"
create_tables(cluster_name, table_name)
with pytest.raises(QueryRuntimeException):
initiator.query(
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 2,
"cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": skip_unavailable_shards,
},
)