Filesystem abstraction layer

This commit is contained in:
Alexander Burmak 2019-11-27 12:39:44 +03:00
parent ad2af03f98
commit 3e5ef56644
36 changed files with 1004 additions and 661 deletions

View File

@ -87,7 +87,7 @@ PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 60
PenaltyReturnTypeOnItsOwnLine: 1000
SpaceAfterCStyleCast: false
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: ControlStatements

View File

@ -168,6 +168,7 @@ add_object_library(clickhouse_compression src/Compression)
add_object_library(clickhouse_datastreams src/DataStreams)
add_object_library(clickhouse_datatypes src/DataTypes)
add_object_library(clickhouse_databases src/Databases)
add_object_library(clickhouse_disks src/Disks)
add_object_library(clickhouse_interpreters src/Interpreters)
add_object_library(clickhouse_interpreters_clusterproxy src/Interpreters/ClusterProxy)
add_object_library(clickhouse_columns src/Columns)

View File

@ -65,6 +65,7 @@
#include <Storages/registerStorages.h>
#include <Storages/StorageDistributed.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Databases/DatabaseMemory.h>
#include <Common/StatusFile.h>
@ -2410,6 +2411,7 @@ void ClusterCopierApp::mainImpl()
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
static const std::string default_database = "_local";
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));

View File

@ -32,6 +32,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/registerStorages.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options.hpp>
#include <common/argsToConfig.h>
@ -152,6 +153,7 @@ try
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
/// Maybe useless
if (config().has("macros"))

View File

@ -50,6 +50,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/registerStorages.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Common/Config/ConfigReloader.h>
#include "HTTPHandlerFactory.h"
#include "MetricsTransmitter.h"
@ -187,6 +188,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::get());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());

View File

@ -5,6 +5,7 @@ add_subdirectory (Core)
add_subdirectory (DataStreams)
add_subdirectory (DataTypes)
add_subdirectory (Dictionaries)
add_subdirectory (Disks)
add_subdirectory (Storages)
add_subdirectory (Parsers)
add_subdirectory (IO)

View File

@ -1,360 +0,0 @@
#pragma once
#include <mutex>
#include <sys/statvfs.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#if defined(__linux__)
#include <cstdio>
#include <mntent.h>
#endif
#include <memory>
#include <filesystem>
#include <boost/noncopyable.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <Common/formatReadable.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_STATVFS;
extern const int NOT_ENOUGH_SPACE;
extern const int NOT_IMPLEMENTED;
extern const int SYSTEM_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int UNKNOWN_POLICY;
extern const int UNKNOWN_DISK;
}
namespace DiskSpace
{
class Reservation;
using ReservationPtr = std::unique_ptr<Reservation>;
/// Returns mount point of filesystem where absoulte_path (must exist) is located
std::filesystem::path getMountPoint(std::filesystem::path absolute_path);
/// Returns name of filesystem mounted to mount_point
#if !defined(__linux__)
[[noreturn]]
#endif
std::string getFilesystemName([[maybe_unused]] const std::string & mount_point);
inline struct statvfs getStatVFS(const std::string & path)
{
struct statvfs fs;
if (statvfs(path.c_str(), &fs) != 0)
throwFromErrnoWithPath(
"Could not calculate available disk space (statvfs)", path, ErrorCodes::CANNOT_STATVFS);
return fs;
}
/**
* Provide interface for reservation
*/
class Space : public std::enable_shared_from_this<Space>
{
public:
virtual ReservationPtr reserve(UInt64 bytes) const = 0;
virtual const String & getName() const = 0;
virtual ~Space() = default;
};
using SpacePtr = std::shared_ptr<const Space>;
/** Disk - Smallest space unit.
* path - Path to space. Ends with /
* name - Unique key using for disk space reservation.
*/
class Disk : public Space
{
public:
friend class Reservation;
/// Snapshot of disk space state (free and total space)
class Stat
{
struct statvfs fs{};
UInt64 keep_free_space_bytes;
public:
explicit Stat(const Disk & disk)
{
if (statvfs(disk.path.c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
keep_free_space_bytes = disk.keep_free_space_bytes;
}
/// Total space on disk using information from statvfs
UInt64 getTotalSpace() const;
/// Available space on disk using information from statvfs
UInt64 getAvailableSpace() const;
};
Disk(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_)
, path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
{
if (path.back() != '/')
throw Exception("Disk path must ends with '/', but '" + path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
}
/// Reserves bytes on disk, if not possible returns nullptr.
ReservationPtr reserve(UInt64 bytes) const override;
/// Disk name from configuration;
const String & getName() const override { return name; }
/// Path on fs to disk
const String & getPath() const { return path; }
/// Path to clickhouse data directory on this disk
String getClickHouseDataPath() const { return path + "data/"; }
/// Amount of bytes which should be kept free on this disk
UInt64 getKeepingFreeSpace() const { return keep_free_space_bytes; }
/// Snapshot of disk space state (free and total space)
Stat getSpaceInformation() const { return Stat(*this); }
/// Total available space on disk
UInt64 getTotalSpace() const { return getSpaceInformation().getTotalSpace(); }
/// Space currently available on disk, take information from statvfs call
UInt64 getAvailableSpace() const { return getSpaceInformation().getAvailableSpace(); }
/// Currently available (prev method) minus already reserved space
UInt64 getUnreservedSpace() const;
private:
const String name;
const String path;
const UInt64 keep_free_space_bytes;
/// Used for reservation counters modification
static std::mutex mutex;
mutable UInt64 reserved_bytes = 0;
mutable UInt64 reservation_count = 0;
private:
/// Reserves bytes on disk, if not possible returns false
bool tryReserve(UInt64 bytes) const;
};
/// It is not possible to change disk runtime.
using DiskPtr = std::shared_ptr<const Disk>;
using Disks = std::vector<DiskPtr>;
/** Information about reserved size on concrete disk.
* Unreserve on destroy. Doesn't reserve bytes in constructor.
*/
class Reservation final : private boost::noncopyable
{
public:
Reservation(UInt64 size_, DiskPtr disk_ptr_)
: size(size_)
, metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size)
, disk_ptr(disk_ptr_)
{
}
/// Unreserves reserved space and decrement reservations count on disk
~Reservation();
/// Changes amount of reserved space. When new_size is greater than before,
/// availability of free space is not checked.
void update(UInt64 new_size);
/// Get reservation size
UInt64 getSize() const { return size; }
/// Get disk where reservation take place
const DiskPtr & getDisk() const { return disk_ptr; }
private:
UInt64 size;
CurrentMetrics::Increment metric_increment;
DiskPtr disk_ptr;
};
/// Parse .xml configuration and store information about disks
/// Mostly used for introspection.
class DiskSelector
{
public:
DiskSelector(const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, const String & default_path);
/// Get disk by name
const DiskPtr & operator[](const String & name) const;
/// Get all disks name
const auto & getDisksMap() const { return disks; }
private:
std::map<String, DiskPtr> disks;
};
/**
* Disks group by some (user) criteria. For example,
* - Volume("slow_disks", [d1, d2], 100)
* - Volume("fast_disks", [d3, d4], 200)
* Cannot store parts larger than max_data_part_size.
*/
class Volume : public Space
{
friend class StoragePolicy;
public:
Volume(String name_, std::vector<DiskPtr> disks_, UInt64 max_data_part_size_)
: max_data_part_size(max_data_part_size_)
, disks(std::move(disks_))
, name(std::move(name_))
{
}
Volume(String name_, const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, const DiskSelector & disk_selector);
/// Uses Round-robin to choose disk for reservation.
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) const override;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
/// Volume name from config
const String & getName() const override { return name; }
/// Max size of reservation
UInt64 max_data_part_size = 0;
/// Disks in volume
Disks disks;
private:
mutable std::atomic<size_t> last_used = 0;
const String name;
};
using VolumePtr = std::shared_ptr<const Volume>;
using Volumes = std::vector<VolumePtr>;
/**
* Contains all information about volumes configuration for Storage.
* Can determine appropriate Volume and Disk for each reservation.
*/
class StoragePolicy : public Space
{
public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, const DiskSelector & disks);
StoragePolicy(String name_, Volumes volumes_, double move_factor_);
/// Returns disks ordered by volumes priority
Disks getDisks() const;
/// Returns any disk
/// Used when it's not important, for example for
/// mutations files
DiskPtr getAnyDisk() const;
DiskPtr getDiskByName(const String & disk_name) const;
/// Get free space from most free disk
UInt64 getMaxUnreservedFreeSpace() const;
const String & getName() const override { return name; }
/// Returns valid reservation or null
ReservationPtr reserve(UInt64 bytes) const override;
/// Reserve space on any volume with index > min_volume_index
ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const;
/// Find volume index, which contains disk
size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const;
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size.
ReservationPtr makeEmptyReservationOnLargestDisk() const;
const Volumes & getVolumes() const { return volumes; }
/// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves
double getMoveFactor() const { return move_factor; }
/// Get volume by index from storage_policy
VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); }
VolumePtr getVolumeByName(const String & volume_name) const
{
auto it = volumes_names.find(volume_name);
if (it == volumes_names.end())
return {};
return getVolume(it->second);
}
private:
Volumes volumes;
const String name;
std::map<String, size_t> volumes_names;
/// move_factor from interval [0., 1.]
/// We move something if disk from this policy
/// filled more than total_size * move_factor
double move_factor = 0.1; /// by default move factor is 10%
};
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
/// Parse .xml configuration and store information about policies
/// Mostly used for introspection.
class StoragePolicySelector
{
public:
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const DiskSelector & disks);
/// Policy by name
const StoragePolicyPtr & operator[](const String & name) const;
/// All policies
const std::map<String, StoragePolicyPtr> & getPoliciesMap() const { return policies; }
private:
std::map<String, StoragePolicyPtr> policies;
};
}
}

View File

@ -10,7 +10,7 @@
#include <common/demangle.h>
#include <Common/config_version.h>
#include <Common/formatReadable.h>
#include <Common/DiskSpaceMonitor.h>
#include <Common/filesystemHelpers.h>
#include <filesystem>
namespace DB
@ -84,16 +84,16 @@ void getNoSpaceLeftInfoMessage(std::filesystem::path path, std::string & msg)
while (!std::filesystem::exists(path) && path.has_relative_path())
path = path.parent_path();
auto fs = DiskSpace::getStatVFS(path);
auto fs = getStatVFS(path);
msg += "\nTotal space: " + formatReadableSizeWithBinarySuffix(fs.f_blocks * fs.f_bsize)
+ "\nAvailable space: " + formatReadableSizeWithBinarySuffix(fs.f_bavail * fs.f_bsize)
+ "\nTotal inodes: " + formatReadableQuantity(fs.f_files)
+ "\nAvailable inodes: " + formatReadableQuantity(fs.f_favail);
auto mount_point = DiskSpace::getMountPoint(path).string();
auto mount_point = getMountPoint(path).string();
msg += "\nMount point: " + mount_point;
#if defined(__linux__)
msg += "\nFilesystem: " + DiskSpace::getFilesystemName(mount_point);
msg += "\nFilesystem: " + getFilesystemName(mount_point);
#endif
}

View File

@ -1,10 +1,21 @@
#include <Common/filesystemHelpers.h>
#include "filesystemHelpers.h"
#include <sys/stat.h>
#if defined(__linux__)
# include <cstdio>
# include <mntent.h>
#endif
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Poco/Version.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SYSTEM_ERROR;
}
bool enoughSpaceInDirectory(const std::string & path [[maybe_unused]], size_t data_size [[maybe_unused]])
{
@ -24,4 +35,59 @@ std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
return std::make_unique<TemporaryFile>(path);
}
std::filesystem::path getMountPoint(std::filesystem::path absolute_path)
{
if (absolute_path.is_relative())
throw Exception("Path is relative. It's a bug.", ErrorCodes::LOGICAL_ERROR);
absolute_path = std::filesystem::canonical(absolute_path);
const auto get_device_id = [](const std::filesystem::path & p)
{
struct stat st;
if (stat(p.c_str(), &st))
throwFromErrnoWithPath("Cannot stat " + p.string(), p.string(), ErrorCodes::SYSTEM_ERROR);
return st.st_dev;
};
/// If /some/path/to/dir/ and /some/path/to/ have different device id,
/// then device which contains /some/path/to/dir/filename is mounted to /some/path/to/dir/
auto device_id = get_device_id(absolute_path);
while (absolute_path.has_relative_path())
{
auto parent = absolute_path.parent_path();
auto parent_device_id = get_device_id(parent);
if (device_id != parent_device_id)
return absolute_path;
absolute_path = parent;
device_id = parent_device_id;
}
return absolute_path;
}
/// Returns name of filesystem mounted to mount_point
#if !defined(__linux__)
[[noreturn]]
#endif
String getFilesystemName([[maybe_unused]] const String & mount_point)
{
#if defined(__linux__)
auto mounted_filesystems = setmntent("/etc/mtab", "r");
if (!mounted_filesystems)
throw DB::Exception("Cannot open /etc/mtab to get name of filesystem", ErrorCodes::SYSTEM_ERROR);
mntent fs_info;
constexpr size_t buf_size = 4096; /// The same as buffer used for getmntent in glibc. It can happen that it's not enough
char buf[buf_size];
while (getmntent_r(mounted_filesystems, &fs_info, buf, buf_size) && fs_info.mnt_dir != mount_point)
;
endmntent(mounted_filesystems);
if (fs_info.mnt_dir != mount_point)
throw DB::Exception("Cannot find name of filesystem by mount point " + mount_point, ErrorCodes::SYSTEM_ERROR);
return fs_info.mnt_fsname;
#else
throw DB::Exception("The function getFilesystemName is supported on Linux only", ErrorCodes::NOT_IMPLEMENTED);
#endif
}
}

View File

@ -1,16 +1,43 @@
#pragma once
#include <string>
#include <memory>
#include <Core/Types.h>
#include <Common/Exception.h>
#include <filesystem>
#include <memory>
#include <string>
#include <sys/statvfs.h>
#include <Poco/TemporaryFile.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_STATVFS;
}
using TemporaryFile = Poco::TemporaryFile;
bool enoughSpaceInDirectory(const std::string & path, size_t data_size);
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
/// Returns mount point of filesystem where absoulte_path (must exist) is located
std::filesystem::path getMountPoint(std::filesystem::path absolute_path);
/// Returns name of filesystem mounted to mount_point
#if !defined(__linux__)
[[noreturn]]
#endif
String
getFilesystemName([[maybe_unused]] const String & mount_point);
inline struct statvfs getStatVFS(const String & path)
{
struct statvfs fs;
if (statvfs(path.c_str(), &fs) != 0)
throwFromErrnoWithPath("Could not calculate available disk space (statvfs)", path, ErrorCodes::CANNOT_STATVFS);
return fs;
}
}

View File

View File

@ -0,0 +1,39 @@
#include "DiskFactory.h"
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
DiskFactory & DiskFactory::instance()
{
static DiskFactory factory;
return factory;
}
void DiskFactory::registerDisk(const String & disk_type, DB::DiskFactory::Creator creator)
{
if (!registry.emplace(disk_type, creator).second)
throw Exception("DiskFactory: the disk type '" + disk_type + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}
DiskPtr DiskFactory::get(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) const
{
const auto disk_type = config.getString(config_prefix + ".type", "local");
const auto found = registry.find(disk_type);
if (found == registry.end())
throw Exception{"DiskFactory: the disk '" + name + "' has unknown disk type: " + disk_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
const auto & disk_creator = found->second;
return disk_creator(name, config, config_prefix, context);
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <Core/Types.h>
#include <Disks/IDisk.h>
#include <functional>
#include <unordered_map>
#include <boost/noncopyable.hpp>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
class Context;
class DiskFactory final : private boost::noncopyable
{
public:
using Creator = std::function<DiskPtr(
const String & name, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context)>;
static DiskFactory & instance();
void registerDisk(const String & disk_type, Creator creator);
DiskPtr get(
const String & name, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) const;
private:
using DiskTypeRegistry = std::unordered_map<String, Creator>;
DiskTypeRegistry registry;
};
}

View File

@ -0,0 +1,194 @@
#include "DiskLocal.h"
#include "DiskFactory.h"
#include <Common/filesystemHelpers.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
namespace DB
{
std::mutex DiskLocal::mutex;
ReservationPtr DiskLocal::reserve(UInt64 bytes) const
{
if (!tryReserve(bytes))
return {};
return std::make_unique<DiskLocalReservation>(std::static_pointer_cast<const DiskLocal>(shared_from_this()), bytes);
}
bool DiskLocal::tryReserve(UInt64 bytes) const
{
std::lock_guard lock(mutex);
if (bytes == 0)
{
LOG_DEBUG(&Logger::get("DiskLocal"), "Reserving 0 bytes on disk " << backQuote(name));
++reservation_count;
return true;
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(
&Logger::get("DiskLocal"),
"Reserving " << formatReadableSizeWithBinarySuffix(bytes) << " on disk " << backQuote(name) << ", having unreserved "
<< formatReadableSizeWithBinarySuffix(unreserved_space) << ".");
++reservation_count;
reserved_bytes += bytes;
return true;
}
return false;
}
UInt64 DiskLocal::getTotalSpace() const
{
auto fs = getStatVFS(path);
UInt64 total_size = fs.f_blocks * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getAvailableSpace() const
{
/// we use f_bavail, because part of b_free space is
/// available for superuser only and for system purposes
auto fs = getStatVFS(path);
UInt64 total_size = fs.f_bavail * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getUnreservedSpace() const
{
std::lock_guard lock(mutex);
auto available_space = getAvailableSpace();
available_space -= std::min(available_space, reserved_bytes);
return available_space;
}
DiskFilePtr DiskLocal::file(const String & path_) const
{
return std::make_shared<DiskLocalFile>(std::static_pointer_cast<const DiskLocal>(shared_from_this()), path_);
}
DiskDirectoryIteratorImplPtr DiskLocalFile::iterateDirectory()
{
return std::make_unique<DiskLocalDirectoryIterator>(shared_from_this());
}
DiskLocalDirectoryIterator::DiskLocalDirectoryIterator(const DiskFilePtr & parent_) : parent(parent_), iter(parent_->fullPath())
{
updateCurrentFile();
}
void DiskLocalDirectoryIterator::next()
{
++iter;
updateCurrentFile();
}
void DiskLocalDirectoryIterator::updateCurrentFile()
{
current_file.reset();
if (iter != Poco::DirectoryIterator())
{
String path = parent->path() + iter.name();
current_file = std::make_shared<DiskLocalFile>(parent->disk(), path);
}
}
void DiskLocalReservation::update(UInt64 new_size)
{
std::lock_guard lock(DiskLocal::mutex);
auto disk_local = std::static_pointer_cast<const DiskLocal>(disk_ptr);
disk_local->reserved_bytes -= size;
size = new_size;
disk_local->reserved_bytes += size;
}
DiskLocalReservation::~DiskLocalReservation()
{
try
{
std::lock_guard lock(DiskLocal::mutex);
auto disk_local = std::static_pointer_cast<const DiskLocal>(disk_ptr);
if (disk_local->reserved_bytes < size)
{
disk_local->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservations size for disk '" + disk_ptr->getName() + "'.");
}
else
{
disk_local->reserved_bytes -= size;
}
if (disk_local->reservation_count == 0)
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservation count for disk '" + disk_ptr->getName() + "'.");
else
--disk_local->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void registerDiskLocal(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
String path = config.getString(config_prefix + ".path", "");
if (name == "default")
{
if (!path.empty())
throw Exception(
"\"default\" disk path should be provided in <path> not it <storage_configuration>",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
path = context.getPath();
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (path.back() != '/')
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
throw Exception(
"Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
UInt64 keep_free_space_bytes = config.getUInt64(config_prefix + ".keep_free_space_bytes", 0);
if (has_space_ratio)
{
auto ratio = config.getDouble(config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception("'keep_free_space_ratio' have to be between 0 and 1", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
String tmp_path = path;
if (tmp_path.empty())
tmp_path = context.getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
return std::make_shared<const DiskLocal>(name, path, keep_free_space_bytes);
};
factory.registerDisk("local", creator);
}
}

132
dbms/src/Disks/DiskLocal.h Normal file
View File

@ -0,0 +1,132 @@
#pragma once
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <mutex>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class DiskLocalReservation;
class DiskLocal : public IDisk
{
public:
friend class DiskLocalReservation;
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_), path(path_), keep_free_space_bytes(keep_free_space_bytes_)
{
if (path.back() != '/')
throw Exception("Disk path must ends with '/', but '" + path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
}
const String & getName() const override { return name; }
const String & getPath() const override { return path; }
ReservationPtr reserve(UInt64 bytes) const override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override { return keep_free_space_bytes; }
DiskFilePtr file(const String & path) const override;
private:
bool tryReserve(UInt64 bytes) const;
private:
const String name;
const String path;
const UInt64 keep_free_space_bytes;
/// Used for reservation counters modification
static std::mutex mutex;
mutable UInt64 reserved_bytes = 0;
mutable UInt64 reservation_count = 0;
};
using DiskLocalPtr = std::shared_ptr<const DiskLocal>;
class DiskLocalFile : public IDiskFile
{
public:
DiskLocalFile(const DiskPtr & disk_ptr_, const String & rel_path_)
: IDiskFile(disk_ptr_, rel_path_), file(disk_ptr->getPath() + rel_path)
{
}
bool exists() const override { return file.exists(); }
bool isDirectory() const override { return file.isDirectory(); }
void createDirectory() override { file.createDirectory(); }
void createDirectories() override { file.createDirectories(); }
void moveTo(const String & new_path) override { file.renameTo(new_path); }
void copyTo(const String & new_path) override { file.copyTo(new_path); }
std::unique_ptr<ReadBuffer> read() const override { return std::make_unique<ReadBufferFromFile>(file.path()); }
std::unique_ptr<WriteBuffer> write() override { return std::make_unique<WriteBufferFromFile>(file.path()); }
private:
DiskDirectoryIteratorImplPtr iterateDirectory() override;
private:
Poco::File file;
};
class DiskLocalDirectoryIterator : public IDiskDirectoryIteratorImpl
{
public:
explicit DiskLocalDirectoryIterator(const DiskFilePtr & parent_);
const String & name() const override { return iter.name(); }
const DiskFilePtr & get() const override { return current_file; }
void next() override;
bool isValid() const override { return bool(current_file); }
private:
void updateCurrentFile();
private:
DiskFilePtr parent;
Poco::DirectoryIterator iter;
DiskFilePtr current_file;
};
/**
* Information about reserved size on concrete local disk.
* Doesn't reserve bytes in constructor.
*/
class DiskLocalReservation : public IReservation
{
public:
DiskLocalReservation(const DiskLocalPtr & disk_, UInt64 size_) : IReservation(disk_, size_) {}
void update(UInt64 new_size) override;
~DiskLocalReservation() override;
};
}

View File

@ -1,174 +1,24 @@
#include <Common/DiskSpaceMonitor.h>
#include "DiskSpaceMonitor.h"
#include "DiskFactory.h"
#include "DiskLocal.h"
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
#include <set>
#include <Poco/File.h>
namespace DB
{
namespace DiskSpace
{
std::mutex Disk::mutex;
std::filesystem::path getMountPoint(std::filesystem::path absolute_path)
{
if (absolute_path.is_relative())
throw Exception("Path is relative. It's a bug.", ErrorCodes::LOGICAL_ERROR);
absolute_path = std::filesystem::canonical(absolute_path);
const auto get_device_id = [](const std::filesystem::path & p)
{
struct stat st;
if (stat(p.c_str(), &st))
throwFromErrnoWithPath("Cannot stat " + p.string(), p.string(), ErrorCodes::SYSTEM_ERROR);
return st.st_dev;
};
/// If /some/path/to/dir/ and /some/path/to/ have different device id,
/// then device which contains /some/path/to/dir/filename is mounted to /some/path/to/dir/
auto device_id = get_device_id(absolute_path);
while (absolute_path.has_relative_path())
{
auto parent = absolute_path.parent_path();
auto parent_device_id = get_device_id(parent);
if (device_id != parent_device_id)
return absolute_path;
absolute_path = parent;
device_id = parent_device_id;
}
return absolute_path;
}
/// Returns name of filesystem mounted to mount_point
#if !defined(__linux__)
[[noreturn]]
#endif
std::string getFilesystemName([[maybe_unused]] const std::string & mount_point)
{
#if defined(__linux__)
auto mounted_filesystems = setmntent("/etc/mtab", "r");
if (!mounted_filesystems)
throw DB::Exception("Cannot open /etc/mtab to get name of filesystem", ErrorCodes::SYSTEM_ERROR);
mntent fs_info;
constexpr size_t buf_size = 4096; /// The same as buffer used for getmntent in glibc. It can happen that it's not enough
char buf[buf_size];
while (getmntent_r(mounted_filesystems, &fs_info, buf, buf_size) && fs_info.mnt_dir != mount_point)
;
endmntent(mounted_filesystems);
if (fs_info.mnt_dir != mount_point)
throw DB::Exception("Cannot find name of filesystem by mount point " + mount_point, ErrorCodes::SYSTEM_ERROR);
return fs_info.mnt_fsname;
#else
throw DB::Exception("The function getFilesystemName is supported on Linux only", ErrorCodes::NOT_IMPLEMENTED);
#endif
}
ReservationPtr Disk::reserve(UInt64 bytes) const
{
if (!tryReserve(bytes))
return {};
return std::make_unique<Reservation>(bytes, std::static_pointer_cast<const Disk>(shared_from_this()));
}
bool Disk::tryReserve(UInt64 bytes) const
{
std::lock_guard lock(mutex);
if (bytes == 0)
{
LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Reserving 0 bytes on disk " << backQuote(name));
++reservation_count;
return true;
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(
&Logger::get("DiskSpaceMonitor"),
"Reserving " << formatReadableSizeWithBinarySuffix(bytes) << " on disk " << backQuote(name)
<< ", having unreserved " << formatReadableSizeWithBinarySuffix(unreserved_space) << ".");
++reservation_count;
reserved_bytes += bytes;
return true;
}
return false;
}
UInt64 Disk::getUnreservedSpace() const
{
std::lock_guard lock(mutex);
auto available_space = getSpaceInformation().getAvailableSpace();
available_space -= std::min(available_space, reserved_bytes);
return available_space;
}
UInt64 Disk::Stat::getTotalSpace() const
{
UInt64 total_size = fs.f_blocks * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;
return total_size - keep_free_space_bytes;
}
UInt64 Disk::Stat::getAvailableSpace() const
{
/// we use f_bavail, because part of b_free space is
/// available for superuser only and for system purposes
UInt64 total_size = fs.f_bavail * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;
return total_size - keep_free_space_bytes;
}
Reservation::~Reservation()
{
try
{
std::lock_guard lock(Disk::mutex);
if (disk_ptr->reserved_bytes < size)
{
disk_ptr->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservations size for disk '" + disk_ptr->getName() + "'.");
}
else
{
disk_ptr->reserved_bytes -= size;
}
if (disk_ptr->reservation_count == 0)
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservation count for disk '" + disk_ptr->getName() + "'.");
else
--disk_ptr->reservation_count;
}
catch (...)
{
tryLogCurrentException("~DiskSpaceMonitor");
}
}
void Reservation::update(UInt64 new_size)
{
std::lock_guard lock(Disk::mutex);
disk_ptr->reserved_bytes -= size;
size = new_size;
disk_ptr->reserved_bytes += size;
}
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const String & default_path)
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto & factory = DiskFactory::instance();
constexpr auto default_disk_name = "default";
bool has_default_disk = false;
for (const auto & disk_name : keys)
@ -177,53 +27,15 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (disk_name == default_disk_name)
has_default_disk = true;
auto disk_config_prefix = config_prefix + "." + disk_name;
bool has_space_ratio = config.has(disk_config_prefix + ".keep_free_space_ratio");
if (config.has(disk_config_prefix + ".keep_free_space_bytes") && has_space_ratio)
throw Exception("Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
UInt64 keep_free_space_bytes = config.getUInt64(disk_config_prefix + ".keep_free_space_bytes", 0);
String path;
if (config.has(disk_config_prefix + ".path"))
path = config.getString(disk_config_prefix + ".path");
if (has_space_ratio)
{
auto ratio = config.getDouble(disk_config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception("'keep_free_space_ratio' have to be between 0 and 1",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
String tmp_path = path;
if (tmp_path.empty())
tmp_path = default_path;
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(Disk("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
if (disk_name == default_disk_name)
{
has_default_disk = true;
if (!path.empty())
throw Exception("\"default\" disk path should be provided in <path> not it <storage_configuration>",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
disks.emplace(disk_name, std::make_shared<const Disk>(disk_name, default_path, keep_free_space_bytes));
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + disk_name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (path.back() != '/')
throw Exception("Disk path must end with /. Disk " + disk_name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
disks.emplace(disk_name, std::make_shared<const Disk>(disk_name, path, keep_free_space_bytes));
}
disks.emplace(disk_name, factory.get(disk_name, config, disk_config_prefix, context));
}
if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<const Disk>(default_disk_name, default_path, 0));
disks.emplace(default_disk_name, std::make_shared<const DiskLocal>(default_disk_name, context.getPath(), 0));
}
@ -239,7 +51,7 @@ const DiskPtr & DiskSelector::operator[](const String & name) const
Volume::Volume(
String name_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const String & config_prefix,
const DiskSelector & disk_selector)
: name(std::move(name_))
{
@ -332,7 +144,7 @@ UInt64 Volume::getMaxUnreservedFreeSpace() const
StoragePolicy::StoragePolicy(
String name_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const String & config_prefix,
const DiskSelector & disks)
: name(std::move(name_))
{
@ -537,5 +349,3 @@ const StoragePolicyPtr & StoragePolicySelector::operator[](const String & name)
}
}
}

View File

@ -0,0 +1,179 @@
#pragma once
#include <common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Disks/IDisk.h>
#include <IO/WriteHelpers.h>
#include <memory>
#include <mutex>
#include <unistd.h>
#include <boost/noncopyable.hpp>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_ENOUGH_SPACE;
extern const int NOT_IMPLEMENTED;
extern const int SYSTEM_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int UNKNOWN_POLICY;
extern const int UNKNOWN_DISK;
}
/// Parse .xml configuration and store information about disks
/// Mostly used for introspection.
class DiskSelector
{
public:
DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context);
/// Get disk by name
const DiskPtr & operator[](const String & name) const;
/// Get all disks name
const auto & getDisksMap() const { return disks; }
private:
std::map<String, DiskPtr> disks;
};
/**
* Disks group by some (user) criteria. For example,
* - Volume("slow_disks", [d1, d2], 100)
* - Volume("fast_disks", [d3, d4], 200)
* Cannot store parts larger than max_data_part_size.
*/
class Volume : public Space
{
friend class StoragePolicy;
public:
Volume(String name_, std::vector<DiskPtr> disks_, UInt64 max_data_part_size_)
: max_data_part_size(max_data_part_size_), disks(std::move(disks_)), name(std::move(name_))
{
}
Volume(
String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disk_selector);
/// Uses Round-robin to choose disk for reservation.
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) const override;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
/// Volume name from config
const String & getName() const override { return name; }
/// Max size of reservation
UInt64 max_data_part_size = 0;
/// Disks in volume
Disks disks;
private:
mutable std::atomic<size_t> last_used = 0;
const String name;
};
using VolumePtr = std::shared_ptr<const Volume>;
using Volumes = std::vector<VolumePtr>;
/**
* Contains all information about volumes configuration for Storage.
* Can determine appropriate Volume and Disk for each reservation.
*/
class StoragePolicy : public Space
{
public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disks);
StoragePolicy(String name_, Volumes volumes_, double move_factor_);
/// Returns disks ordered by volumes priority
Disks getDisks() const;
/// Returns any disk
/// Used when it's not important, for example for
/// mutations files
DiskPtr getAnyDisk() const;
DiskPtr getDiskByName(const String & disk_name) const;
/// Get free space from most free disk
UInt64 getMaxUnreservedFreeSpace() const;
const String & getName() const override { return name; }
/// Returns valid reservation or null
ReservationPtr reserve(UInt64 bytes) const override;
/// Reserve space on any volume with index > min_volume_index
ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const;
/// Find volume index, which contains disk
size_t getVolumeIndexByDisk(const DiskPtr & disk_ptr) const;
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size.
ReservationPtr makeEmptyReservationOnLargestDisk() const;
const Volumes & getVolumes() const { return volumes; }
/// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves
double getMoveFactor() const { return move_factor; }
/// Get volume by index from storage_policy
VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); }
VolumePtr getVolumeByName(const String & volume_name) const
{
auto it = volumes_names.find(volume_name);
if (it == volumes_names.end())
return {};
return getVolume(it->second);
}
private:
Volumes volumes;
const String name;
std::map<String, size_t> volumes_names;
/// move_factor from interval [0., 1.]
/// We move something if disk from this policy
/// filled more than total_size * move_factor
double move_factor = 0.1; /// by default move factor is 10%
};
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
/// Parse .xml configuration and store information about policies
/// Mostly used for introspection.
class StoragePolicySelector
{
public:
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disks);
/// Policy by name
const StoragePolicyPtr & operator[](const String & name) const;
/// All policies
const std::map<String, StoragePolicyPtr> & getPoliciesMap() const { return policies; }
private:
std::map<String, StoragePolicyPtr> policies;
};
}

193
dbms/src/Disks/IDisk.h Normal file
View File

@ -0,0 +1,193 @@
#pragma once
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Core/Types.h>
#include <memory>
#include <utility>
#include <boost/noncopyable.hpp>
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
}
namespace DB
{
class IDiskFile;
using DiskFilePtr = std::shared_ptr<IDiskFile>;
class DiskDirectoryIterator;
class IDiskDirectoryIteratorImpl;
using DiskDirectoryIteratorImplPtr = std::unique_ptr<IDiskDirectoryIteratorImpl>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
class ReadBuffer;
class WriteBuffer;
/**
* Provide interface for reservation.
*/
class Space : public std::enable_shared_from_this<Space>
{
public:
virtual const String & getName() const = 0;
virtual ReservationPtr reserve(UInt64 bytes) const = 0;
virtual ~Space() = default;
};
using SpacePtr = std::shared_ptr<const Space>;
class IDisk : public Space
{
public:
virtual const String & getPath() const = 0;
/// Total available space on disk.
virtual UInt64 getTotalSpace() const = 0;
/// Space currently available on disk.
virtual UInt64 getAvailableSpace() const = 0;
/// Currently available (prev method) minus already reserved space.
virtual UInt64 getUnreservedSpace() const = 0;
/// Amount of bytes which should be kept free on this disk.
virtual UInt64 getKeepingFreeSpace() const { return 0; }
virtual DiskFilePtr file(const String & path) const = 0;
};
using DiskPtr = std::shared_ptr<const IDisk>;
using Disks = std::vector<DiskPtr>;
class IDiskFile : public std::enable_shared_from_this<IDiskFile>
{
public:
friend class DiskDirectoryIterator;
const DiskPtr & disk() const { return disk_ptr; }
const String & path() const { return rel_path; }
String fullPath() const { return disk_ptr->getPath() + rel_path; }
virtual bool exists() const = 0;
virtual bool isDirectory() const = 0;
virtual void createDirectory() = 0;
virtual void createDirectories() = 0;
virtual void moveTo(const String & new_path) = 0;
virtual void copyTo(const String & new_path) = 0;
virtual std::unique_ptr<ReadBuffer> read() const = 0;
virtual std::unique_ptr<WriteBuffer> write() = 0;
virtual ~IDiskFile() = default;
protected:
IDiskFile(const DiskPtr & disk_ptr_, const String & rel_path_) : disk_ptr(disk_ptr_), rel_path(rel_path_) {}
private:
virtual DiskDirectoryIteratorImplPtr iterateDirectory() = 0;
protected:
DiskPtr disk_ptr;
String rel_path;
};
class IDiskDirectoryIteratorImpl
{
public:
virtual const String & name() const = 0;
virtual const DiskFilePtr & get() const = 0;
virtual void next() = 0;
virtual bool isValid() const = 0;
virtual ~IDiskDirectoryIteratorImpl() = default;
};
class DiskDirectoryIterator final
{
public:
DiskDirectoryIterator() = default;
explicit DiskDirectoryIterator(const DiskFilePtr & file) : impl(file->iterateDirectory()) {}
String name() const { return impl->name(); }
DiskDirectoryIterator & operator++()
{
impl->next();
return *this;
}
const DiskFilePtr & operator*() const { return impl->get(); }
const DiskFilePtr & operator->() const { return impl->get(); }
bool operator==(const DiskDirectoryIterator & iterator) const
{
if (this == &iterator)
return true;
if (iterator.impl && iterator.impl->isValid())
return false;
if (impl && impl->isValid())
return false;
return true;
}
bool operator!=(const DiskDirectoryIterator & iterator) const { return !operator==(iterator); }
private:
DiskDirectoryIteratorImplPtr impl;
};
/**
* Information about reserved size on concrete disk.
*/
class IReservation
{
public:
/// Get reservation size.
UInt64 getSize() const { return size; }
/// Get disk where reservation take place.
const DiskPtr & getDisk() const { return disk_ptr; }
/// Changes amount of reserved space.
virtual void update(UInt64 new_size) = 0;
/// Unreserves reserved space.
virtual ~IReservation() = default;
protected:
explicit IReservation(const DiskPtr & disk_ptr_, UInt64 size_)
: disk_ptr(disk_ptr_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
protected:
DiskPtr disk_ptr;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
}

View File

@ -0,0 +1,15 @@
#include "DiskFactory.h"
namespace DB
{
void registerDiskLocal(DiskFactory & factory);
void registerDisks()
{
auto & factory = DiskFactory::instance();
registerDiskLocal(factory);
}
}

View File

@ -0,0 +1,8 @@
#pragma once
namespace DB
{
void registerDisks();
}

View File

@ -146,9 +146,9 @@ struct ContextShared
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage disk chooser
mutable std::unique_ptr<DiskSpace::DiskSelector> merge_tree_disk_selector;
mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector;
/// Storage policy chooser
mutable std::unique_ptr<DiskSpace::StoragePolicySelector> merge_tree_storage_policy_selector;
mutable std::unique_ptr<StoragePolicySelector> merge_tree_storage_policy_selector;
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
@ -1782,7 +1782,7 @@ CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double par
}
const DiskSpace::DiskPtr & Context::getDisk(const String & name) const
const DiskPtr & Context::getDisk(const String & name) const
{
auto lock = getLock();
@ -1792,7 +1792,7 @@ const DiskSpace::DiskPtr & Context::getDisk(const String & name) const
}
DiskSpace::DiskSelector & Context::getDiskSelector() const
DiskSelector & Context::getDiskSelector() const
{
auto lock = getLock();
@ -1801,13 +1801,13 @@ DiskSpace::DiskSelector & Context::getDiskSelector() const
constexpr auto config_name = "storage_configuration.disks";
auto & config = getConfigRef();
shared->merge_tree_disk_selector = std::make_unique<DiskSpace::DiskSelector>(config, config_name, getPath());
shared->merge_tree_disk_selector = std::make_unique<DiskSelector>(config, config_name, *this);
}
return *shared->merge_tree_disk_selector;
}
const DiskSpace::StoragePolicyPtr & Context::getStoragePolicy(const String & name) const
const StoragePolicyPtr & Context::getStoragePolicy(const String & name) const
{
auto lock = getLock();
@ -1817,7 +1817,7 @@ const DiskSpace::StoragePolicyPtr & Context::getStoragePolicy(const String & nam
}
DiskSpace::StoragePolicySelector & Context::getStoragePolicySelector() const
StoragePolicySelector & Context::getStoragePolicySelector() const
{
auto lock = getLock();
@ -1826,7 +1826,7 @@ DiskSpace::StoragePolicySelector & Context::getStoragePolicySelector() const
constexpr auto config_name = "storage_configuration.policies";
auto & config = getConfigRef();
shared->merge_tree_storage_policy_selector = std::make_unique<DiskSpace::StoragePolicySelector>(config, config_name, getDiskSelector());
shared->merge_tree_storage_policy_selector = std::make_unique<StoragePolicySelector>(config, config_name, getDiskSelector());
}
return *shared->merge_tree_storage_policy_selector;
}

View File

@ -13,7 +13,7 @@
#include <Common/ThreadPool.h>
#include "config_core.h"
#include <Storages/IStorage_fwd.h>
#include <Common/DiskSpaceMonitor.h>
#include <Disks/DiskSpaceMonitor.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
@ -495,15 +495,16 @@ public:
/// Lets you select the compression codec according to the conditions described in the configuration file.
std::shared_ptr<ICompressionCodec> chooseCompressionCodec(size_t part_size, double part_size_ratio) const;
DiskSpace::DiskSelector & getDiskSelector() const;
DiskSelector & getDiskSelector() const;
/// Provides storage disks
const DiskSpace::DiskPtr & getDisk(const String & name) const;
const DiskPtr & getDisk(const String & name) const;
const DiskPtr & getDefaultDisk() const { return getDisk("default"); }
DiskSpace::StoragePolicySelector & getStoragePolicySelector() const;
StoragePolicySelector & getStoragePolicySelector() const;
/// Provides storage politics schemes
const DiskSpace::StoragePolicyPtr & getStoragePolicy(const String &name) const;
const StoragePolicyPtr & getStoragePolicy(const String &name) const;
/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;

View File

@ -426,7 +426,7 @@ public:
virtual Names getSortingKeyColumns() const { return {}; }
/// Returns storage policy if storage supports it
virtual DiskSpace::StoragePolicyPtr getStoragePolicy() const { return {}; }
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
/** If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
*/

View File

@ -226,7 +226,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
auto server_protocol_version = in.getResponseCookie("server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
DiskSpace::ReservationPtr reservation;
ReservationPtr reservation;
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
{
size_t sum_files_size;
@ -247,7 +247,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
const DiskSpace::ReservationPtr reservation,
const ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in)
{

View File

@ -70,7 +70,7 @@ private:
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
const DiskSpace::ReservationPtr reservation,
const ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in);
MergeTreeData & data;

View File

@ -756,7 +756,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
LOG_DEBUG(log, "Loading data parts");
const auto settings = getSettings();
std::vector<std::pair<String, DiskSpace::DiskPtr>> part_names_with_disks;
std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
Strings part_file_names;
Poco::DirectoryIterator end;
@ -1178,32 +1178,25 @@ void MergeTreeData::rename(
const String & /*new_path_to_db*/, const String & new_database_name,
const String & new_table_name, TableStructureWriteLockHolder &)
{
auto old_file_db_name = escapeForFileName(database_name);
auto new_file_db_name = escapeForFileName(new_database_name);
auto old_file_table_name = escapeForFileName(table_name);
auto new_file_table_name = escapeForFileName(new_table_name);
auto old_table_path = "data/" + escapeForFileName(database_name) + '/' + escapeForFileName(table_name) + '/';
auto new_db_path = "data/" + escapeForFileName(new_database_name) + '/';
auto new_table_path = new_db_path + escapeForFileName(new_table_name) + '/';
auto disks = storage_policy->getDisks();
for (const auto & disk : disks)
{
auto new_full_path = disk->getClickHouseDataPath() + new_file_db_name + '/' + new_file_table_name + '/';
auto new_table_file = disk->file(new_table_path);
if (Poco::File{new_full_path}.exists())
throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS};
if (new_table_file->exists())
throw Exception{"Target path already exists: " + new_table_file->fullPath(), ErrorCodes::DIRECTORY_ALREADY_EXISTS};
}
for (const auto & disk : disks)
{
auto full_path = disk->getClickHouseDataPath() + old_file_db_name + '/' + old_file_table_name + '/';
auto new_db_path = disk->getClickHouseDataPath() + new_file_db_name + '/';
disk->file(new_db_path)->createDirectory();
Poco::File db_file{new_db_path};
if (!db_file.exists())
db_file.createDirectory();
auto new_full_path = new_db_path + new_file_table_name + '/';
Poco::File{full_path}.renameTo(new_full_path);
disk->file(old_table_path)->moveTo(new_table_path);
}
global_context.dropCaches();
@ -2595,7 +2588,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskSpace::DiskPtr & disk, const String & relative_path)
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this, disk, Poco::Path(relative_path).getFileName());
part->relative_path = relative_path;
@ -2753,7 +2746,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
throw Exception(no_parts_to_move_message, ErrorCodes::UNKNOWN_DISK);
}
if (!movePartsToSpace(parts, std::static_pointer_cast<const DiskSpace::Space>(disk)))
if (!movePartsToSpace(parts, std::static_pointer_cast<const Space>(disk)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
}
@ -2805,7 +2798,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception(no_parts_to_move_message, ErrorCodes::UNKNOWN_DISK);
}
if (!movePartsToSpace(parts, std::static_pointer_cast<const DiskSpace::Space>(volume)))
if (!movePartsToSpace(parts, std::static_pointer_cast<const Space>(volume)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
}
@ -3003,7 +2996,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
{
String source_dir = "detached/";
std::map<String, DiskSpace::DiskPtr> name_to_disk;
std::map<String, DiskPtr> name_to_disk;
/// Let's compose a list of parts that should be added.
if (attach_part)
{
@ -3020,7 +3013,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
ActiveDataPartSet active_parts(format_version);
const auto disks = storage_policy->getDisks();
for (const DiskSpace::DiskPtr & disk : disks)
for (const DiskPtr & disk : disks)
{
const auto full_path = getFullPathOnDisk(disk);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
@ -3076,7 +3069,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
return loaded_parts;
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size)
ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size)
{
constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; /// 1MB
@ -3296,16 +3289,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
return dst_data_part;
}
String MergeTreeData::getFullPathOnDisk(const DiskSpace::DiskPtr & disk) const
String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
{
return disk->getClickHouseDataPath() + escapeForFileName(database_name) + '/' + escapeForFileName(table_name) + '/';
return disk->getPath() + "data/" + escapeForFileName(database_name) + '/' + escapeForFileName(table_name) + '/';
}
DiskSpace::DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & relative_path) const
DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & relative_path) const
{
const auto disks = storage_policy->getDisks();
for (const DiskSpace::DiskPtr & disk : disks)
for (const DiskPtr & disk : disks)
{
const auto disk_path = getFullPathOnDisk(disk);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(disk_path + relative_path); it != Poco::DirectoryIterator(); ++it)
@ -3486,7 +3479,7 @@ bool MergeTreeData::selectPartsAndMove()
return moveParts(std::move(moving_tagger));
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space)
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)
{
if (parts_mover.moves_blocker.isCancelled())
return false;
@ -3524,7 +3517,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove()
return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this);
}
MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const DataPartsVector & parts, DiskSpace::SpacePtr space)
MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const DataPartsVector & parts, SpacePtr space)
{
std::lock_guard moving_lock(moving_parts_mutex);

View File

@ -19,7 +19,7 @@
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Interpreters/PartLog.h>
#include <Common/DiskSpaceMonitor.h>
#include <Disks/DiskSpaceMonitor.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -360,7 +360,7 @@ public:
Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); }
Names getSortingKeyColumns() const override { return sorting_key_columns; }
DiskSpace::StoragePolicyPtr getStoragePolicy() const override { return storage_policy; }
StoragePolicyPtr getStoragePolicy() const override { return storage_policy; }
bool supportsPrewhere() const override { return true; }
bool supportsSampling() const override { return sample_by_ast != nullptr; }
@ -590,7 +590,7 @@ public:
bool hasAnyColumnTTL() const { return !ttl_entries_by_name.empty(); }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const DiskSpace::DiskPtr & disk, const String & relative_path);
MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path);
void loadPartAndFixMetadata(MutableDataPartPtr part);
/** Create local backup (snapshot) for parts with specified prefix.
@ -658,27 +658,27 @@ public:
}
/// Get table path on disk
String getFullPathOnDisk(const DiskSpace::DiskPtr & disk) const;
String getFullPathOnDisk(const DiskPtr & disk) const;
/// Get disk for part. Looping through directories on FS because some parts maybe not in
/// active dataparts set (detached)
DiskSpace::DiskPtr getDiskForPart(const String & part_name, const String & relative_path = "") const;
DiskPtr getDiskForPart(const String & part_name, const String & relative_path = "") const;
/// Get full path for part. Uses getDiskForPart and returns the full path
String getFullPathForPart(const String & part_name, const String & relative_path = "") const;
Strings getDataPaths() const override;
using PathWithDisk = std::pair<String, DiskSpace::DiskPtr>;
using PathWithDisk = std::pair<String, DiskPtr>;
using PathsWithDisks = std::vector<PathWithDisk>;
PathsWithDisks getDataPathsWithDisks() const;
/// Reserves space at least 1MB
DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size);
ReservationPtr reserveSpace(UInt64 expected_size);
/// Choose disk with max available free space
/// Reserves 0 bytes
DiskSpace::ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); }
ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); }
MergeTreeDataFormatVersion format_version;
@ -781,7 +781,7 @@ protected:
/// Use get and set to receive readonly versions.
MultiVersion<MergeTreeSettings> storage_settings;
DiskSpace::StoragePolicyPtr storage_policy;
StoragePolicyPtr storage_policy;
/// Work with data parts
@ -934,7 +934,7 @@ protected:
virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
/// Moves part to specified space, used in ALTER ... MOVE ... queries
bool movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space);
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
/// Selects parts for move and moves them, used in background process
bool selectPartsAndMove();
@ -960,7 +960,7 @@ private:
CurrentlyMovingPartsTagger selectPartsForMove();
/// Check selected parts for movements. Used by ALTER ... MOVE queries.
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, DiskSpace::SpacePtr space);
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
};
}

View File

@ -3,7 +3,7 @@
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Common/DiskSpaceMonitor.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h>
#include <Storages/MergeTree/TTLMergeSelector.h>
@ -533,7 +533,7 @@ public:
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part, MergeList::Entry & merge_entry, TableStructureReadLockHolder &,
time_t time_of_merge, DiskSpace::Reservation * space_reservation, bool deduplicate, bool force_ttl)
time_t time_of_merge, const ReservationPtr & space_reservation, bool deduplicate, bool force_ttl)
{
static const String TMP_PREFIX = "tmp_merge_";
@ -907,7 +907,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const std::vector<MutationCommand> & commands,
MergeListEntry & merge_entry,
const Context & context,
DiskSpace::Reservation * space_reservation,
const ReservationPtr & space_reservation,
TableStructureReadLockHolder & table_lock_holder)
{
auto check_not_cancelled = [&]()

View File

@ -97,14 +97,14 @@ public:
MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part,
MergeListEntry & merge_entry, TableStructureReadLockHolder & table_lock_holder, time_t time_of_merge,
DiskSpace::Reservation * disk_reservation, bool deduplication, bool force_ttl);
const ReservationPtr & disk_reservation, bool deduplication, bool force_ttl);
/// Mutate a single data part with the specified commands. Will create and return a temporary part.
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const std::vector<MutationCommand> & commands,
MergeListEntry & merge_entry, const Context & context,
DiskSpace::Reservation * disk_reservation,
const ReservationPtr & disk_reservation,
TableStructureReadLockHolder & table_lock_holder);
MergeTreeData::DataPartPtr renameMergedTemporaryPart(

View File

@ -145,7 +145,7 @@ void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
}
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_)
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const DiskPtr & disk_, const String & name_)
: storage(storage_)
, disk(disk_)
, name(name_)
@ -156,7 +156,7 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const DiskSpace::
MergeTreeDataPart::MergeTreeDataPart(
const MergeTreeData & storage_,
const DiskSpace::DiskPtr & disk_,
const DiskPtr & disk_,
const String & name_,
const MergeTreePartInfo & info_)
: storage(storage_)
@ -545,7 +545,7 @@ void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
localBackup(src, dst, 0);
}
void MergeTreeDataPart::makeCloneOnDiskDetached(const DiskSpace::ReservationPtr & reservation) const
void MergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const
{
auto & reserved_disk = reservation->getDisk();
if (reserved_disk->getName() == disk->getName())

View File

@ -31,9 +31,9 @@ struct MergeTreeDataPart
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPart(const MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_, const MergeTreePartInfo & info_);
MergeTreeDataPart(const MergeTreeData & storage_, const DiskPtr & disk_, const String & name_, const MergeTreePartInfo & info_);
MergeTreeDataPart(MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_);
MergeTreeDataPart(MergeTreeData & storage_, const DiskPtr & disk_, const String & name_);
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
@ -72,7 +72,7 @@ struct MergeTreeDataPart
const MergeTreeData & storage;
DiskSpace::DiskPtr disk;
DiskPtr disk;
String name;
MergeTreePartInfo info;
@ -259,7 +259,7 @@ struct MergeTreeDataPart
void makeCloneInDetached(const String & prefix) const;
/// Makes full clone of part in detached/ on another disk
void makeCloneOnDiskDetached(const DiskSpace::ReservationPtr & reservation) const;
void makeCloneOnDiskDetached(const ReservationPtr & reservation) const;
/// Populates columns_to_size map (compressed size).
void accumulateColumnSizes(ColumnToSize & column_to_size) const;

View File

@ -81,7 +81,7 @@ bool MergeTreePartsMover::selectPartsForMove(
if (data_parts.empty())
return false;
std::unordered_map<DiskSpace::DiskPtr, LargestPartsWithRequiredSize> need_to_move;
std::unordered_map<DiskPtr, LargestPartsWithRequiredSize> need_to_move;
const auto & policy = data->getStoragePolicy();
const auto & volumes = policy->getVolumes();

View File

@ -1,11 +1,11 @@
#pragma once
#include <functional>
#include <vector>
#include <optional>
#include <vector>
#include <Disks/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Common/ActionBlocker.h>
#include <Common/DiskSpaceMonitor.h>
namespace DB
{
@ -16,9 +16,9 @@ namespace DB
struct MergeTreeMoveEntry
{
std::shared_ptr<const MergeTreeDataPart> part;
DiskSpace::ReservationPtr reserved_space;
ReservationPtr reserved_space;
MergeTreeMoveEntry(const std::shared_ptr<const MergeTreeDataPart> & part_, DiskSpace::ReservationPtr reservation_)
MergeTreeMoveEntry(const std::shared_ptr<const MergeTreeDataPart> & part_, ReservationPtr reservation_)
: part(part_), reserved_space(std::move(reservation_))
{
}

View File

@ -18,7 +18,7 @@
#include <Storages/AlterCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Common/DiskSpaceMonitor.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Poco/DirectoryIterator.h>
@ -334,7 +334,7 @@ void StorageMergeTree::alter(
struct CurrentlyMergingPartsTagger
{
FutureMergedMutatedPart future_part;
DiskSpace::ReservationPtr reserved_space;
ReservationPtr reserved_space;
bool is_successful = false;
String exception_message;
@ -613,7 +613,7 @@ bool StorageMergeTree::merge(
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, *merge_entry, table_lock_holder, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate, force_ttl);
merging_tagger->reserved_space, deduplicate, force_ttl);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
removeEmptyColumnsFromPart(new_part);
@ -734,7 +734,7 @@ bool StorageMergeTree::tryMutatePart()
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context,
tagger->reserved_space.get(), table_lock_holder);
tagger->reserved_space, table_lock_holder);
renameTempPartAndReplace(new_part);
tagger->is_successful = true;

View File

@ -12,7 +12,7 @@
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Common/DiskSpaceMonitor.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Common/SimpleIncrement.h>
#include <Core/BackgroundSchedulePool.h>

View File

@ -1,14 +1,14 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Common/FieldVisitors.h>
#include <Common/Macros.h>
#include <Common/formatReadable.h>
#include <Common/escapeForFileName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/thread_local_rng.h>
#include <Common/ThreadPool.h>
#include <Common/DiskSpaceMonitor.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/escapeForFileName.h>
#include <Common/formatReadable.h>
#include <Common/thread_local_rng.h>
#include <Common/typeid_cast.h>
#include <Storages/AlterCommands.h>
#include <Storages/PartitionCommands.h>
@ -1006,7 +1006,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
/// Can throw an exception.
DiskSpace::ReservationPtr reserved_space = reserveSpace(estimated_space_for_merge);
ReservationPtr reserved_space = reserveSpace(estimated_space_for_merge);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
@ -1034,7 +1034,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
try
{
part = merger_mutator.mergePartsToTemporaryPart(
future_merged_part, *merge_entry, table_lock, entry.create_time, reserved_space.get(), entry.deduplicate, entry.force_ttl);
future_merged_part, *merge_entry, table_lock, entry.create_time, reserved_space, entry.deduplicate, entry.force_ttl);
merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);
removeEmptyColumnsFromPart(part);
@ -1140,7 +1140,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception.
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
DiskSpace::ReservationPtr reserved_space = source_part->disk->reserve(estimated_space_for_result);
ReservationPtr reserved_space = source_part->disk->reserve(estimated_space_for_result);
if (!reserved_space)
{
throw Exception("Cannot reserve " + formatReadableSizeWithBinarySuffix(estimated_space_for_result) + ", not enough space",
@ -1171,7 +1171,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context, reserved_space.get(), table_lock);
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context, reserved_space, table_lock);
renameTempPartAndReplace(new_part, nullptr, &transaction);
try