mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Fix metrics WriteBufferFromS3Bytes, WriteBufferFromS3Microseconds and WriteBufferFromS3RequestsErrors
Ref: https://github.com/ClickHouse/ClickHouse/pull/45188
This commit is contained in:
parent
36c31e1d79
commit
7df4820af7
@ -15,6 +15,10 @@
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event WriteBufferFromS3Bytes;
|
||||
extern const Event WriteBufferFromS3Microseconds;
|
||||
extern const Event WriteBufferFromS3RequestsErrors;
|
||||
|
||||
extern const Event S3CreateMultipartUpload;
|
||||
extern const Event S3CompleteMultipartUpload;
|
||||
extern const Event S3PutObject;
|
||||
@ -135,7 +139,10 @@ namespace
|
||||
LOG_TRACE(log, "Multipart upload has created. Bucket: {}, Key: {}, Upload id: {}", dest_bucket, dest_key, multipart_upload_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
}
|
||||
|
||||
void completeMultipartUpload()
|
||||
@ -184,7 +191,7 @@ namespace
|
||||
LOG_INFO(log, "Multipart upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Upload_id: {}, Parts: {}, will retry", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
|
||||
continue; /// will retry
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Tags: {}",
|
||||
@ -228,7 +235,12 @@ namespace
|
||||
size_t next_position = std::min(position + normal_part_size, end_position);
|
||||
size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part.
|
||||
|
||||
Stopwatch watch;
|
||||
uploadPart(part_number, position, part_size);
|
||||
watch.stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, part_size);
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
|
||||
position = next_position;
|
||||
}
|
||||
@ -485,16 +497,21 @@ namespace
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
|
||||
|
||||
Stopwatch watch;
|
||||
auto outcome = client_ptr->PutObject(request);
|
||||
watch.stop();
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
Int64 object_size = request.GetContentLength();
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, object_size);
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Single part upload has completed. Bucket: {}, Key: {}, Object size: {}",
|
||||
dest_bucket,
|
||||
dest_key,
|
||||
request.GetContentLength());
|
||||
object_size);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -523,7 +540,7 @@ namespace
|
||||
request.GetContentLength());
|
||||
continue; /// will retry
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Object size: {}",
|
||||
@ -567,6 +584,7 @@ namespace
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
abortMultipartUpload();
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,8 @@
|
||||
from typing import Dict, Iterable
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
@ -34,23 +37,116 @@ def new_backup_name():
|
||||
return f"backup{backup_id_counter}"
|
||||
|
||||
|
||||
def check_backup_and_restore(storage_policy, backup_destination, size=1000):
|
||||
def get_events(events_names: Iterable[str]) -> Dict[str, int]:
|
||||
_events = TSV(
|
||||
node.query(
|
||||
f"SELECT event, value FROM system.events WHERE event in {events_names} SETTINGS system_events_show_zero_values = 1;"
|
||||
)
|
||||
)
|
||||
return {
|
||||
event: int(value)
|
||||
for event, value in [line.split("\t") for line in _events.lines]
|
||||
}
|
||||
|
||||
|
||||
def check_backup_and_restore(
|
||||
storage_policy, backup_destination, size=1000, backup_name=None, check_events=False
|
||||
):
|
||||
s3_backup_events = (
|
||||
"WriteBufferFromS3Microseconds",
|
||||
"WriteBufferFromS3Bytes",
|
||||
"WriteBufferFromS3RequestsErrors",
|
||||
)
|
||||
s3_restore_events = (
|
||||
"ReadBufferFromS3Microseconds",
|
||||
"ReadBufferFromS3Bytes",
|
||||
"ReadBufferFromS3RequestsErrors",
|
||||
)
|
||||
|
||||
node.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS data SYNC;
|
||||
CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}';
|
||||
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size};
|
||||
BACKUP TABLE data TO {backup_destination};
|
||||
RESTORE TABLE data AS data_restored FROM {backup_destination};
|
||||
SELECT throwIf(
|
||||
(SELECT count(), sum(sipHash64(*)) FROM data) !=
|
||||
(SELECT count(), sum(sipHash64(*)) FROM data_restored),
|
||||
'Data does not matched after BACKUP/RESTORE'
|
||||
);
|
||||
DROP TABLE data SYNC;
|
||||
DROP TABLE data_restored SYNC;
|
||||
OPTIMIZE TABLE data FINAL;
|
||||
"""
|
||||
)
|
||||
try:
|
||||
events_before_backups = get_events(s3_backup_events)
|
||||
node.query(f"BACKUP TABLE data TO {backup_destination}")
|
||||
events_after_backups = get_events(s3_backup_events)
|
||||
events_before_restore = get_events(s3_restore_events)
|
||||
node.query(
|
||||
f"""
|
||||
RESTORE TABLE data AS data_restored FROM {backup_destination};
|
||||
"""
|
||||
)
|
||||
events_after_restore = get_events(s3_restore_events)
|
||||
node.query(
|
||||
"""
|
||||
SELECT throwIf(
|
||||
(SELECT count(), sum(sipHash64(*)) FROM data) !=
|
||||
(SELECT count(), sum(sipHash64(*)) FROM data_restored),
|
||||
'Data does not matched after BACKUP/RESTORE'
|
||||
);
|
||||
"""
|
||||
)
|
||||
if check_events and backup_name:
|
||||
objects = node.cluster.minio_client.list_objects(
|
||||
"root", f"data/backups/multipart/{backup_name}/"
|
||||
)
|
||||
backup_meta_size = 0
|
||||
for obj in objects:
|
||||
if ".backup" in obj.object_name:
|
||||
backup_meta_size = obj.size
|
||||
break
|
||||
backup_total_size = int(
|
||||
node.query(
|
||||
f"SELECT sum(total_size) FROM system.backups WHERE status = 'BACKUP_CREATED' AND name like '%{backup_name}%'"
|
||||
).strip()
|
||||
)
|
||||
restore_total_size = int(
|
||||
node.query(
|
||||
f"SELECT sum(total_size) FROM system.backups WHERE status = 'RESTORED' AND name like '%{backup_name}%'"
|
||||
).strip()
|
||||
)
|
||||
# backup
|
||||
# NOTE: ~35 bytes is used by .lock file, so set up 100 bytes to avoid flaky test
|
||||
assert (
|
||||
abs(
|
||||
backup_total_size
|
||||
- (
|
||||
events_after_backups["WriteBufferFromS3Bytes"]
|
||||
- events_before_backups["WriteBufferFromS3Bytes"]
|
||||
- backup_meta_size
|
||||
)
|
||||
)
|
||||
< 100
|
||||
)
|
||||
assert (
|
||||
events_after_backups["WriteBufferFromS3Microseconds"]
|
||||
> events_before_backups["WriteBufferFromS3Microseconds"]
|
||||
)
|
||||
assert events_after_backups["WriteBufferFromS3RequestsErrors"] == 0
|
||||
# restore
|
||||
assert (
|
||||
events_after_restore["ReadBufferFromS3Bytes"]
|
||||
- events_before_restore["ReadBufferFromS3Bytes"]
|
||||
- backup_meta_size
|
||||
== restore_total_size
|
||||
)
|
||||
assert (
|
||||
events_after_restore["ReadBufferFromS3Microseconds"]
|
||||
> events_before_restore["ReadBufferFromS3Microseconds"]
|
||||
)
|
||||
assert events_after_restore["ReadBufferFromS3RequestsErrors"] == 0
|
||||
finally:
|
||||
node.query(
|
||||
"""
|
||||
DROP TABLE data SYNC;
|
||||
DROP TABLE IF EXISTS data_restored SYNC;
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def check_system_tables():
|
||||
@ -128,7 +224,13 @@ def test_backup_to_s3_multipart():
|
||||
storage_policy = "default"
|
||||
backup_name = new_backup_name()
|
||||
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
|
||||
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
|
||||
check_backup_and_restore(
|
||||
storage_policy,
|
||||
backup_destination,
|
||||
size=1000000,
|
||||
backup_name=backup_name,
|
||||
check_events=True,
|
||||
)
|
||||
assert node.contains_in_log(
|
||||
f"copyDataToS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}"
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user