Merge pull request #49157 from CheSema/fixing-test-merge-tree-s3-2

clearing s3 between tests in a robust way
This commit is contained in:
Sema Checherinda 2023-05-02 11:08:40 +02:00 committed by GitHub
commit 50099cad17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -101,44 +101,45 @@ def run_s3_mocks(cluster):
)
def list_objects(cluster, path="data/"):
def list_objects(cluster, path="data/", hint="list_objects"):
minio = cluster.minio_client
objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
logging.info(f"list_objects ({len(objects)}): {[x.object_name for x in objects]}")
logging.info(f"{hint} ({len(objects)}): {[x.object_name for x in objects]}")
return objects
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
if (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
):
if len(list_objects(cluster, "data/")) == expected:
return
timeout -= 1
time.sleep(1)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
)
assert len(list_objects(cluster, "data/")) == expected
@pytest.fixture(autouse=True)
@pytest.mark.parametrize("node_name", ["node"])
def drop_table(cluster, node_name):
yield
node = cluster.instances[node_name]
def remove_all_s3_objects(cluster):
minio = cluster.minio_client
for obj in list_objects(cluster, "data/"):
minio.remove_object(cluster.minio_bucket, obj.object_name)
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
@pytest.fixture(autouse=True, scope="function")
def clear_minio(cluster):
try:
wait_for_delete_s3_objects(cluster, 0)
finally:
# CH do some writes to the S3 at start. For example, file data/clickhouse_access_check_{server_uuid}.
# Set the timeout there as 10 sec in order to resolve the race with that file exists.
wait_for_delete_s3_objects(cluster, 0, timeout=10)
except:
# Remove extra objects to prevent tests cascade failing
for obj in list_objects(cluster, "data/"):
minio.remove_object(cluster.minio_bucket, obj.object_name)
remove_all_s3_objects(cluster)
yield
def check_no_objects_after_drop(cluster, table_name="s3_test", node_name="node"):
node = cluster.instances[node_name]
node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY")
wait_for_delete_s3_objects(cluster, 0, timeout=0)
@pytest.mark.parametrize(
@ -158,10 +159,7 @@ def test_simple_insert_select(
values1 = generate_values("2020-01-03", 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + files_per_part
)
assert len(list_objects(cluster, "data/")) == FILES_OVERHEAD + files_per_part
values2 = generate_values("2020-01-04", 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values2))
@ -169,15 +167,14 @@ def test_simple_insert_select(
node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values")
== values1 + "," + values2
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + files_per_part * 2
)
assert len(list_objects(cluster, "data/")) == FILES_OVERHEAD + files_per_part * 2
assert (
node.query("SELECT count(*) FROM s3_test where id = 1 FORMAT Values") == "(2)"
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("merge_vertical,node_name", [(True, "node"), (False, "node")])
def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
@ -188,7 +185,6 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test", **settings)
minio = cluster.minio_client
node.query("SYSTEM STOP MERGES s3_test")
node.query(
@ -214,7 +210,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD_PER_PART_WIDE * 6 + FILES_OVERHEAD
)
@ -242,6 +238,8 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
cluster, FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD, timeout=45
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_alter_table_columns(cluster, node_name):
@ -287,12 +285,13 @@ def test_alter_table_columns(cluster, node_name):
cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_attach_detach_partition(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -355,12 +354,13 @@ def test_attach_detach_partition(cluster, node_name):
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 0
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_move_partition_to_another_disk(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -370,30 +370,31 @@ def test_move_partition_to_another_disk(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 's3'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_table_manipulations(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -405,9 +406,10 @@ def test_table_manipulations(cluster, node_name):
node.query("RENAME TABLE s3_test TO s3_renamed")
assert node.query("SELECT count(*) FROM s3_renamed FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("RENAME TABLE s3_renamed TO s3_test")
assert node.query("CHECK TABLE s3_test FORMAT Values") == "(1)"
@ -416,7 +418,7 @@ def test_table_manipulations(cluster, node_name):
node.query("ATTACH TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -424,17 +426,15 @@ def test_table_manipulations(cluster, node_name):
wait_for_delete_empty_parts(node, "s3_test")
wait_for_delete_inactive_parts(node, "s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
assert len(list_objects(cluster, "data/")) == FILES_OVERHEAD
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_move_replace_partition_to_another_table(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -451,12 +451,10 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object at start", obj.object_name)
assert len(s3_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
assert (
len(list_objects(cluster, "data/", "Objects at start"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
create_table(node, "s3_clone")
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-03' TO TABLE s3_clone")
@ -465,10 +463,8 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(id) FROM s3_clone FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after move partition", obj.object_name)
list_objects(cluster, "data/", "Object after move partition")
# Number of objects in S3 should be unchanged.
wait_for_delete_s3_objects(
cluster,
@ -486,10 +482,8 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after insert", obj.object_name)
list_objects(cluster, "data/", "Object after insert")
wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD * 2
@ -515,12 +509,8 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
node.query("DROP TABLE s3_clone NO DELAY")
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after drop", obj.object_name)
# Data should remain in S3
list_objects(cluster, "data/", "Object after drop")
wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD
@ -530,10 +520,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
node.query("ALTER TABLE s3_test FREEZE")
# Number S3 objects should be unchanged.
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after freeze", obj.object_name)
list_objects(cluster, "data/", "Object after freeze")
wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD
@ -548,15 +535,13 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
cluster, FILES_OVERHEAD_PER_PART_WIDE * 4 - FILES_OVERHEAD_METADATA_VERSION * 4
)
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
remove_all_s3_objects(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_freeze_unfreeze(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -571,7 +556,7 @@ def test_freeze_unfreeze(cluster, node_name):
wait_for_delete_empty_parts(node, "s3_test")
wait_for_delete_inactive_parts(node, "s3_test")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD
+ (FILES_OVERHEAD_PER_PART_WIDE - FILES_OVERHEAD_METADATA_VERSION) * 2
)
@ -583,13 +568,10 @@ def test_freeze_unfreeze(cluster, node_name):
# Unfreeze all partitions from backup2.
node.query("ALTER TABLE s3_test UNFREEZE WITH NAME 'backup2'")
# Data should be removed from S3.
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD)
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -597,7 +579,6 @@ def test_freeze_system_unfreeze(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
create_table(node, "s3_test_removed")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-04", 4096))
@ -613,7 +594,7 @@ def test_freeze_system_unfreeze(cluster, node_name):
wait_for_delete_inactive_parts(node, "s3_test")
node.query("DROP TABLE s3_test_removed NO DELAY")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD
+ (FILES_OVERHEAD_PER_PART_WIDE - FILES_OVERHEAD_METADATA_VERSION) * 2
)
@ -621,13 +602,10 @@ def test_freeze_system_unfreeze(cluster, node_name):
# Unfreeze all data from backup3.
node.query("SYSTEM UNFREEZE WITH NAME 'backup3'")
# Data should be removed from S3.
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD)
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -673,6 +651,8 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
# There should be 3 times more S3 requests because multi-part upload mode uses 3 requests to upload object.
assert get_s3_requests() - s3_requests_before == s3_requests_to_write_partition * 3
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_s3_no_delete_objects(cluster, node_name):
@ -681,6 +661,7 @@ def test_s3_no_delete_objects(cluster, node_name):
node, "s3_test_no_delete_objects", storage_policy="no_delete_objects_s3"
)
node.query("DROP TABLE s3_test_no_delete_objects SYNC")
remove_all_s3_objects(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -695,6 +676,7 @@ def test_s3_disk_reads_on_unstable_connection(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == [
"40499995500000"
]
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -704,14 +686,13 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';"
)
node.query("SYSTEM STOP MERGES s3_test")
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 10000000"
)
node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node_with_limited_disk"])
@ -721,6 +702,7 @@ def test_cache_with_full_disk_space(cluster, node_name):
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY value SETTINGS storage_policy='s3_with_cache_and_jbod';"
)
node.query("SYSTEM STOP MERGES s3_test")
node.query(
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(100000000)"
)
@ -739,7 +721,7 @@ def test_cache_with_full_disk_space(cluster, node_name):
assert node.contains_in_log(
"Insert into cache is skipped due to insufficient disk space"
)
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
check_no_objects_after_drop(cluster, node_name=node_name)
@pytest.mark.parametrize("node_name", ["node"])
@ -764,6 +746,7 @@ def test_store_cleanup_disk_s3(cluster, node_name):
"CREATE TABLE s3_test UUID '00000000-1000-4000-8000-000000000001' (n UInt64) Engine=MergeTree() ORDER BY n SETTINGS storage_policy='s3';"
)
node.query("INSERT INTO s3_test SELECT 1")
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -840,3 +823,5 @@ def test_cache_setting_compatibility(cluster, node_name):
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")
check_no_objects_after_drop(cluster)