Better test

This commit is contained in:
zhangxiao871 2022-01-28 18:00:52 +08:00
parent 78e2794ada
commit 7b51e0b8b6

View File

@ -1070,72 +1070,6 @@ def test_concurrent_alter_modify(start_cluster, name, engine):
def test_simple_replication_and_moves(start_cluster):
try:
for i, node in enumerate([node1, node2]):
node.query_with_retry("""
CREATE TABLE IF NOT EXISTS replicated_table_for_moves (
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
""".format(i + 1))
def insert(num):
for i in range(num):
node = random.choice([node1, node2])
data = [] # 1MB in total
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
node.query_with_retry("INSERT INTO replicated_table_for_moves VALUES {}".format(
','.join(["('" + x + "')" for x in data])))
def optimize(num):
for i in range(num):
node = random.choice([node1, node2])
node.query_with_retry("OPTIMIZE TABLE replicated_table_for_moves FINAL")
p = Pool(60)
tasks = []
tasks.append(p.apply_async(insert, (20,)))
tasks.append(p.apply_async(optimize, (20,)))
for task in tasks:
task.get(timeout=60)
node1.query_with_retry("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
node2.query_with_retry("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
node1.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
node2.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
data = [] # 1MB in total
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
time.sleep(3) # wait until old parts will be deleted
node1.query("SYSTEM STOP MERGES")
node2.query("SYSTEM STOP MERGES")
node1.query_with_retry(
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
node2.query_with_retry(
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
time.sleep(3) # nothing was moved
disks1 = get_used_disks_for_table(node1, "replicated_table_for_moves")
disks2 = get_used_disks_for_table(node2, "replicated_table_for_moves")
node1.query("SYSTEM START MERGES")
node2.query("SYSTEM START MERGES")
set(disks1) == set(["jbod1", "external"])
set(disks2) == set(["jbod1", "external"])
finally:
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS replicated_table_for_moves SYNC")
def test_simple_replication_and_moves_on_cluster(start_cluster):
try:
for i, node in enumerate([node1, node2]):
node.query_with_retry("""