Add test and logging.

This commit is contained in:
Vitaly Baranov 2022-11-01 00:01:27 +01:00
parent 914ab51992
commit b9f2f17331
6 changed files with 64 additions and 24 deletions

View File

@ -126,6 +126,7 @@ BackupWriterS3::BackupWriterS3(
, max_single_read_retries(context_->getSettingsRef().s3_max_single_read_retries)
, read_settings(context_->getReadSettings())
, rw_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).rw_settings)
, log(&Poco::Logger::get("BackupWriterS3"))
{
rw_settings.updateFromSettingsIfEmpty(context_->getSettingsRef());
}
@ -146,9 +147,12 @@ void BackupWriterS3::copyObjectImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<ObjectAttributes> metadata) const
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata) const
{
size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
@ -186,13 +190,11 @@ void BackupWriterS3::copyObjectMultipartImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<ObjectAttributes> metadata) const
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata) const
{
if (!head)
head = requestObjectHeadData(src_bucket, src_key).GetResult();
size_t size = head->GetContentLength();
size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
String multipart_upload_id;
@ -289,15 +291,14 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
auto file_path = fs::path(s3_uri.key) / file_name_to;
auto head = requestObjectHeadData(source_bucket, objects[0].absolute_path).GetResult();
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
if (head.GetContentLength() >= multipart_upload_threashold)
if (static_cast<size_t>(head.GetContentLength()) < rw_settings.max_single_operation_copy_size)
{
copyObjectMultipartImpl(
copyObjectImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
}
else
{
copyObjectImpl(
copyObjectMultipartImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
}
}

View File

@ -61,7 +61,6 @@ public:
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private:
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
void copyObjectImpl(
@ -69,22 +68,23 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<ObjectAttributes> metadata = std::nullopt) const;
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
void copyObjectMultipartImpl(
const String & src_bucket,
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<ObjectAttributes> metadata = std::nullopt) const;
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
S3::URI s3_uri;
std::shared_ptr<Aws::S3::S3Client> client;
UInt64 max_single_read_retries;
ReadSettings read_settings;
S3Settings::ReadWriteSettings rw_settings;
Poco::Logger * log;
};
}

View File

@ -5,13 +5,20 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <base/unit.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
{
namespace
{
/// An object up to 5 GB can be copied in a single atomic operation.
constexpr UInt64 DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 5_GiB;
}
void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings)
{
std::lock_guard lock(mutex);
@ -53,6 +60,7 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
rw_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
rw_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
@ -101,6 +109,8 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
if (!max_single_part_upload_size)
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
if (!max_single_operation_copy_size)
max_single_operation_copy_size = DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE;
if (!max_connections)
max_connections = settings.s3_max_connections;
if (!max_unexpected_write_error_retries)

View File

@ -30,6 +30,7 @@ struct S3Settings
size_t upload_part_size_multiply_factor = 0;
size_t upload_part_size_multiply_parts_count_threshold = 0;
size_t max_single_part_upload_size = 0;
size_t max_single_operation_copy_size = 0;
size_t max_connections = 0;
bool check_objects_after_upload = false;
size_t max_unexpected_write_error_retries = 0;
@ -44,6 +45,7 @@ struct S3Settings
&& upload_part_size_multiply_factor == other.upload_part_size_multiply_factor
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
&& max_single_part_upload_size == other.max_single_part_upload_size
&& max_single_operation_copy_size == other.max_single_operation_copy_size
&& max_connections == other.max_connections
&& check_objects_after_upload == other.check_objects_after_upload
&& max_unexpected_write_error_retries == other.max_unexpected_write_error_retries;

View File

@ -0,0 +1,12 @@
<clickhouse>
<s3>
<multipart_upload_copy>
<endpoint>http://minio1:9001/root/data/backups/multipart_upload_copy/</endpoint>
<!-- We set max_single_operation_copy_size=1 here so multipart upload copy will always be chosen for that test. -->
<max_single_operation_copy_size>1</max_single_operation_copy_size>
<min_upload_part_size>5242880</min_upload_part_size>
<upload_part_size_multiply_parts_count_threshold>3</upload_part_size_multiply_parts_count_threshold>
<upload_part_size_multiply_factor>2</upload_part_size_multiply_factor>
</multipart_upload_copy>
</s3>
</clickhouse>

View File

@ -4,7 +4,11 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/disk_s3.xml", "configs/named_collection_s3_backups.xml"],
main_configs=[
"configs/disk_s3.xml",
"configs/named_collection_s3_backups.xml",
"configs/s3_settings.xml",
],
with_minio=True,
)
@ -27,17 +31,17 @@ def new_backup_name():
return f"backup{backup_id_counter}"
def check_backup_and_restore(storage_policy, backup_destination):
def check_backup_and_restore(storage_policy, backup_destination, size=1000):
node.query(
f"""
DROP TABLE IF EXISTS data NO DELAY;
CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}';
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT 1000;
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size};
BACKUP TABLE data TO {backup_destination};
RESTORE TABLE data AS data_restored FROM {backup_destination};
SELECT throwIf(
(SELECT groupArray(tuple(*)) FROM data) !=
(SELECT groupArray(tuple(*)) FROM data_restored),
(SELECT count(), sum(sipHash64(*)) FROM data) !=
(SELECT count(), sum(sipHash64(*)) FROM data_restored),
'Data does not matched after BACKUP/RESTORE'
);
DROP TABLE data NO DELAY;
@ -106,9 +110,10 @@ def test_backup_to_s3_native_copy():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("single-operation copy")
def test_backup_to_s3_other_bucket_native_copy():
def test_backup_to_s3_native_copy_other_bucket():
storage_policy = "policy_s3_other_bucket"
backup_name = new_backup_name()
backup_destination = (
@ -116,3 +121,13 @@ def test_backup_to_s3_other_bucket_native_copy():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("single-operation copy")
def test_backup_to_s3_native_copy_multipart_upload():
storage_policy = "policy_s3"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart_upload_copy/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("multipart upload copy")