From e391ce813d53bc65cee18d86d4af2dd73cc91f74 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 14 Aug 2019 18:20:52 +0300 Subject: [PATCH] refactoring --- .../MergeTree => Common}/DiskSpaceMonitor.cpp | 216 +++++++- dbms/src/Common/DiskSpaceMonitor.h | 371 +++++++++++++ dbms/src/Common/Exception.cpp | 8 +- dbms/src/Interpreters/Context.h | 2 +- .../src/Storages/MergeTree/DiskSpaceMonitor.h | 503 ------------------ dbms/src/Storages/MergeTree/MergeTreeData.cpp | 9 +- dbms/src/Storages/MergeTree/MergeTreeData.h | 2 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 7 +- .../MergeTree/MergeTreeDataMergerMutator.h | 2 +- dbms/src/Storages/StorageMergeTree.cpp | 2 +- dbms/src/Storages/StorageMergeTree.h | 2 +- .../Storages/System/StorageSystemDisks.cpp | 18 +- dbms/src/Storages/System/StorageSystemDisks.h | 12 +- 13 files changed, 606 insertions(+), 548 deletions(-) rename dbms/src/{Storages/MergeTree => Common}/DiskSpaceMonitor.cpp (64%) create mode 100644 dbms/src/Common/DiskSpaceMonitor.h delete mode 100644 dbms/src/Storages/MergeTree/DiskSpaceMonitor.h diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp b/dbms/src/Common/DiskSpaceMonitor.cpp similarity index 64% rename from dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp rename to dbms/src/Common/DiskSpaceMonitor.cpp index e7f89400264..c31c76a4ded 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp +++ b/dbms/src/Common/DiskSpaceMonitor.cpp @@ -1,4 +1,4 @@ -#include +#include #include @@ -11,14 +11,152 @@ namespace DB namespace DiskSpace { + std::mutex Disk::mutex; +std::filesystem::path getMountPoint(std::filesystem::path absolute_path) +{ + if (absolute_path.is_relative()) + throw Exception("Path is relative. It's a bug.", ErrorCodes::LOGICAL_ERROR); + + absolute_path = std::filesystem::canonical(absolute_path); + + const auto get_device_id = [](const std::filesystem::path & p) + { + struct stat st; + if (stat(p.c_str(), &st)) + throwFromErrnoWithPath("Cannot stat " + p.string(), p.string(), ErrorCodes::SYSTEM_ERROR); + return st.st_dev; + }; + + /// If /some/path/to/dir/ and /some/path/to/ have different device id, + /// then device which contains /some/path/to/dir/filename is mounted to /some/path/to/dir/ + auto device_id = get_device_id(absolute_path); + while (absolute_path.has_relative_path()) + { + auto parent = absolute_path.parent_path(); + auto parent_device_id = get_device_id(parent); + if (device_id != parent_device_id) + return absolute_path; + absolute_path = parent; + device_id = parent_device_id; + } + + return absolute_path; +} + + /// Returns name of filesystem mounted to mount_point +#if !defined(__linux__) +[[noreturn]] +#endif +std::string getFilesystemName([[maybe_unused]] const std::string & mount_point) +{ +#if defined(__linux__) + auto mounted_filesystems = setmntent("/etc/mtab", "r"); + if (!mounted_filesystems) + throw DB::Exception("Cannot open /etc/mtab to get name of filesystem", ErrorCodes::SYSTEM_ERROR); + mntent fs_info; + constexpr size_t buf_size = 4096; /// The same as buffer used for getmntent in glibc. It can happen that it's not enough + char buf[buf_size]; + while (getmntent_r(mounted_filesystems, &fs_info, buf, buf_size) && fs_info.mnt_dir != mount_point) + ; + endmntent(mounted_filesystems); + if (fs_info.mnt_dir != mount_point) + throw DB::Exception("Cannot find name of filesystem by mount point " + mount_point, ErrorCodes::SYSTEM_ERROR); + return fs_info.mnt_fsname; +#else + throw DB::Exception("Supported on linux only", ErrorCodes::NOT_IMPLEMENTED); +#endif +} + ReservationPtr Disk::reserve(UInt64 bytes) const { return std::make_unique(bytes, std::static_pointer_cast(shared_from_this())); } +bool Disk::tryReserve(UInt64 bytes) const +{ + auto available_space = getAvailableSpace(); + std::lock_guard lock(mutex); + if (bytes == 0) + { + LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Reserve 0 bytes on disk " << name); + ++reservation_count; + return true; + } + available_space -= std::min(available_space, reserved_bytes); + LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Unreserved " << available_space << " , to reserve " << bytes << " on disk " << name); + if (available_space >= bytes) + { + ++reservation_count; + reserved_bytes += bytes; + return true; + } + return false; +} + +UInt64 Disk::getUnreservedSpace() const +{ + auto available_space = getSpaceInformation().getAvailableSpace(); + std::lock_guard lock(mutex); + available_space -= std::min(available_space, reserved_bytes); + return available_space; +} + +UInt64 Disk::Stat::getTotalSpace() const +{ + UInt64 total_size = fs.f_blocks * fs.f_bsize; + if (total_size < keep_free_space_bytes) + return 0; + return total_size - keep_free_space_bytes; +} + +UInt64 Disk::Stat::getAvailableSpace() const +{ + UInt64 total_size = fs.f_bfree * fs.f_bsize; + if (total_size < keep_free_space_bytes) + return 0; + return total_size - keep_free_space_bytes; +} + +Reservation::~Reservation() +{ + try + { + std::lock_guard lock(Disk::mutex); + if (disk_ptr->reserved_bytes < size) + { + disk_ptr->reserved_bytes = 0; + LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservations size; it's a bug"); + } + else + { + disk_ptr->reserved_bytes -= size; + } + + if (disk_ptr->reservation_count == 0) + { + LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservation count; it's a bug"); + } + else + { + --disk_ptr->reservation_count; + } + } + catch (...) + { + tryLogCurrentException("~DiskSpaceMonitor"); + } +} + +void Reservation::update(UInt64 new_size) +{ + std::lock_guard lock(Disk::mutex); + disk_ptr->reserved_bytes -= size; + size = new_size; + disk_ptr->reserved_bytes += size; +} DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, String default_path) { @@ -202,6 +340,24 @@ UInt64 Volume::getMaxUnreservedFreeSpace() const } +Volume & Volume::operator=(Volume && other) noexcept +{ + disks = std::move(other.disks); + max_data_part_size = other.max_data_part_size; + last_used.store(0, std::memory_order_relaxed); + name = std::move(other.name); + return *this; +} + +Volume & Volume::operator=(const Volume & other) +{ + disks = other.disks; + max_data_part_size = other.max_data_part_size; + last_used.store(0, std::memory_order_relaxed); + name = other.name; + return *this; +} + StoragePolicy::StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks) : name(std::move(name_)) { @@ -242,6 +398,23 @@ StoragePolicy::StoragePolicy(String name_, const Poco::Util::AbstractConfigurati } +StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_) + : volumes(std::move(volumes_)), + name(std::move(name_)), + move_factor(move_factor_) +{ + if (volumes.empty()) + throw Exception("StoragePolicy must contain at least one Volume", ErrorCodes::UNKNOWN_POLICY); + + for (size_t i = 0; i != volumes.size(); ++i) + { + if (volumes_names.find(volumes[i]->getName()) != volumes_names.end()) + throw Exception("Volumes names must be unique (" + volumes[i]->getName() + " duplicated)", ErrorCodes::UNKNOWN_POLICY); + volumes_names[volumes[i]->getName()] = i; + } +} + + Disks StoragePolicy::getDisks() const { Disks res; @@ -328,30 +501,51 @@ ReservationPtr StoragePolicy::reserveOnMaxDiskWithoutReservation() const } -StoragePolicySelector::StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks) +size_t StoragePolicy::getVolumePriorityByDisk(const DiskPtr & disk_ptr) const +{ + for (size_t i = 0; i != volumes.size(); ++i) + { + const auto & volume = volumes[i]; + for (auto && disk : volume->disks) + { + if (disk->getName() == disk_ptr->getName()) + return i; + } + } + throw Exception("No disk " + disk_ptr->getName() + " in Policy " + name, ErrorCodes::UNKNOWN_DISK); +} + + +StoragePolicySelector::StoragePolicySelector( + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + const DiskSelector & disks) { Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_prefix, keys); - Logger * logger = &Logger::get("StoragePolicySelector"); - for (const auto & name : keys) { if (!std::all_of(name.begin(), name.end(), isWordCharASCII)) - throw Exception("StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + throw Exception("StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + policies.emplace(name, std::make_shared(name, config, config_prefix + "." + name, disks)); - LOG_INFO(logger, "Storage policy " << name << " loaded"); + LOG_INFO(&Logger::get("StoragePolicySelector"), "Storage policy " << name << " loaded"); } constexpr auto default_storage_policy_name = "default"; constexpr auto default_volume_name = "default"; constexpr auto default_disk_name = "default"; if (policies.find(default_storage_policy_name) == policies.end()) - policies.emplace(default_storage_policy_name, - std::make_shared(default_storage_policy_name, - Volumes{std::make_shared(default_volume_name, std::vector{disks[default_disk_name]}, - std::numeric_limits::max())}, - 0.0)); + { + auto default_volume = std::make_shared( + default_volume_name, + std::vector{disks[default_disk_name]}, + std::numeric_limits::max()); + auto default_policy = std::make_shared(default_storage_policy_name, Volumes{default_volume}, 0.0); + policies.emplace(default_storage_policy_name, default_policy); + } } diff --git a/dbms/src/Common/DiskSpaceMonitor.h b/dbms/src/Common/DiskSpaceMonitor.h new file mode 100644 index 00000000000..40b0410205c --- /dev/null +++ b/dbms/src/Common/DiskSpaceMonitor.h @@ -0,0 +1,371 @@ +#pragma once + +#include +#include +#include +#include +#include +#if defined(__linux__) +#include +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace CurrentMetrics +{ + extern const Metric DiskSpaceReservedForMerge; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int CANNOT_STATVFS; + extern const int NOT_ENOUGH_SPACE; + extern const int SYSTEM_ERROR; + extern const int UNKNOWN_ELEMENT_IN_CONFIG; + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int UNKNOWN_POLICY; + extern const int UNKNOWN_DISK; +} + +namespace DiskSpace +{ + + +class Reservation; +using ReservationPtr = std::unique_ptr; + +/// Returns mount point of filesystem where absoulte_path (must exist) is located +std::filesystem::path getMountPoint(std::filesystem::path absolute_path); + +/// Returns name of filesystem mounted to mount_point +#if !defined(__linux__) +[[noreturn]] +#endif +std::string getFilesystemName([[maybe_unused]] const std::string & mount_point); + +inline struct statvfs getStatVFS(const std::string & path) +{ + struct statvfs fs; + if (statvfs(path.c_str(), &fs) != 0) + throwFromErrnoWithPath( + "Could not calculate available disk space (statvfs)", path, ErrorCodes::CANNOT_STATVFS); + return fs; +} + +/** + * Provide interface for reservation + */ +class Space : public std::enable_shared_from_this +{ +public: + Space() = default; + + virtual ~Space() = default; + Space(const Space &) = default; + Space(Space &&) = default; + Space& operator=(const Space &) = default; + Space& operator=(Space &&) = default; + + virtual ReservationPtr reserve(UInt64 bytes) const = 0; + + virtual const String & getName() const = 0; +}; + +using SpacePtr = std::shared_ptr; + + +/** Disk - Smallest space unit. + * path - Path to space. Ends with / + * name - Unique key using for disk space reservation. + */ +class Disk : public Space +{ +public: + friend class Reservation; + + class Stat + { + struct statvfs fs = {}; + UInt64 keep_free_space_bytes; + + public: + explicit Stat(const Disk & disk) + { + if (statvfs(disk.path.c_str(), &fs) != 0) + throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS); + keep_free_space_bytes = disk.keep_free_space_bytes; + } + + UInt64 getTotalSpace() const; + + UInt64 getAvailableSpace() const; + }; + + Disk(String name_, String path_, UInt64 keep_free_space_bytes_) + : name(std::move(name_)), + path(std::move(path_)), + keep_free_space_bytes(keep_free_space_bytes_) + { + if (name.back() != '/') + throw Exception("Disk path must ends with '/'", ErrorCodes::LOGICAL_ERROR); + } + + ~Disk() override = default; + + ReservationPtr reserve(UInt64 bytes) const override; + + bool tryReserve(UInt64 bytes) const; + + const String & getName() const override + { + return name; + } + + const String & getPath() const + { + return path; + } + + UInt64 getKeepingFreeSpace() const + { + return keep_free_space_bytes; + } + + auto getSpaceInformation() const + { + return Stat(*this); + } + + UInt64 getTotalSpace() const + { + return getSpaceInformation().getTotalSpace(); + } + + UInt64 getAvailableSpace() const + { + return getSpaceInformation().getAvailableSpace(); + } + + UInt64 getUnreservedSpace() const; + +private: + String name; + String path; + UInt64 keep_free_space_bytes; + + /// Real reservation data + static std::mutex mutex; + mutable UInt64 reserved_bytes = 0; + mutable UInt64 reservation_count = 0; +}; + +/// It is not possible to change disk runtime. +using DiskPtr = std::shared_ptr; +using Disks = std::vector; + + +/** + * Contain information about disk and size of reservation + * Unreserve on destroy. + */ +class Reservation final : private boost::noncopyable +{ +public: + Reservation(UInt64 size_, DiskPtr disk_ptr_) + : size(size_) + , metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size) + , disk_ptr(std::move(disk_ptr_)) + , valid(disk_ptr->tryReserve(size)) + { + } + + ~Reservation(); + + /// Change amount of reserved space. When new_size is greater than before, + /// availability of free space is not checked. + void update(UInt64 new_size); + + UInt64 getSize() const + { + return size; + } + + const DiskPtr & getDisk() const + { + return disk_ptr; + } + + bool isValid() const { return valid; } + +private: + UInt64 size; + CurrentMetrics::Increment metric_increment; + DiskPtr disk_ptr; + bool valid = false; +}; + + +class DiskSelector +{ +public: + DiskSelector(const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, String default_path); + + const DiskPtr & operator[](const String & name) const; + + bool has(const String & name) const; + + void add(const DiskPtr & disk); + + const auto & getDisksMap() const { return disks; } + +private: + std::map disks; +}; + + +/** Volume. + * Contain set of "equivalent" disks + */ +class Volume : public Space +{ + friend class StoragePolicy; + +public: + Volume(String name_, std::vector disks_, UInt64 max_data_part_size_) + : max_data_part_size(max_data_part_size_) + , disks(std::move(disks_)) + , name(std::move(name_)) + { + } + + Volume(String name_, const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, const DiskSelector & disk_selector); + + Volume(const Volume & other) + : Space(other) + , max_data_part_size(other.max_data_part_size) + , disks(other.disks) + , name(other.name) + { + } + + Volume(Volume && other) noexcept + : max_data_part_size(other.max_data_part_size) + , disks(std::move(other.disks)) + , name(std::move(other.name)) + { + } + + Volume & operator=(const Volume & other); + + Volume & operator=(Volume && other) noexcept; + + /// Returns valid reservation or null + ReservationPtr reserve(UInt64 bytes) const override; + + UInt64 getMaxUnreservedFreeSpace() const; + + const String & getName() const override { return name; } + + UInt64 max_data_part_size = std::numeric_limits::max(); + + Disks disks; + +private: + mutable std::atomic last_used = 0; + String name; +}; + +using VolumePtr = std::shared_ptr; +using Volumes = std::vector; + + +/** + * Contain ordered set of Volumes + */ +class StoragePolicy : public Space +{ +public: + + StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, const DiskSelector & disks); + + StoragePolicy(String name_, Volumes volumes_, double move_factor_); + + /// Returns disks ordered by volumes priority + Disks getDisks() const; + + DiskPtr getAnyDisk() const; + + DiskPtr getDiskByName(const String & disk_name) const; + + UInt64 getMaxUnreservedFreeSpace() const; + + const String & getName() const override { return name; } + + /// Returns valid reservation or null + ReservationPtr reserve(UInt64 bytes) const override; + + /// Reserve space on any volume with priority > min_volume_index + ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const; + + size_t getVolumePriorityByDisk(const DiskPtr & disk_ptr) const; + + /// Reserves 0 bytes on disk with max available space + /// Do not use this function when it is possible to predict size!!! + ReservationPtr reserveOnMaxDiskWithoutReservation() const; + + const auto & getVolumes() const { return volumes; } + + auto getMoveFactor() const { return move_factor; } + + VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); } + + VolumePtr getVolumeByName(const String & volume_name) const + { + auto it = volumes_names.find(volume_name); + if (it == volumes_names.end()) + return {}; + return getVolume(it->second); + } + +private: + Volumes volumes; + String name; + std::map volumes_names; + double move_factor; +}; + +using StoragePolicyPtr = std::shared_ptr; + +class StoragePolicySelector +{ +public: + StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, const DiskSelector & disks); + + const StoragePolicyPtr & operator[](const String & name) const; + + const auto & getPoliciesMap() const { return policies; } + +private: + std::map policies; +}; + +} + +} diff --git a/dbms/src/Common/Exception.cpp b/dbms/src/Common/Exception.cpp index 411d4678ed3..e49600a789e 100644 --- a/dbms/src/Common/Exception.cpp +++ b/dbms/src/Common/Exception.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include namespace DB @@ -84,16 +84,16 @@ void getNoSpaceLeftInfoMessage(std::filesystem::path path, std::string & msg) while (!std::filesystem::exists(path) && path.has_relative_path()) path = path.parent_path(); - auto fs = DiskSpace::Disk::getStatVFS(path); + auto fs = DiskSpace::getStatVFS(path); msg += "\nTotal space: " + formatReadableSizeWithBinarySuffix(fs.f_blocks * fs.f_bsize) + "\nAvailable space: " + formatReadableSizeWithBinarySuffix(fs.f_bavail * fs.f_bsize) + "\nTotal inodes: " + formatReadableQuantity(fs.f_files) + "\nAvailable inodes: " + formatReadableQuantity(fs.f_favail); - auto mount_point = DiskSpace::Reservation::getMountPoint(path).string(); + auto mount_point = DiskSpace::getMountPoint(path).string(); msg += "\nMount point: " + mount_point; #if defined(__linux__) - msg += "\nFilesystem: " + DiskSpace::Reservation::getFilesystemName(mount_point); + msg += "\nFilesystem: " + DiskSpace::getFilesystemName(mount_point); #endif } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index b6157b64ff4..e5db63467d2 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -12,7 +12,7 @@ #include #include "config_core.h" #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h deleted file mode 100644 index 5aa4fed43f7..00000000000 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h +++ /dev/null @@ -1,503 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#if defined(__linux__) -#include -#include -#endif -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -namespace CurrentMetrics -{ - extern const Metric DiskSpaceReservedForMerge; -} - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int CANNOT_STATVFS; - extern const int NOT_ENOUGH_SPACE; - extern const int SYSTEM_ERROR; - extern const int UNKNOWN_ELEMENT_IN_CONFIG; - extern const int EXCESSIVE_ELEMENT_IN_CONFIG; - extern const int UNKNOWN_POLICY; - extern const int UNKNOWN_DISK; -} - -namespace DiskSpace -{ - - -class Reservation; -using ReservationPtr = std::unique_ptr; - - -/** Space. - * Provide interface for reservation - */ -class Space : public std::enable_shared_from_this -{ -public: - Space() = default; - - virtual ~Space() = default; - Space(const Space &) = default; - Space(Space &&) = default; - Space& operator=(const Space &) = default; - Space& operator=(Space &&) = default; - - virtual ReservationPtr reserve(UInt64 bytes) const = 0; - - virtual const String & getName() const = 0; -}; - -using SpacePtr = std::shared_ptr; - - -/** Disk - Smallest space unit. - * path - Path to space. Ends with / - * name - Unique key using for disk space reservation. - */ -class Disk : public Space -{ -public: - friend class Reservation; - - class Stat - { - struct statvfs fs = {}; - UInt64 keep_free_space_bytes; - - public: - explicit Stat(const Disk & disk) - { - if (statvfs(disk.path.c_str(), &fs) != 0) - throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS); - keep_free_space_bytes = disk.keep_free_space_bytes; - } - - UInt64 getTotalSpace() - { - UInt64 size = fs.f_blocks * fs.f_bsize; - if (size < keep_free_space_bytes) - return 0; - return size - keep_free_space_bytes; - } - - UInt64 getAvailableSpace() - { - UInt64 size = fs.f_bfree * fs.f_bsize; - if (size < keep_free_space_bytes) - return 0; - return size - keep_free_space_bytes; - } - }; - - Disk(String name_, String path_, UInt64 keep_free_space_bytes_) - : name(std::move(name_)), - path(std::move(path_)), - keep_free_space_bytes(keep_free_space_bytes_) - { - } - - ~Disk() override = default; - - ReservationPtr reserve(UInt64 bytes) const override; - - bool try_reserve(UInt64 bytes) const - { - auto available_space = getAvailableSpace(); - std::lock_guard lock(mutex); - if (bytes == 0) - { - LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Reserve 0 bytes on disk " << name); - ++reservation_count; - return true; - } - available_space -= std::min(available_space, reserved_bytes); - LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Unreserved " << available_space << " , to reserve " << bytes << " on disk " << name); - if (available_space >= bytes) - { - ++reservation_count; - reserved_bytes += bytes; - return true; - } - return false; - } - - const String & getName() const override - { - return name; - } - - const String & getPath() const - { - return path; - } - - inline static struct statvfs getStatVFS(const std::string & path) - { - struct statvfs fs; - if (statvfs(path.c_str(), &fs) != 0) - throwFromErrnoWithPath("Could not calculate available disk space (statvfs)", path, - ErrorCodes::CANNOT_STATVFS); - return fs; - } - - UInt64 getKeepingFreeSpace() const - { - return keep_free_space_bytes; - } - - auto getSpaceInformation() const - { - return Stat(*this); - } - - UInt64 getTotalSpace() const - { - return getSpaceInformation().getTotalSpace(); - } - - UInt64 getAvailableSpace() const - { - return getSpaceInformation().getAvailableSpace(); - } - - UInt64 getUnreservedSpace() const - { - auto available_space = getSpaceInformation().getAvailableSpace(); - std::lock_guard lock(mutex); - available_space -= std::min(available_space, reserved_bytes); - return available_space; - } - -private: - String name; - String path; - UInt64 keep_free_space_bytes; - - /// Real reservation data - static std::mutex mutex; - mutable UInt64 reserved_bytes = 0; - mutable UInt64 reservation_count = 0; -}; - -/// It is not possible to change disk runtime. -using DiskPtr = std::shared_ptr; -using Disks = std::vector; - - -/** Reservationcontain - * Contain information about disk and size of reservation - * Unreserve on destroy - */ -class Reservation : private boost::noncopyable -{ -public: - Reservation(UInt64 size_, DiskPtr disk_ptr_) - : size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), disk_ptr(std::move(disk_ptr_)), valid(disk_ptr->try_reserve(size)) - { - } - - ~Reservation() - { - try - { - std::lock_guard lock(Disk::mutex); - if (disk_ptr->reserved_bytes < size) - { - disk_ptr->reserved_bytes = 0; - LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservations size; it's a bug"); - } - else - { - disk_ptr->reserved_bytes -= size; - } - - if (disk_ptr->reservation_count == 0) - { - LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservation count; it's a bug"); - } - else - { - --disk_ptr->reservation_count; - } - } - catch (...) - { - tryLogCurrentException("~DiskSpaceMonitor"); - } - } - - /// Change amount of reserved space. When new_size is greater than before, availability of free space is not checked. - void update(UInt64 new_size) - { - std::lock_guard lock(Disk::mutex); - disk_ptr->reserved_bytes -= size; - size = new_size; - disk_ptr->reserved_bytes += size; - } - - UInt64 getSize() const - { - return size; - } - - /// Returns mount point of filesystem where absoulte_path (must exist) is located - static std::filesystem::path getMountPoint(std::filesystem::path absolute_path) - { - if (absolute_path.is_relative()) - throw Exception("Path is relative. It's a bug.", ErrorCodes::LOGICAL_ERROR); - - absolute_path = std::filesystem::canonical(absolute_path); - - const auto get_device_id = [](const std::filesystem::path & p) - { - struct stat st; - if (stat(p.c_str(), &st)) - throwFromErrnoWithPath("Cannot stat " + p.string(), p.string(), ErrorCodes::SYSTEM_ERROR); - return st.st_dev; - }; - - /// If /some/path/to/dir/ and /some/path/to/ have different device id, - /// then device which contains /some/path/to/dir/filename is mounted to /some/path/to/dir/ - auto device_id = get_device_id(absolute_path); - while (absolute_path.has_relative_path()) - { - auto parent = absolute_path.parent_path(); - auto parent_device_id = get_device_id(parent); - if (device_id != parent_device_id) - return absolute_path; - absolute_path = parent; - device_id = parent_device_id; - } - - return absolute_path; - } - - /// Returns name of filesystem mounted to mount_point -#if !defined(__linux__) -[[noreturn]] -#endif - static std::string getFilesystemName([[maybe_unused]] const std::string & mount_point) - { -#if defined(__linux__) - auto mounted_filesystems = setmntent("/etc/mtab", "r"); - if (!mounted_filesystems) - throw DB::Exception("Cannot open /etc/mtab to get name of filesystem", ErrorCodes::SYSTEM_ERROR); - mntent fs_info; - constexpr size_t buf_size = 4096; /// The same as buffer used for getmntent in glibc. It can happen that it's not enough - char buf[buf_size]; - while (getmntent_r(mounted_filesystems, &fs_info, buf, buf_size) && fs_info.mnt_dir != mount_point) - ; - endmntent(mounted_filesystems); - if (fs_info.mnt_dir != mount_point) - throw DB::Exception("Cannot find name of filesystem by mount point " + mount_point, ErrorCodes::SYSTEM_ERROR); - return fs_info.mnt_fsname; -#else - throw DB::Exception("Supported on linux only", ErrorCodes::NOT_IMPLEMENTED); -#endif - } - - const DiskPtr & getDisk() const - { - return disk_ptr; - } - - bool isValid() const { return valid; } - -private: - UInt64 size; - CurrentMetrics::Increment metric_increment; - DiskPtr disk_ptr; - bool valid = false; -}; - - -class DiskSelector -{ -public: - DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, String default_path); - - const DiskPtr & operator[](const String & name) const; - - bool has(const String & name) const; - - void add(const DiskPtr & disk); - - const auto & getDisksMap() const { return disks; } - -private: - std::map disks; -}; - - -/** Volume. - * Contain set of "equivalent" disks - */ -class Volume : public Space -{ - friend class StoragePolicy; - -public: - Volume(String name_, std::vector disks_, UInt64 max_data_part_size_) - : max_data_part_size(max_data_part_size_), disks(std::move(disks_)), name(std::move(name_)) { } - - Volume(String name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector); - - Volume(const Volume & other) : Space(other), max_data_part_size(other.max_data_part_size), disks(other.disks), name(other.name) { } - - Volume & operator=(const Volume & other) - { - disks = other.disks; - max_data_part_size = other.max_data_part_size; - last_used.store(0, std::memory_order_relaxed); - name = other.name; - return *this; - } - - Volume(Volume && other) noexcept - : max_data_part_size(other.max_data_part_size), disks(std::move(other.disks)), name(std::move(other.name)) { } - - Volume & operator=(Volume && other) noexcept - { - disks = std::move(other.disks); - max_data_part_size = other.max_data_part_size; - last_used.store(0, std::memory_order_relaxed); - name = std::move(other.name); - return *this; - } - - /// Returns valid reservation or null - ReservationPtr reserve(UInt64 bytes) const override; - - UInt64 getMaxUnreservedFreeSpace() const; - - const String & getName() const override { return name; } - - UInt64 max_data_part_size = std::numeric_limits::max(); - - Disks disks; - -private: - mutable std::atomic last_used = 0; - String name; -}; - -using VolumePtr = std::shared_ptr; -using Volumes = std::vector; - - -/** Policy. - * Contain ordered set of Volumes - */ -class StoragePolicy : public Space -{ -public: - - StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, - const DiskSelector & disks); - - StoragePolicy(String name_, Volumes volumes_, double move_factor_) : volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_) - { - if (volumes.empty()) - throw Exception("StoragePolicy must contain at least one Volume", ErrorCodes::UNKNOWN_POLICY); - - for (size_t i = 0; i != volumes.size(); ++i) - { - if (volumes_names.find(volumes[i]->getName()) != volumes_names.end()) - throw Exception("Volumes names must be unique (" + volumes[i]->getName() + " duplicated)", ErrorCodes::UNKNOWN_POLICY); - volumes_names[volumes[i]->getName()] = i; - } - } - - /// Returns disks ordered by volumes priority - Disks getDisks() const; - - DiskPtr getAnyDisk() const; - - DiskPtr getDiskByName(const String & disk_name) const; - - UInt64 getMaxUnreservedFreeSpace() const; - - const String & getName() const override { return name; } - - /// Returns valid reservation or null - ReservationPtr reserve(UInt64 bytes) const override; - - /// Reserve space on any volume with priority > min_volume_index - ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const; - - auto getVolumePriorityByDisk(const DiskPtr & disk_ptr) const - { - for (size_t i = 0; i != volumes.size(); ++i) - { - const auto & volume = volumes[i]; - for (auto && disk : volume->disks) - { - if (disk->getName() == disk_ptr->getName()) - return i; - } - } - throw Exception("No disk " + disk_ptr->getName() + " in Policy " + name, ErrorCodes::UNKNOWN_DISK); - } - - /// Reserves 0 bytes on disk with max available space - /// Do not use this function when it is possible to predict size!!! - ReservationPtr reserveOnMaxDiskWithoutReservation() const; - - const auto & getVolumes() const { return volumes; } - - auto getMoveFactor() const { return move_factor; } - - VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); } - - VolumePtr getVolumeByName(const String & volume_name) const - { - auto it = volumes_names.find(volume_name); - if (it == volumes_names.end()) - return {}; - return getVolume(it->second); - } - -private: - Volumes volumes; - String name; - std::map volumes_names; - double move_factor; -}; - -using StoragePolicyPtr = std::shared_ptr; - -class StoragePolicySelector -{ -public: - StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks); - - const StoragePolicyPtr & operator[](const String & name) const; - - const auto & getPoliciesMap() const { return policies; } - -private: - std::map policies; -}; - -} - -} diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 1181a6f6393..25da0694287 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2585,19 +2585,16 @@ void MergeTreeData::movePartitionToSpace(MergeTreeData::DataPartPtr part, DiskSp if (Poco::File(path_to_clone + part->name).exists()) throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - LOG_DEBUG(log, "Cloning part " << part->getFullPath() << " to " << getFullPathOnDisk(reservation->getDisk())); part->makeCloneOnDiskDetached(reservation); - MergeTreeData::MutableDataPartPtr copied_part = std::make_shared(*this, - reservation->getDisk(), - part->name); + MergeTreeData::MutableDataPartPtr copied_part = + std::make_shared(*this, reservation->getDisk(), part->name); + copied_part->relative_path = "detached/" + part->name; copied_part->loadColumnsChecksumsIndexes(require_part_metadata, true); - if (Poco::File(path_to_clone + part->name).exists()) - throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); copied_part->renameTo(part->name); auto old_active_part = swapActivePart(copied_part); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f903daebace..f6fba3502bf 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -16,7 +16,7 @@ #include #include #include -#include +#include #include #include diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 3bafd66227b..632caa1a070 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -315,9 +315,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMove( MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector(); if (data_parts.empty()) - { return false; - } std::unordered_map> need_to_move; const auto & policy = data.getStoragePolicy(); @@ -330,7 +328,8 @@ bool MergeTreeDataMergerMutator::selectPartsToMove( } /// Do not check last volume - for (size_t i = 0; i != volumes.size() - 1; ++i) { + for (size_t i = 0; i != volumes.size() - 1; ++i) + { for (const auto & disk : volumes[i]->disks) { auto space_information = disk->getSpaceInformation(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index cf6cd6312cb..b52a57f8c6f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 1483b570231..3d1ecf753db 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index b67ef456b2f..4fb6078eefa 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include diff --git a/dbms/src/Storages/System/StorageSystemDisks.cpp b/dbms/src/Storages/System/StorageSystemDisks.cpp index f0c4077bb01..df45705c742 100644 --- a/dbms/src/Storages/System/StorageSystemDisks.cpp +++ b/dbms/src/Storages/System/StorageSystemDisks.cpp @@ -10,7 +10,7 @@ namespace ErrorCodes StorageSystemDisks::StorageSystemDisks(const std::string & name_) - : name(name_) + : name(name_) { setColumns(ColumnsDescription( { @@ -23,12 +23,12 @@ StorageSystemDisks::StorageSystemDisks(const std::string & name_) } BlockInputStreams StorageSystemDisks::read( - const Names & column_names, - const SelectQueryInfo & /*query_info*/, - const Context & context, - QueryProcessingStage::Enum /*processed_stage*/, - const size_t /*max_block_size*/, - const unsigned /*num_streams*/) + const Names & column_names, + const SelectQueryInfo & /*query_info*/, + const Context & context, + QueryProcessingStage::Enum /*processed_stage*/, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) { check(column_names); @@ -40,9 +40,9 @@ BlockInputStreams StorageSystemDisks::read( const auto & disk_selector = context.getDiskSelector(); - for (const auto & [name, disk_ptr] : disk_selector.getDisksMap()) + for (const auto & [disk_name, disk_ptr] : disk_selector.getDisksMap()) { - col_name_mut->insert(name); + col_name_mut->insert(disk_name); col_path_mut->insert(disk_ptr->getPath()); col_free_mut->insert(disk_ptr->getAvailableSpace()); col_total_mut->insert(disk_ptr->getTotalSpace()); diff --git a/dbms/src/Storages/System/StorageSystemDisks.h b/dbms/src/Storages/System/StorageSystemDisks.h index 0cb99c559b0..531a0ad81fd 100644 --- a/dbms/src/Storages/System/StorageSystemDisks.h +++ b/dbms/src/Storages/System/StorageSystemDisks.h @@ -22,12 +22,12 @@ public: std::string getDatabaseName() const override { return "system"; } BlockInputStreams read( - const Names & column_names, - const SelectQueryInfo & query_info, - const Context & context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - unsigned num_streams) override; + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; private: const std::string name;