Fix for HDFS and Azure

This commit is contained in:
avogar 2024-04-08 20:37:06 +00:00
parent 78dd23fd83
commit 8accf395fb
5 changed files with 44 additions and 7 deletions

View File

@ -1065,6 +1065,10 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
if (is_partitioned_implementation)
{
String path = current_uri.substr(current_uri.find('/', current_uri.find("//") + 2));
if (PartitionedSink::replaceWildcards(path, "").find_first_of("*?{") != std::string::npos)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
return std::make_shared<PartitionedHDFSSink>(
partition_by_ast,
current_uri,

View File

@ -462,6 +462,13 @@ Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const
return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl());
}
bool StorageAzureBlob::Configuration::withGlobsIgnorePartitionWildcard() const
{
if (!withPartitionWildcard())
return withGlobs();
return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos;
}
StorageAzureBlob::StorageAzureBlob(
const Configuration & configuration_,
@ -749,7 +756,7 @@ void StorageAzureBlob::read(
size_t max_block_size,
size_t num_streams)
{
if (partition_by && configuration.withWildcard())
if (partition_by && configuration.withPartitionWildcard())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned Azure storage is not implemented yet");
auto this_ptr = std::static_pointer_cast<StorageAzureBlob>(shared_from_this());
@ -836,12 +843,16 @@ void ReadFromAzureBlob::initializePipeline(QueryPipelineBuilder & pipeline, cons
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
{
if (configuration.withGlobsIgnorePartitionWildcard())
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method);
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && configuration.withWildcard();
bool is_partitioned_implementation = partition_by_ast && configuration.withPartitionWildcard();
if (is_partitioned_implementation)
{
@ -857,10 +868,6 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
}
else
{
if (configuration.withGlobs())
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
bool truncate_in_insert = local_context->getSettingsRef().azure_truncate_on_insert;
if (!truncate_in_insert && object_storage->exists(StoredObject(configuration.blob_path)))

View File

@ -37,12 +37,14 @@ public:
bool withGlobs() const { return blob_path.find_first_of("*?{") != std::string::npos; }
bool withWildcard() const
bool withPartitionWildcard() const
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
return blobs_paths.back().find(PARTITION_ID_WILDCARD) != String::npos;
}
bool withGlobsIgnorePartitionWildcard() const;
Poco::URI getConnectionURL() const;
std::string connection_url;

View File

@ -1323,6 +1323,20 @@ def test_format_detection(cluster):
assert result == expected_result
def test_write_to_globbed_partitioned_path(cluster):
node = cluster.instances["node"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
error = azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_data_*_{{_partition_id}}', '{account_name}', '{account_key}', 'CSV', 'auto', 'x UInt64') partition by 42 select 42",
expect_error="true",
)
assert "DATABASE_ACCESS_DENIED" in error
def test_parallel_read(cluster):
node = cluster.instances["node"]
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]

View File

@ -1116,6 +1116,16 @@ def test_format_detection(started_cluster):
assert expected_result == result
def test_write_to_globbed_partitioned_path(started_cluster):
node = started_cluster.instances["node1"]
error = node.query_and_get_error(
"insert into function hdfs('hdfs://hdfs1:9000/test_data_*_{_partition_id}.csv') partition by 42 select 42"
)
assert "DATABASE_ACCESS_DENIED" in error
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")