diff --git a/dbms/tests/integration/test_ttl_move/test.py b/dbms/tests/integration/test_ttl_move/test.py index 2312c0cdfad..ae76fe296e9 100644 --- a/dbms/tests/integration/test_ttl_move/test.py +++ b/dbms/tests/integration/test_ttl_move/test.py @@ -45,11 +45,13 @@ def get_used_disks_for_table(node, table_name): return node.query("select disk_name from system.parts where table == '{}' and active=1 order by modification_time".format(table_name)).strip().split('\n') -@pytest.mark.parametrize("name,engine", [ - ("mt_test_inserts_to_disk_work","MergeTree()"), - ("replicated_mt_test_inserts_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_disk_work', '1')",), +@pytest.mark.parametrize("name,engine,positive", [ + ("mt_test_inserts_to_disk_do_not_work","MergeTree()",0), + ("replicated_mt_test_inserts_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_disk_do_not_work', '1')",0), + ("mt_test_inserts_to_disk_work","MergeTree()",1), + ("replicated_mt_test_inserts_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_disk_work', '1')",1), ]) -def test_inserts_to_disk_work(started_cluster, name, engine): +def test_inserts_to_disk_work(started_cluster, name, engine, positive): try: node1.query(""" CREATE TABLE {name} ( @@ -63,11 +65,11 @@ def test_inserts_to_disk_work(started_cluster, name, engine): data = [] # 10MB in total for i in range(10): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2 if i > 0 or positive else time.time()+2))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"external"} + assert set(used_disks) == {"external" if positive else "jbod1"} assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "10" @@ -75,11 +77,13 @@ def test_inserts_to_disk_work(started_cluster, name, engine): node1.query("DROP TABLE IF EXISTS {}".format(name)) -@pytest.mark.parametrize("name,engine", [ - ("mt_test_inserts_to_disk_do_not_work","MergeTree()"), - ("replicated_mt_test_inserts_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_disk_do_not_work', '1')",), +@pytest.mark.parametrize("name,engine,positive", [ + ("mt_test_moves_to_disk_do_not_work","MergeTree()",0), + ("replicated_mt_test_moves_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_do_not_work', '1')",0), + ("mt_test_moves_to_disk_work","MergeTree()",1), + ("replicated_mt_test_moves_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_work', '1')",1), ]) -def test_inserts_to_disk_do_not_work(started_cluster, name, engine): +def test_moves_to_disk_work(started_cluster, name, engine, positive): try: node1.query(""" CREATE TABLE {name} ( @@ -93,37 +97,7 @@ def test_inserts_to_disk_do_not_work(started_cluster, name, engine): data = [] # 10MB in total for i in range(10): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2 if i > 0 else time.time()+2))) # 1MB row - - node1.query("INSERT INTO {} VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) - used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"jbod1"} - - assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "10" - - finally: - node1.query("DROP TABLE IF EXISTS {}".format(name)) - - -@pytest.mark.parametrize("name,engine", [ - ("mt_test_moves_to_disk_work","MergeTree()"), - ("replicated_mt_test_moves_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_work', '1')",), -]) -def test_moves_to_disk_work(started_cluster, name, engine): - try: - node1.query(""" - CREATE TABLE {name} ( - s1 String, - d1 DateTime - ) ENGINE = {engine} - ORDER BY tuple() - TTL d1 TO DISK 'external' - SETTINGS storage_policy='small_jbod_with_external' - """.format(name=name, engine=engine)) - - data = [] # 10MB in total - for i in range(10): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+6))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) @@ -131,41 +105,7 @@ def test_moves_to_disk_work(started_cluster, name, engine): time.sleep(4) used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"external"} - - assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "10" - - finally: - node1.query("DROP TABLE IF EXISTS {}".format(name)) - - -@pytest.mark.parametrize("name,engine", [ - ("mt_test_moves_to_disk_do_not_work","MergeTree()"), - ("replicated_mt_test_moves_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_do_not_work', '1')",), -]) -def test_moves_to_disk_do_not_work(started_cluster, name, engine): - try: - node1.query(""" - CREATE TABLE {name} ( - s1 String, - d1 DateTime - ) ENGINE = {engine} - ORDER BY tuple() - TTL d1 TO DISK 'external' - SETTINGS storage_policy='small_jbod_with_external' - """.format(name=name, engine=engine, time=time.time()+2)) - - data = [] # 10MB in total - for i in range(10): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 else time.time()+6))) # 1MB row - - node1.query("INSERT INTO {} (s1) VALUES {}".format(name, ",".join(["('" + x + "')" for x in data]))) - used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"jbod1"} - - time.sleep(4) - used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"jbod1"} + assert set(used_disks) == {"external" if positive else "jbod1"} assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "10" @@ -300,11 +240,13 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine): node1.query("DROP TABLE IF EXISTS {}".format(name)) -@pytest.mark.parametrize("name,engine", [ - ("mt_test_merges_to_disk_work","MergeTree()"), - ("replicated_mt_test_merges_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_disk_work', '1')",), +@pytest.mark.parametrize("name,engine,positive", [ + ("mt_test_merges_to_disk_do_not_work","MergeTree()",0), + ("replicated_mt_test_merges_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_disk_do_not_work', '1')",0), + ("mt_test_merges_to_disk_work","MergeTree()",1), + ("replicated_mt_test_merges_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_disk_work', '1')",1), ]) -def test_merges_to_disk_work(started_cluster, name, engine): +def test_merges_to_disk_work(started_cluster, name, engine, positive): try: node1.query(""" CREATE TABLE {name} ( @@ -322,7 +264,7 @@ def test_merges_to_disk_work(started_cluster, name, engine): for _ in range(2): data = [] # 16MB in total for i in range(8): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+7))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) @@ -336,7 +278,7 @@ def test_merges_to_disk_work(started_cluster, name, engine): time.sleep(1) used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"external"} + assert set(used_disks) == {"external" if positive else "jbod1"} assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "16" @@ -405,11 +347,13 @@ def test_merges_to_full_disk_work(started_cluster, name, engine): node1.query("DROP TABLE IF EXISTS {}".format(name_temp)) node1.query("DROP TABLE IF EXISTS {}".format(name)) -@pytest.mark.parametrize("name,engine", [ - ("mt_test_moves_after_merges_work","MergeTree()"), - ("replicated_mt_test_moves_after_merges_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_work', '1')",), +@pytest.mark.parametrize("name,engine,positive", [ + ("mt_test_moves_after_merges_do_not_work","MergeTree()",0), + ("replicated_mt_test_moves_after_merges_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_do_not_work', '1')",0), + ("mt_test_moves_after_merges_work","MergeTree()",1), + ("replicated_mt_test_moves_after_merges_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_work', '1')",1), ]) -def test_moves_after_merges_work(started_cluster, name, engine): +def test_moves_after_merges_work(started_cluster, name, engine, positive): try: node1.query(""" CREATE TABLE {name} ( @@ -424,7 +368,7 @@ def test_moves_after_merges_work(started_cluster, name, engine): for _ in range(2): data = [] # 16MB in total for i in range(8): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+6))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) @@ -436,95 +380,9 @@ def test_moves_after_merges_work(started_cluster, name, engine): time.sleep(4) used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"external"} + assert set(used_disks) == {"external" if positive else "jbod1"} assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "16" finally: node1.query("DROP TABLE IF EXISTS {}".format(name)) - - -@pytest.mark.parametrize("name,engine", [ - ("mt_test_merges_to_disk_do_not_work","MergeTree()"), - ("replicated_mt_test_merges_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_disk_do_not_work', '1')",), -]) -def test_merges_to_disk_do_not_work(started_cluster, name, engine): - try: - node1.query(""" - CREATE TABLE {name} ( - s1 String, - d1 DateTime - ) ENGINE = {engine} - ORDER BY tuple() - TTL d1 TO DISK 'external' - SETTINGS storage_policy='small_jbod_with_external' - """.format(name=name, engine=engine)) - - node1.query("SYSTEM STOP MERGES {}".format(name)) - node1.query("SYSTEM STOP MOVES {}".format(name)) - - for _ in range(2): - data = [] # 16MB in total - for i in range(8): - data.append(("'{}'".format(get_random_string(1024 * 1024)), 'toDateTime({})'.format(time.time()+2 if i > 0 else time.time()+7))) # 1MB row - - node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) - - time.sleep(4) - used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"jbod1"} - assert "2" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() - - node1.query("SYSTEM START MERGES {}".format(name)) - node1.query("OPTIMIZE TABLE {}".format(name)) - - time.sleep(1) - used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"jbod1"} - assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() - - assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "16" - - finally: - node1.query("DROP TABLE IF EXISTS {}".format(name)) - - -@pytest.mark.parametrize("name,engine", [ - ("mt_test_moves_after_merges_do_not_work","MergeTree()"), - ("replicated_mt_test_moves_after_merges_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_do_not_work', '1')",), -]) -def test_moves_after_merges_do_not_work(started_cluster, name, engine): - try: - node1.query(""" - CREATE TABLE {name} ( - s1 String, - d1 DateTime - ) ENGINE = {engine} - ORDER BY tuple() - TTL d1 TO DISK 'external' - SETTINGS storage_policy='small_jbod_with_external' - """.format(name=name, engine=engine, time=time.time()+2)) - - for _ in range(2): - data = [] # 16MB in total - for i in range(8): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 else time.time()+6))) # 1MB row - - node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) - - node1.query("OPTIMIZE TABLE {}".format(name)) - used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"jbod1"} - assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() - - time.sleep(4) - - used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"jbod1"} - - assert node1.query("SELECT count() FROM {}".format(name=name)).strip() == "16" - - finally: - node1.query("DROP TABLE IF EXISTS {}".format(name)) - -# FIXME refactor _do_not tests into main ones