Support settings, test truncate

This commit is contained in:
Alexander Sapin 2023-06-05 14:46:52 +02:00
parent 2866bac089
commit bc8ee56a19
4 changed files with 53 additions and 6 deletions

View File

@ -81,7 +81,9 @@ class IColumn;
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
@ -90,8 +92,11 @@ class IColumn;
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, azure_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in azure engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \

View File

@ -255,9 +255,15 @@ void registerStorageAzure(StorageFactory & factory)
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
const auto & context_settings = args.getContext()->getSettingsRef();
auto settings = std::make_unique<AzureObjectStorageSettings>();
settings->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size;
settings->max_single_read_retries = context_settings.azure_max_single_read_retries;
settings->list_object_keys_size = static_cast<int32_t>(context_settings.azure_list_object_keys_size);
return std::make_shared<StorageAzure>(
std::move(configuration),
std::make_unique<AzureObjectStorage>("AzureStorage", std::move(client), std::make_unique<AzureObjectStorageSettings>()),
std::make_unique<AzureObjectStorage>("AzureStorage", std::move(client), std::move(settings)),
args.getContext(),
args.table_id,
args.columns,
@ -395,7 +401,6 @@ StorageAzure::StorageAzure(
void StorageAzure::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{
if (configuration.withGlobs())
{
throw Exception(
@ -577,12 +582,12 @@ SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadata
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"Azure key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
bool truncate_in_insert = local_context->getSettingsRef().azure_truncate_on_insert;
if (!truncate_in_insert && object_storage->exists(StoredObject(configuration.blob_path)))
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
if (local_context->getSettingsRef().azure_create_new_file_on_insert)
{
size_t index = configuration.blobs_paths.size();
const auto & first_key = configuration.blobs_paths[0];
@ -603,8 +608,8 @@ SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadata
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
"If you want to overwrite it, enable setting azure_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting azure_create_new_file_on_insert",
configuration.container, configuration.blobs_paths.back());
}
}
@ -630,6 +635,26 @@ bool StorageAzure::supportsPartitionBy() const
return true;
}
bool StorageAzure::supportsSubcolumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
}
bool StorageAzure::supportsSubsetOfColumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
}
bool StorageAzure::prefersLargeBlocks() const
{
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format);
}
bool StorageAzure::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context);
}
}
#endif

View File

@ -103,6 +103,14 @@ public:
bool supportsPartitionBy() const override;
bool supportsSubcolumns() const override;
bool supportsSubsetOfColumns() const override;
bool prefersLargeBlocks() const override;
bool parallelizeOutputAfterReading(ContextPtr context) const override;
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
private:

View File

@ -134,3 +134,12 @@ def test_partition_by_const_column(cluster):
azure_query(node, f"CREATE TABLE test_partitioned_const_write ({table_format}) Engine = Azure('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}")
azure_query(node, f"INSERT INTO test_partitioned_const_write VALUES {values}")
assert values_csv == get_azure_file_content("test_88.csv")
def test_truncate(cluster):
node = cluster.instances["node"]
azure_query(node, "CREATE TABLE test_truncate (key UInt64, data String) Engine = Azure(azure_conf2, container='cont', blob_path='test_truncate.csv', format='CSV')")
azure_query(node, "INSERT INTO test_truncate VALUES (1, 'a')")
assert get_azure_file_content('test_truncate.csv') == '1,"a"\n'
azure_query(node, "TRUNCATE TABLE test_truncate")
with pytest.raises(Exception):
print(get_azure_file_content('test_truncate.csv'))