Finally fixed tests and logic for extended TTL syntax.

This commit is contained in:
Vladimir Chebotarev 2019-11-30 22:22:01 +03:00
parent f83b28a1c1
commit 67aa268bbb
6 changed files with 128 additions and 40 deletions

View File

@ -3157,6 +3157,17 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(U
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
DiskSpace::ReservationPtr reservation = tryReserveSpacePreferringMoveDestination(expected_size, ttl_infos, time_of_move);
return returnReservationOrThrowError(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpacePreferringMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
DiskSpace::ReservationPtr reservation;
auto ttl_entry = selectMoveDestination(ttl_infos, time_of_move);
@ -3184,18 +3195,25 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(U
reservation = storage_policy->reserve(expected_size);
return returnReservationOrThrowError(expected_size, std::move(reservation));
return reservation;
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = space->reserve(expected_size);
auto reservation = tryReserveSpaceInSpecificSpace(expected_size, space);
return returnReservationOrThrowError(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
return space->reserve(expected_size);
}
DiskSpace::SpacePtr MergeTreeData::TTLEntry::getDestination(const DiskSpace::StoragePolicyPtr & storage_policy) const
{
if (destination_type == PartDestinationType::VOLUME)

View File

@ -679,7 +679,11 @@ public:
DiskSpace::ReservationPtr reserveSpacePreferringMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const;
DiskSpace::ReservationPtr tryReserveSpacePreferringMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const;
DiskSpace::ReservationPtr reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
DiskSpace::ReservationPtr tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
/// Choose disk with max available free space
/// Reserves 0 bytes

View File

@ -130,7 +130,7 @@ bool MergeTreePartsMover::selectPartsForMove(
{
auto destination = ttl_entry_ptr->getDestination(policy);
if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part))
reservation = part->storage.reserveSpaceInSpecificSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
reservation = part->storage.tryReserveSpaceInSpecificSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
}
if (reservation)

View File

@ -350,7 +350,7 @@ public:
/// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
if (is_mutation)
reserved_space = storage.reserveSpaceInSpecificSpace(total_size, future_part_.parts[0]->disk);
reserved_space = storage.tryReserveSpaceInSpecificSpace(total_size, future_part_.parts[0]->disk);
else
{
MergeTreeDataPart::TTLInfos ttl_infos;
@ -358,7 +358,7 @@ public:
{
ttl_infos.update(part_ptr->ttl_infos);
}
reserved_space = storage.reserveSpacePreferringMoveDestination(total_size, ttl_infos, time(nullptr));
reserved_space = storage.tryReserveSpacePreferringMoveDestination(total_size, ttl_infos, time(nullptr));
}
if (!reserved_space)
{

View File

@ -28,6 +28,18 @@
</volumes>
</external_with_jbods>
<jbods_with_external>
<volumes>
<main>
<disk>jbod1</disk>
<disk>jbod2</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</jbods_with_external>
<small_jbod_with_external>
<volumes>
<main>

View File

@ -3,6 +3,7 @@ import pytest
import random
import re
import string
import threading
import time
from multiprocessing.dummy import Pool
from helpers.client import QueryRuntimeException
@ -38,7 +39,11 @@ def started_cluster():
def get_random_string(length):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
symbols = bytes(string.ascii_uppercase + string.digits)
result_list = bytearray([0])*length
for i in range(length):
result_list[i] = random.choice(symbols)
return str(result_list)
def get_used_disks_for_table(node, table_name):
@ -65,7 +70,7 @@ def test_inserts_to_disk_work(started_cluster, name, engine, positive):
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2 if i > 0 or positive else time.time()+2))) # 1MB row
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1 if i > 0 or positive else time.time()+300))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
@ -95,15 +100,25 @@ def test_moves_to_disk_work(started_cluster, name, engine, positive):
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
wait_expire_1 = 6
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+6))) # 1MB row
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
time.sleep(4)
wait_expire_1_thread.join()
time.sleep(wait_expire_2/2)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}
@ -115,7 +130,7 @@ def test_moves_to_disk_work(started_cluster, name, engine, positive):
@pytest.mark.parametrize("name,engine", [
("mt_test_moves_to_volume_work","MergeTree()"),
("replicated_mt_test_moves_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_volume_work', '1')",),
("replicated_mt_test_moves_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_volume_work', '1')"),
])
def test_moves_to_volume_work(started_cluster, name, engine):
try:
@ -127,21 +142,29 @@ def test_moves_to_volume_work(started_cluster, name, engine):
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY p1
TTL d1 TO VOLUME 'main'
SETTINGS storage_policy='external_with_jbods'
TTL d1 TO VOLUME 'external'
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
wait_expire_1 = 10
time_1 = time.time() + wait_expire_1
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for p in range(2):
data = [] # 20MB in total
for i in range(10):
data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2))) # 1MB row
data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row
node1.query("INSERT INTO {} (p1, s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {'jbod1', 'jbod2'}
time.sleep(4)
wait_expire_1_thread.join()
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external"}
@ -151,11 +174,13 @@ def test_moves_to_volume_work(started_cluster, name, engine):
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine", [
("mt_test_inserts_to_volume_work","MergeTree()"),
("replicated_mt_test_inserts_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_work', '1')",),
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_inserts_to_volume_do_not_work","MergeTree()",0),
("replicated_mt_test_inserts_to_volume_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_do_not_work', '1')",0),
("mt_test_inserts_to_volume_work","MergeTree()",1),
("replicated_mt_test_inserts_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_work', '1')",1),
])
def test_inserts_to_volume_work(started_cluster, name, engine):
def test_inserts_to_volume_work(started_cluster, name, engine, positive):
try:
node1.query("""
CREATE TABLE {name} (
@ -165,19 +190,21 @@ def test_inserts_to_volume_work(started_cluster, name, engine):
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY p1
TTL d1 TO VOLUME 'main'
SETTINGS storage_policy='external_with_jbods'
TTL d1 TO VOLUME 'external'
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
node1.query("SYSTEM STOP MOVES {name}".format(name=name))
for p in range(2):
data = [] # 20MB in total
for i in range(10):
data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2))) # 1MB row
data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1 if i > 0 or positive else time.time()+300))) # 1MB row
node1.query("INSERT INTO {} (p1, s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external"}
assert set(used_disks) == {"external" if positive else "jbod1"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "20"
@ -187,7 +214,7 @@ def test_inserts_to_volume_work(started_cluster, name, engine):
@pytest.mark.parametrize("name,engine", [
("mt_test_moves_to_disk_eventually_work","MergeTree()"),
("replicated_mt_test_moves_to_disk_eventually_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_eventually_work', '1')",),
("replicated_mt_test_moves_to_disk_eventually_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_eventually_work', '1')"),
])
def test_moves_to_disk_eventually_work(started_cluster, name, engine):
try:
@ -221,7 +248,7 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine):
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-2))) # 1MB row
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
@ -261,18 +288,28 @@ def test_merges_to_disk_work(started_cluster, name, engine, positive):
node1.query("SYSTEM STOP MERGES {}".format(name))
node1.query("SYSTEM STOP MOVES {}".format(name))
wait_expire_1 = 10
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for _ in range(2):
data = [] # 16MB in total
for i in range(8):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+7))) # 1MB row
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
time.sleep(4)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
assert "2" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
wait_expire_1_thread.join()
time.sleep(wait_expire_2/2)
node1.query("SYSTEM START MERGES {}".format(name))
node1.query("OPTIMIZE TABLE {}".format(name))
@ -288,10 +325,10 @@ def test_merges_to_disk_work(started_cluster, name, engine, positive):
@pytest.mark.parametrize("name,engine", [
("mt_test_merges_to_full_disk_work","MergeTree()"),
("replicated_mt_test_merges_to_full_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_full_disk_work', '1')",),
("mt_test_merges_with_full_disk_work","MergeTree()"),
("replicated_mt_test_merges_with_full_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_with_full_disk_work', '1')"),
])
def test_merges_to_full_disk_work(started_cluster, name, engine):
def test_merges_with_full_disk_work(started_cluster, name, engine):
try:
name_temp = name + "_temp"
@ -317,36 +354,42 @@ def test_merges_to_full_disk_work(started_cluster, name, engine):
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'external'
SETTINGS storage_policy='small_jbod_with_external'
TTL d1 TO DISK 'jbod2'
SETTINGS storage_policy='jbod1_with_jbod2'
""".format(name=name, engine=engine))
node1.query("SYSTEM STOP MOVES {}".format(name))
wait_expire_1 = 10
time_1 = time.time() + wait_expire_1
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for _ in range(2):
data = [] # 16MB in total
for i in range(8):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2))) # 1MB row
data = [] # 12MB in total
for i in range(6):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
assert "2" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
time.sleep(4)
wait_expire_1_thread.join()
node1.query("OPTIMIZE TABLE {}".format(name))
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"} # Merged to the same disk against the rule.
assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "16"
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "12"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name_temp))
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_moves_after_merges_do_not_work","MergeTree()",0),
("replicated_mt_test_moves_after_merges_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_do_not_work', '1')",0),
@ -365,19 +408,30 @@ def test_moves_after_merges_work(started_cluster, name, engine, positive):
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
wait_expire_1 = 10
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for _ in range(2):
data = [] # 16MB in total
for i in range(8):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()+2 if i > 0 or positive else time.time()+6))) # 1MB row
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
node1.query("OPTIMIZE TABLE {}".format(name))
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
time.sleep(4)
wait_expire_1_thread.join()
time.sleep(wait_expire_2/2)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}