diff --git a/tests/integration/test_alter_moving_garbage/test.py b/tests/integration/test_alter_moving_garbage/test.py index dc3f6c35ead..330df3ac490 100644 --- a/tests/integration/test_alter_moving_garbage/test.py +++ b/tests/integration/test_alter_moving_garbage/test.py @@ -11,6 +11,7 @@ from helpers.cluster import ClickHouseCluster # two replicas in remote_servers.xml REPLICA_COUNT = 2 + @pytest.fixture(scope="module") def cluster(): try: @@ -82,7 +83,7 @@ def test_create_table( additional_settings = {} - # different names for logs readability + # Different names for logs readability table_name = "test_table" if allow_remote_fs_zero_copy_replication: table_name = "test_table_zero_copy" @@ -99,17 +100,7 @@ def test_create_table( f"INSERT INTO {table_name} SELECT toDate('{partition}'), number as id, toString(sipHash64(number, {i})) FROM numbers(10_000)" ) - def check_count(): - if replicated_engine: - return random.choice(nodes).query_with_retry( - f"SELECT countDistinct(dt, data) FROM clusterAllReplicas(test_cluster, default.{table_name}) WHERE id % 100 = 0" - ) - else: - return random.choice(nodes).query( - f"SELECT countDistinct(dt, data) FROM {table_name} WHERE id % 100 = 0" - ) - - assert check_count() == "1000\n" + # Run ALTER in parallel with moving parts stop_alter = False @@ -118,9 +109,14 @@ def test_create_table( for d in range(1, 100): if stop_alter: break - # I managed to reproduce issue with DELETE, but it can be any other lightweight mutation - # Do not delete rows with id % 100 = 0, because they are used in check_count to check that data is not corrupted + + # Some lightweight mutation should change moving part before it is swapped, then we will have to cleanup it. + # Messages `Failed to swap {}. Active part doesn't exist` should appear in logs. + # + # I managed to reproduce issue with DELETE (`ALTER TABLE {table_name} ADD/DROP COLUMN` also works on real s3 instead of minio) + # Note: do not delete rows with id % 100 = 0, because they are used in `check_count` to use them in check that data is not corrupted random.choice(nodes).query(f"DELETE FROM {table_name} WHERE id % 100 = {d}") + time.sleep(0.1) alter_thread = threading.Thread(target=alter) @@ -143,4 +139,17 @@ def test_create_table( stop_alter = True alter_thread.join() - assert check_count() == "1000\n" + # Check that no data was lost + + data_digest = None + if replicated_engine: + # We don't know what data was replicated, so we need to check all replicas and take unique values + data_digest = random.choice(nodes).query_with_retry( + f"SELECT countDistinct(dt, data) FROM clusterAllReplicas(test_cluster, default.{table_name}) WHERE id % 100 == 0" + ) + else: + data_digest = random.choice(nodes).query( + f"SELECT countDistinct(dt, data) FROM {table_name} WHERE id % 100 == 0" + ) + + assert data_digest == "1000\n"