From 5504f3af9b7624beadce5c4a97b099066a10dfcb Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Sat, 12 Nov 2022 00:03:36 +0800 Subject: [PATCH] fix skip_unavailable_shards does not work using s3Cluster table function --- src/Storages/StorageS3Cluster.cpp | 38 ++++++++----------- .../test_s3_cluster/configs/cluster.xml | 15 ++++++++ tests/integration/test_s3_cluster/test.py | 29 ++++++++++++++ 3 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 3b8c8b1cb92..350e942f972 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -117,32 +117,24 @@ Pipe StorageS3Cluster::read( addColumnsStructureToQueryWithClusterEngine( query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName()); - for (const auto & replicas : cluster->getShardsAddresses()) + const auto & current_settings = context->getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + for (const auto & shard_info : cluster->getShardsInfo()) { - /// There will be only one replica, because we consider each replica as a shard - for (const auto & node : replicas) + auto try_results = shard_info.pool->getMany(timeouts, ¤t_settings, PoolMode::GET_MANY); + for (auto & try_result : try_results) { - auto connection = std::make_shared( - node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(), - node.user, node.password, node.quota_key, node.cluster, node.cluster_secret, - "S3ClusterInititiator", - node.compression, - node.secure - ); - - - /// For unknown reason global context is passed to IStorage::read() method - /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( - connection, - queryToString(query_to_send), - header, - context, - /*throttler=*/nullptr, - scalars, - Tables(), - processed_stage, - RemoteQueryExecutor::Extension{.task_iterator = callback}); + shard_info.pool, + std::vector{try_result}, + queryToString(query_to_send), + header, + context, + /*throttler=*/nullptr, + scalars, + Tables(), + processed_stage, + RemoteQueryExecutor::Extension{.task_iterator = callback}); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); } diff --git a/tests/integration/test_s3_cluster/configs/cluster.xml b/tests/integration/test_s3_cluster/configs/cluster.xml index 18f15763633..3059340cfe4 100644 --- a/tests/integration/test_s3_cluster/configs/cluster.xml +++ b/tests/integration/test_s3_cluster/configs/cluster.xml @@ -20,6 +20,21 @@ + + + + s0_0_0 + 9000 + + + + + s0_0_0 + 19000 + + + + cluster_simple diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 2cbb36fcf06..8e082f7d86a 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -195,3 +195,32 @@ def test_ambiguous_join(started_cluster): """ ) assert "AMBIGUOUS_COLUMN_NAME" not in result + + +def test_skip_unavailable_shards(started_cluster): + node = started_cluster.instances["s0_0_0"] + result = node.query( + """ + SELECT count(*) from s3Cluster( + 'cluster_non_existent_port', + 'http://minio1:9001/root/data/clickhouse/part1.csv', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS skip_unavailable_shards = 1 + """ + ) + + assert result == "10\n" + + +def test_unskip_unavailable_shards(started_cluster): + node = started_cluster.instances["s0_0_0"] + error = node.query_and_get_error( + """ + SELECT count(*) from s3Cluster( + 'cluster_non_existent_port', + 'http://minio1:9001/root/data/clickhouse/part1.csv', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + """ + ) + + assert "NETWORK_ERROR" in error