mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Fix flaky test test_lost_part
This commit is contained in:
parent
df363f444e
commit
2993cf3cba
@ -39,257 +39,261 @@ def test_lost_part_same_replica(start_cluster):
|
||||
node1.query("DROP TABLE IF EXISTS mt0 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt0 SYNC")
|
||||
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{node.name}') ORDER BY tuple() PARTITION BY date "
|
||||
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
)
|
||||
try:
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{node.name}') ORDER BY tuple() PARTITION BY date "
|
||||
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
)
|
||||
|
||||
node1.query("SYSTEM STOP MERGES mt0")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
node1.query("SYSTEM STOP MERGES mt0")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
|
||||
for i in range(5):
|
||||
node1.query(f"INSERT INTO mt0 VALUES ({i}, toDate('2020-10-01'))")
|
||||
for i in range(5):
|
||||
node1.query(f"INSERT INTO mt0 VALUES ({i}, toDate('2020-10-01'))")
|
||||
|
||||
for i in range(20):
|
||||
parts_to_merge = node1.query(
|
||||
"SELECT parts_to_merge FROM system.replication_queue WHERE table='mt0' AND length(parts_to_merge) > 0"
|
||||
)
|
||||
if parts_to_merge:
|
||||
parts_list = list(sorted(ast.literal_eval(parts_to_merge)))
|
||||
print("Got parts list", parts_list)
|
||||
if len(parts_list) < 3:
|
||||
raise Exception(f"Got too small parts list {parts_list}")
|
||||
break
|
||||
time.sleep(1)
|
||||
for i in range(20):
|
||||
parts_to_merge = node1.query(
|
||||
"SELECT parts_to_merge FROM system.replication_queue WHERE table='mt0' AND length(parts_to_merge) > 0"
|
||||
)
|
||||
if parts_to_merge:
|
||||
parts_list = list(sorted(ast.literal_eval(parts_to_merge)))
|
||||
print("Got parts list", parts_list)
|
||||
if len(parts_list) < 3:
|
||||
raise Exception(f"Got too small parts list {parts_list}")
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
victim_part_from_the_middle = random.choice(parts_list[1:-1])
|
||||
print("Will corrupt part", victim_part_from_the_middle)
|
||||
victim_part_from_the_middle = random.choice(parts_list[1:-1])
|
||||
print("Will corrupt part", victim_part_from_the_middle)
|
||||
|
||||
remove_part_from_disk(node1, "mt0", victim_part_from_the_middle)
|
||||
remove_part_from_disk(node1, "mt0", victim_part_from_the_middle)
|
||||
|
||||
node1.query("DETACH TABLE mt0")
|
||||
node1.query("DETACH TABLE mt0")
|
||||
|
||||
node1.query("ATTACH TABLE mt0")
|
||||
node1.query("ATTACH TABLE mt0")
|
||||
|
||||
node1.query("SYSTEM START MERGES mt0")
|
||||
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
|
||||
print("result: ", res)
|
||||
print("error: ", res)
|
||||
node1.query("SYSTEM START MERGES mt0")
|
||||
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
|
||||
print("result: ", res)
|
||||
print("error: ", res)
|
||||
|
||||
for i in range(10):
|
||||
result = node1.query("SELECT count() FROM system.replication_queue")
|
||||
if int(result) == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False, "Still have something in replication queue:\n" + node1.query(
|
||||
"SELECT count() FROM system.replication_queue FORMAT Vertical"
|
||||
)
|
||||
for i in range(10):
|
||||
result = node1.query("SELECT count() FROM system.replication_queue")
|
||||
if int(result) == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False, "Still have something in replication queue:\n" + node1.query(
|
||||
"SELECT count() FROM system.replication_queue FORMAT Vertical"
|
||||
)
|
||||
|
||||
assert node1.contains_in_log(
|
||||
"Created empty part"
|
||||
), f"Seems like empty part {victim_part_from_the_middle} is not created or log message changed"
|
||||
assert node1.contains_in_log(
|
||||
"Created empty part"
|
||||
), f"Seems like empty part {victim_part_from_the_middle} is not created or log message changed"
|
||||
|
||||
assert node1.query("SELECT COUNT() FROM mt0") == "4\n"
|
||||
assert node1.query("SELECT COUNT() FROM mt0") == "4\n"
|
||||
|
||||
node2.query("SYSTEM START REPLICATION QUEUES")
|
||||
node2.query("SYSTEM START REPLICATION QUEUES")
|
||||
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM mt0", "4")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS mt0 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt0 SYNC")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM mt0", "4")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS mt0 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt0 SYNC")
|
||||
|
||||
|
||||
def test_lost_part_other_replica(start_cluster):
|
||||
node1.query("DROP TABLE IF EXISTS mt1 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt1 SYNC")
|
||||
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{node.name}') ORDER BY tuple() "
|
||||
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
try:
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{node.name}') ORDER BY tuple() "
|
||||
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
)
|
||||
|
||||
node1.query("SYSTEM STOP MERGES mt1")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
|
||||
for i in range(5):
|
||||
node1.query(f"INSERT INTO mt1 VALUES ({i})")
|
||||
|
||||
for i in range(20):
|
||||
parts_to_merge = node1.query(
|
||||
"SELECT parts_to_merge FROM system.replication_queue WHERE table='mt1' AND length(parts_to_merge) > 0"
|
||||
)
|
||||
if parts_to_merge:
|
||||
parts_list = list(sorted(ast.literal_eval(parts_to_merge)))
|
||||
print("Got parts list", parts_list)
|
||||
if len(parts_list) < 3:
|
||||
raise Exception("Got too small parts list {}".format(parts_list))
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
victim_part_from_the_middle = random.choice(parts_list[1:-1])
|
||||
print("Will corrupt part", victim_part_from_the_middle)
|
||||
|
||||
remove_part_from_disk(node1, "mt1", victim_part_from_the_middle)
|
||||
|
||||
# other way to detect broken parts
|
||||
node1.query("CHECK TABLE mt1")
|
||||
|
||||
node2.query("SYSTEM START REPLICATION QUEUES")
|
||||
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
|
||||
print("result: ", res)
|
||||
print("error: ", res)
|
||||
|
||||
for i in range(10):
|
||||
result = node2.query("SELECT count() FROM system.replication_queue")
|
||||
if int(result) == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False, "Still have something in replication queue:\n" + node2.query(
|
||||
"SELECT * FROM system.replication_queue FORMAT Vertical"
|
||||
)
|
||||
|
||||
assert node1.contains_in_log(
|
||||
"Created empty part"
|
||||
), "Seems like empty part {} is not created or log message changed".format(
|
||||
victim_part_from_the_middle
|
||||
)
|
||||
|
||||
node1.query("SYSTEM STOP MERGES mt1")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM mt1", "4")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
for i in range(5):
|
||||
node1.query(f"INSERT INTO mt1 VALUES ({i})")
|
||||
node1.query("SYSTEM START MERGES mt1")
|
||||
|
||||
for i in range(20):
|
||||
parts_to_merge = node1.query(
|
||||
"SELECT parts_to_merge FROM system.replication_queue WHERE table='mt1' AND length(parts_to_merge) > 0"
|
||||
)
|
||||
if parts_to_merge:
|
||||
parts_list = list(sorted(ast.literal_eval(parts_to_merge)))
|
||||
print("Got parts list", parts_list)
|
||||
if len(parts_list) < 3:
|
||||
raise Exception("Got too small parts list {}".format(parts_list))
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
victim_part_from_the_middle = random.choice(parts_list[1:-1])
|
||||
print("Will corrupt part", victim_part_from_the_middle)
|
||||
|
||||
remove_part_from_disk(node1, "mt1", victim_part_from_the_middle)
|
||||
|
||||
# other way to detect broken parts
|
||||
node1.query("CHECK TABLE mt1")
|
||||
|
||||
node2.query("SYSTEM START REPLICATION QUEUES")
|
||||
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
|
||||
print("result: ", res)
|
||||
print("error: ", res)
|
||||
|
||||
for i in range(10):
|
||||
result = node2.query("SELECT count() FROM system.replication_queue")
|
||||
if int(result) == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False, "Still have something in replication queue:\n" + node2.query(
|
||||
"SELECT * FROM system.replication_queue FORMAT Vertical"
|
||||
)
|
||||
|
||||
assert node1.contains_in_log(
|
||||
"Created empty part"
|
||||
), "Seems like empty part {} is not created or log message changed".format(
|
||||
victim_part_from_the_middle
|
||||
)
|
||||
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM mt1", "4")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
node1.query("SYSTEM START MERGES mt1")
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt1", "4")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS mt1 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt1 SYNC")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt1", "4")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS mt1 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt1 SYNC")
|
||||
|
||||
|
||||
def test_lost_part_mutation(start_cluster):
|
||||
node1.query("DROP TABLE IF EXISTS mt2 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt2 SYNC")
|
||||
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{node.name}') ORDER BY tuple() "
|
||||
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
try:
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{node.name}') ORDER BY tuple() "
|
||||
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
)
|
||||
|
||||
node1.query("SYSTEM STOP MERGES mt2")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
|
||||
for i in range(2):
|
||||
node1.query(f"INSERT INTO mt2 VALUES ({i})")
|
||||
|
||||
node1.query(
|
||||
"ALTER TABLE mt2 UPDATE id = 777 WHERE 1", settings={"mutations_sync": "0"}
|
||||
)
|
||||
|
||||
node1.query("SYSTEM STOP MERGES mt2")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
for i in range(20):
|
||||
parts_to_mutate = node1.query(
|
||||
"SELECT count() FROM system.replication_queue WHERE table='mt2'"
|
||||
)
|
||||
# two mutations for both replicas
|
||||
if int(parts_to_mutate) == 4:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
for i in range(2):
|
||||
node1.query(f"INSERT INTO mt2 VALUES ({i})")
|
||||
remove_part_from_disk(node1, "mt2", "all_1_1_0")
|
||||
|
||||
node1.query(
|
||||
"ALTER TABLE mt2 UPDATE id = 777 WHERE 1", settings={"mutations_sync": "0"}
|
||||
)
|
||||
# other way to detect broken parts
|
||||
node1.query("CHECK TABLE mt2")
|
||||
|
||||
for i in range(20):
|
||||
parts_to_mutate = node1.query(
|
||||
"SELECT count() FROM system.replication_queue WHERE table='mt2'"
|
||||
)
|
||||
# two mutations for both replicas
|
||||
if int(parts_to_mutate) == 4:
|
||||
break
|
||||
time.sleep(1)
|
||||
node1.query("SYSTEM START MERGES mt2")
|
||||
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
|
||||
print("result: ", res)
|
||||
print("error: ", res)
|
||||
|
||||
remove_part_from_disk(node1, "mt2", "all_1_1_0")
|
||||
for i in range(10):
|
||||
result = node1.query("SELECT count() FROM system.replication_queue")
|
||||
if int(result) == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False, "Still have something in replication queue:\n" + node1.query(
|
||||
"SELECT * FROM system.replication_queue FORMAT Vertical"
|
||||
)
|
||||
|
||||
# other way to detect broken parts
|
||||
node1.query("CHECK TABLE mt2")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt2", "1")
|
||||
assert_eq_with_retry(node1, "SELECT SUM(id) FROM mt2", "777")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
node1.query("SYSTEM START MERGES mt2")
|
||||
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
|
||||
print("result: ", res)
|
||||
print("error: ", res)
|
||||
node2.query("SYSTEM START REPLICATION QUEUES")
|
||||
|
||||
for i in range(10):
|
||||
result = node1.query("SELECT count() FROM system.replication_queue")
|
||||
if int(result) == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False, "Still have something in replication queue:\n" + node1.query(
|
||||
"SELECT * FROM system.replication_queue FORMAT Vertical"
|
||||
)
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt2", "1")
|
||||
assert_eq_with_retry(node1, "SELECT SUM(id) FROM mt2", "777")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
node2.query("SYSTEM START REPLICATION QUEUES")
|
||||
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM mt2", "1")
|
||||
assert_eq_with_retry(node2, "SELECT SUM(id) FROM mt2", "777")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS mt2 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt2 SYNC")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM mt2", "1")
|
||||
assert_eq_with_retry(node2, "SELECT SUM(id) FROM mt2", "777")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS mt2 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt2 SYNC")
|
||||
|
||||
|
||||
def test_lost_last_part(start_cluster):
|
||||
node1.query("DROP TABLE IF EXISTS mt3 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt3 SYNC")
|
||||
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{node.name}') "
|
||||
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
try:
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
f"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{node.name}') "
|
||||
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
|
||||
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
|
||||
)
|
||||
|
||||
node1.query("SYSTEM STOP MERGES mt3")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
|
||||
for i in range(1):
|
||||
node1.query(f"INSERT INTO mt3 VALUES ({i}, 'x')")
|
||||
|
||||
# actually not important
|
||||
node1.query(
|
||||
"ALTER TABLE mt3 UPDATE id = 777 WHERE 1", settings={"mutations_sync": "0"}
|
||||
)
|
||||
|
||||
node1.query("SYSTEM STOP MERGES mt3")
|
||||
node2.query("SYSTEM STOP REPLICATION QUEUES")
|
||||
partition_id = node1.query("select partitionId('x')").strip()
|
||||
remove_part_from_disk(node1, "mt3", f"{partition_id}_0_0_0")
|
||||
|
||||
for i in range(1):
|
||||
node1.query(f"INSERT INTO mt3 VALUES ({i}, 'x')")
|
||||
# other way to detect broken parts
|
||||
node1.query("CHECK TABLE mt3")
|
||||
|
||||
# actually not important
|
||||
node1.query(
|
||||
"ALTER TABLE mt3 UPDATE id = 777 WHERE 1", settings={"mutations_sync": "0"}
|
||||
)
|
||||
node1.query("SYSTEM START MERGES mt3")
|
||||
|
||||
partition_id = node1.query("select partitionId('x')").strip()
|
||||
remove_part_from_disk(node1, "mt3", f"{partition_id}_0_0_0")
|
||||
for i in range(100):
|
||||
result = node1.query(
|
||||
"SELECT count() FROM system.replication_queue WHERE table='mt3'"
|
||||
)
|
||||
assert int(result) <= 2, "Have a lot of entries in queue {}".format(
|
||||
node1.query("SELECT * FROM system.replication_queue FORMAT Vertical")
|
||||
)
|
||||
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log(
|
||||
"DROP/DETACH PARTITION"
|
||||
):
|
||||
break
|
||||
if node1.contains_in_log(
|
||||
"Created empty part 8b8f0fede53df97513a9fb4cb19dc1e4_0_0_0 "
|
||||
):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
assert False, "Don't have required messages in node1 log"
|
||||
|
||||
# other way to detect broken parts
|
||||
node1.query("CHECK TABLE mt3")
|
||||
node1.query(f"ALTER TABLE mt3 DROP PARTITION ID '{partition_id}'")
|
||||
|
||||
node1.query("SYSTEM START MERGES mt3")
|
||||
|
||||
for i in range(10):
|
||||
result = node1.query(
|
||||
"SELECT count() FROM system.replication_queue WHERE table='mt3'"
|
||||
)
|
||||
assert int(result) <= 2, "Have a lot of entries in queue {}".format(
|
||||
node1.query("SELECT * FROM system.replication_queue FORMAT Vertical")
|
||||
)
|
||||
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log(
|
||||
"DROP/DETACH PARTITION"
|
||||
):
|
||||
break
|
||||
if node1.contains_in_log(
|
||||
"Created empty part 8b8f0fede53df97513a9fb4cb19dc1e4_0_0_0 "
|
||||
):
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False, "Don't have required messages in node1 log"
|
||||
|
||||
node1.query(f"ALTER TABLE mt3 DROP PARTITION ID '{partition_id}'")
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt3", "0")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS mt3 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt3 SYNC")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt3", "0")
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS mt3 SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS mt3 SYNC")
|
||||
|
Loading…
Reference in New Issue
Block a user