From 7df4820af7f84f850433e80e970e0763194310b5 Mon Sep 17 00:00:00 2001 From: AVMusorin Date: Tue, 16 May 2023 21:42:56 +0200 Subject: [PATCH] Fix metrics WriteBufferFromS3Bytes, WriteBufferFromS3Microseconds and WriteBufferFromS3RequestsErrors Ref: https://github.com/ClickHouse/ClickHouse/pull/45188 --- src/IO/S3/copyS3File.cpp | 24 +++- .../test_backup_restore_s3/test.py | 124 ++++++++++++++++-- 2 files changed, 134 insertions(+), 14 deletions(-) diff --git a/src/IO/S3/copyS3File.cpp b/src/IO/S3/copyS3File.cpp index 3a2fd513392..20490ef9a19 100644 --- a/src/IO/S3/copyS3File.cpp +++ b/src/IO/S3/copyS3File.cpp @@ -15,6 +15,10 @@ namespace ProfileEvents { + extern const Event WriteBufferFromS3Bytes; + extern const Event WriteBufferFromS3Microseconds; + extern const Event WriteBufferFromS3RequestsErrors; + extern const Event S3CreateMultipartUpload; extern const Event S3CompleteMultipartUpload; extern const Event S3PutObject; @@ -135,7 +139,10 @@ namespace LOG_TRACE(log, "Multipart upload has created. Bucket: {}, Key: {}, Upload id: {}", dest_bucket, dest_key, multipart_upload_id); } else + { + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); + } } void completeMultipartUpload() @@ -184,7 +191,7 @@ namespace LOG_INFO(log, "Multipart upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Upload_id: {}, Parts: {}, will retry", dest_bucket, dest_key, multipart_upload_id, part_tags.size()); continue; /// will retry } - + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); throw S3Exception( outcome.GetError().GetErrorType(), "Message: {}, Key: {}, Bucket: {}, Tags: {}", @@ -228,7 +235,12 @@ namespace size_t next_position = std::min(position + normal_part_size, end_position); size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part. + Stopwatch watch; uploadPart(part_number, position, part_size); + watch.stop(); + + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, part_size); + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); position = next_position; } @@ -485,16 +497,21 @@ namespace if (for_disk_s3) ProfileEvents::increment(ProfileEvents::DiskS3PutObject); + Stopwatch watch; auto outcome = client_ptr->PutObject(request); + watch.stop(); if (outcome.IsSuccess()) { + Int64 object_size = request.GetContentLength(); + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, object_size); + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); LOG_TRACE( log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}", dest_bucket, dest_key, - request.GetContentLength()); + object_size); break; } @@ -523,7 +540,7 @@ namespace request.GetContentLength()); continue; /// will retry } - + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); throw S3Exception( outcome.GetError().GetErrorType(), "Message: {}, Key: {}, Bucket: {}, Object size: {}", @@ -567,6 +584,7 @@ namespace if (!outcome.IsSuccess()) { abortMultipartUpload(); + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); } diff --git a/tests/integration/test_backup_restore_s3/test.py b/tests/integration/test_backup_restore_s3/test.py index 2f60575b634..0285500d044 100644 --- a/tests/integration/test_backup_restore_s3/test.py +++ b/tests/integration/test_backup_restore_s3/test.py @@ -1,5 +1,8 @@ +from typing import Dict, Iterable import pytest from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV + cluster = ClickHouseCluster(__file__) node = cluster.add_instance( @@ -34,23 +37,116 @@ def new_backup_name(): return f"backup{backup_id_counter}" -def check_backup_and_restore(storage_policy, backup_destination, size=1000): +def get_events(events_names: Iterable[str]) -> Dict[str, int]: + _events = TSV( + node.query( + f"SELECT event, value FROM system.events WHERE event in {events_names} SETTINGS system_events_show_zero_values = 1;" + ) + ) + return { + event: int(value) + for event, value in [line.split("\t") for line in _events.lines] + } + + +def check_backup_and_restore( + storage_policy, backup_destination, size=1000, backup_name=None, check_events=False +): + s3_backup_events = ( + "WriteBufferFromS3Microseconds", + "WriteBufferFromS3Bytes", + "WriteBufferFromS3RequestsErrors", + ) + s3_restore_events = ( + "ReadBufferFromS3Microseconds", + "ReadBufferFromS3Bytes", + "ReadBufferFromS3RequestsErrors", + ) + node.query( f""" DROP TABLE IF EXISTS data SYNC; CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'; INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size}; - BACKUP TABLE data TO {backup_destination}; - RESTORE TABLE data AS data_restored FROM {backup_destination}; - SELECT throwIf( - (SELECT count(), sum(sipHash64(*)) FROM data) != - (SELECT count(), sum(sipHash64(*)) FROM data_restored), - 'Data does not matched after BACKUP/RESTORE' - ); - DROP TABLE data SYNC; - DROP TABLE data_restored SYNC; + OPTIMIZE TABLE data FINAL; """ ) + try: + events_before_backups = get_events(s3_backup_events) + node.query(f"BACKUP TABLE data TO {backup_destination}") + events_after_backups = get_events(s3_backup_events) + events_before_restore = get_events(s3_restore_events) + node.query( + f""" + RESTORE TABLE data AS data_restored FROM {backup_destination}; + """ + ) + events_after_restore = get_events(s3_restore_events) + node.query( + """ + SELECT throwIf( + (SELECT count(), sum(sipHash64(*)) FROM data) != + (SELECT count(), sum(sipHash64(*)) FROM data_restored), + 'Data does not matched after BACKUP/RESTORE' + ); + """ + ) + if check_events and backup_name: + objects = node.cluster.minio_client.list_objects( + "root", f"data/backups/multipart/{backup_name}/" + ) + backup_meta_size = 0 + for obj in objects: + if ".backup" in obj.object_name: + backup_meta_size = obj.size + break + backup_total_size = int( + node.query( + f"SELECT sum(total_size) FROM system.backups WHERE status = 'BACKUP_CREATED' AND name like '%{backup_name}%'" + ).strip() + ) + restore_total_size = int( + node.query( + f"SELECT sum(total_size) FROM system.backups WHERE status = 'RESTORED' AND name like '%{backup_name}%'" + ).strip() + ) + # backup + # NOTE: ~35 bytes is used by .lock file, so set up 100 bytes to avoid flaky test + assert ( + abs( + backup_total_size + - ( + events_after_backups["WriteBufferFromS3Bytes"] + - events_before_backups["WriteBufferFromS3Bytes"] + - backup_meta_size + ) + ) + < 100 + ) + assert ( + events_after_backups["WriteBufferFromS3Microseconds"] + > events_before_backups["WriteBufferFromS3Microseconds"] + ) + assert events_after_backups["WriteBufferFromS3RequestsErrors"] == 0 + # restore + assert ( + events_after_restore["ReadBufferFromS3Bytes"] + - events_before_restore["ReadBufferFromS3Bytes"] + - backup_meta_size + == restore_total_size + ) + assert ( + events_after_restore["ReadBufferFromS3Microseconds"] + > events_before_restore["ReadBufferFromS3Microseconds"] + ) + assert events_after_restore["ReadBufferFromS3RequestsErrors"] == 0 + finally: + node.query( + """ + DROP TABLE data SYNC; + DROP TABLE IF EXISTS data_restored SYNC; + """ + ) def check_system_tables(): @@ -128,7 +224,13 @@ def test_backup_to_s3_multipart(): storage_policy = "default" backup_name = new_backup_name() backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')" - check_backup_and_restore(storage_policy, backup_destination, size=1000000) + check_backup_and_restore( + storage_policy, + backup_destination, + size=1000000, + backup_name=backup_name, + check_events=True, + ) assert node.contains_in_log( f"copyDataToS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}" )