Merge pull request #19608 from kreuzerkrieg/Add_IStoragePolicy_interface

Add IStoragePolicy interface
This commit is contained in:
Maksim Kita 2021-02-02 11:03:20 +03:00 committed by GitHub
commit d0151de4bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 109 additions and 41 deletions

View File

@ -0,0 +1,62 @@
#pragma once
#include <memory>
#include <vector>
#include <common/types.h>
namespace DB
{
class IStoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
using Disks = std::vector<DiskPtr>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
using Reservations = std::vector<ReservationPtr>;
using String = std::string;
class IStoragePolicy
{
public:
virtual ~IStoragePolicy() = default;
virtual const String & getName() const = 0;
virtual const Volumes & getVolumes() const = 0;
/// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves
virtual double getMoveFactor() const = 0;
virtual bool isDefaultPolicy() const = 0;
/// Returns disks ordered by volumes priority
virtual Disks getDisks() const = 0;
/// Returns any disk
/// Used when it's not important, for example for
/// mutations files
virtual DiskPtr getAnyDisk() const = 0;
virtual DiskPtr getDiskByName(const String & disk_name) const = 0;
/// Get free space from most free disk
virtual UInt64 getMaxUnreservedFreeSpace() const = 0;
/// Reserves space on any volume with index > min_volume_index or returns nullptr
virtual ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const = 0;
/// Returns valid reservation or nullptr
virtual ReservationPtr reserve(UInt64 bytes) const = 0;
/// Reserves space on any volume or throws
virtual ReservationPtr reserveAndCheck(UInt64 bytes) const = 0;
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size.
virtual ReservationPtr makeEmptyReservationOnLargestDisk() const = 0;
/// Get volume by index.
virtual VolumePtr getVolume(size_t index) const = 0;
virtual VolumePtr getVolumeByName(const String & volume_name) const = 0;
/// Checks if storage policy can be replaced by another one.
virtual void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const = 0;
/// Find volume index, which contains disk
virtual size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const = 0;
/// Check if we have any volume with stopped merges
virtual bool hasAnyVolumeWithDisabledMerges() const = 0;
virtual bool containsVolume(const String & volume_name) const = 0;
};
}

View File

@ -93,17 +93,17 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_
} }
StoragePolicy::StoragePolicy(const StoragePolicy & storage_policy, StoragePolicy::StoragePolicy(StoragePolicyPtr storage_policy,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
DiskSelectorPtr disks) DiskSelectorPtr disks)
: StoragePolicy(storage_policy.getName(), config, config_prefix, disks) : StoragePolicy(storage_policy->getName(), config, config_prefix, disks)
{ {
for (auto & volume : volumes) for (auto & volume : volumes)
{ {
if (storage_policy.volume_index_by_volume_name.count(volume->getName()) > 0) if (storage_policy->containsVolume(volume->getName()))
{ {
auto old_volume = storage_policy.getVolumeByName(volume->getName()); auto old_volume = storage_policy->getVolumeByName(volume->getName());
try try
{ {
auto new_volume = updateVolumeFromConfig(old_volume, config, config_prefix + ".volumes." + volume->getName(), disks); auto new_volume = updateVolumeFromConfig(old_volume, config, config_prefix + ".volumes." + volume->getName(), disks);
@ -112,7 +112,7 @@ StoragePolicy::StoragePolicy(const StoragePolicy & storage_policy,
catch (Exception & e) catch (Exception & e)
{ {
/// Default policies are allowed to be missed in configuration. /// Default policies are allowed to be missed in configuration.
if (e.code() != ErrorCodes::NO_ELEMENTS_IN_CONFIG || storage_policy.getName() != DEFAULT_STORAGE_POLICY_NAME) if (e.code() != ErrorCodes::NO_ELEMENTS_IN_CONFIG || storage_policy->getName() != DEFAULT_STORAGE_POLICY_NAME)
throw; throw;
Poco::Util::AbstractConfiguration::Keys keys; Poco::Util::AbstractConfiguration::Keys keys;
@ -331,6 +331,11 @@ bool StoragePolicy::hasAnyVolumeWithDisabledMerges() const
return false; return false;
} }
bool StoragePolicy::containsVolume(const String & volume_name) const
{
return volume_index_by_volume_name.contains(volume_name);
}
StoragePolicySelector::StoragePolicySelector( StoragePolicySelector::StoragePolicySelector(
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
@ -345,6 +350,13 @@ StoragePolicySelector::StoragePolicySelector(
throw Exception( throw Exception(
"Storage policy name can contain only alphanumeric and '_' (" + backQuote(name) + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); "Storage policy name can contain only alphanumeric and '_' (" + backQuote(name) + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
/*
* A customization point for StoragePolicy, here one can add his own policy, for example, based on policy's name
* if (name == "MyCustomPolicy")
* policies.emplace(name, std::make_shared<CustomPolicy>(name, config, config_prefix + "." + name, disks));
* else
*/
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks)); policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
LOG_INFO(&Poco::Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name)); LOG_INFO(&Poco::Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name));
} }
@ -374,7 +386,7 @@ StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Uti
/// Second pass, load. /// Second pass, load.
for (const auto & [name, policy] : policies) for (const auto & [name, policy] : policies)
{ {
result->policies[name] = std::make_shared<StoragePolicy>(*policy, config, config_prefix + "." + name, disks); result->policies[name] = std::make_shared<StoragePolicy>(policy, config, config_prefix + "." + name, disks);
} }
return result; return result;

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <Disks/IStoragePolicy.h>
#include <Disks/DiskSelector.h> #include <Disks/DiskSelector.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <Disks/IVolume.h> #include <Disks/IVolume.h>
@ -23,14 +24,11 @@
namespace DB namespace DB
{ {
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
/** /**
* Contains all information about volumes configuration for Storage. * Contains all information about volumes configuration for Storage.
* Can determine appropriate Volume and Disk for each reservation. * Can determine appropriate Volume and Disk for each reservation.
*/ */
class StoragePolicy class StoragePolicy : public IStoragePolicy
{ {
public: public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks); StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
@ -38,62 +36,63 @@ public:
StoragePolicy(String name_, Volumes volumes_, double move_factor_); StoragePolicy(String name_, Volumes volumes_, double move_factor_);
StoragePolicy( StoragePolicy(
const StoragePolicy & storage_policy, StoragePolicyPtr storage_policy,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
DiskSelectorPtr disks DiskSelectorPtr disks
); );
bool isDefaultPolicy() const; bool isDefaultPolicy() const override;
/// Returns disks ordered by volumes priority /// Returns disks ordered by volumes priority
Disks getDisks() const; Disks getDisks() const override;
/// Returns any disk /// Returns any disk
/// Used when it's not important, for example for /// Used when it's not important, for example for
/// mutations files /// mutations files
DiskPtr getAnyDisk() const; DiskPtr getAnyDisk() const override;
DiskPtr getDiskByName(const String & disk_name) const; DiskPtr getDiskByName(const String & disk_name) const override;
/// Get free space from most free disk /// Get free space from most free disk
UInt64 getMaxUnreservedFreeSpace() const; UInt64 getMaxUnreservedFreeSpace() const override;
const String & getName() const { return name; } const String & getName() const override{ return name; }
/// Returns valid reservation or nullptr /// Returns valid reservation or nullptr
ReservationPtr reserve(UInt64 bytes) const; ReservationPtr reserve(UInt64 bytes) const override;
/// Reserves space on any volume or throws /// Reserves space on any volume or throws
ReservationPtr reserveAndCheck(UInt64 bytes) const; ReservationPtr reserveAndCheck(UInt64 bytes) const override;
/// Reserves space on any volume with index > min_volume_index or returns nullptr /// Reserves space on any volume with index > min_volume_index or returns nullptr
ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const; ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const override;
/// Find volume index, which contains disk /// Find volume index, which contains disk
size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const; size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const override;
/// Reserves 0 bytes on disk with max available space /// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size. /// Do not use this function when it is possible to predict size.
ReservationPtr makeEmptyReservationOnLargestDisk() const; ReservationPtr makeEmptyReservationOnLargestDisk() const override;
const Volumes & getVolumes() const { return volumes; } const Volumes & getVolumes() const override{ return volumes; }
/// Returns number [0., 1.] -- fraction of free space on disk /// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves /// which should be kept with help of background moves
double getMoveFactor() const { return move_factor; } double getMoveFactor() const override{ return move_factor; }
/// Get volume by index. /// Get volume by index.
VolumePtr getVolume(size_t index) const; VolumePtr getVolume(size_t index) const override;
VolumePtr getVolumeByName(const String & volume_name) const; VolumePtr getVolumeByName(const String & volume_name) const override;
/// Checks if storage policy can be replaced by another one. /// Checks if storage policy can be replaced by another one.
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const; void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const override;
/// Check if we have any volume with stopped merges /// Check if we have any volume with stopped merges
bool hasAnyVolumeWithDisabledMerges() const; bool hasAnyVolumeWithDisabledMerges() const override;
bool containsVolume(const String & volume_name) const override;
private: private:
Volumes volumes; Volumes volumes;
const String name; const String name;

View File

@ -19,7 +19,6 @@
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <AggregateFunctions/AggregateFunctionArray.h> #include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h> #include <AggregateFunctions/AggregateFunctionState.h>
#include <Disks/StoragePolicy.h>
#include <IO/Operators.h> #include <IO/Operators.h>

View File

@ -102,8 +102,8 @@ using DiskPtr = std::shared_ptr<IDisk>;
class DiskSelector; class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>; using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
using DisksMap = std::map<String, DiskPtr>; using DisksMap = std::map<String, DiskPtr>;
class StoragePolicy; class IStoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>; using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>; using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
class StoragePolicySelector; class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>; using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;

View File

@ -4,7 +4,7 @@
#include <DataStreams/OneBlockInputStream.h> #include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h> #include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/materializeBlock.h> #include <DataStreams/materializeBlock.h>
#include <Disks/StoragePolicy.h> #include <Disks/IVolume.h>
namespace DB namespace DB
{ {

View File

@ -8,7 +8,7 @@
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockInputStream.h> #include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h> #include <DataStreams/NativeBlockOutputStream.h>
#include <Disks/StoragePolicy.h> #include <Disks/IVolume.h>
namespace ProfileEvents namespace ProfileEvents

View File

@ -19,7 +19,7 @@
#include <common/getFQDNOrHostName.h> #include <common/getFQDNOrHostName.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/SettingsChanges.h> #include <Common/SettingsChanges.h>
#include <Disks/StoragePolicy.h> #include <Disks/IVolume.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h> #include <IO/ReadBufferFromIStream.h>

View File

@ -50,8 +50,8 @@ class Pipe;
class QueryPlan; class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>; using QueryPlanPtr = std::unique_ptr<QueryPlan>;
class StoragePolicy; class IStoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>; using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
struct StreamLocalLimits; struct StreamLocalLimits;
class EnabledQuota; class EnabledQuota;

View File

@ -3,7 +3,6 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h> #include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h> #include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h> #include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/SimpleMergeSelector.h> #include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h> #include <Storages/MergeTree/AllMergeSelector.h>
#include <Storages/MergeTree/TTLMergeSelector.h> #include <Storages/MergeTree/TTLMergeSelector.h>

View File

@ -1,7 +1,6 @@
#include <Storages/StorageDistributed.h> #include <Storages/StorageDistributed.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>

View File

@ -22,7 +22,6 @@
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h> #include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h> #include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/PartitionPruner.h> #include <Storages/MergeTree/PartitionPruner.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/MergeList.h> #include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h> #include <Storages/MergeTree/checkDataPart.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>

View File

@ -27,7 +27,6 @@
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h> #include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <Disks/StoragePolicy.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>

View File

@ -15,7 +15,7 @@
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <Disks/StoragePolicy.h> #include <Disks/IStoragePolicy.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <DataTypes/DataTypeUUID.h> #include <DataTypes/DataTypeUUID.h>