Merge pull request #36686 from azat/disk-least-used

Implement least_used load balancing algorithm for disks inside volume
This commit is contained in:
Sergei Trifonov 2022-05-20 09:16:07 +02:00 committed by GitHub
commit 2450ab9079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 412 additions and 51 deletions

View File

@ -669,6 +669,7 @@ Storage policies configuration markup:
<volume_name_1>
<disk>disk_name_from_disks_configuration</disk>
<max_data_part_size_bytes>1073741824</max_data_part_size_bytes>
<load_balancing>round_robin</load_balancing>
</volume_name_1>
<volume_name_2>
<!-- configuration -->
@ -695,6 +696,8 @@ Tags:
- `max_data_part_size_bytes` — the maximum size of a part that can be stored on any of the volumes disks. If the a size of a merged part estimated to be bigger than `max_data_part_size_bytes` then this part will be written to a next volume. Basically this feature allows to keep new/small parts on a hot (SSD) volume and move them to a cold (HDD) volume when they reach large size. Do not use this setting if your policy has only one volume.
- `move_factor` — when the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1). ClickHouse sorts existing parts by size from largest to smallest (in descending order) and selects parts with the total size that is sufficient to meet the `move_factor` condition. If the total size of all parts is insufficient, all parts will be moved.
- `prefer_not_to_merge` — Disables merging of data parts on this volume. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks.
- `perform_ttl_move_on_insert` — Disables TTL move on data part INSERT. By default if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3).
- `load_balancing` - Policy for disk balancing, `round_robin` or `least_used`.
Cofiguration examples:
@ -724,7 +727,7 @@ Cofiguration examples:
<move_factor>0.2</move_factor>
</moving_from_ssd_to_hdd>
<small_jbod_with_external_no_merges>
<small_jbod_with_external_no_merges>
<volumes>
<main>
<disk>jbod1</disk>

View File

@ -365,6 +365,59 @@
<!-- Path to data directory, with trailing slash. -->
<path>/var/lib/clickhouse/</path>
<!-- Multi-disk configuration example: -->
<!--
<storage_configuration>
<disks>
<default>
<keep_free_space_bytes>0</keep_free_space_bytes>
</default>
<data>
<path>/data/</path>
<keep_free_space_bytes>0</keep_free_space_bytes>
</data>
<s3>
<type>s3</type>
<endpoint>http://path/to/endpoint</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
</s3>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://account.blob.core.windows.net</storage_account_url>
<container_name>container</container_name>
<account_name>account</account_name>
<account_key>pass123</account_key>
<metadata_path>/var/lib/clickhouse/disks/blob_storage_disk/</metadata_path>
<cache_enabled>true</cache_enabled>
<cache_path>/var/lib/clickhouse/disks/blob_storage_disk/cache/</cache_path>
<skip_access_check>false</skip_access_check>
</blob_storage_disk>
</disks>
<policies>
<all>
<volumes>
<main>
<disk>default</disk>
<disk>data</disk>
<disk>s3</disk>
<disk>blob_storage_disk</disk>
<max_data_part_size_bytes></max_data_part_size_bytes>
<max_data_part_size_ratio></max_data_part_size_ratio>
<perform_ttl_move_on_insert>true</perform_ttl_move_on_insert>
<prefer_not_to_merge>false</prefer_not_to_merge>
<load_balancing>round_robin</load_balancing>
</main>
</volumes>
<move_factor>0.2</move_factor>
</all>
</policies>
</storage_configuration>
-->
<!-- Path to temporary data for processing hard queries. -->
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>

View File

@ -96,6 +96,7 @@ class ReservationDelegate : public IReservation
public:
ReservationDelegate(ReservationPtr delegate_, DiskPtr wrapper_) : delegate(std::move(delegate_)), wrapper(wrapper_) { }
UInt64 getSize() const override { return delegate->getSize(); }
UInt64 getUnreservedSpace() const override { return delegate->getUnreservedSpace(); }
DiskPtr getDisk(size_t) const override { return wrapper; }
Disks getDisks() const override { return {wrapper}; }
void update(UInt64 new_size) override { delegate->update(new_size); }

View File

@ -182,6 +182,7 @@ public:
}
UInt64 getSize() const override { return reservation->getSize(); }
UInt64 getUnreservedSpace() const override { return reservation->getUnreservedSpace(); }
DiskPtr getDisk(size_t i) const override
{

View File

@ -112,12 +112,15 @@ std::optional<size_t> fileSizeSafe(const fs::path & path)
class DiskLocalReservation : public IReservation
{
public:
DiskLocalReservation(const DiskLocalPtr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
DiskLocalReservation(const DiskLocalPtr & disk_, UInt64 size_, UInt64 unreserved_space_)
: disk(disk_)
, size(size_)
, unreserved_space(unreserved_space_)
, metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{}
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override
{
@ -165,6 +168,7 @@ public:
private:
DiskLocalPtr disk;
UInt64 size;
UInt64 unreserved_space;
CurrentMetrics::Increment metric_increment;
};
@ -201,32 +205,38 @@ private:
ReservationPtr DiskLocal::reserve(UInt64 bytes)
{
if (!tryReserve(bytes))
auto unreserved_space = tryReserve(bytes);
if (!unreserved_space.has_value())
return {};
return std::make_unique<DiskLocalReservation>(std::static_pointer_cast<DiskLocal>(shared_from_this()), bytes);
return std::make_unique<DiskLocalReservation>(
std::static_pointer_cast<DiskLocal>(shared_from_this()),
bytes, unreserved_space.value());
}
bool DiskLocal::tryReserve(UInt64 bytes)
std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
{
std::lock_guard lock(DiskLocal::reservation_mutex);
UInt64 available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (bytes == 0)
{
LOG_DEBUG(log, "Reserving 0 bytes on disk {}", backQuote(name));
++reservation_count;
return true;
return {unreserved_space};
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
return true;
return {unreserved_space - bytes};
}
return false;
return {};
}
static UInt64 getTotalSpaceByName(const String & name, const String & disk_path, UInt64 keep_free_space_bytes)

View File

@ -121,7 +121,7 @@ public:
bool canWrite() const noexcept;
private:
bool tryReserve(UInt64 bytes);
std::optional<UInt64> tryReserve(UInt64 bytes);
/// Setup disk for healthy check. Returns true if it's read-write, false if read-only.
/// Throw exception if it's not possible to setup necessary files and directories.

View File

@ -392,6 +392,10 @@ public:
/// Get reservation size.
virtual UInt64 getSize() const = 0;
/// Space available for reservation
/// (with this reservation already take into account).
virtual UInt64 getUnreservedSpace() const = 0;
/// Get i-th disk where reservation take place.
virtual DiskPtr getDisk(size_t i = 0) const = 0; /// NOLINT

View File

@ -637,34 +637,40 @@ void IDiskRemote::createHardLink(const String & src_path, const String & dst_pat
ReservationPtr IDiskRemote::reserve(UInt64 bytes)
{
if (!tryReserve(bytes))
auto unreserved_space = tryReserve(bytes);
if (!unreserved_space.has_value())
return {};
return std::make_unique<DiskRemoteReservation>(std::static_pointer_cast<IDiskRemote>(shared_from_this()), bytes);
return std::make_unique<DiskRemoteReservation>(
std::static_pointer_cast<IDiskRemote>(shared_from_this()),
bytes, unreserved_space.value());
}
bool IDiskRemote::tryReserve(UInt64 bytes)
std::optional<UInt64> IDiskRemote::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (bytes == 0)
{
LOG_TRACE(log, "Reserving 0 bytes on remote_fs disk {}", backQuote(name));
++reservation_count;
return true;
return {unreserved_space};
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_TRACE(log, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
return true;
return {unreserved_space - bytes};
}
return false;
return {};
}
String IDiskRemote::getUniqueId(const String & path) const

View File

@ -177,7 +177,7 @@ private:
void removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove);
bool tryReserve(UInt64 bytes);
std::optional<UInt64> tryReserve(UInt64 bytes);
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
@ -250,13 +250,18 @@ private:
class DiskRemoteReservation final : public IReservation
{
public:
DiskRemoteReservation(const RemoteDiskPtr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
DiskRemoteReservation(const RemoteDiskPtr & disk_, UInt64 size_, UInt64 unreserved_space_)
: disk(disk_)
, size(size_)
, unreserved_space(unreserved_space_)
, metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override;
Disks getDisks() const override { return {disk}; }
@ -268,6 +273,7 @@ public:
private:
RemoteDiskPtr disk;
UInt64 size;
UInt64 unreserved_space;
CurrentMetrics::Increment metric_increment;
};

View File

@ -7,17 +7,31 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
}
VolumeLoadBalancing parseVolumeLoadBalancing(const String & config)
{
if (config == "round_robin")
return VolumeLoadBalancing::ROUND_ROBIN;
if (config == "least_used")
return VolumeLoadBalancing::LEAST_USED;
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "'{}' is not valid load_balancing value", config);
}
IVolume::IVolume(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector)
: name(std::move(name_))
, load_balancing(parseVolumeLoadBalancing(config.getString(config_prefix + ".load_balancing", "round_robin")))
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);

View File

@ -11,11 +11,18 @@ namespace DB
enum class VolumeType
{
JBOD,
RAID1,
SINGLE_DISK,
UNKNOWN
};
enum class VolumeLoadBalancing
{
ROUND_ROBIN,
LEAST_USED,
};
VolumeLoadBalancing parseVolumeLoadBalancing(const String & config);
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
@ -34,11 +41,19 @@ using Volumes = std::vector<VolumePtr>;
class IVolume : public Space
{
public:
IVolume(String name_, Disks disks_, size_t max_data_part_size_ = 0, bool perform_ttl_move_on_insert_ = true)
/// This constructor is only for:
/// - SingleDiskVolume
/// From createVolumeFromReservation().
IVolume(String name_,
Disks disks_,
size_t max_data_part_size_ = 0,
bool perform_ttl_move_on_insert_ = true,
VolumeLoadBalancing load_balancing_ = VolumeLoadBalancing::ROUND_ROBIN)
: disks(std::move(disks_))
, name(name_)
, max_data_part_size(max_data_part_size_)
, perform_ttl_move_on_insert(perform_ttl_move_on_insert_)
, load_balancing(load_balancing_)
{
}
@ -79,6 +94,10 @@ public:
/// Should a new data part be synchronously moved to a volume according to ttl on insert
/// or move this part in background task asynchronously after insert.
bool perform_ttl_move_on_insert = true;
/// Load balancing, one of:
/// - ROUND_ROBIN
/// - LEAST_USED
const VolumeLoadBalancing load_balancing;
};
}

View File

@ -1,6 +0,0 @@
#include <Disks/SingleDiskVolume.h>
namespace DB
{
}

View File

@ -63,7 +63,12 @@ StoragePolicy::StoragePolicy(
if (volumes.empty() && name == DEFAULT_STORAGE_POLICY_NAME)
{
auto default_volume = std::make_shared<VolumeJBOD>(DEFAULT_VOLUME_NAME, std::vector<DiskPtr>{disks->get(DEFAULT_DISK_NAME)}, 0, false);
auto default_volume = std::make_shared<VolumeJBOD>(DEFAULT_VOLUME_NAME,
std::vector<DiskPtr>{disks->get(DEFAULT_DISK_NAME)},
/* max_data_part_size_= */ 0,
/* are_merges_avoided_= */ false,
/* perform_ttl_move_on_insert_= */ true,
VolumeLoadBalancing::ROUND_ROBIN);
volumes.emplace_back(std::move(default_volume));
}

View File

@ -19,15 +19,18 @@ VolumeJBOD::VolumeJBOD(
const String & config_prefix,
DiskSelectorPtr disk_selector)
: IVolume(name_, config, config_prefix, disk_selector)
, disks_by_size(disks.begin(), disks.end())
{
Poco::Logger * logger = &Poco::Logger::get("StorageConfiguration");
auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
if (has_max_bytes && has_max_ratio)
{
throw Exception(
"Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified.",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
if (has_max_bytes)
{
@ -47,12 +50,20 @@ VolumeJBOD::VolumeJBOD(
}
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
{
if (sizes[i] < max_data_part_size)
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})", backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
{
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})",
backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
}
}
}
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE));
{
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})",
backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE));
}
/// Default value is 'true' due to backward compatibility.
perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true);
@ -72,31 +83,61 @@ VolumeJBOD::VolumeJBOD(const VolumeJBOD & volume_jbod,
DiskPtr VolumeJBOD::getDisk(size_t /* index */) const
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t index = start_from % disks.size();
return disks[index];
switch (load_balancing)
{
case VolumeLoadBalancing::ROUND_ROBIN:
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t index = start_from % disks.size();
return disks[index];
}
case VolumeLoadBalancing::LEAST_USED:
{
std::lock_guard lock(mutex);
return disks_by_size.top().disk;
}
}
__builtin_unreachable();
}
ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
{
/// This volume can not store data which size is greater than `max_data_part_size`
/// to ensure that parts of size greater than that go to another volume(s).
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t disks_num = disks.size();
for (size_t i = 0; i < disks_num; ++i)
switch (load_balancing)
{
size_t index = (start_from + i) % disks_num;
case VolumeLoadBalancing::ROUND_ROBIN:
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t disks_num = disks.size();
for (size_t i = 0; i < disks_num; ++i)
{
size_t index = (start_from + i) % disks_num;
auto reservation = disks[index]->reserve(bytes);
auto reservation = disks[index]->reserve(bytes);
if (reservation)
return reservation;
}
return {};
}
case VolumeLoadBalancing::LEAST_USED:
{
std::lock_guard lock(mutex);
DiskWithSize disk = disks_by_size.top();
disks_by_size.pop();
ReservationPtr reservation = disk.reserve(bytes);
disks_by_size.push(disk);
if (reservation)
return reservation;
}
}
return {};
__builtin_unreachable();
}
bool VolumeJBOD::areMergesAvoided() const

View File

@ -22,8 +22,8 @@ using VolumesJBOD = std::vector<VolumeJBODPtr>;
class VolumeJBOD : public IVolume
{
public:
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_)
: IVolume(name_, disks_, max_data_part_size_)
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_, bool perform_ttl_move_on_insert_, VolumeLoadBalancing load_balancing_)
: IVolume(name_, disks_, max_data_part_size_, perform_ttl_move_on_insert_, load_balancing_)
, are_merges_avoided(are_merges_avoided_)
{
}
@ -44,7 +44,8 @@ public:
VolumeType getType() const override { return VolumeType::JBOD; }
/// Always returns next disk (round-robin), ignores argument.
/// Returns disk based on the load balancing algorithm (round-robin, or least-used),
/// ignores @index argument.
///
/// - Used with policy for temporary data
/// - Ignores all limitations
@ -63,8 +64,36 @@ public:
bool are_merges_avoided = true;
private:
/// Index of last used disk.
struct DiskWithSize
{
DiskPtr disk;
uint64_t free_size = 0;
DiskWithSize(DiskPtr disk_)
: disk(disk_)
, free_size(disk->getUnreservedSpace())
{}
bool operator<(const DiskWithSize & rhs) const
{
return free_size < rhs.free_size;
}
ReservationPtr reserve(uint64_t bytes)
{
ReservationPtr reservation = disk->reserve(bytes);
/// Not just subtract bytes, but update the value,
/// since some reservations may be done directly via IDisk, or not by ClickHouse.
free_size = reservation->getUnreservedSpace();
return reservation;
}
};
mutable std::mutex mutex;
/// Index of last used disk, for load_balancing=round_robin
mutable std::atomic<size_t> last_used = 0;
/// Priority queue of disks sorted by size, for load_balancing=least_used
mutable std::priority_queue<DiskWithSize> disks_by_size;
/// True if parts on this volume participate in merges according to START/STOP MERGES ON VOLUME.
std::atomic<std::optional<bool>> are_merges_avoided_user_override{std::nullopt};

View File

@ -0,0 +1,39 @@
<clickhouse>
<storage_configuration>
<disks>
<jbod1>
<path>/jbod1/</path>
</jbod1>
<jbod2>
<path>/jbod2/</path>
</jbod2>
<jbod3>
<path>/jbod3/</path>
</jbod3>
</disks>
<policies>
<jbod_round_robin>
<volumes>
<disks>
<disk>jbod1</disk>
<disk>jbod2</disk>
<disk>jbod3</disk>
<!-- <load_balancing>round_robin</load_balancing> -->
</disks>
</volumes>
</jbod_round_robin>
<jbod_least_used>
<volumes>
<disks>
<disk>jbod1</disk>
<disk>jbod2</disk>
<disk>jbod3</disk>
<load_balancing>least_used</load_balancing>
</disks>
</volumes>
</jbod_least_used>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,136 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=[
"configs/config.d/storage_configuration.xml",
],
tmpfs=[
"/jbod1:size=100M",
"/jbod2:size=200M",
"/jbod3:size=300M",
],
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_jbod_load_balancing_round_robin(start_cluster):
try:
node.query(
"""
CREATE TABLE data_round_robin (p UInt8)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy = 'jbod_round_robin';
SYSTEM STOP MERGES data_round_robin;
INSERT INTO data_round_robin SELECT * FROM numbers(10);
INSERT INTO data_round_robin SELECT * FROM numbers(10);
INSERT INTO data_round_robin SELECT * FROM numbers(10);
INSERT INTO data_round_robin SELECT * FROM numbers(10);
"""
)
parts = node.query(
"""
SELECT count(), disk_name
FROM system.parts
WHERE table = 'data_round_robin'
GROUP BY disk_name
ORDER BY disk_name
"""
)
parts = [l.split("\t") for l in parts.strip().split("\n")]
assert parts == [
["2", "jbod1"],
["1", "jbod2"],
["1", "jbod3"],
]
finally:
node.query("DROP TABLE IF EXISTS data_round_robin SYNC")
def test_jbod_load_balancing_least_used(start_cluster):
try:
node.query(
"""
CREATE TABLE data_least_used (p UInt8)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy = 'jbod_least_used';
SYSTEM STOP MERGES data_least_used;
INSERT INTO data_least_used SELECT * FROM numbers(10);
INSERT INTO data_least_used SELECT * FROM numbers(10);
INSERT INTO data_least_used SELECT * FROM numbers(10);
INSERT INTO data_least_used SELECT * FROM numbers(10);
"""
)
parts = node.query(
"""
SELECT count(), disk_name
FROM system.parts
WHERE table = 'data_least_used'
GROUP BY disk_name
ORDER BY disk_name
"""
)
parts = [l.split("\t") for l in parts.strip().split("\n")]
assert parts == [
["4", "jbod3"],
]
finally:
node.query("DROP TABLE IF EXISTS data_least_used SYNC")
def test_jbod_load_balancing_least_used_next_disk(start_cluster):
try:
node.query(
"""
CREATE TABLE data_least_used_next_disk
(
s String CODEC(NONE)
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy = 'jbod_least_used';
SYSTEM STOP MERGES data_least_used_next_disk;
-- 100MiB each part, 3 parts in total
INSERT INTO data_least_used_next_disk SELECT repeat('a', 100) FROM numbers(3e6) SETTINGS max_block_size='1Mi';
"""
)
parts = node.query(
"""
SELECT count(), disk_name
FROM system.parts
WHERE table = 'data_least_used_next_disk'
GROUP BY disk_name
ORDER BY disk_name
"""
)
parts = [l.split("\t") for l in parts.strip().split("\n")]
assert parts == [
["1", "jbod2"],
["2", "jbod3"],
]
finally:
node.query("DROP TABLE IF EXISTS data_least_used_next_disk SYNC")