Implement lead_used load balancing algorithm for disks inside volume

v2: rebase on top removed raid1
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-04-26 17:58:09 +03:00
parent 4bc849b9f0
commit ba26b3cf4c
10 changed files with 290 additions and 19 deletions

View File

@ -669,6 +669,7 @@ Storage policies configuration markup:
<volume_name_1>
<disk>disk_name_from_disks_configuration</disk>
<max_data_part_size_bytes>1073741824</max_data_part_size_bytes>
<load_balancing>round_robin</load_balancing>
</volume_name_1>
<volume_name_2>
<!-- configuration -->
@ -696,6 +697,7 @@ Tags:
- `move_factor` — when the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1). ClickHouse sorts existing parts by size from largest to smallest (in descending order) and selects parts with the total size that is sufficient to meet the `move_factor` condition. If the total size of all parts is insufficient, all parts will be moved.
- `prefer_not_to_merge` — Disables merging of data parts on this volume. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks.
- `perform_ttl_move_on_insert` — Disables TTL move on data part INSERT. By default if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3).
- `load_balancing` - Policy for disk balancing, `round_robin` or `least_used`.
Cofiguration examples:

View File

@ -408,6 +408,7 @@
<max_data_part_size_ratio></max_data_part_size_ratio>
<perform_ttl_move_on_insert>true</perform_ttl_move_on_insert>
<prefer_not_to_merge>false</prefer_not_to_merge>
<load_balancing>round_robin</load_balancing>
</main>
</volumes>
<move_factor>0.2</move_factor>

View File

@ -7,17 +7,31 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
}
VolumeLoadBalancing parseVolumeLoadBalancing(const String & config)
{
if (config == "round_robin")
return VolumeLoadBalancing::ROUND_ROBIN;
if (config == "least_used")
return VolumeLoadBalancing::LEAST_USED;
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "'{}' is not valid load_balancing value", config);
}
IVolume::IVolume(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector)
: name(std::move(name_))
, load_balancing(parseVolumeLoadBalancing(config.getString(config_prefix + ".load_balancing", "round_robin")))
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);

View File

@ -15,6 +15,14 @@ enum class VolumeType
UNKNOWN
};
enum class VolumeLoadBalancing
{
ROUND_ROBIN,
LEAST_USED,
};
VolumeLoadBalancing parseVolumeLoadBalancing(const String & config);
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
@ -33,11 +41,19 @@ using Volumes = std::vector<VolumePtr>;
class IVolume : public Space
{
public:
IVolume(String name_, Disks disks_, size_t max_data_part_size_ = 0, bool perform_ttl_move_on_insert_ = true)
/// This constructor is only for:
/// - SingleDiskVolume
/// From createVolumeFromReservation().
IVolume(String name_,
Disks disks_,
size_t max_data_part_size_ = 0,
bool perform_ttl_move_on_insert_ = true,
VolumeLoadBalancing load_balancing_ = VolumeLoadBalancing::ROUND_ROBIN)
: disks(std::move(disks_))
, name(name_)
, max_data_part_size(max_data_part_size_)
, perform_ttl_move_on_insert(perform_ttl_move_on_insert_)
, load_balancing(load_balancing_)
{
}
@ -78,6 +94,10 @@ public:
/// Should a new data part be synchronously moved to a volume according to ttl on insert
/// or move this part in background task asynchronously after insert.
bool perform_ttl_move_on_insert = true;
/// Load balancing, one of:
/// - ROUND_ROBIN
/// - LEAST_USED
const VolumeLoadBalancing load_balancing;
};
}

View File

@ -63,7 +63,12 @@ StoragePolicy::StoragePolicy(
if (volumes.empty() && name == DEFAULT_STORAGE_POLICY_NAME)
{
auto default_volume = std::make_shared<VolumeJBOD>(DEFAULT_VOLUME_NAME, std::vector<DiskPtr>{disks->get(DEFAULT_DISK_NAME)}, 0, false);
auto default_volume = std::make_shared<VolumeJBOD>(DEFAULT_VOLUME_NAME,
std::vector<DiskPtr>{disks->get(DEFAULT_DISK_NAME)},
/* max_data_part_size_= */ 0,
/* are_merges_avoided_= */ false,
/* perform_ttl_move_on_insert_= */ true,
VolumeLoadBalancing::ROUND_ROBIN);
volumes.emplace_back(std::move(default_volume));
}

View File

@ -19,15 +19,18 @@ VolumeJBOD::VolumeJBOD(
const String & config_prefix,
DiskSelectorPtr disk_selector)
: IVolume(name_, config, config_prefix, disk_selector)
, disks_by_size(disks.begin(), disks.end())
{
Poco::Logger * logger = &Poco::Logger::get("StorageConfiguration");
auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
if (has_max_bytes && has_max_ratio)
{
throw Exception(
"Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified.",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
if (has_max_bytes)
{
@ -47,12 +50,20 @@ VolumeJBOD::VolumeJBOD(
}
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
{
if (sizes[i] < max_data_part_size)
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})", backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
{
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})",
backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
}
}
}
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE));
{
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})",
backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE));
}
/// Default value is 'true' due to backward compatibility.
perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true);
@ -72,31 +83,61 @@ VolumeJBOD::VolumeJBOD(const VolumeJBOD & volume_jbod,
DiskPtr VolumeJBOD::getDisk(size_t /* index */) const
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t index = start_from % disks.size();
return disks[index];
switch (load_balancing)
{
case VolumeLoadBalancing::ROUND_ROBIN:
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t index = start_from % disks.size();
return disks[index];
}
case VolumeLoadBalancing::LEAST_USED:
{
std::lock_guard lock(mutex);
return disks_by_size.top();
}
}
__builtin_unreachable();
}
ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
{
/// This volume can not store data which size is greater than `max_data_part_size`
/// to ensure that parts of size greater than that go to another volume(s).
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t disks_num = disks.size();
for (size_t i = 0; i < disks_num; ++i)
switch (load_balancing)
{
size_t index = (start_from + i) % disks_num;
case VolumeLoadBalancing::ROUND_ROBIN:
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t disks_num = disks.size();
for (size_t i = 0; i < disks_num; ++i)
{
size_t index = (start_from + i) % disks_num;
auto reservation = disks[index]->reserve(bytes);
auto reservation = disks[index]->reserve(bytes);
if (reservation)
return reservation;
}
return {};
}
case VolumeLoadBalancing::LEAST_USED:
{
std::lock_guard lock(mutex);
DiskPtr disk = disks_by_size.top();
ReservationPtr reservation = disk->reserve(bytes);
disks_by_size.pop();
disks_by_size.push(disk);
if (reservation)
return reservation;
}
}
return {};
__builtin_unreachable();
}
bool VolumeJBOD::areMergesAvoided() const

View File

@ -22,8 +22,8 @@ using VolumesJBOD = std::vector<VolumeJBODPtr>;
class VolumeJBOD : public IVolume
{
public:
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_)
: IVolume(name_, disks_, max_data_part_size_)
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_, bool perform_ttl_move_on_insert_, VolumeLoadBalancing load_balancing_)
: IVolume(name_, disks_, max_data_part_size_, perform_ttl_move_on_insert_, load_balancing_)
, are_merges_avoided(are_merges_avoided_)
{
}
@ -44,7 +44,8 @@ public:
VolumeType getType() const override { return VolumeType::JBOD; }
/// Always returns next disk (round-robin), ignores argument.
/// Returns disk based on the load balancing algorithm (round-robin, or least-used),
/// ignores @index argument.
///
/// - Used with policy for temporary data
/// - Ignores all limitations
@ -63,8 +64,20 @@ public:
bool are_merges_avoided = true;
private:
/// Index of last used disk.
struct DiskBySize
{
bool operator()(const DiskPtr & lhs, const DiskPtr & rhs) const
{
/// TODO: avoid getAvailableSpace() calls
return lhs->getUnreservedSpace() < rhs->getUnreservedSpace();
}
};
mutable std::mutex mutex;
/// Index of last used disk, for load_balancing=round_robin
mutable std::atomic<size_t> last_used = 0;
/// Priority queue of disks sorted by size, for load_balancing=least_used
mutable std::priority_queue<DiskPtr, std::vector<DiskPtr>, DiskBySize> disks_by_size;
/// True if parts on this volume participate in merges according to START/STOP MERGES ON VOLUME.
std::atomic<std::optional<bool>> are_merges_avoided_user_override{std::nullopt};

View File

@ -0,0 +1,39 @@
<clickhouse>
<storage_configuration>
<disks>
<jbod1>
<path>/jbod1/</path>
</jbod1>
<jbod2>
<path>/jbod2/</path>
</jbod2>
<jbod3>
<path>/jbod3/</path>
</jbod3>
</disks>
<policies>
<jbod_round_robin>
<volumes>
<disks>
<disk>jbod1</disk>
<disk>jbod2</disk>
<disk>jbod3</disk>
<!-- <load_balancing>round_robin</load_balancing> -->
</disks>
</volumes>
</jbod_round_robin>
<jbod_least_used>
<volumes>
<disks>
<disk>jbod1</disk>
<disk>jbod2</disk>
<disk>jbod3</disk>
<load_balancing>least_used</load_balancing>
</disks>
</volumes>
</jbod_least_used>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,136 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=[
"configs/config.d/storage_configuration.xml",
],
tmpfs=[
"/jbod1:size=100M",
"/jbod2:size=200M",
"/jbod3:size=300M",
],
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_jbod_load_balancing_round_robin(start_cluster):
try:
node.query(
"""
CREATE TABLE data_round_robin (p UInt8)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy = 'jbod_round_robin';
SYSTEM STOP MERGES data_round_robin;
INSERT INTO data_round_robin SELECT * FROM numbers(10);
INSERT INTO data_round_robin SELECT * FROM numbers(10);
INSERT INTO data_round_robin SELECT * FROM numbers(10);
INSERT INTO data_round_robin SELECT * FROM numbers(10);
"""
)
parts = node.query(
"""
SELECT count(), disk_name
FROM system.parts
WHERE table = 'data_round_robin'
GROUP BY disk_name
ORDER BY disk_name
"""
)
parts = [l.split("\t") for l in parts.strip().split("\n")]
assert parts == [
["2", "jbod1"],
["1", "jbod2"],
["1", "jbod3"],
]
finally:
node.query("DROP TABLE IF EXISTS data_round_robin SYNC")
def test_jbod_load_balancing_least_used(start_cluster):
try:
node.query(
"""
CREATE TABLE data_least_used (p UInt8)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy = 'jbod_least_used';
SYSTEM STOP MERGES data_least_used;
INSERT INTO data_least_used SELECT * FROM numbers(10);
INSERT INTO data_least_used SELECT * FROM numbers(10);
INSERT INTO data_least_used SELECT * FROM numbers(10);
INSERT INTO data_least_used SELECT * FROM numbers(10);
"""
)
parts = node.query(
"""
SELECT count(), disk_name
FROM system.parts
WHERE table = 'data_least_used'
GROUP BY disk_name
ORDER BY disk_name
"""
)
parts = [l.split("\t") for l in parts.strip().split("\n")]
assert parts == [
["4", "jbod3"],
]
finally:
node.query("DROP TABLE IF EXISTS data_least_used SYNC")
def test_jbod_load_balancing_least_used_next_disk(start_cluster):
try:
node.query(
"""
CREATE TABLE data_least_used_next_disk
(
s String CODEC(NONE)
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy = 'jbod_least_used';
SYSTEM STOP MERGES data_least_used_next_disk;
-- 100MiB each part, 3 parts in total
INSERT INTO data_least_used_next_disk SELECT repeat('a', 100) FROM numbers(3e6) SETTINGS max_block_size='1Mi';
"""
)
parts = node.query(
"""
SELECT count(), disk_name
FROM system.parts
WHERE table = 'data_least_used_next_disk'
GROUP BY disk_name
ORDER BY disk_name
"""
)
parts = [l.split("\t") for l in parts.strip().split("\n")]
assert parts == [
["1", "jbod2"],
["2", "jbod3"],
]
finally:
node.query("DROP TABLE IF EXISTS data_least_used_next_disk SYNC")