From 67aa268bbbd280be2f3414d9d05da660d3b8201e Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Sat, 30 Nov 2019 22:22:01 +0300 Subject: [PATCH] Finally fixed tests and logic for extended TTL syntax. --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 22 +++- dbms/src/Storages/MergeTree/MergeTreeData.h | 4 + .../MergeTree/MergeTreePartsMover.cpp | 2 +- dbms/src/Storages/StorageMergeTree.cpp | 4 +- .../config.d/storage_configuration.xml | 12 ++ dbms/tests/integration/test_ttl_move/test.py | 124 +++++++++++++----- 6 files changed, 128 insertions(+), 40 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index a05910e313c..4d659acc15a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -3157,6 +3157,17 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(U { expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); + DiskSpace::ReservationPtr reservation = tryReserveSpacePreferringMoveDestination(expected_size, ttl_infos, time_of_move); + + return returnReservationOrThrowError(expected_size, std::move(reservation)); +} + +DiskSpace::ReservationPtr MergeTreeData::tryReserveSpacePreferringMoveDestination(UInt64 expected_size, + const MergeTreeDataPart::TTLInfos & ttl_infos, + time_t time_of_move) const +{ + expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); + DiskSpace::ReservationPtr reservation; auto ttl_entry = selectMoveDestination(ttl_infos, time_of_move); @@ -3184,18 +3195,25 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(U reservation = storage_policy->reserve(expected_size); - return returnReservationOrThrowError(expected_size, std::move(reservation)); + return reservation; } DiskSpace::ReservationPtr MergeTreeData::reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const { expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); - auto reservation = space->reserve(expected_size); + auto reservation = tryReserveSpaceInSpecificSpace(expected_size, space); return returnReservationOrThrowError(expected_size, std::move(reservation)); } +DiskSpace::ReservationPtr MergeTreeData::tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const +{ + expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); + + return space->reserve(expected_size); +} + DiskSpace::SpacePtr MergeTreeData::TTLEntry::getDestination(const DiskSpace::StoragePolicyPtr & storage_policy) const { if (destination_type == PartDestinationType::VOLUME) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index eb52e4cea4a..47093da2ccc 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -679,7 +679,11 @@ public: DiskSpace::ReservationPtr reserveSpacePreferringMoveDestination(UInt64 expected_size, const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; + DiskSpace::ReservationPtr tryReserveSpacePreferringMoveDestination(UInt64 expected_size, + const MergeTreeDataPart::TTLInfos & ttl_infos, + time_t time_of_move) const; DiskSpace::ReservationPtr reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const; + DiskSpace::ReservationPtr tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const; /// Choose disk with max available free space /// Reserves 0 bytes diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp index 60a6cbdd3f2..de987acac72 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -130,7 +130,7 @@ bool MergeTreePartsMover::selectPartsForMove( { auto destination = ttl_entry_ptr->getDestination(policy); if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part)) - reservation = part->storage.reserveSpaceInSpecificSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy)); + reservation = part->storage.tryReserveSpaceInSpecificSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy)); } if (reservation) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index a8c91a58c60..4e4bea7b023 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -350,7 +350,7 @@ public: /// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks if (is_mutation) - reserved_space = storage.reserveSpaceInSpecificSpace(total_size, future_part_.parts[0]->disk); + reserved_space = storage.tryReserveSpaceInSpecificSpace(total_size, future_part_.parts[0]->disk); else { MergeTreeDataPart::TTLInfos ttl_infos; @@ -358,7 +358,7 @@ public: { ttl_infos.update(part_ptr->ttl_infos); } - reserved_space = storage.reserveSpacePreferringMoveDestination(total_size, ttl_infos, time(nullptr)); + reserved_space = storage.tryReserveSpacePreferringMoveDestination(total_size, ttl_infos, time(nullptr)); } if (!reserved_space) { diff --git a/dbms/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml b/dbms/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml index 454b78ec216..b48de85007a 100644 --- a/dbms/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml +++ b/dbms/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml @@ -28,6 +28,18 @@ + + +
+ jbod1 + jbod2 +
+ + external + +
+
+
diff --git a/dbms/tests/integration/test_ttl_move/test.py b/dbms/tests/integration/test_ttl_move/test.py index 85d9eb894af..f35c5409841 100644 --- a/dbms/tests/integration/test_ttl_move/test.py +++ b/dbms/tests/integration/test_ttl_move/test.py @@ -3,6 +3,7 @@ import pytest import random import re import string +import threading import time from multiprocessing.dummy import Pool from helpers.client import QueryRuntimeException @@ -38,7 +39,11 @@ def started_cluster(): def get_random_string(length): - return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length)) + symbols = bytes(string.ascii_uppercase + string.digits) + result_list = bytearray([0])*length + for i in range(length): + result_list[i] = random.choice(symbols) + return str(result_list) def get_used_disks_for_table(node, table_name): @@ -65,7 +70,7 @@ def test_inserts_to_disk_work(started_cluster, name, engine, positive): data = [] # 10MB in total for i in range(10): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2 if i > 0 or positive else time.time()+2))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1 if i > 0 or positive else time.time()+300))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) @@ -95,15 +100,25 @@ def test_moves_to_disk_work(started_cluster, name, engine, positive): SETTINGS storage_policy='small_jbod_with_external' """.format(name=name, engine=engine)) + wait_expire_1 = 6 + wait_expire_2 = 4 + time_1 = time.time() + wait_expire_1 + time_2 = time.time() + wait_expire_1 + wait_expire_2 + + wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,)) + wait_expire_1_thread.start() + data = [] # 10MB in total for i in range(10): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+6))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"jbod1"} - time.sleep(4) + wait_expire_1_thread.join() + time.sleep(wait_expire_2/2) + used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"external" if positive else "jbod1"} @@ -115,7 +130,7 @@ def test_moves_to_disk_work(started_cluster, name, engine, positive): @pytest.mark.parametrize("name,engine", [ ("mt_test_moves_to_volume_work","MergeTree()"), - ("replicated_mt_test_moves_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_volume_work', '1')",), + ("replicated_mt_test_moves_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_volume_work', '1')"), ]) def test_moves_to_volume_work(started_cluster, name, engine): try: @@ -127,21 +142,29 @@ def test_moves_to_volume_work(started_cluster, name, engine): ) ENGINE = {engine} ORDER BY tuple() PARTITION BY p1 - TTL d1 TO VOLUME 'main' - SETTINGS storage_policy='external_with_jbods' + TTL d1 TO VOLUME 'external' + SETTINGS storage_policy='jbods_with_external' """.format(name=name, engine=engine)) + wait_expire_1 = 10 + time_1 = time.time() + wait_expire_1 + + wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,)) + wait_expire_1_thread.start() + for p in range(2): data = [] # 20MB in total for i in range(10): - data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2))) # 1MB row + data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row node1.query("INSERT INTO {} (p1, s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {'jbod1', 'jbod2'} - time.sleep(4) + wait_expire_1_thread.join() + time.sleep(1) + used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"external"} @@ -151,11 +174,13 @@ def test_moves_to_volume_work(started_cluster, name, engine): node1.query("DROP TABLE IF EXISTS {}".format(name)) -@pytest.mark.parametrize("name,engine", [ - ("mt_test_inserts_to_volume_work","MergeTree()"), - ("replicated_mt_test_inserts_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_work', '1')",), +@pytest.mark.parametrize("name,engine,positive", [ + ("mt_test_inserts_to_volume_do_not_work","MergeTree()",0), + ("replicated_mt_test_inserts_to_volume_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_do_not_work', '1')",0), + ("mt_test_inserts_to_volume_work","MergeTree()",1), + ("replicated_mt_test_inserts_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_work', '1')",1), ]) -def test_inserts_to_volume_work(started_cluster, name, engine): +def test_inserts_to_volume_work(started_cluster, name, engine, positive): try: node1.query(""" CREATE TABLE {name} ( @@ -165,19 +190,21 @@ def test_inserts_to_volume_work(started_cluster, name, engine): ) ENGINE = {engine} ORDER BY tuple() PARTITION BY p1 - TTL d1 TO VOLUME 'main' - SETTINGS storage_policy='external_with_jbods' + TTL d1 TO VOLUME 'external' + SETTINGS storage_policy='small_jbod_with_external' """.format(name=name, engine=engine)) + node1.query("SYSTEM STOP MOVES {name}".format(name=name)) + for p in range(2): data = [] # 20MB in total for i in range(10): - data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2))) # 1MB row + data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1 if i > 0 or positive else time.time()+300))) # 1MB row node1.query("INSERT INTO {} (p1, s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) - assert set(used_disks) == {"external"} + assert set(used_disks) == {"external" if positive else "jbod1"} assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "20" @@ -187,7 +214,7 @@ def test_inserts_to_volume_work(started_cluster, name, engine): @pytest.mark.parametrize("name,engine", [ ("mt_test_moves_to_disk_eventually_work","MergeTree()"), - ("replicated_mt_test_moves_to_disk_eventually_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_eventually_work', '1')",), + ("replicated_mt_test_moves_to_disk_eventually_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_eventually_work', '1')"), ]) def test_moves_to_disk_eventually_work(started_cluster, name, engine): try: @@ -221,7 +248,7 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine): data = [] # 10MB in total for i in range(10): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) @@ -261,18 +288,28 @@ def test_merges_to_disk_work(started_cluster, name, engine, positive): node1.query("SYSTEM STOP MERGES {}".format(name)) node1.query("SYSTEM STOP MOVES {}".format(name)) + wait_expire_1 = 10 + wait_expire_2 = 4 + time_1 = time.time() + wait_expire_1 + time_2 = time.time() + wait_expire_1 + wait_expire_2 + + wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,)) + wait_expire_1_thread.start() + for _ in range(2): data = [] # 16MB in total for i in range(8): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+7))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) - time.sleep(4) used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"jbod1"} assert "2" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() + wait_expire_1_thread.join() + time.sleep(wait_expire_2/2) + node1.query("SYSTEM START MERGES {}".format(name)) node1.query("OPTIMIZE TABLE {}".format(name)) @@ -288,10 +325,10 @@ def test_merges_to_disk_work(started_cluster, name, engine, positive): @pytest.mark.parametrize("name,engine", [ - ("mt_test_merges_to_full_disk_work","MergeTree()"), - ("replicated_mt_test_merges_to_full_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_full_disk_work', '1')",), + ("mt_test_merges_with_full_disk_work","MergeTree()"), + ("replicated_mt_test_merges_with_full_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_with_full_disk_work', '1')"), ]) -def test_merges_to_full_disk_work(started_cluster, name, engine): +def test_merges_with_full_disk_work(started_cluster, name, engine): try: name_temp = name + "_temp" @@ -317,36 +354,42 @@ def test_merges_to_full_disk_work(started_cluster, name, engine): d1 DateTime ) ENGINE = {engine} ORDER BY tuple() - TTL d1 TO DISK 'external' - SETTINGS storage_policy='small_jbod_with_external' + TTL d1 TO DISK 'jbod2' + SETTINGS storage_policy='jbod1_with_jbod2' """.format(name=name, engine=engine)) - node1.query("SYSTEM STOP MOVES {}".format(name)) + wait_expire_1 = 10 + time_1 = time.time() + wait_expire_1 + + wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,)) + wait_expire_1_thread.start() for _ in range(2): - data = [] # 16MB in total - for i in range(8): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2))) # 1MB row - + data = [] # 12MB in total + for i in range(6): + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"jbod1"} assert "2" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() - time.sleep(4) + wait_expire_1_thread.join() + node1.query("OPTIMIZE TABLE {}".format(name)) + time.sleep(1) used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"jbod1"} # Merged to the same disk against the rule. assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() - assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "16" + assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "12" finally: node1.query("DROP TABLE IF EXISTS {}".format(name_temp)) node1.query("DROP TABLE IF EXISTS {}".format(name)) + @pytest.mark.parametrize("name,engine,positive", [ ("mt_test_moves_after_merges_do_not_work","MergeTree()",0), ("replicated_mt_test_moves_after_merges_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_do_not_work', '1')",0), @@ -365,19 +408,30 @@ def test_moves_after_merges_work(started_cluster, name, engine, positive): SETTINGS storage_policy='small_jbod_with_external' """.format(name=name, engine=engine)) + wait_expire_1 = 10 + wait_expire_2 = 4 + time_1 = time.time() + wait_expire_1 + time_2 = time.time() + wait_expire_1 + wait_expire_2 + + wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,)) + wait_expire_1_thread.start() + for _ in range(2): data = [] # 16MB in total for i in range(8): - data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+6))) # 1MB row + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) node1.query("OPTIMIZE TABLE {}".format(name)) + time.sleep(1) + used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"jbod1"} assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip() - time.sleep(4) + wait_expire_1_thread.join() + time.sleep(wait_expire_2/2) used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"external" if positive else "jbod1"}