Fix flaky test_ttl_move

- replace sleep() with waiting for the message in log
- due to previous paragaph, use unique table names
- increase sleep timeout in some places
This commit is contained in:
Azat Khuzhin 2020-11-29 19:44:02 +03:00
parent 7c2a04d77d
commit ea64ec390f

View File

@ -3,11 +3,16 @@ import string
import threading
import time
from multiprocessing.dummy import Pool
from helpers.test_tools import assert_logs_contain_with_retry
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
# FIXME: each sleep(1) is a time bomb, and not only this cause false positive
# it also makes the test not reliable (i.e. assertions may be wrong, due timing issues)
# Seems that some SYSTEM query should be added to wait those things insteadof sleep.
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
@ -58,6 +63,14 @@ def check_used_disks_with_retry(node, table_name, expected_disks, retries):
time.sleep(0.5)
return False
# Use unique table name for flaky checker, that run tests multiple times
def unique_table_name(base_name):
return f'{base_name}_{int(time.time())}'
def wait_parts_mover(node, table, *args, **kwargs):
# wait for MergeTreePartsMover
assert_logs_contain_with_retry(node, f'default.{table}.*Removed part from old location', *args, **kwargs)
@pytest.mark.parametrize("name,engine,alter", [
("mt_test_rule_with_invalid_destination", "MergeTree()", 0),
@ -68,6 +81,8 @@ def check_used_disks_with_retry(node, table_name, expected_disks, retries):
"ReplicatedMergeTree('/clickhouse/replicated_test_rule_with_invalid_destination', '1')", 1),
])
def test_rule_with_invalid_destination(started_cluster, name, engine, alter):
name = unique_table_name(name)
try:
def get_command(x, policy):
x = x or ""
@ -129,6 +144,8 @@ def test_rule_with_invalid_destination(started_cluster, name, engine, alter):
"ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_disk_work', '1')", 1),
])
def test_inserts_to_disk_work(started_cluster, name, engine, positive):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -164,6 +181,8 @@ def test_inserts_to_disk_work(started_cluster, name, engine, positive):
"ReplicatedMergeTree('/clickhouse/test_moves_work_after_storage_policy_change', '1')"),
])
def test_moves_work_after_storage_policy_change(started_cluster, name, engine):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -184,10 +203,6 @@ def test_moves_work_after_storage_policy_change(started_cluster, name, engine):
wait_expire_1 = 12
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
data = [] # 10MB in total
for i in range(10):
@ -197,8 +212,7 @@ def test_moves_work_after_storage_policy_change(started_cluster, name, engine):
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
wait_expire_1_thread.join()
time.sleep(wait_expire_2 / 2)
wait_parts_mover(node1, name, retry_count=40)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external"}
@ -218,6 +232,8 @@ def test_moves_work_after_storage_policy_change(started_cluster, name, engine):
"ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_work', '1')", 1),
])
def test_moves_to_disk_work(started_cluster, name, engine, positive):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -230,7 +246,7 @@ def test_moves_to_disk_work(started_cluster, name, engine, positive):
""".format(name=name, engine=engine))
wait_expire_1 = 12
wait_expire_2 = 4
wait_expire_2 = 20
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
@ -264,6 +280,8 @@ def test_moves_to_disk_work(started_cluster, name, engine, positive):
"ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_volume_work', '1')"),
])
def test_moves_to_volume_work(started_cluster, name, engine):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -280,9 +298,6 @@ def test_moves_to_volume_work(started_cluster, name, engine):
wait_expire_1 = 10
time_1 = time.time() + wait_expire_1
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for p in range(2):
data = [] # 10MB in total
for i in range(5):
@ -295,8 +310,7 @@ def test_moves_to_volume_work(started_cluster, name, engine):
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {'jbod1', 'jbod2'}
wait_expire_1_thread.join()
time.sleep(1)
wait_parts_mover(node1, name, retry_count=40)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external"}
@ -316,6 +330,8 @@ def test_moves_to_volume_work(started_cluster, name, engine):
"ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_work', '1')", 1),
])
def test_inserts_to_volume_work(started_cluster, name, engine, positive):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -355,6 +371,8 @@ def test_inserts_to_volume_work(started_cluster, name, engine, positive):
"ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_eventually_work', '1')"),
])
def test_moves_to_disk_eventually_work(started_cluster, name, engine):
name = unique_table_name(name)
try:
name_temp = name + "_temp"
@ -395,7 +413,8 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine):
node1.query("DROP TABLE {} NO DELAY".format(name_temp))
time.sleep(2)
wait_parts_mover(node1, name)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod2"}
@ -407,7 +426,7 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine):
def test_replicated_download_ttl_info(started_cluster):
name = "test_replicated_ttl_info"
name = unique_table_name("test_replicated_ttl_info")
engine = "ReplicatedMergeTree('/clickhouse/test_replicated_download_ttl_info', '{replica}')"
try:
for i, node in enumerate((node1, node2), start=1):
@ -426,6 +445,7 @@ def test_replicated_download_ttl_info(started_cluster):
node2.query("INSERT INTO {} (s1, d1) VALUES (randomPrintableASCII(1024*1024), toDateTime({}))".format(name, time.time() - 100))
assert set(get_used_disks_for_table(node2, name)) == {"external"}
time.sleep(1)
assert node1.query("SELECT count() FROM {}".format(name)).splitlines() == ["1"]
@ -448,6 +468,8 @@ def test_replicated_download_ttl_info(started_cluster):
"ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_disk_work', '1')", 1),
])
def test_merges_to_disk_work(started_cluster, name, engine, positive):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -463,7 +485,7 @@ def test_merges_to_disk_work(started_cluster, name, engine, positive):
node1.query("SYSTEM STOP MOVES {}".format(name))
wait_expire_1 = 16
wait_expire_2 = 4
wait_expire_2 = 20
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
@ -490,7 +512,6 @@ def test_merges_to_disk_work(started_cluster, name, engine, positive):
node1.query("SYSTEM START MERGES {}".format(name))
node1.query("OPTIMIZE TABLE {}".format(name))
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}
assert "1" == node1.query(
@ -508,6 +529,8 @@ def test_merges_to_disk_work(started_cluster, name, engine, positive):
"ReplicatedMergeTree('/clickhouse/replicated_test_merges_with_full_disk_work', '1')"),
])
def test_merges_with_full_disk_work(started_cluster, name, engine):
name = unique_table_name(name)
try:
name_temp = name + "_temp"
@ -581,6 +604,8 @@ def test_merges_with_full_disk_work(started_cluster, name, engine):
"ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_work', '1')", 1),
])
def test_moves_after_merges_work(started_cluster, name, engine, positive):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -593,7 +618,7 @@ def test_moves_after_merges_work(started_cluster, name, engine, positive):
""".format(name=name, engine=engine))
wait_expire_1 = 16
wait_expire_2 = 4
wait_expire_2 = 20
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
@ -610,7 +635,6 @@ def test_moves_after_merges_work(started_cluster, name, engine, positive):
"INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
node1.query("OPTIMIZE TABLE {}".format(name))
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
@ -644,6 +668,8 @@ def test_moves_after_merges_work(started_cluster, name, engine, positive):
"ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_work', '1')", 1, "TO DISK 'external'"),
])
def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive, bar):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -683,6 +709,8 @@ def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive, b
"ReplicatedMergeTree('/clickhouse/test_materialize_ttl_in_partition', '1')"),
])
def test_materialize_ttl_in_partition(started_cluster, name, engine):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -702,8 +730,6 @@ def test_materialize_ttl_in_partition(started_cluster, name, engine):
node1.query(
"INSERT INTO {} (p1, s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
time.sleep(0.5)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
@ -713,7 +739,7 @@ def test_materialize_ttl_in_partition(started_cluster, name, engine):
d1 TO DISK 'external' SETTINGS materialize_ttl_after_modify = 0
""".format(name=name))
time.sleep(0.5)
time.sleep(3)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
@ -728,7 +754,7 @@ def test_materialize_ttl_in_partition(started_cluster, name, engine):
MATERIALIZE TTL IN PARTITION 4
""".format(name=name))
time.sleep(0.5)
time.sleep(3)
used_disks_sets = []
for i in range(len(data)):
@ -751,6 +777,8 @@ def test_materialize_ttl_in_partition(started_cluster, name, engine):
"ReplicatedMergeTree('/clickhouse/replicated_test_alter_multiple_ttls_negative', '1')", False),
])
def test_alter_multiple_ttls(started_cluster, name, engine, positive):
name = unique_table_name(name)
"""Copyright 2019, Altinity LTD
Licensed under the Apache License, Version 2.0 (the "License");
@ -845,6 +873,8 @@ limitations under the License."""
"ReplicatedMergeTree('/clickhouse/concurrently_altering_ttl_replicated_mt', '1')",),
])
def test_concurrent_alter_with_ttl_move(started_cluster, name, engine):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -951,6 +981,8 @@ def test_concurrent_alter_with_ttl_move(started_cluster, name, engine):
("test_double_move_while_select_positive", 1),
])
def test_double_move_while_select(started_cluster, name, positive):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (
@ -990,7 +1022,7 @@ def test_double_move_while_select(started_cluster, name, positive):
node1.query(
"INSERT INTO {name} VALUES (4, randomPrintableASCII(9*1024*1024))".format(name=name))
time.sleep(1)
wait_parts_mover(node1, name, retry_count=40)
# If SELECT locked old part on external, move shall fail.
assert node1.query(
@ -1014,6 +1046,8 @@ def test_double_move_while_select(started_cluster, name, positive):
"ReplicatedMergeTree('/clickhouse/replicated_test_alter_with_merge_work', '1')", 1),
])
def test_alter_with_merge_work(started_cluster, name, engine, positive):
name = unique_table_name(name)
"""Copyright 2019, Altinity LTD
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -1103,6 +1137,8 @@ limitations under the License."""
("replicated_mt_test_disabled_ttl_move_on_insert_work", "VOLUME", "ReplicatedMergeTree('/clickhouse/replicated_test_disabled_ttl_move_on_insert_work', '1')"),
])
def test_disabled_ttl_move_on_insert(started_cluster, name, dest_type, engine):
name = unique_table_name(name)
try:
node1.query("""
CREATE TABLE {name} (