fix skip_unavailable_shards does not work using s3Cluster table function

This commit is contained in:
xiedeyantu 2022-11-12 00:03:36 +08:00
parent c003c90341
commit 5504f3af9b
3 changed files with 59 additions and 23 deletions

View File

@ -117,32 +117,24 @@ Pipe StorageS3Cluster::read(
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName());
for (const auto & replicas : cluster->getShardsAddresses())
const auto & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
auto try_results = shard_info.pool->getMany(timeouts, &current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"S3ClusterInititiator",
node.compression,
node.secure
);
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
shard_info.pool,
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}

View File

@ -20,6 +20,21 @@
</shard>
</cluster_simple>
<cluster_non_existent_port>
<shard>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>s0_0_0</host>
<port>19000</port>
</replica>
</shard>
</cluster_non_existent_port>
</remote_servers>
<macros>
<default_cluster_macro>cluster_simple</default_cluster_macro>

View File

@ -195,3 +195,32 @@ def test_ambiguous_join(started_cluster):
"""
)
assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_skip_unavailable_shards(started_cluster):
node = started_cluster.instances["s0_0_0"]
result = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_non_existent_port',
'http://minio1:9001/root/data/clickhouse/part1.csv',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
SETTINGS skip_unavailable_shards = 1
"""
)
assert result == "10\n"
def test_unskip_unavailable_shards(started_cluster):
node = started_cluster.instances["s0_0_0"]
error = node.query_and_get_error(
"""
SELECT count(*) from s3Cluster(
'cluster_non_existent_port',
'http://minio1:9001/root/data/clickhouse/part1.csv',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
"""
)
assert "NETWORK_ERROR" in error