Merge pull request #44921 from ClickHouse/fix-flaky-test-5

Fix flaky test `test_lost_part`
This commit is contained in:
Ilya Yatsishin 2023-01-05 10:28:07 +01:00 committed by GitHub
commit 5d0f3e7819
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -26,9 +26,7 @@ def start_cluster():
def remove_part_from_disk(node, table, part_name):
part_path = node.query(
"SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(
table, part_name
)
f"SELECT path FROM system.parts WHERE table = '{table}' and name = '{part_name}'"
).strip()
if not part_path:
raise Exception("Part " + part_name + "doesn't exist")
@ -38,29 +36,30 @@ def remove_part_from_disk(node, table, part_name):
def test_lost_part_same_replica(start_cluster):
node1.query("DROP TABLE IF EXISTS mt0 SYNC")
node2.query("DROP TABLE IF EXISTS mt0 SYNC")
for node in [node1, node2]:
node.query(
"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple() PARTITION BY date "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
f"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{node.name}') ORDER BY tuple() PARTITION BY date "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
)
node1.query("SYSTEM STOP MERGES mt0")
node2.query("SYSTEM STOP REPLICATION QUEUES")
for i in range(5):
node1.query("INSERT INTO mt0 VALUES ({}, toDate('2020-10-01'))".format(i))
node1.query(f"INSERT INTO mt0 VALUES ({i}, toDate('2020-10-01'))")
for i in range(20):
parts_to_merge = node1.query(
"SELECT parts_to_merge FROM system.replication_queue"
"SELECT parts_to_merge FROM system.replication_queue WHERE table='mt0' AND length(parts_to_merge) > 0"
)
if parts_to_merge:
parts_list = list(sorted(ast.literal_eval(parts_to_merge)))
print("Got parts list", parts_list)
if len(parts_list) < 3:
raise Exception("Got too small parts list {}".format(parts_list))
raise Exception(f"Got too small parts list {parts_list}")
break
time.sleep(1)
@ -90,9 +89,7 @@ def test_lost_part_same_replica(start_cluster):
assert node1.contains_in_log(
"Created empty part"
), "Seems like empty part {} is not created or log message changed".format(
victim_part_from_the_middle
)
), f"Seems like empty part {victim_part_from_the_middle} is not created or log message changed"
assert node1.query("SELECT COUNT() FROM mt0") == "4\n"
@ -101,25 +98,29 @@ def test_lost_part_same_replica(start_cluster):
assert_eq_with_retry(node2, "SELECT COUNT() FROM mt0", "4")
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
node1.query("DROP TABLE IF EXISTS mt0 SYNC")
node2.query("DROP TABLE IF EXISTS mt0 SYNC")
def test_lost_part_other_replica(start_cluster):
node1.query("DROP TABLE IF EXISTS mt1 SYNC")
node2.query("DROP TABLE IF EXISTS mt1 SYNC")
for node in [node1, node2]:
node.query(
"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
f"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{node.name}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
)
node1.query("SYSTEM STOP MERGES mt1")
node2.query("SYSTEM STOP REPLICATION QUEUES")
for i in range(5):
node1.query("INSERT INTO mt1 VALUES ({})".format(i))
node1.query(f"INSERT INTO mt1 VALUES ({i})")
for i in range(20):
parts_to_merge = node1.query(
"SELECT parts_to_merge FROM system.replication_queue"
"SELECT parts_to_merge FROM system.replication_queue WHERE table='mt1' AND length(parts_to_merge) > 0"
)
if parts_to_merge:
parts_list = list(sorted(ast.literal_eval(parts_to_merge)))
@ -166,28 +167,34 @@ def test_lost_part_other_replica(start_cluster):
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt1", "4")
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
node1.query("DROP TABLE IF EXISTS mt1 SYNC")
node2.query("DROP TABLE IF EXISTS mt1 SYNC")
def test_lost_part_mutation(start_cluster):
node1.query("DROP TABLE IF EXISTS mt2 SYNC")
node2.query("DROP TABLE IF EXISTS mt2 SYNC")
for node in [node1, node2]:
node.query(
"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
f"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{node.name}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
)
node1.query("SYSTEM STOP MERGES mt2")
node2.query("SYSTEM STOP REPLICATION QUEUES")
for i in range(2):
node1.query("INSERT INTO mt2 VALUES ({})".format(i))
node1.query(f"INSERT INTO mt2 VALUES ({i})")
node1.query(
"ALTER TABLE mt2 UPDATE id = 777 WHERE 1", settings={"mutations_sync": "0"}
)
for i in range(20):
parts_to_mutate = node1.query("SELECT count() FROM system.replication_queue")
parts_to_mutate = node1.query(
"SELECT count() FROM system.replication_queue WHERE table='mt2'"
)
# two mutations for both replicas
if int(parts_to_mutate) == 4:
break
@ -223,21 +230,25 @@ def test_lost_part_mutation(start_cluster):
assert_eq_with_retry(node2, "SELECT SUM(id) FROM mt2", "777")
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.replication_queue", "0")
node1.query("DROP TABLE IF EXISTS mt2 SYNC")
node2.query("DROP TABLE IF EXISTS mt2 SYNC")
def test_lost_last_part(start_cluster):
node1.query("DROP TABLE IF EXISTS mt3 SYNC")
node2.query("DROP TABLE IF EXISTS mt3 SYNC")
for node in [node1, node2]:
node.query(
"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{}') "
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
f"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{node.name}') "
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
)
node1.query("SYSTEM STOP MERGES mt3")
node2.query("SYSTEM STOP REPLICATION QUEUES")
for i in range(1):
node1.query("INSERT INTO mt3 VALUES ({}, 'x')".format(i))
node1.query(f"INSERT INTO mt3 VALUES ({i}, 'x')")
# actually not important
node1.query(
@ -245,7 +256,7 @@ def test_lost_last_part(start_cluster):
)
partition_id = node1.query("select partitionId('x')").strip()
remove_part_from_disk(node1, "mt3", "{}_0_0_0".format(partition_id))
remove_part_from_disk(node1, "mt3", f"{partition_id}_0_0_0")
# other way to detect broken parts
node1.query("CHECK TABLE mt3")
@ -253,7 +264,9 @@ def test_lost_last_part(start_cluster):
node1.query("SYSTEM START MERGES mt3")
for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
result = node1.query(
"SELECT count() FROM system.replication_queue WHERE table='mt3'"
)
assert int(result) <= 2, "Have a lot of entries in queue {}".format(
node1.query("SELECT * FROM system.replication_queue FORMAT Vertical")
)
@ -269,7 +282,10 @@ def test_lost_last_part(start_cluster):
else:
assert False, "Don't have required messages in node1 log"
node1.query("ALTER TABLE mt3 DROP PARTITION ID '{}'".format(partition_id))
node1.query(f"ALTER TABLE mt3 DROP PARTITION ID '{partition_id}'")
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt3", "0")
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")
node1.query("DROP TABLE IF EXISTS mt3 SYNC")
node2.query("DROP TABLE IF EXISTS mt3 SYNC")