Add IStoragePolicy interface

This commit is contained in:
kreuzerkrieg 2021-01-23 17:20:15 +02:00
parent 27ddf78ba5
commit 29a2ef3089
14 changed files with 109 additions and 41 deletions

View File

@ -0,0 +1,62 @@
#pragma once
#include <memory>
#include <vector>
#include <common/types.h>
namespace DB
{
class IStoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
using Disks = std::vector<DiskPtr>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
using Reservations = std::vector<ReservationPtr>;
using String = std::string;
class IStoragePolicy
{
public:
virtual ~IStoragePolicy() = default;
virtual const String & getName() const = 0;
virtual const Volumes & getVolumes() const = 0;
/// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves
virtual double getMoveFactor() const = 0;
virtual bool isDefaultPolicy() const = 0;
/// Returns disks ordered by volumes priority
virtual Disks getDisks() const = 0;
/// Returns any disk
/// Used when it's not important, for example for
/// mutations files
virtual DiskPtr getAnyDisk() const = 0;
virtual DiskPtr getDiskByName(const String & disk_name) const = 0;
/// Get free space from most free disk
virtual UInt64 getMaxUnreservedFreeSpace() const = 0;
/// Reserves space on any volume with index > min_volume_index or returns nullptr
virtual ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const = 0;
/// Returns valid reservation or nullptr
virtual ReservationPtr reserve(UInt64 bytes) const = 0;
/// Reserves space on any volume or throws
virtual ReservationPtr reserveAndCheck(UInt64 bytes) const = 0;
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size.
virtual ReservationPtr makeEmptyReservationOnLargestDisk() const = 0;
/// Get volume by index.
virtual VolumePtr getVolume(size_t index) const = 0;
virtual VolumePtr getVolumeByName(const String & volume_name) const = 0;
/// Checks if storage policy can be replaced by another one.
virtual void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const = 0;
/// Find volume index, which contains disk
virtual size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const = 0;
/// Check if we have any volume with stopped merges
virtual bool hasAnyVolumeWithDisabledMerges() const = 0;
virtual bool containsVolume(const String & volume_name) const = 0;
};
}

View File

@ -93,17 +93,17 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_
}
StoragePolicy::StoragePolicy(const StoragePolicy & storage_policy,
StoragePolicy::StoragePolicy(StoragePolicyPtr storage_policy,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disks)
: StoragePolicy(storage_policy.getName(), config, config_prefix, disks)
: StoragePolicy(storage_policy->getName(), config, config_prefix, disks)
{
for (auto & volume : volumes)
{
if (storage_policy.volume_index_by_volume_name.count(volume->getName()) > 0)
if (storage_policy->containsVolume(volume->getName()))
{
auto old_volume = storage_policy.getVolumeByName(volume->getName());
auto old_volume = storage_policy->getVolumeByName(volume->getName());
try
{
auto new_volume = updateVolumeFromConfig(old_volume, config, config_prefix + ".volumes." + volume->getName(), disks);
@ -112,7 +112,7 @@ StoragePolicy::StoragePolicy(const StoragePolicy & storage_policy,
catch (Exception & e)
{
/// Default policies are allowed to be missed in configuration.
if (e.code() != ErrorCodes::NO_ELEMENTS_IN_CONFIG || storage_policy.getName() != DEFAULT_STORAGE_POLICY_NAME)
if (e.code() != ErrorCodes::NO_ELEMENTS_IN_CONFIG || storage_policy->getName() != DEFAULT_STORAGE_POLICY_NAME)
throw;
Poco::Util::AbstractConfiguration::Keys keys;
@ -331,6 +331,11 @@ bool StoragePolicy::hasAnyVolumeWithDisabledMerges() const
return false;
}
bool StoragePolicy::containsVolume(const String & volume_name) const
{
return volume_index_by_volume_name.contains(volume_name);
}
StoragePolicySelector::StoragePolicySelector(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
@ -345,6 +350,13 @@ StoragePolicySelector::StoragePolicySelector(
throw Exception(
"Storage policy name can contain only alphanumeric and '_' (" + backQuote(name) + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
/*
* A customization point for StoragePolicy, here one can add his own policy, for example, based on policy's name
* if (name == "MyCustomPolicy")
* policies.emplace(name, std::make_shared<CustomPolicy>(name, config, config_prefix + "." + name, disks));
* else
*/
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
LOG_INFO(&Poco::Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name));
}
@ -374,7 +386,7 @@ StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Uti
/// Second pass, load.
for (const auto & [name, policy] : policies)
{
result->policies[name] = std::make_shared<StoragePolicy>(*policy, config, config_prefix + "." + name, disks);
result->policies[name] = std::make_shared<StoragePolicy>(policy, config, config_prefix + "." + name, disks);
}
return result;

View File

@ -1,5 +1,6 @@
#pragma once
#include <Disks/IStoragePolicy.h>
#include <Disks/DiskSelector.h>
#include <Disks/IDisk.h>
#include <Disks/IVolume.h>
@ -23,14 +24,11 @@
namespace DB
{
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
/**
* Contains all information about volumes configuration for Storage.
* Can determine appropriate Volume and Disk for each reservation.
*/
class StoragePolicy
class StoragePolicy : public IStoragePolicy
{
public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
@ -38,62 +36,63 @@ public:
StoragePolicy(String name_, Volumes volumes_, double move_factor_);
StoragePolicy(
const StoragePolicy & storage_policy,
StoragePolicyPtr storage_policy,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disks
);
bool isDefaultPolicy() const;
bool isDefaultPolicy() const override;
/// Returns disks ordered by volumes priority
Disks getDisks() const;
Disks getDisks() const override;
/// Returns any disk
/// Used when it's not important, for example for
/// mutations files
DiskPtr getAnyDisk() const;
DiskPtr getAnyDisk() const override;
DiskPtr getDiskByName(const String & disk_name) const;
DiskPtr getDiskByName(const String & disk_name) const override;
/// Get free space from most free disk
UInt64 getMaxUnreservedFreeSpace() const;
UInt64 getMaxUnreservedFreeSpace() const override;
const String & getName() const { return name; }
const String & getName() const override{ return name; }
/// Returns valid reservation or nullptr
ReservationPtr reserve(UInt64 bytes) const;
ReservationPtr reserve(UInt64 bytes) const override;
/// Reserves space on any volume or throws
ReservationPtr reserveAndCheck(UInt64 bytes) const;
ReservationPtr reserveAndCheck(UInt64 bytes) const override;
/// Reserves space on any volume with index > min_volume_index or returns nullptr
ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const;
ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const override;
/// Find volume index, which contains disk
size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const;
size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const override;
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size.
ReservationPtr makeEmptyReservationOnLargestDisk() const;
ReservationPtr makeEmptyReservationOnLargestDisk() const override;
const Volumes & getVolumes() const { return volumes; }
const Volumes & getVolumes() const override{ return volumes; }
/// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves
double getMoveFactor() const { return move_factor; }
double getMoveFactor() const override{ return move_factor; }
/// Get volume by index.
VolumePtr getVolume(size_t index) const;
VolumePtr getVolume(size_t index) const override;
VolumePtr getVolumeByName(const String & volume_name) const;
VolumePtr getVolumeByName(const String & volume_name) const override;
/// Checks if storage policy can be replaced by another one.
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const;
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const override;
/// Check if we have any volume with stopped merges
bool hasAnyVolumeWithDisabledMerges() const;
bool hasAnyVolumeWithDisabledMerges() const override;
bool containsVolume(const String & volume_name) const override;
private:
Volumes volumes;
const String name;

View File

@ -19,7 +19,6 @@
#include <Common/assert_cast.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Disks/StoragePolicy.h>
#include <IO/Operators.h>

View File

@ -102,8 +102,8 @@ using DiskPtr = std::shared_ptr<IDisk>;
class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
using DisksMap = std::map<String, DiskPtr>;
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
class IStoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;

View File

@ -3,7 +3,7 @@
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IVolume.h>
namespace DB
{

View File

@ -8,7 +8,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IVolume.h>
namespace ProfileEvents

View File

@ -19,7 +19,7 @@
#include <common/getFQDNOrHostName.h>
#include <Common/setThreadName.h>
#include <Common/SettingsChanges.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IVolume.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>

View File

@ -50,8 +50,8 @@ class Pipe;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
class IStoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
struct StreamLocalLimits;
class EnabledQuota;

View File

@ -3,7 +3,6 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h>
#include <Storages/MergeTree/TTLMergeSelector.h>

View File

@ -1,7 +1,6 @@
#include <Storages/StorageDistributed.h>
#include <Databases/IDatabase.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IDisk.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -22,7 +22,6 @@
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Processors/Pipe.h>

View File

@ -27,7 +27,6 @@
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#include <Storages/VirtualColumnUtils.h>
#include <Disks/StoragePolicy.h>
#include <Databases/IDatabase.h>

View File

@ -15,7 +15,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IStoragePolicy.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <DataTypes/DataTypeUUID.h>