Merge pull request #10666 from NanoBjorn/refactor-volume

Volumes and storages refactoring
This commit is contained in:
alexey-milovidov 2020-05-05 16:25:26 +03:00 committed by GitHub
commit 114f09cfca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 454 additions and 333 deletions

View File

@ -20,7 +20,7 @@
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/SettingsChanges.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>

View File

@ -7,7 +7,7 @@
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
namespace ProfileEvents

View File

@ -18,8 +18,8 @@ namespace DB
struct TemporaryFileStream;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
namespace ErrorCodes
{

116
src/Disks/DiskSelector.cpp Normal file
View File

@ -0,0 +1,116 @@
#include "DiskLocal.h"
#include "DiskSelector.h"
#include <IO/WriteHelpers.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
#include <set>
namespace DB
{
namespace ErrorCodes
{
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int UNKNOWN_DISK;
}
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto & factory = DiskFactory::instance();
constexpr auto default_disk_name = "default";
bool has_default_disk = false;
for (const auto & disk_name : keys)
{
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (disk_name == default_disk_name)
has_default_disk = true;
auto disk_config_prefix = config_prefix + "." + disk_name;
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context.getPath(), 0));
}
DiskSelectorPtr DiskSelector::updateFromConfig(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto & factory = DiskFactory::instance();
std::shared_ptr<DiskSelector> result = std::make_shared<DiskSelector>(*this);
constexpr auto default_disk_name = "default";
std::set<String> old_disks_minus_new_disks;
for (const auto & [disk_name, _] : result->disks)
{
old_disks_minus_new_disks.insert(disk_name);
}
for (const auto & disk_name : keys)
{
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (result->disks.count(disk_name) == 0)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
else
{
old_disks_minus_new_disks.erase(disk_name);
/// TODO: Ideally ClickHouse shall complain if disk has changed, but
/// implementing that may appear as not trivial task.
}
}
old_disks_minus_new_disks.erase(default_disk_name);
if (!old_disks_minus_new_disks.empty())
{
WriteBufferFromOwnString warning;
if (old_disks_minus_new_disks.size() == 1)
writeString("Disk ", warning);
else
writeString("Disks ", warning);
int index = 0;
for (const String & name : old_disks_minus_new_disks)
{
if (index++ > 0)
writeString(", ", warning);
writeBackQuotedString(name, warning);
}
writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning);
LOG_WARNING(&Logger::get("DiskSelector"), warning.str());
}
return result;
}
DiskPtr DiskSelector::get(const String & name) const
{
auto it = disks.find(name);
if (it == disks.end())
throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_DISK);
return it->second;
}
}

37
src/Disks/DiskSelector.h Normal file
View File

@ -0,0 +1,37 @@
#pragma once
#include <Interpreters/Context.h>
#include <Disks/DiskFactory.h>
#include <Disks/IDisk.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <map>
namespace DB
{
class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
/// Parse .xml configuration and store information about disks
/// Mostly used for introspection.
class DiskSelector
{
public:
DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context);
DiskSelector(const DiskSelector & from) : disks(from.disks) { }
DiskSelectorPtr
updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const;
/// Get disk by name
DiskPtr get(const String & name) const;
/// Get all disks with names
const auto & getDisksMap() const { return disks; }
private:
std::map<String, DiskPtr> disks;
};
}

43
src/Disks/IVolume.cpp Normal file
View File

@ -0,0 +1,43 @@
#include "IVolume.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
}
IVolume::IVolume(
String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disk_selector)
: name(std::move(name_))
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & disk : keys)
{
if (startsWith(disk, "disk"))
{
auto disk_name = config.getString(config_prefix + "." + disk);
disks.push_back(disk_selector->get(disk_name));
}
}
if (disks.empty())
throw Exception("Volume must contain at least one disk.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
UInt64 IVolume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
for (const auto & disk : disks)
res = std::max(res, disk->getUnreservedSpace());
return res;
}
}

52
src/Disks/IVolume.h Normal file
View File

@ -0,0 +1,52 @@
#pragma once
#include <Disks/IDisk.h>
#include <Disks/DiskSelector.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
/**
* Disks group by some (user) criteria. For example,
* - VolumeJBOD("slow_disks", [d1, d2], 100)
* - VolumeJBOD("fast_disks", [d3, d4], 200)
*
* Here VolumeJBOD is one of implementations of IVolume.
*
* Different of implementations of this interface implement different reserve behaviour
* VolumeJBOD reserves space on the next disk after the last used, other future implementations
* will reserve, for example, equal spaces on all disks.
*/
class IVolume : public Space
{
public:
IVolume(String name_, Disks disks_): disks(std::move(disks_)), name(std::move(name_))
{
}
IVolume(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector
);
virtual ReservationPtr reserve(UInt64 bytes) override = 0;
/// Volume name from config
const String & getName() const override { return name; }
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
Disks disks;
protected:
const String name;
};
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
}

View File

@ -1,4 +1,4 @@
#include "DiskSpaceMonitor.h"
#include "StoragePolicy.h"
#include "DiskFactory.h"
#include "DiskLocal.h"
@ -23,204 +23,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto & factory = DiskFactory::instance();
constexpr auto default_disk_name = "default";
bool has_default_disk = false;
for (const auto & disk_name : keys)
{
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (disk_name == default_disk_name)
has_default_disk = true;
auto disk_config_prefix = config_prefix + "." + disk_name;
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context.getPath(), 0));
}
DiskSelectorPtr DiskSelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto & factory = DiskFactory::instance();
std::shared_ptr<DiskSelector> result = std::make_shared<DiskSelector>(*this);
constexpr auto default_disk_name = "default";
std::set<String> old_disks_minus_new_disks;
for (const auto & [disk_name, _] : result->disks)
{
old_disks_minus_new_disks.insert(disk_name);
}
for (const auto & disk_name : keys)
{
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (result->disks.count(disk_name) == 0)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
else
{
old_disks_minus_new_disks.erase(disk_name);
/// TODO: Ideally ClickHouse shall complain if disk has changed, but
/// implementing that may appear as not trivial task.
}
}
old_disks_minus_new_disks.erase(default_disk_name);
if (!old_disks_minus_new_disks.empty())
{
WriteBufferFromOwnString warning;
if (old_disks_minus_new_disks.size() == 1)
writeString("Disk ", warning);
else
writeString("Disks ", warning);
int index = 0;
for (const String & name : old_disks_minus_new_disks)
{
if (index++ > 0)
writeString(", ", warning);
writeBackQuotedString(name, warning);
}
writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning);
LOG_WARNING(&Logger::get("DiskSelector"), warning.str());
}
return result;
}
DiskPtr DiskSelector::get(const String & name) const
{
auto it = disks.find(name);
if (it == disks.end())
throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_DISK);
return it->second;
}
Volume::Volume(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector)
: name(std::move(name_))
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
Logger * logger = &Logger::get("StorageConfiguration");
for (const auto & disk : keys)
{
if (startsWith(disk, "disk"))
{
auto disk_name = config.getString(config_prefix + "." + disk);
disks.push_back(disk_selector->get(disk_name));
}
}
if (disks.empty())
throw Exception("Volume must contain at least one disk.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
if (has_max_bytes && has_max_ratio)
throw Exception(
"Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified.",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (has_max_bytes)
{
max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes", 0);
}
else if (has_max_ratio)
{
auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio");
if (ratio < 0)
throw Exception("'max_data_part_size_ratio' have to be not less then 0.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
UInt64 sum_size = 0;
std::vector<UInt64> sizes;
for (const auto & disk : disks)
{
sizes.push_back(disk->getTotalSpace());
sum_size += sizes.back();
}
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
if (sizes[i] < max_data_part_size)
LOG_WARNING(
logger,
"Disk " << backQuote(disks[i]->getName()) << " on volume " << backQuote(config_prefix) << " have not enough space ("
<< formatReadableSizeWithBinarySuffix(sizes[i]) << ") for containing part the size of max_data_part_size ("
<< formatReadableSizeWithBinarySuffix(max_data_part_size) << ")");
}
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
LOG_WARNING(
logger,
"Volume " << backQuote(name) << " max_data_part_size is too low (" << formatReadableSizeWithBinarySuffix(max_data_part_size)
<< " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")");
}
DiskPtr Volume::getNextDisk()
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t index = start_from % disks.size();
return disks[index];
}
ReservationPtr Volume::reserve(UInt64 bytes)
{
/// This volume can not store files which size greater than max_data_part_size
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t disks_num = disks.size();
for (size_t i = 0; i < disks_num; ++i)
{
size_t index = (start_from + i) % disks_num;
auto reservation = disks[index]->reserve(bytes);
if (reservation)
return reservation;
}
return {};
}
UInt64 Volume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
for (const auto & disk : disks)
res = std::max(res, disk->getUnreservedSpace());
return res;
}
StoragePolicy::StoragePolicy(
String name_,
const Poco::Util::AbstractConfiguration & config,
@ -240,7 +42,7 @@ StoragePolicy::StoragePolicy(
if (!std::all_of(attr_name.begin(), attr_name.end(), isWordCharASCII))
throw Exception(
"Volume name can contain only alphanumeric and '_' (" + attr_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
volumes.push_back(std::make_shared<Volume>(attr_name, config, volumes_prefix + "." + attr_name, disks));
volumes.push_back(std::make_shared<VolumeJBOD>(attr_name, config, volumes_prefix + "." + attr_name, disks));
if (volumes_names.find(attr_name) != volumes_names.end())
throw Exception("Volumes names must be unique (" + attr_name + " duplicated)", ErrorCodes::UNKNOWN_POLICY);
volumes_names[attr_name] = volumes.size() - 1;
@ -269,7 +71,7 @@ StoragePolicy::StoragePolicy(
}
StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_)
StoragePolicy::StoragePolicy(String name_, VolumesJBOD volumes_, double move_factor_)
: volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
{
if (volumes.empty())
@ -453,9 +255,9 @@ StoragePolicySelector::StoragePolicySelector(
/// Add default policy if it's not specified explicetly
if (policies.find(default_storage_policy_name) == policies.end())
{
auto default_volume = std::make_shared<Volume>(default_volume_name, std::vector<DiskPtr>{disks->get(default_disk_name)}, 0);
auto default_volume = std::make_shared<VolumeJBOD>(default_volume_name, std::vector<DiskPtr>{disks->get(default_disk_name)}, 0);
auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, Volumes{default_volume}, 0.0);
auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, VolumesJBOD{default_volume}, 0.0);
policies.emplace(default_storage_policy_name, default_policy);
}
}

View File

@ -1,6 +1,9 @@
#pragma once
#include <Disks/DiskSelector.h>
#include <Disks/IDisk.h>
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <IO/WriteHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
@ -17,82 +20,6 @@
namespace DB
{
class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
/// Parse .xml configuration and store information about disks
/// Mostly used for introspection.
class DiskSelector
{
public:
DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context);
DiskSelector(const DiskSelector & from): disks(from.disks) {}
DiskSelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const;
/// Get disk by name
DiskPtr get(const String & name) const;
/// Get all disks with names
const auto & getDisksMap() const { return disks; }
private:
std::map<String, DiskPtr> disks;
};
/**
* Disks group by some (user) criteria. For example,
* - Volume("slow_disks", [d1, d2], 100)
* - Volume("fast_disks", [d3, d4], 200)
* Cannot store parts larger than max_data_part_size.
*/
class Volume : public Space
{
friend class StoragePolicy;
public:
Volume(String name_, std::vector<DiskPtr> disks_, UInt64 max_data_part_size_)
: max_data_part_size(max_data_part_size_), disks(std::move(disks_)), name(std::move(name_))
{
}
Volume(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector);
/// Next disk (round-robin)
///
/// - Used with policy for temporary data
/// - Ignores all limitations
/// - Shares last access with reserve()
DiskPtr getNextDisk();
/// Uses Round-robin to choose disk for reservation.
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) override;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
/// Volume name from config
const String & getName() const override { return name; }
/// Max size of reservation
UInt64 max_data_part_size = 0;
/// Disks in volume
Disks disks;
private:
mutable std::atomic<size_t> last_used = 0;
const String name;
};
using VolumePtr = std::shared_ptr<Volume>;
using Volumes = std::vector<VolumePtr>;
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
@ -105,7 +32,7 @@ class StoragePolicy
public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
StoragePolicy(String name_, Volumes volumes_, double move_factor_);
StoragePolicy(String name_, VolumesJBOD volumes_, double move_factor_);
bool isDefaultPolicy() const;
@ -137,16 +64,16 @@ public:
/// Do not use this function when it is possible to predict size.
ReservationPtr makeEmptyReservationOnLargestDisk() const;
const Volumes & getVolumes() const { return volumes; }
const VolumesJBOD & getVolumes() const { return volumes; }
/// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves
double getMoveFactor() const { return move_factor; }
/// Get volume by index from storage_policy
VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); }
VolumeJBODPtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumeJBODPtr()); }
VolumePtr getVolumeByName(const String & volume_name) const
VolumeJBODPtr getVolumeByName(const String & volume_name) const
{
auto it = volumes_names.find(volume_name);
if (it == volumes_names.end())
@ -158,7 +85,7 @@ public:
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const;
private:
Volumes volumes;
VolumesJBOD volumes;
const String name;
std::map<String, size_t> volumes_names;

93
src/Disks/VolumeJBOD.cpp Normal file
View File

@ -0,0 +1,93 @@
#include "VolumeJBOD.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/formatReadable.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
}
VolumeJBOD::VolumeJBOD(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector
) : IVolume(name_, config, config_prefix, disk_selector)
{
Logger * logger = &Logger::get("StorageConfiguration");
auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
if (has_max_bytes && has_max_ratio)
throw Exception(
"Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified.",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (has_max_bytes)
{
max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes", 0);
}
else if (has_max_ratio)
{
auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio");
if (ratio < 0)
throw Exception("'max_data_part_size_ratio' have to be not less then 0.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
UInt64 sum_size = 0;
std::vector<UInt64> sizes;
for (const auto & disk : disks)
{
sizes.push_back(disk->getTotalSpace());
sum_size += sizes.back();
}
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
if (sizes[i] < max_data_part_size)
LOG_WARNING(
logger,
"Disk " << backQuote(disks[i]->getName()) << " on volume " << backQuote(config_prefix) << " have not enough space ("
<< formatReadableSizeWithBinarySuffix(sizes[i]) << ") for containing part the size of max_data_part_size ("
<< formatReadableSizeWithBinarySuffix(max_data_part_size) << ")");
}
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
LOG_WARNING(
logger,
"Volume " << backQuote(name) << " max_data_part_size is too low (" << formatReadableSizeWithBinarySuffix(max_data_part_size)
<< " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")");
}
DiskPtr VolumeJBOD::getNextDisk()
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t index = start_from % disks.size();
return disks[index];
}
ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
{
/// This volume can not store files which size greater than max_data_part_size
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t disks_num = disks.size();
for (size_t i = 0; i < disks_num; ++i)
{
size_t index = (start_from + i) % disks_num;
auto reservation = disks[index]->reserve(bytes);
if (reservation)
return reservation;
}
return {};
}
}

48
src/Disks/VolumeJBOD.h Normal file
View File

@ -0,0 +1,48 @@
#pragma once
#include <Disks/IVolume.h>
namespace DB
{
/**
* Implements something similar to JBOD (https://en.wikipedia.org/wiki/Non-RAID_drive_architectures#JBOD).
* When MergeTree engine wants to write part it requests VolumeJBOD to reserve space on the next available
* disk and then writes new part to that disk.
*/
class VolumeJBOD : public IVolume
{
public:
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_)
: IVolume(name_, disks_), max_data_part_size(max_data_part_size_)
{
}
VolumeJBOD(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector
);
/// Next disk (round-robin)
///
/// - Used with policy for temporary data
/// - Ignores all limitations
/// - Shares last access with reserve()
DiskPtr getNextDisk();
/// Uses Round-robin to choose disk for reservation.
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) override;
/// Max size of reservation
UInt64 max_data_part_size = 0;
private:
mutable std::atomic<size_t> last_used = 0;
};
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
using VolumesJBOD = std::vector<VolumeJBODPtr>;
}

View File

@ -8,9 +8,12 @@ SRCS(
DiskFactory.cpp
DiskLocal.cpp
DiskMemory.cpp
DiskSpaceMonitor.cpp
DiskSelector.cpp
IDisk.cpp
IVolume.cpp
registerDisks.cpp
StoragePolicy.cpp
VolumeJBOD.cpp
)
END()

View File

@ -27,7 +27,7 @@
#include <common/demangle.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#if !defined(ARCADIA_BUILD)
# include <common/config_common.h>

View File

@ -45,8 +45,8 @@ namespace ErrorCodes
class IBlockOutputStream;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
@ -878,7 +878,7 @@ public:
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
VolumePtr tmp_volume;
VolumeJBODPtr tmp_volume;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
@ -891,7 +891,7 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumePtr tmp_volume_, size_t max_threads_,
VolumeJBODPtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),

View File

@ -311,7 +311,7 @@ struct ContextShared
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable VolumeJBODPtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
@ -538,7 +538,7 @@ String Context::getDictionariesLibPath() const
return shared->dictionaries_lib_path;
}
VolumePtr Context::getTemporaryVolume() const
VolumeJBODPtr Context::getTemporaryVolume() const
{
auto lock = getLock();
return shared->tmp_volume;
@ -563,7 +563,7 @@ void Context::setPath(const String & path)
shared->dictionaries_lib_path = shared->path + "dictionaries_lib/";
}
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & policy_name)
{
auto lock = getLock();
@ -574,7 +574,7 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<Volume>("_tmp_default", std::vector<DiskPtr>{disk}, 0);
shared->tmp_volume = std::make_shared<VolumeJBOD>("_tmp_default", std::vector<DiskPtr>{disk}, 0);
}
else
{

View File

@ -102,8 +102,8 @@ using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
struct NamedSession;
@ -221,14 +221,14 @@ public:
String getUserFilesPath() const;
String getDictionariesLibPath() const;
VolumePtr getTemporaryVolume() const;
VolumeJBODPtr getTemporaryVolume() const;
void setPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);
void setDictionariesLibPath(const String & path);
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
VolumeJBODPtr setTemporaryStorage(const String & path, const String & policy_name = "");
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;

View File

@ -3,7 +3,7 @@
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
namespace DB
{

View File

@ -16,8 +16,8 @@ class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
struct SortedBlocksWriter
{
@ -57,7 +57,7 @@ struct SortedBlocksWriter
std::mutex insert_mutex;
std::condition_variable flush_condvar;
const SizeLimits & size_limits;
VolumePtr volume;
VolumeJBODPtr volume;
const Block & sample_block;
const SortDescription & sort_description;
Blocks & inserted_blocks;
@ -70,7 +70,7 @@ struct SortedBlocksWriter
size_t flush_number = 0;
size_t flush_inflight = 0;
SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_, const SortDescription & description,
SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_, const SortDescription & description,
Blocks & blocks, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
: size_limits(size_limits_)
, volume(volume_)

View File

@ -13,7 +13,7 @@
namespace DB
{
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
TableJoin::TableJoin(const Settings & settings, VolumeJBODPtr tmp_volume_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)

View File

@ -24,8 +24,8 @@ class DictionaryReader;
struct Settings;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
class TableJoin
{
@ -70,11 +70,11 @@ class TableJoin
/// Original name -> name. Only ranamed columns.
std::unordered_map<String, String> renames;
VolumePtr tmp_volume;
VolumeJBODPtr tmp_volume;
public:
TableJoin() = default;
TableJoin(const Settings &, VolumePtr tmp_volume);
TableJoin(const Settings &, VolumeJBODPtr tmp_volume);
/// for StorageJoin
TableJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness,
@ -96,7 +96,7 @@ public:
ASTTableJoin::Strictness strictness() const { return table_join.strictness; }
bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const;
const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
VolumeJBODPtr getTemporaryVolume() { return tmp_volume; }
bool allowMergeJoin() const;
bool allowDictJoin(const String & dict_key, const Block & sample_block, Names &, NamesAndTypesList &) const;
bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; }

View File

@ -8,7 +8,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
namespace ProfileEvents

View File

@ -9,8 +9,8 @@
namespace DB
{
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
class MergeSortingTransform : public SortingTransform
{

View File

@ -27,7 +27,7 @@
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Disks/DiskLocal.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/AutoPtr.h>
@ -194,7 +194,7 @@ try
auto cur_path = Poco::Path().absolute().toString();
auto disk = std::make_shared<DiskLocal>("tmp", cur_path, 0);
auto tmp_volume = std::make_shared<Volume>("tmp", std::vector<DiskPtr>{disk}, 0);
auto tmp_volume = std::make_shared<VolumeJBOD>("tmp", std::vector<DiskPtr>{disk}, 0);
auto execute_one_stream = [&](String msg, size_t num_threads, bool two_level, bool external)
{

View File

@ -1,7 +1,7 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Disks/DiskLocal.h>
#include <Processors/IProcessor.h>
@ -129,7 +129,7 @@ try
Logger::root().setLevel("trace");
auto disk = std::make_shared<DiskLocal>("tmp", ".", 0);
auto tmp_volume = std::make_shared<Volume>("tmp", std::vector<DiskPtr>{disk}, 0);
auto tmp_volume = std::make_shared<VolumeJBOD>("tmp", std::vector<DiskPtr>{disk}, 0);
auto execute_chain = [tmp_volume](
String msg,

View File

@ -1,7 +1,7 @@
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageDistributed.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>

View File

@ -19,7 +19,7 @@
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Interpreters/PartLog.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>

View File

@ -3,7 +3,7 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h>
#include <Storages/MergeTree/TTLMergeSelector.h>

View File

@ -3,7 +3,7 @@
#include <functional>
#include <optional>
#include <vector>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Common/ActionBlocker.h>

View File

@ -3,7 +3,7 @@
#include <DataStreams/OneBlockInputStream.h>
#include <Databases/IDatabase.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Disks/DiskLocal.h>
#include <DataTypes/DataTypeFactory.h>
@ -330,7 +330,7 @@ void StorageDistributed::createStorage()
if (!path.ends_with('/'))
path += '/';
auto disk = std::make_shared<DiskLocal>("default", path, 0);
volume = std::make_shared<Volume>("default", std::vector<DiskPtr>{disk}, 0);
volume = std::make_shared<VolumeJBOD>("default", std::vector<DiskPtr>{disk}, 0);
}
else
{

View File

@ -19,8 +19,8 @@ namespace DB
class Context;
class StorageDistributedDirectoryMonitor;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
@ -176,7 +176,7 @@ protected:
String storage_policy;
String relative_data_path;
/// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created.
VolumePtr volume;
VolumeJBODPtr volume;
struct ClusterNodeData
{

View File

@ -18,7 +18,7 @@
#include <Storages/AlterCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <optional>

View File

@ -12,7 +12,7 @@
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Common/SimpleIncrement.h>
#include <Core/BackgroundSchedulePool.h>

View File

@ -26,7 +26,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#include <Storages/VirtualColumnUtils.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Databases/IDatabase.h>

View File

@ -15,7 +15,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/StoragePolicy.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <DataTypes/DataTypeUUID.h>