Skip unavailable replicas in parallel distributed insert select (#58931)

* Update StorageDistributed.cpp

* Update StorageDistributed.cpp

* Update StorageDistributed.cpp

* add a test

* Update 01099_parallel_distributed_insert_select.sql
This commit is contained in:
Alexander Tokmakov 2024-01-22 15:34:46 +01:00 committed by GitHub
parent 0791c75315
commit c2202ff347
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 143 additions and 14 deletions

View File

@ -1140,22 +1140,20 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
ContextMutablePtr query_context = Context::createCopy(local_context); ContextMutablePtr query_context = Context::createCopy(local_context);
query_context->increaseDistributedDepth(); query_context->increaseDistributedDepth();
/// Here we take addresses from destination cluster and assume source table exists on these nodes const auto & current_settings = query_context->getSettingsRef();
for (const auto & replicas : getCluster()->getShardsAddresses()) auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, ssh::SSHKey(), node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
/// Here we take addresses from destination cluster and assume source table exists on these nodes
for (const auto & replicas : getCluster()->getShardsInfo())
{
/// Skip unavailable hosts if necessary
auto try_results = replicas.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY, /*async_callback*/ {}, /*skip_unavailable_endpoints*/ true);
/// There will be only one replica, because we consider each replica as a shard
for (const auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>( auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection, std::vector<IConnectionPool::Entry>{try_result},
new_query_str, new_query_str,
Block{}, Block{},
query_context, query_context,

View File

@ -17,6 +17,12 @@ distributed
0 4 0 4
1 4 1 4
2 4 2 4
test_cluster_1_shard_3_replicas_1_unavailable
distributed
local
test_cluster_1_shard_3_replicas_1_unavailable with storageCluster
distributed
local
parallel_distributed_insert_select=2 parallel_distributed_insert_select=2
test_shard_localhost test_shard_localhost
0 0
@ -35,3 +41,21 @@ local
0 2 0 2
1 2 1 2
2 2 2 2
test_cluster_1_shard_3_replicas_1_unavailable
distributed
0 1
1 1
2 1
local
0 1
1 1
2 1
test_cluster_1_shard_3_replicas_1_unavailable with storageCluster
distributed
1 1
2 1
3 1
local
1 1
2 1
3 1

View File

@ -87,6 +87,60 @@ DROP TABLE local_01099_b;
DROP TABLE distributed_01099_a; DROP TABLE distributed_01099_a;
DROP TABLE distributed_01099_b; DROP TABLE distributed_01099_b;
--- test_cluster_1_shard_3_replicas_1_unavailable
SELECT 'test_cluster_1_shard_3_replicas_1_unavailable';
CREATE TABLE local_01099_a (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_1_shard_3_replicas_1_unavailable', currentDatabase(), local_01099_a, rand());
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_1_shard_3_replicas_1_unavailable', currentDatabase(), local_01099_b, rand());
SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b;
SET prefer_localhost_replica=0; -- to require distributed send for local replica too
INSERT INTO local_01099_a SELECT number from system.numbers limit 3;
INSERT INTO distributed_01099_b SELECT * from distributed_01099_a;
SET prefer_localhost_replica=1;
-- distributed sends disabled, but they are not required, since insert is done into local table.
-- (since parallel_distributed_insert_select=2)
SELECT 'distributed';
SELECT number, count(number) FROM distributed_01099_b group by number order by number;
SELECT 'local';
SELECT number, count(number) FROM local_01099_b group by number order by number;
DROP TABLE local_01099_a;
DROP TABLE local_01099_b;
SET send_logs_level='fatal';
DROP TABLE distributed_01099_a;
DROP TABLE distributed_01099_b;
SET send_logs_level='warning';
--- test_cluster_1_shard_3_replicas_1_unavailable with storageCluster
SELECT 'test_cluster_1_shard_3_replicas_1_unavailable with storageCluster';
CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_1_shard_3_replicas_1_unavailable', currentDatabase(), local_01099_b, rand());
SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b;
SET prefer_localhost_replica=0; -- to require distributed send for local replica too
SET send_logs_level='error';
INSERT INTO distributed_01099_b SELECT * FROM urlCluster('test_cluster_two_shards', 'http://localhost:8123/?query=select+{1,2,3}+format+TSV', 'TSV', 's String');
SET send_logs_level='warning';
SET prefer_localhost_replica=1;
-- distributed sends disabled, but they are not required, since insert is done into local table.
-- (since parallel_distributed_insert_select=2)
SELECT 'distributed';
SELECT number, count(number) FROM distributed_01099_b group by number order by number;
SELECT 'local';
SELECT number, count(number) FROM local_01099_b group by number order by number;
DROP TABLE local_01099_b;
SET send_logs_level='fatal';
DROP TABLE distributed_01099_b;
SET send_logs_level='warning';
SET parallel_distributed_insert_select=2; SET parallel_distributed_insert_select=2;
SELECT 'parallel_distributed_insert_select=2'; SELECT 'parallel_distributed_insert_select=2';
@ -164,3 +218,56 @@ DROP TABLE local_01099_a;
DROP TABLE local_01099_b; DROP TABLE local_01099_b;
DROP TABLE distributed_01099_a; DROP TABLE distributed_01099_a;
DROP TABLE distributed_01099_b; DROP TABLE distributed_01099_b;
--- test_cluster_1_shard_3_replicas_1_unavailable
SELECT 'test_cluster_1_shard_3_replicas_1_unavailable';
CREATE TABLE local_01099_a (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_1_shard_3_replicas_1_unavailable', currentDatabase(), local_01099_a, rand());
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_1_shard_3_replicas_1_unavailable', currentDatabase(), local_01099_b, rand());
SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b;
SET prefer_localhost_replica=0; -- to require distributed send for local replica too
INSERT INTO local_01099_a SELECT number from system.numbers limit 3;
INSERT INTO distributed_01099_b SELECT * from distributed_01099_a;
SET prefer_localhost_replica=1;
-- distributed sends disabled, but they are not required, since insert is done into local table.
-- (since parallel_distributed_insert_select=2)
SELECT 'distributed';
SELECT number, count(number) FROM distributed_01099_b group by number order by number;
SELECT 'local';
SELECT number, count(number) FROM local_01099_b group by number order by number;
DROP TABLE local_01099_a;
DROP TABLE local_01099_b;
DROP TABLE distributed_01099_a;
DROP TABLE distributed_01099_b;
--- test_cluster_1_shard_3_replicas_1_unavailable with storageCluster
SELECT 'test_cluster_1_shard_3_replicas_1_unavailable with storageCluster';
CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_1_shard_3_replicas_1_unavailable', currentDatabase(), local_01099_b, rand());
SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b;
SET prefer_localhost_replica=0; -- to require distributed send for local replica too
SET send_logs_level='error';
INSERT INTO distributed_01099_b SELECT * FROM urlCluster('test_cluster_two_shards', 'http://localhost:8123/?query=select+{1,2,3}+format+TSV', 'TSV', 's String');
SET send_logs_level='warning';
SET prefer_localhost_replica=1;
-- distributed sends disabled, but they are not required, since insert is done into local table.
-- (since parallel_distributed_insert_select=2)
SELECT 'distributed';
SELECT number, count(number) FROM distributed_01099_b group by number order by number;
SELECT 'local';
SELECT number, count(number) FROM local_01099_b group by number order by number;
DROP TABLE local_01099_b;
SET send_logs_level='fatal';
DROP TABLE distributed_01099_b;
SET send_logs_level='warning';