Respect settings truncate_on_insert/create_new_file_on_insert in s3/hdfs/azure engines

This commit is contained in:
avogar 2024-04-08 20:18:47 +00:00
parent ce4ab06d03
commit ef518946f3
6 changed files with 244 additions and 84 deletions

View File

@ -871,6 +871,40 @@ private:
bool cancelled = false;
};
namespace
{
std::optional<String> checkFileExistsAndCreateNewKeyIfNeeded(const ContextPtr & context, const String & uri, size_t sequence_number)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
if (context->getSettingsRef().hdfs_truncate_on_insert || hdfsExists(fs.get(), path_from_uri.c_str()))
return std::nullopt;
if (context->getSettingsRef().hdfs_create_new_file_on_insert)
{
auto pos = uri.find_first_of('.', uri.find_last_of('/'));
String new_uri;
do
{
new_uri = uri.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : uri.substr(pos));
++sequence_number;
}
while (!hdfsExists(fs.get(), new_uri.c_str()));
return new_uri;
}
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
path_from_uri);
}
}
class PartitionedHDFSSink : public PartitionedSink
{
public:
@ -894,6 +928,8 @@ public:
{
auto path = PartitionedSink::replaceWildcards(uri, partition_id);
PartitionedSink::validatePartitionKey(path, true);
if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(context, path, 1))
path = *new_path;
return std::make_shared<HDFSSink>(path, format, sample_block, context, compression_method);
}
@ -1078,34 +1114,10 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
if (is_path_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_uri);
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
bool truncate_on_insert = context_->getSettingsRef().hdfs_truncate_on_insert;
if (!truncate_on_insert && !hdfsExists(fs.get(), path_from_uri.c_str()))
if (auto new_uri = checkFileExistsAndCreateNewKeyIfNeeded(context_, current_uri, uris.size()))
{
if (context_->getSettingsRef().hdfs_create_new_file_on_insert)
{
auto pos = uris[0].find_first_of('.', uris[0].find_last_of('/'));
size_t index = uris.size();
String new_uri;
do
{
new_uri = uris[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : uris[0].substr(pos));
++index;
}
while (!hdfsExists(fs.get(), new_uri.c_str()));
uris.push_back(new_uri);
current_uri = new_uri;
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
path_from_uri);
uris.push_back(*new_uri);
current_uri = *new_uri;
}
return std::make_shared<HDFSSink>(current_uri,

View File

@ -633,6 +633,36 @@ private:
std::mutex cancel_mutex;
};
namespace
{
std::optional<String> checkFileExistsAndCreateNewKeyIfNeeded(const ContextPtr & context, AzureObjectStorage * object_storage, const String & path, size_t sequence_number)
{
if (context->getSettingsRef().azure_truncate_on_insert || !object_storage->exists(StoredObject(path)))
return std::nullopt;
if (context->getSettingsRef().azure_create_new_file_on_insert)
{
auto pos = path.find_first_of('.');
String new_path;
do
{
new_path = path.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : path.substr(pos));
++sequence_number;
}
while (object_storage->exists(StoredObject(new_path)));
return new_path;
}
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object with key {} already exists. "
"If you want to overwrite it, enable setting azure_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting azure_create_new_file_on_insert",
path);
}
}
class PartitionedStorageAzureBlobSink : public PartitionedSink, WithContext
{
public:
@ -659,6 +689,8 @@ public:
{
auto partition_key = replaceWildcards(blob, partition_id);
validateKey(partition_key);
if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(getContext(), object_storage, partition_key, 1))
partition_key = *new_path;
return std::make_shared<StorageAzureBlobSink>(
format,
@ -861,37 +893,8 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
bool truncate_in_insert = local_context->getSettingsRef().azure_truncate_on_insert;
if (!truncate_in_insert && object_storage->exists(StoredObject(configuration.blob_path)))
{
if (local_context->getSettingsRef().azure_create_new_file_on_insert)
{
size_t index = configuration.blobs_paths.size();
const auto & first_key = configuration.blobs_paths[0];
auto pos = first_key.find_first_of('.');
String new_key;
do
{
new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos));
++index;
}
while (object_storage->exists(StoredObject(new_key)));
configuration.blobs_paths.push_back(new_key);
}
else
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting azure_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting azure_create_new_file_on_insert",
configuration.container, configuration.blobs_paths.back());
}
}
if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(local_context, object_storage.get(), configuration.blobs_paths.back(), configuration.blobs_paths.size()))
configuration.blobs_paths.push_back(*new_path);
return std::make_shared<StorageAzureBlobSink>(
configuration.format,

View File

@ -955,6 +955,36 @@ private:
std::mutex cancel_mutex;
};
namespace
{
std::optional<String> checkFileExistsAndCreateNewKeyIfNeeded(const ContextPtr & context, const StorageS3::Configuration & configuration, const String & key, size_t sequence_number)
{
if (context->getSettingsRef().s3_truncate_on_insert || !S3::objectExists(*configuration.client, configuration.url.bucket, key, configuration.url.version_id, configuration.request_settings))
return std::nullopt;
if (context->getSettingsRef().s3_create_new_file_on_insert)
{
auto pos = key.find_first_of('.');
String new_key;
do
{
new_key = key.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : key.substr(pos));
++sequence_number;
}
while (S3::objectExists(*configuration.client, configuration.url.bucket, new_key, configuration.url.version_id, configuration.request_settings));
return new_key;
}
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
configuration.url.bucket, key);
}
}
class PartitionedStorageS3Sink : public PartitionedSink, WithContext
{
@ -988,6 +1018,9 @@ public:
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(getContext(), configuration, partition_key, 1))
partition_key = *new_key;
return std::make_shared<StorageS3Sink>(
format,
sample_block,
@ -1274,36 +1307,16 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
if (query_configuration.withGlobs())
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
<<<<<<< Updated upstream
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
=======
>>>>>>> Stashed changes
if (!truncate_in_insert && S3::objectExists(*query_configuration.client, query_configuration.url.bucket, query_configuration.keys.back(), query_configuration.url.version_id, query_configuration.request_settings))
if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(local_context, configuration, query_configuration.keys.back(), query_configuration.keys.size()))
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{
size_t index = query_configuration.keys.size();
const auto & first_key = query_configuration.keys[0];
auto pos = first_key.find_first_of('.');
String new_key;
do
{
new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos));
++index;
}
while (S3::objectExists(*query_configuration.client, query_configuration.url.bucket, new_key, query_configuration.url.version_id, query_configuration.request_settings));
query_configuration.keys.push_back(new_key);
configuration.keys.push_back(new_key);
}
else
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
query_configuration.url.bucket, query_configuration.keys.back());
}
query_configuration.keys.push_back(*new_key);
configuration.keys.push_back(*new_key);
}
return std::make_shared<StorageS3Sink>(

View File

@ -1343,3 +1343,54 @@ def test_parallel_read(cluster):
)
assert int(res) == 10000
assert_logs_contain_with_retry(node, "AzureBlobStorage readBigAt read bytes")
def test_respect_object_existence_on_partitioned_write(cluster):
node = cluster.instances["node"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_partitioned_write42.csv', '{account_name}', '{account_key}') select 42 settings azure_truncate_on_insert=1",
)
result = azure_query(
node,
f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_partitioned_write42.csv', '{account_name}', '{account_key}')",
)
assert int(result) == 42
error = azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_partitioned_write{{_partition_id}}.csv', '{account_name}', '{account_key}') partition by 42 select 42 settings azure_truncate_on_insert=0",
expect_error="true",
)
assert "BAD_ARGUMENTS" in error
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_partitioned_write{{_partition_id}}.csv', '{account_name}', '{account_key}') partition by 42 select 43 settings azure_truncate_on_insert=1",
)
result = azure_query(
node,
f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_partitioned_write42.csv', '{account_name}', '{account_key}')",
)
assert int(result) == 43
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_partitioned_write{{_partition_id}}.csv', '{account_name}', '{account_key}') partition by 42 select 44 settings azure_truncate_on_insert=0, azure_create_new_file_on_insert=1",
)
result = azure_query(
node,
f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_partitioned_write42.1.csv', '{account_name}', '{account_key}')",
)
assert int(result) == 44

View File

@ -1116,6 +1116,46 @@ def test_format_detection(started_cluster):
assert expected_result == result
def test_respect_object_existence_on_partitioned_write(started_cluster):
node = started_cluster.instances["node1"]
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_partitioned_write42.csv', CSV) select 42 settings hdfs_truncate_on_insert=1"
)
result = node.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_partitioned_write42.csv', CSV)"
)
assert int(result) == 42
error = node.query_and_get_error(
f"insert into table function hdfs('hdfs://hdfs1:9000/test_partitioned_write{{_partition_id}}.csv', CSV) partition by 42 select 42 settings hdfs_truncate_on_insert=0"
)
assert "BAD_ARGUMENTS" in error
node.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/test_partitioned_write{{_partition_id}}.csv', CSV) partition by 42 select 43 settings hdfs_truncate_on_insert=1"
)
result = node.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_partitioned_write42.csv', CSV)"
)
assert int(result) == 43
node.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/test_partitioned_write{{_partition_id}}.csv', CSV) partition by 42 select 44 settings hdfs_truncate_on_insert=0, hdfs_create_new_file_on_insert=1"
)
result = node.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_partitioned_write42.1.csv', CSV)"
)
assert int(result) == 44
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -2244,3 +2244,44 @@ def test_s3_format_detection(started_cluster):
)
assert result == expected_result
def test_respect_object_existence_on_partitioned_write(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_partitioned_write42.csv', 'CSV', 'x UInt64') select 42 settings s3_truncate_on_insert=1"
)
result = instance.query(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_partitioned_write42.csv')"
)
assert int(result) == 42
error = instance.query_and_get_error(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_partitioned_write{{_partition_id}}.csv', 'CSV', 'x UInt64') partition by 42 select 42 settings s3_truncate_on_insert=0"
)
assert "BAD_ARGUMENTS" in error
instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_partitioned_write{{_partition_id}}.csv', 'CSV', 'x UInt64') partition by 42 select 43 settings s3_truncate_on_insert=1"
)
result = instance.query(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_partitioned_write42.csv')"
)
assert int(result) == 43
instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_partitioned_write{{_partition_id}}.csv', 'CSV', 'x UInt64') partition by 42 select 44 settings s3_truncate_on_insert=0, s3_create_new_file_on_insert=1"
)
result = instance.query(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_partitioned_write42.1.csv')"
)
assert int(result) == 44