fix skip_unavailable_shards does not work using hdfsCluster table function

This commit is contained in:
xiedeyantu 2022-11-15 13:25:15 +08:00
parent 6e01b2b2a1
commit ec6698395e
3 changed files with 65 additions and 24 deletions

View File

@ -99,32 +99,24 @@ Pipe StorageHDFSCluster::read(
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());
for (const auto & replicas : cluster->getShardsAddresses())
const auto & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
auto try_results = shard_info.pool->getMany(timeouts, &current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"HDFSClusterInititiator",
node.compression,
node.secure
);
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
shard_info.pool,
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}

View File

@ -0,0 +1,18 @@
<clickhouse>
<remote_servers>
<cluster_non_existent_port>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node1</host>
<port>19000</port>
</replica>
</shard>
</cluster_non_existent_port>
</remote_servers>
</clickhouse>

View File

@ -9,7 +9,7 @@ from pyhdfs import HdfsClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/macro.xml", "configs/schema_cache.xml"],
main_configs=["configs/macro.xml", "configs/schema_cache.xml", "configs/cluster.xml"],
with_hdfs=True,
)
@ -783,6 +783,37 @@ def test_schema_inference_cache(started_cluster):
check_cache_misses(node1, files, 4)
def test_test_hdfsCluster_skip_unavailable_shards(started_cluster):
node = started_cluster.instances["node1"]
result = node.query(
"""
SELECT count(*) FROM hdfsCluster(
'cluster_non_existent_port',
'hdfs://hdfs1:9000/test_hdfsCluster/file*',
'TSV',
'id UInt32')
SETTINGS skip_unavailable_shards = 1
"""
)
assert result == "3\n"
def test_test_hdfsCluster_unskip_unavailable_shards(started_cluster):
node = started_cluster.instances["node1"]
error = node.query_and_get_error(
"""
SELECT count(*) FROM hdfsCluster(
'cluster_non_existent_port',
'hdfs://hdfs1:9000/test_hdfsCluster/file*',
'TSV',
'id UInt32')
"""
)
assert "NETWORK_ERROR" in error
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")