Merge pull request #46765 from zk-kiger/improve_cluster_table_funcion_skip_unavailable_shards

add skip_unavailable_shards setting for table function cluster
This commit is contained in:
Alexey Milovidov 2023-04-27 14:54:18 +03:00 committed by GitHub
commit aac037f51c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 30 additions and 12 deletions

View File

@ -102,7 +102,8 @@ Pipe StorageHDFSCluster::read(
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());
const auto & current_settings = context->getSettingsRef();
auto new_context = IStorageCluster::updateSettingsForTableFunctionCluster(context, context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
@ -113,7 +114,7 @@ Pipe StorageHDFSCluster::read(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
header,
context,
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),

View File

@ -23,6 +23,18 @@ public:
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const = 0;
bool isRemote() const override { return true; }
static ContextPtr updateSettingsForTableFunctionCluster(ContextPtr context, const Settings & settings)
{
Settings new_settings = settings;
/// Cluster table functions should always skip unavailable shards.
new_settings.skip_unavailable_shards = true;
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;
}
};

View File

@ -140,7 +140,8 @@ Pipe StorageS3Cluster::read(
/* only_replace_in_join_= */true);
visitor.visit(query_to_send);
const auto & current_settings = context->getSettingsRef();
auto new_context = IStorageCluster::updateSettingsForTableFunctionCluster(context, context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
@ -151,7 +152,7 @@ Pipe StorageS3Cluster::read(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
sample_block,
context,
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),

View File

@ -247,9 +247,10 @@ def test_skip_unavailable_shards(started_cluster):
assert result == "10\n"
def test_unskip_unavailable_shards(started_cluster):
def test_unset_skip_unavailable_shards(started_cluster):
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
node = started_cluster.instances["s0_0_0"]
error = node.query_and_get_error(
result = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_non_existent_port',
@ -258,7 +259,7 @@ def test_unskip_unavailable_shards(started_cluster):
"""
)
assert "NETWORK_ERROR" in error
assert result == "10\n"
def test_distributed_insert_select_with_replicated(started_cluster):

View File

@ -788,6 +788,7 @@ def test_schema_inference_cache(started_cluster):
def test_hdfsCluster_skip_unavailable_shards(started_cluster):
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
hdfs_api = started_cluster.hdfs_api
node = started_cluster.instances["node1"]
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
@ -801,16 +802,18 @@ def test_hdfsCluster_skip_unavailable_shards(started_cluster):
)
def test_hdfsCluster_unskip_unavailable_shards(started_cluster):
def test_hdfsCluster_unset_skip_unavailable_shards(started_cluster):
hdfs_api = started_cluster.hdfs_api
node = started_cluster.instances["node1"]
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/unskip_unavailable_shards", data)
error = node.query_and_get_error(
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/unskip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')"
)
assert "NETWORK_ERROR" in error
assert (
node1.query(
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/skip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')"
)
== data
)
if __name__ == "__main__":