Fix incorrect free space accounting for least_used JBOD policy

Before least_used fails to detect when the disk started to have more
space, it works only when the disk starts to have less space.

The reason for this is that it uses priority_queue, and once the disk
goes at the bottom of the queue, free space will not be updated for it
until it will be selected again.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-10-25 22:56:24 +02:00
parent d6a09a6efa
commit c7f392500e
6 changed files with 99 additions and 8 deletions

View File

@ -866,6 +866,7 @@ Tags:
- `prefer_not_to_merge` — Disables merging of data parts on this volume. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks.
- `perform_ttl_move_on_insert` — Disables TTL move on data part INSERT. By default (if enabled) if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3). If disabled then already expired data part is written into a default volume and then right after moved to TTL volume.
- `load_balancing` - Policy for disk balancing, `round_robin` or `least_used`.
- `least_used_ttl_ms` - Configure timeout (in milliseconds) for the updating available space on all disks (`0` - update always, `-1` - never update, default is `60000`). Note, if the disk can be used by ClickHouse only and is not subject to a online filesystem resize/shrink you can use `-1`, in all other cases it is not recommended, since eventually it will lead to incorrect space distribution.
Configuration examples:

View File

@ -71,7 +71,8 @@ StoragePolicy::StoragePolicy(
/* max_data_part_size_= */ 0,
/* are_merges_avoided_= */ false,
/* perform_ttl_move_on_insert_= */ true,
VolumeLoadBalancing::ROUND_ROBIN);
VolumeLoadBalancing::ROUND_ROBIN,
/* least_used_ttl_ms_= */ 60'000);
volumes.emplace_back(std::move(default_volume));
}

View File

@ -76,6 +76,7 @@ VolumeJBOD::VolumeJBOD(
perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true);
are_merges_avoided = config.getBool(config_prefix + ".prefer_not_to_merge", false);
least_used_ttl_ms = config.getUInt64(config_prefix + ".least_used_ttl_ms", 60'000);
}
VolumeJBOD::VolumeJBOD(const VolumeJBOD & volume_jbod,
@ -101,6 +102,11 @@ DiskPtr VolumeJBOD::getDisk(size_t /* index */) const
case VolumeLoadBalancing::LEAST_USED:
{
std::lock_guard lock(mutex);
if (!least_used_ttl_ms || least_used_update_watch.elapsedMilliseconds() >= least_used_ttl_ms)
{
disks_by_size = LeastUsedDisksQueue(disks.begin(), disks.end());
least_used_update_watch.restart();
}
return disks_by_size.top().disk;
}
}
@ -135,11 +141,23 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
{
std::lock_guard lock(mutex);
DiskWithSize disk = disks_by_size.top();
disks_by_size.pop();
ReservationPtr reservation;
if (!least_used_ttl_ms || least_used_update_watch.elapsedMilliseconds() >= least_used_ttl_ms)
{
disks_by_size = LeastUsedDisksQueue(disks.begin(), disks.end());
least_used_update_watch.restart();
ReservationPtr reservation = disk.reserve(bytes);
disks_by_size.push(disk);
DiskWithSize disk = disks_by_size.top();
reservation = disk.reserve(bytes);
}
else
{
DiskWithSize disk = disks_by_size.top();
disks_by_size.pop();
reservation = disk.reserve(bytes);
disks_by_size.push(disk);
}
return reservation;
}

View File

@ -5,6 +5,9 @@
#include <queue>
#include <Disks/IVolume.h>
#include <base/defines.h>
#include <base/types.h>
#include <Common/Stopwatch.h>
namespace DB
@ -23,9 +26,10 @@ using VolumesJBOD = std::vector<VolumeJBODPtr>;
class VolumeJBOD : public IVolume
{
public:
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_, bool perform_ttl_move_on_insert_, VolumeLoadBalancing load_balancing_)
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_, bool perform_ttl_move_on_insert_, VolumeLoadBalancing load_balancing_, UInt64 least_used_ttl_ms_)
: IVolume(name_, disks_, max_data_part_size_, perform_ttl_move_on_insert_, load_balancing_)
, are_merges_avoided(are_merges_avoided_)
, least_used_ttl_ms(least_used_ttl_ms_)
{
}
@ -70,7 +74,7 @@ private:
DiskPtr disk;
std::optional<UInt64> free_size = 0;
DiskWithSize(DiskPtr disk_)
explicit DiskWithSize(DiskPtr disk_)
: disk(disk_)
, free_size(disk->getUnreservedSpace())
{}
@ -97,7 +101,10 @@ private:
/// Index of last used disk, for load_balancing=round_robin
mutable std::atomic<size_t> last_used = 0;
/// Priority queue of disks sorted by size, for load_balancing=least_used
mutable std::priority_queue<DiskWithSize> disks_by_size;
using LeastUsedDisksQueue = std::priority_queue<DiskWithSize>;
mutable LeastUsedDisksQueue disks_by_size TSA_GUARDED_BY(mutex);
mutable Stopwatch least_used_update_watch TSA_GUARDED_BY(mutex);
UInt64 least_used_ttl_ms = 0;
/// True if parts on this volume participate in merges according to START/STOP MERGES ON VOLUME.
std::atomic<std::optional<bool>> are_merges_avoided_user_override{std::nullopt};

View File

@ -31,6 +31,7 @@
<disk>jbod3</disk>
<load_balancing>least_used</load_balancing>
<least_used_ttl_ms>0</least_used_ttl_ms>
</disks>
</volumes>
</jbod_least_used>

View File

@ -134,3 +134,66 @@ def test_jbod_load_balancing_least_used_next_disk(start_cluster):
]
finally:
node.query("DROP TABLE IF EXISTS data_least_used_next_disk SYNC")
def test_jbod_load_balancing_least_used_detect_background_changes(start_cluster):
def get_parts_on_disks():
parts = node.query(
"""
SELECT count(), disk_name
FROM system.parts
WHERE table = 'data_least_used_detect_background_changes'
GROUP BY disk_name
ORDER BY disk_name
"""
)
parts = [l.split("\t") for l in parts.strip().split("\n")]
return parts
try:
node.query(
"""
CREATE TABLE data_least_used_detect_background_changes (p UInt8)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy = 'jbod_least_used';
SYSTEM STOP MERGES data_least_used_detect_background_changes;
"""
)
node.exec_in_container(["fallocate", "-l200M", "/jbod3/.test"])
node.query(
"""
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
"""
)
parts = get_parts_on_disks()
assert parts == [
["4", "jbod2"],
]
node.exec_in_container(["rm", "/jbod3/.test"])
node.query(
"""
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10);
"""
)
parts = get_parts_on_disks()
assert parts == [
# previous INSERT
["4", "jbod2"],
# this INSERT
["4", "jbod3"],
]
finally:
node.exec_in_container(["rm", "-f", "/jbod3/.test"])
node.query(
"DROP TABLE IF EXISTS data_least_used_detect_background_changes SYNC"
)