make it possible to rerun test_storage_delta and test_checking_s3_blobs_paranoid

This commit is contained in:
Michael Stetsyuk 2024-07-31 14:44:54 +01:00
parent 8b52d7b711
commit 31c142a96d
2 changed files with 31 additions and 0 deletions

View File

@ -61,6 +61,7 @@ def test_upload_after_check_works(cluster, broken_s3):
node.query(
"""
DROP TABLE IF EXISTS s3_upload_after_check_works;
CREATE TABLE s3_upload_after_check_works (
id Int64,
data String
@ -631,6 +632,7 @@ def test_no_key_found_disk(cluster, broken_s3):
node.query(
"""
DROP TABLE IF EXISTS no_key_found_disk;
CREATE TABLE no_key_found_disk (
id Int64
) ENGINE=MergeTree()

View File

@ -52,6 +52,11 @@ def get_spark():
return builder.master("local").getOrCreate()
def remove_local_directory_contents(local_path):
for local_file in glob.glob(local_path + "/**"):
os.unlink(local_file)
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -169,6 +174,9 @@ def test_single_log_file(started_cluster):
inserted_data
)
os.unlink(parquet_data_path)
remove_local_directory_contents(f"/{TABLE_NAME}")
def test_partition_by(started_cluster):
instance = started_cluster.instances["node1"]
@ -191,6 +199,7 @@ def test_partition_by(started_cluster):
create_delta_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10
remove_local_directory_contents(f"/{TABLE_NAME}")
def test_checkpoint(started_cluster):
instance = started_cluster.instances["node1"]
@ -266,6 +275,9 @@ def test_checkpoint(started_cluster):
).strip()
)
remove_local_directory_contents(f"/{TABLE_NAME}")
spark.sql(f"DROP TABLE {TABLE_NAME}")
def test_multiple_log_files(started_cluster):
instance = started_cluster.instances["node1"]
@ -304,6 +316,8 @@ def test_multiple_log_files(started_cluster):
"SELECT number, toString(number + 1) FROM numbers(200)"
)
remove_local_directory_contents(f"/{TABLE_NAME}")
def test_metadata(started_cluster):
instance = started_cluster.instances["node1"]
@ -337,6 +351,9 @@ def test_metadata(started_cluster):
create_delta_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
os.unlink(parquet_data_path)
remove_local_directory_contents(f"/{TABLE_NAME}")
def test_types(started_cluster):
TABLE_NAME = "test_types"
@ -409,6 +426,9 @@ def test_types(started_cluster):
]
)
remove_local_directory_contents(f"/{result_file}")
spark.sql(f"DROP TABLE {TABLE_NAME}")
def test_restart_broken(started_cluster):
instance = started_cluster.instances["node1"]
@ -470,6 +490,9 @@ def test_restart_broken(started_cluster):
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
os.unlink(parquet_data_path)
remove_local_directory_contents(f"/{TABLE_NAME}")
def test_restart_broken_table_function(started_cluster):
instance = started_cluster.instances["node1"]
@ -524,6 +547,9 @@ def test_restart_broken_table_function(started_cluster):
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
os.unlink(parquet_data_path)
remove_local_directory_contents(f"/{TABLE_NAME}")
def test_partition_columns(started_cluster):
instance = started_cluster.instances["node1"]
@ -721,3 +747,6 @@ SELECT * FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.mini
)
== 1
)
remove_local_directory_contents(f"/{TABLE_NAME}")
spark.sql(f"DROP TABLE {TABLE_NAME}")