diff --git a/programs/server/HTTPHandler.cpp b/programs/server/HTTPHandler.cpp index 6c01a28c5a5..1e2695709f9 100644 --- a/programs/server/HTTPHandler.cpp +++ b/programs/server/HTTPHandler.cpp @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/DataStreams/MergeSortingBlockInputStream.cpp b/src/DataStreams/MergeSortingBlockInputStream.cpp index 0ac919f7a98..5b26f4822b9 100644 --- a/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include namespace ProfileEvents diff --git a/src/DataStreams/MergeSortingBlockInputStream.h b/src/DataStreams/MergeSortingBlockInputStream.h index 89b16688e9e..ee03f202be0 100644 --- a/src/DataStreams/MergeSortingBlockInputStream.h +++ b/src/DataStreams/MergeSortingBlockInputStream.h @@ -18,8 +18,8 @@ namespace DB struct TemporaryFileStream; -class Volume; -using VolumePtr = std::shared_ptr; +class IVolume; +using VolumePtr = std::shared_ptr; namespace ErrorCodes { diff --git a/src/Disks/DiskSelector.cpp b/src/Disks/DiskSelector.cpp new file mode 100644 index 00000000000..b464d8652d6 --- /dev/null +++ b/src/Disks/DiskSelector.cpp @@ -0,0 +1,116 @@ +#include "DiskLocal.h" +#include "DiskSelector.h" + +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int UNKNOWN_DISK; +} + +DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) +{ + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + + auto & factory = DiskFactory::instance(); + + constexpr auto default_disk_name = "default"; + bool has_default_disk = false; + for (const auto & disk_name : keys) + { + if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII)) + throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + if (disk_name == default_disk_name) + has_default_disk = true; + + auto disk_config_prefix = config_prefix + "." + disk_name; + + disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context)); + } + if (!has_default_disk) + disks.emplace(default_disk_name, std::make_shared(default_disk_name, context.getPath(), 0)); +} + + +DiskSelectorPtr DiskSelector::updateFromConfig( + const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const +{ + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + + auto & factory = DiskFactory::instance(); + + std::shared_ptr result = std::make_shared(*this); + + constexpr auto default_disk_name = "default"; + std::set old_disks_minus_new_disks; + for (const auto & [disk_name, _] : result->disks) + { + old_disks_minus_new_disks.insert(disk_name); + } + + for (const auto & disk_name : keys) + { + if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII)) + throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + if (result->disks.count(disk_name) == 0) + { + auto disk_config_prefix = config_prefix + "." + disk_name; + result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context)); + } + else + { + old_disks_minus_new_disks.erase(disk_name); + + /// TODO: Ideally ClickHouse shall complain if disk has changed, but + /// implementing that may appear as not trivial task. + } + } + + old_disks_minus_new_disks.erase(default_disk_name); + + if (!old_disks_minus_new_disks.empty()) + { + WriteBufferFromOwnString warning; + if (old_disks_minus_new_disks.size() == 1) + writeString("Disk ", warning); + else + writeString("Disks ", warning); + + int index = 0; + for (const String & name : old_disks_minus_new_disks) + { + if (index++ > 0) + writeString(", ", warning); + writeBackQuotedString(name, warning); + } + + writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning); + LOG_WARNING(&Logger::get("DiskSelector"), warning.str()); + } + + return result; +} + + +DiskPtr DiskSelector::get(const String & name) const +{ + auto it = disks.find(name); + if (it == disks.end()) + throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_DISK); + return it->second; +} + +} diff --git a/src/Disks/DiskSelector.h b/src/Disks/DiskSelector.h new file mode 100644 index 00000000000..430ba97c003 --- /dev/null +++ b/src/Disks/DiskSelector.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include + +#include + +#include + +namespace DB +{ +class DiskSelector; +using DiskSelectorPtr = std::shared_ptr; + +/// Parse .xml configuration and store information about disks +/// Mostly used for introspection. +class DiskSelector +{ +public: + DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context); + DiskSelector(const DiskSelector & from) : disks(from.disks) { } + + DiskSelectorPtr + updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const; + + /// Get disk by name + DiskPtr get(const String & name) const; + + /// Get all disks with names + const auto & getDisksMap() const { return disks; } + +private: + std::map disks; +}; + +} diff --git a/src/Disks/IVolume.cpp b/src/Disks/IVolume.cpp new file mode 100644 index 00000000000..6a122a3e3b2 --- /dev/null +++ b/src/Disks/IVolume.cpp @@ -0,0 +1,43 @@ +#include "IVolume.h" + +#include +#include + +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; +} + +IVolume::IVolume( + String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disk_selector) + : name(std::move(name_)) +{ + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + + for (const auto & disk : keys) + { + if (startsWith(disk, "disk")) + { + auto disk_name = config.getString(config_prefix + "." + disk); + disks.push_back(disk_selector->get(disk_name)); + } + } + + if (disks.empty()) + throw Exception("Volume must contain at least one disk.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); +} + +UInt64 IVolume::getMaxUnreservedFreeSpace() const +{ + UInt64 res = 0; + for (const auto & disk : disks) + res = std::max(res, disk->getUnreservedSpace()); + return res; +} + +} diff --git a/src/Disks/IVolume.h b/src/Disks/IVolume.h new file mode 100644 index 00000000000..504dded7696 --- /dev/null +++ b/src/Disks/IVolume.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ + +/** + * Disks group by some (user) criteria. For example, + * - VolumeJBOD("slow_disks", [d1, d2], 100) + * - VolumeJBOD("fast_disks", [d3, d4], 200) + * + * Here VolumeJBOD is one of implementations of IVolume. + * + * Different of implementations of this interface implement different reserve behaviour — + * VolumeJBOD reserves space on the next disk after the last used, other future implementations + * will reserve, for example, equal spaces on all disks. + */ +class IVolume : public Space +{ +public: + IVolume(String name_, Disks disks_): disks(std::move(disks_)), name(std::move(name_)) + { + } + + IVolume( + String name_, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disk_selector + ); + + virtual ReservationPtr reserve(UInt64 bytes) override = 0; + + /// Volume name from config + const String & getName() const override { return name; } + + /// Return biggest unreserved space across all disks + UInt64 getMaxUnreservedFreeSpace() const; + + Disks disks; +protected: + const String name; +}; + +using VolumePtr = std::shared_ptr; +using Volumes = std::vector; + +} diff --git a/src/Disks/DiskSpaceMonitor.cpp b/src/Disks/StoragePolicy.cpp similarity index 55% rename from src/Disks/DiskSpaceMonitor.cpp rename to src/Disks/StoragePolicy.cpp index b7fe05b72ab..8518e1516db 100644 --- a/src/Disks/DiskSpaceMonitor.cpp +++ b/src/Disks/StoragePolicy.cpp @@ -1,4 +1,4 @@ -#include "DiskSpaceMonitor.h" +#include "StoragePolicy.h" #include "DiskFactory.h" #include "DiskLocal.h" @@ -23,204 +23,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } - -DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) -{ - Poco::Util::AbstractConfiguration::Keys keys; - config.keys(config_prefix, keys); - - auto & factory = DiskFactory::instance(); - - constexpr auto default_disk_name = "default"; - bool has_default_disk = false; - for (const auto & disk_name : keys) - { - if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII)) - throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - - if (disk_name == default_disk_name) - has_default_disk = true; - - auto disk_config_prefix = config_prefix + "." + disk_name; - - disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context)); - } - if (!has_default_disk) - disks.emplace(default_disk_name, std::make_shared(default_disk_name, context.getPath(), 0)); -} - - -DiskSelectorPtr DiskSelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const -{ - Poco::Util::AbstractConfiguration::Keys keys; - config.keys(config_prefix, keys); - - auto & factory = DiskFactory::instance(); - - std::shared_ptr result = std::make_shared(*this); - - constexpr auto default_disk_name = "default"; - std::set old_disks_minus_new_disks; - for (const auto & [disk_name, _] : result->disks) - { - old_disks_minus_new_disks.insert(disk_name); - } - - for (const auto & disk_name : keys) - { - if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII)) - throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - - if (result->disks.count(disk_name) == 0) - { - auto disk_config_prefix = config_prefix + "." + disk_name; - result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context)); - } - else - { - old_disks_minus_new_disks.erase(disk_name); - - /// TODO: Ideally ClickHouse shall complain if disk has changed, but - /// implementing that may appear as not trivial task. - } - } - - old_disks_minus_new_disks.erase(default_disk_name); - - if (!old_disks_minus_new_disks.empty()) - { - WriteBufferFromOwnString warning; - if (old_disks_minus_new_disks.size() == 1) - writeString("Disk ", warning); - else - writeString("Disks ", warning); - - int index = 0; - for (const String & name : old_disks_minus_new_disks) - { - if (index++ > 0) - writeString(", ", warning); - writeBackQuotedString(name, warning); - } - - writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning); - LOG_WARNING(&Logger::get("DiskSelector"), warning.str()); - } - - return result; -} - - -DiskPtr DiskSelector::get(const String & name) const -{ - auto it = disks.find(name); - if (it == disks.end()) - throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_DISK); - return it->second; -} - - -Volume::Volume( - String name_, - const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, - DiskSelectorPtr disk_selector) - : name(std::move(name_)) -{ - Poco::Util::AbstractConfiguration::Keys keys; - config.keys(config_prefix, keys); - - Logger * logger = &Logger::get("StorageConfiguration"); - - for (const auto & disk : keys) - { - if (startsWith(disk, "disk")) - { - auto disk_name = config.getString(config_prefix + "." + disk); - disks.push_back(disk_selector->get(disk_name)); - } - } - - if (disks.empty()) - throw Exception("Volume must contain at least one disk.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - - auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes"); - auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio"); - if (has_max_bytes && has_max_ratio) - throw Exception( - "Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified.", - ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - - if (has_max_bytes) - { - max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes", 0); - } - else if (has_max_ratio) - { - auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio"); - if (ratio < 0) - throw Exception("'max_data_part_size_ratio' have to be not less then 0.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - UInt64 sum_size = 0; - std::vector sizes; - for (const auto & disk : disks) - { - sizes.push_back(disk->getTotalSpace()); - sum_size += sizes.back(); - } - max_data_part_size = static_cast(sum_size * ratio / disks.size()); - for (size_t i = 0; i < disks.size(); ++i) - if (sizes[i] < max_data_part_size) - LOG_WARNING( - logger, - "Disk " << backQuote(disks[i]->getName()) << " on volume " << backQuote(config_prefix) << " have not enough space (" - << formatReadableSizeWithBinarySuffix(sizes[i]) << ") for containing part the size of max_data_part_size (" - << formatReadableSizeWithBinarySuffix(max_data_part_size) << ")"); - } - static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u; - if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE) - LOG_WARNING( - logger, - "Volume " << backQuote(name) << " max_data_part_size is too low (" << formatReadableSizeWithBinarySuffix(max_data_part_size) - << " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")"); -} - -DiskPtr Volume::getNextDisk() -{ - size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed); - size_t index = start_from % disks.size(); - return disks[index]; -} - -ReservationPtr Volume::reserve(UInt64 bytes) -{ - /// This volume can not store files which size greater than max_data_part_size - - if (max_data_part_size != 0 && bytes > max_data_part_size) - return {}; - - size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed); - size_t disks_num = disks.size(); - for (size_t i = 0; i < disks_num; ++i) - { - size_t index = (start_from + i) % disks_num; - - auto reservation = disks[index]->reserve(bytes); - - if (reservation) - return reservation; - } - return {}; -} - - -UInt64 Volume::getMaxUnreservedFreeSpace() const -{ - UInt64 res = 0; - for (const auto & disk : disks) - res = std::max(res, disk->getUnreservedSpace()); - return res; -} - StoragePolicy::StoragePolicy( String name_, const Poco::Util::AbstractConfiguration & config, @@ -240,7 +42,7 @@ StoragePolicy::StoragePolicy( if (!std::all_of(attr_name.begin(), attr_name.end(), isWordCharASCII)) throw Exception( "Volume name can contain only alphanumeric and '_' (" + attr_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - volumes.push_back(std::make_shared(attr_name, config, volumes_prefix + "." + attr_name, disks)); + volumes.push_back(std::make_shared(attr_name, config, volumes_prefix + "." + attr_name, disks)); if (volumes_names.find(attr_name) != volumes_names.end()) throw Exception("Volumes names must be unique (" + attr_name + " duplicated)", ErrorCodes::UNKNOWN_POLICY); volumes_names[attr_name] = volumes.size() - 1; @@ -269,7 +71,7 @@ StoragePolicy::StoragePolicy( } -StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_) +StoragePolicy::StoragePolicy(String name_, VolumesJBOD volumes_, double move_factor_) : volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_) { if (volumes.empty()) @@ -453,9 +255,9 @@ StoragePolicySelector::StoragePolicySelector( /// Add default policy if it's not specified explicetly if (policies.find(default_storage_policy_name) == policies.end()) { - auto default_volume = std::make_shared(default_volume_name, std::vector{disks->get(default_disk_name)}, 0); + auto default_volume = std::make_shared(default_volume_name, std::vector{disks->get(default_disk_name)}, 0); - auto default_policy = std::make_shared(default_storage_policy_name, Volumes{default_volume}, 0.0); + auto default_policy = std::make_shared(default_storage_policy_name, VolumesJBOD{default_volume}, 0.0); policies.emplace(default_storage_policy_name, default_policy); } } diff --git a/src/Disks/DiskSpaceMonitor.h b/src/Disks/StoragePolicy.h similarity index 56% rename from src/Disks/DiskSpaceMonitor.h rename to src/Disks/StoragePolicy.h index e3382dc03d1..a41a62f1223 100644 --- a/src/Disks/DiskSpaceMonitor.h +++ b/src/Disks/StoragePolicy.h @@ -1,6 +1,9 @@ #pragma once +#include #include +#include +#include #include #include #include @@ -17,82 +20,6 @@ namespace DB { -class DiskSelector; -using DiskSelectorPtr = std::shared_ptr; - -/// Parse .xml configuration and store information about disks -/// Mostly used for introspection. -class DiskSelector -{ -public: - DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context); - DiskSelector(const DiskSelector & from): disks(from.disks) {} - - DiskSelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const; - - /// Get disk by name - DiskPtr get(const String & name) const; - - /// Get all disks with names - const auto & getDisksMap() const { return disks; } - -private: - std::map disks; -}; - -/** - * Disks group by some (user) criteria. For example, - * - Volume("slow_disks", [d1, d2], 100) - * - Volume("fast_disks", [d3, d4], 200) - * Cannot store parts larger than max_data_part_size. - */ -class Volume : public Space -{ - friend class StoragePolicy; - -public: - Volume(String name_, std::vector disks_, UInt64 max_data_part_size_) - : max_data_part_size(max_data_part_size_), disks(std::move(disks_)), name(std::move(name_)) - { - } - - Volume( - String name_, - const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, - DiskSelectorPtr disk_selector); - - /// Next disk (round-robin) - /// - /// - Used with policy for temporary data - /// - Ignores all limitations - /// - Shares last access with reserve() - DiskPtr getNextDisk(); - - /// Uses Round-robin to choose disk for reservation. - /// Returns valid reservation or nullptr if there is no space left on any disk. - ReservationPtr reserve(UInt64 bytes) override; - - /// Return biggest unreserved space across all disks - UInt64 getMaxUnreservedFreeSpace() const; - - /// Volume name from config - const String & getName() const override { return name; } - - /// Max size of reservation - UInt64 max_data_part_size = 0; - - /// Disks in volume - Disks disks; - -private: - mutable std::atomic last_used = 0; - const String name; -}; - -using VolumePtr = std::shared_ptr; -using Volumes = std::vector; - class StoragePolicy; using StoragePolicyPtr = std::shared_ptr; @@ -105,7 +32,7 @@ class StoragePolicy public: StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks); - StoragePolicy(String name_, Volumes volumes_, double move_factor_); + StoragePolicy(String name_, VolumesJBOD volumes_, double move_factor_); bool isDefaultPolicy() const; @@ -137,16 +64,16 @@ public: /// Do not use this function when it is possible to predict size. ReservationPtr makeEmptyReservationOnLargestDisk() const; - const Volumes & getVolumes() const { return volumes; } + const VolumesJBOD & getVolumes() const { return volumes; } /// Returns number [0., 1.] -- fraction of free space on disk /// which should be kept with help of background moves double getMoveFactor() const { return move_factor; } /// Get volume by index from storage_policy - VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); } + VolumeJBODPtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumeJBODPtr()); } - VolumePtr getVolumeByName(const String & volume_name) const + VolumeJBODPtr getVolumeByName(const String & volume_name) const { auto it = volumes_names.find(volume_name); if (it == volumes_names.end()) @@ -158,7 +85,7 @@ public: void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const; private: - Volumes volumes; + VolumesJBOD volumes; const String name; std::map volumes_names; diff --git a/src/Disks/VolumeJBOD.cpp b/src/Disks/VolumeJBOD.cpp new file mode 100644 index 00000000000..79320aee7f2 --- /dev/null +++ b/src/Disks/VolumeJBOD.cpp @@ -0,0 +1,93 @@ +#include "VolumeJBOD.h" + +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; +} + +VolumeJBOD::VolumeJBOD( + String name_, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disk_selector +) : IVolume(name_, config, config_prefix, disk_selector) +{ + Logger * logger = &Logger::get("StorageConfiguration"); + + auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes"); + auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio"); + if (has_max_bytes && has_max_ratio) + throw Exception( + "Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified.", + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + if (has_max_bytes) + { + max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes", 0); + } + else if (has_max_ratio) + { + auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio"); + if (ratio < 0) + throw Exception("'max_data_part_size_ratio' have to be not less then 0.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + UInt64 sum_size = 0; + std::vector sizes; + for (const auto & disk : disks) + { + sizes.push_back(disk->getTotalSpace()); + sum_size += sizes.back(); + } + max_data_part_size = static_cast(sum_size * ratio / disks.size()); + for (size_t i = 0; i < disks.size(); ++i) + if (sizes[i] < max_data_part_size) + LOG_WARNING( + logger, + "Disk " << backQuote(disks[i]->getName()) << " on volume " << backQuote(config_prefix) << " have not enough space (" + << formatReadableSizeWithBinarySuffix(sizes[i]) << ") for containing part the size of max_data_part_size (" + << formatReadableSizeWithBinarySuffix(max_data_part_size) << ")"); + } + static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u; + if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE) + LOG_WARNING( + logger, + "Volume " << backQuote(name) << " max_data_part_size is too low (" << formatReadableSizeWithBinarySuffix(max_data_part_size) + << " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")"); +} + +DiskPtr VolumeJBOD::getNextDisk() +{ + size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed); + size_t index = start_from % disks.size(); + return disks[index]; +} + +ReservationPtr VolumeJBOD::reserve(UInt64 bytes) +{ + /// This volume can not store files which size greater than max_data_part_size + + if (max_data_part_size != 0 && bytes > max_data_part_size) + return {}; + + size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed); + size_t disks_num = disks.size(); + for (size_t i = 0; i < disks_num; ++i) + { + size_t index = (start_from + i) % disks_num; + + auto reservation = disks[index]->reserve(bytes); + + if (reservation) + return reservation; + } + return {}; +} + +} diff --git a/src/Disks/VolumeJBOD.h b/src/Disks/VolumeJBOD.h new file mode 100644 index 00000000000..7399d3cf065 --- /dev/null +++ b/src/Disks/VolumeJBOD.h @@ -0,0 +1,48 @@ +#pragma once + +#include + +namespace DB +{ + +/** + * Implements something similar to JBOD (https://en.wikipedia.org/wiki/Non-RAID_drive_architectures#JBOD). + * When MergeTree engine wants to write part — it requests VolumeJBOD to reserve space on the next available + * disk and then writes new part to that disk. + */ +class VolumeJBOD : public IVolume +{ +public: + VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_) + : IVolume(name_, disks_), max_data_part_size(max_data_part_size_) + { + } + + VolumeJBOD( + String name_, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disk_selector + ); + + /// Next disk (round-robin) + /// + /// - Used with policy for temporary data + /// - Ignores all limitations + /// - Shares last access with reserve() + DiskPtr getNextDisk(); + + /// Uses Round-robin to choose disk for reservation. + /// Returns valid reservation or nullptr if there is no space left on any disk. + ReservationPtr reserve(UInt64 bytes) override; + + /// Max size of reservation + UInt64 max_data_part_size = 0; +private: + mutable std::atomic last_used = 0; +}; + +using VolumeJBODPtr = std::shared_ptr; +using VolumesJBOD = std::vector; + +} diff --git a/src/Disks/ya.make b/src/Disks/ya.make index 353a0473e56..a14024e7af3 100644 --- a/src/Disks/ya.make +++ b/src/Disks/ya.make @@ -8,9 +8,12 @@ SRCS( DiskFactory.cpp DiskLocal.cpp DiskMemory.cpp - DiskSpaceMonitor.cpp + DiskSelector.cpp IDisk.cpp + IVolume.cpp registerDisks.cpp + StoragePolicy.cpp + VolumeJBOD.cpp ) END() diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 8a85ddb52f0..6066d364b7a 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -27,7 +27,7 @@ #include #include #include -#include +#include #if !defined(ARCADIA_BUILD) # include diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index dc833456e14..1fa0ff282d6 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -45,8 +45,8 @@ namespace ErrorCodes class IBlockOutputStream; -class Volume; -using VolumePtr = std::shared_ptr; +class VolumeJBOD; +using VolumeJBODPtr = std::shared_ptr; /** Different data structures that can be used for aggregation * For efficiency, the aggregation data itself is put into the pool. @@ -878,7 +878,7 @@ public: /// Return empty result when aggregating without keys on empty set. bool empty_result_for_aggregation_by_empty_set; - VolumePtr tmp_volume; + VolumeJBODPtr tmp_volume; /// Settings is used to determine cache size. No threads are created. size_t max_threads; @@ -891,7 +891,7 @@ public: size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, size_t max_bytes_before_external_group_by_, bool empty_result_for_aggregation_by_empty_set_, - VolumePtr tmp_volume_, size_t max_threads_, + VolumeJBODPtr tmp_volume_, size_t max_threads_, size_t min_free_disk_space_) : src_header(src_header_), keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 59861631544..f5651a1ab77 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -311,7 +311,7 @@ struct ContextShared ConfigurationPtr config; /// Global configuration settings. String tmp_path; /// Path to the temporary files that occur when processing the request. - mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request. + mutable VolumeJBODPtr tmp_volume; /// Volume for the the temporary files that occur when processing the request. mutable std::optional embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. mutable std::optional external_dictionaries_loader; @@ -538,7 +538,7 @@ String Context::getDictionariesLibPath() const return shared->dictionaries_lib_path; } -VolumePtr Context::getTemporaryVolume() const +VolumeJBODPtr Context::getTemporaryVolume() const { auto lock = getLock(); return shared->tmp_volume; @@ -563,7 +563,7 @@ void Context::setPath(const String & path) shared->dictionaries_lib_path = shared->path + "dictionaries_lib/"; } -VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name) +VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & policy_name) { auto lock = getLock(); @@ -574,7 +574,7 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic shared->tmp_path += '/'; auto disk = std::make_shared("_tmp_default", shared->tmp_path, 0); - shared->tmp_volume = std::make_shared("_tmp_default", std::vector{disk}, 0); + shared->tmp_volume = std::make_shared("_tmp_default", std::vector{disk}, 0); } else { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 86c6ebfa0f6..0d2b3cdb5af 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -102,8 +102,8 @@ using StoragePolicySelectorPtr = std::shared_ptr; class IOutputFormat; using OutputFormatPtr = std::shared_ptr; -class Volume; -using VolumePtr = std::shared_ptr; +class VolumeJBOD; +using VolumeJBODPtr = std::shared_ptr; struct NamedSession; @@ -221,14 +221,14 @@ public: String getUserFilesPath() const; String getDictionariesLibPath() const; - VolumePtr getTemporaryVolume() const; + VolumeJBODPtr getTemporaryVolume() const; void setPath(const String & path); void setFlagsPath(const String & path); void setUserFilesPath(const String & path); void setDictionariesLibPath(const String & path); - VolumePtr setTemporaryStorage(const String & path, const String & policy_name = ""); + VolumeJBODPtr setTemporaryStorage(const String & path, const String & policy_name = ""); using ConfigurationPtr = Poco::AutoPtr; diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index 625f7d1bcbe..385c67292ad 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include namespace DB { diff --git a/src/Interpreters/SortedBlocksWriter.h b/src/Interpreters/SortedBlocksWriter.h index 6a28e60e553..073c37c4ff9 100644 --- a/src/Interpreters/SortedBlocksWriter.h +++ b/src/Interpreters/SortedBlocksWriter.h @@ -16,8 +16,8 @@ class TableJoin; class MergeJoinCursor; struct MergeJoinEqualRange; -class Volume; -using VolumePtr = std::shared_ptr; +class VolumeJBOD; +using VolumeJBODPtr = std::shared_ptr; struct SortedBlocksWriter { @@ -57,7 +57,7 @@ struct SortedBlocksWriter std::mutex insert_mutex; std::condition_variable flush_condvar; const SizeLimits & size_limits; - VolumePtr volume; + VolumeJBODPtr volume; const Block & sample_block; const SortDescription & sort_description; Blocks & inserted_blocks; @@ -70,7 +70,7 @@ struct SortedBlocksWriter size_t flush_number = 0; size_t flush_inflight = 0; - SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_, const SortDescription & description, + SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_, const SortDescription & description, Blocks & blocks, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_) : size_limits(size_limits_) , volume(volume_) diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp index eea1c576f38..02eb321fdd8 100644 --- a/src/Interpreters/TableJoin.cpp +++ b/src/Interpreters/TableJoin.cpp @@ -13,7 +13,7 @@ namespace DB { -TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_) +TableJoin::TableJoin(const Settings & settings, VolumeJBODPtr tmp_volume_) : size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode}) , default_max_bytes(settings.default_max_bytes_in_join) , join_use_nulls(settings.join_use_nulls) diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 61f6d122ec1..0eeb724ab7b 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -24,8 +24,8 @@ class DictionaryReader; struct Settings; -class Volume; -using VolumePtr = std::shared_ptr; +class VolumeJBOD; +using VolumeJBODPtr = std::shared_ptr; class TableJoin { @@ -70,11 +70,11 @@ class TableJoin /// Original name -> name. Only ranamed columns. std::unordered_map renames; - VolumePtr tmp_volume; + VolumeJBODPtr tmp_volume; public: TableJoin() = default; - TableJoin(const Settings &, VolumePtr tmp_volume); + TableJoin(const Settings &, VolumeJBODPtr tmp_volume); /// for StorageJoin TableJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness, @@ -96,7 +96,7 @@ public: ASTTableJoin::Strictness strictness() const { return table_join.strictness; } bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const; const SizeLimits & sizeLimits() const { return size_limits; } - VolumePtr getTemporaryVolume() { return tmp_volume; } + VolumeJBODPtr getTemporaryVolume() { return tmp_volume; } bool allowMergeJoin() const; bool allowDictJoin(const String & dict_key, const Block & sample_block, Names &, NamesAndTypesList &) const; bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; } diff --git a/src/Processors/Transforms/MergeSortingTransform.cpp b/src/Processors/Transforms/MergeSortingTransform.cpp index ee1836a4a14..e2a84d3f1c2 100644 --- a/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/src/Processors/Transforms/MergeSortingTransform.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include namespace ProfileEvents diff --git a/src/Processors/Transforms/MergeSortingTransform.h b/src/Processors/Transforms/MergeSortingTransform.h index 09c2b182fc7..a8786e5a034 100644 --- a/src/Processors/Transforms/MergeSortingTransform.h +++ b/src/Processors/Transforms/MergeSortingTransform.h @@ -9,8 +9,8 @@ namespace DB { -class Volume; -using VolumePtr = std::shared_ptr; +class IVolume; +using VolumePtr = std::shared_ptr; class MergeSortingTransform : public SortingTransform { diff --git a/src/Processors/tests/processors_test_aggregation.cpp b/src/Processors/tests/processors_test_aggregation.cpp index 70e2f43b31a..af809fab9f2 100644 --- a/src/Processors/tests/processors_test_aggregation.cpp +++ b/src/Processors/tests/processors_test_aggregation.cpp @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include #include #include @@ -194,7 +194,7 @@ try auto cur_path = Poco::Path().absolute().toString(); auto disk = std::make_shared("tmp", cur_path, 0); - auto tmp_volume = std::make_shared("tmp", std::vector{disk}, 0); + auto tmp_volume = std::make_shared("tmp", std::vector{disk}, 0); auto execute_one_stream = [&](String msg, size_t num_threads, bool two_level, bool external) { diff --git a/src/Processors/tests/processors_test_merge_sorting_transform.cpp b/src/Processors/tests/processors_test_merge_sorting_transform.cpp index 1a0c82f90d6..470bf79a174 100644 --- a/src/Processors/tests/processors_test_merge_sorting_transform.cpp +++ b/src/Processors/tests/processors_test_merge_sorting_transform.cpp @@ -1,7 +1,7 @@ #include #include -#include +#include #include #include @@ -129,7 +129,7 @@ try Logger::root().setLevel("trace"); auto disk = std::make_shared("tmp", ".", 0); - auto tmp_volume = std::make_shared("tmp", std::vector{disk}, 0); + auto tmp_volume = std::make_shared("tmp", std::vector{disk}, 0); auto execute_chain = [tmp_volume]( String msg, diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index e3061198beb..ae15552a6d9 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -1,7 +1,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 749a5c27708..b9aa7ba6d4a 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2ab43f8f56c..acdb7616175 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index 9e640d24a6b..95a20dc1f77 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index cd6976b2d24..44707a43fa1 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include @@ -330,7 +330,7 @@ void StorageDistributed::createStorage() if (!path.ends_with('/')) path += '/'; auto disk = std::make_shared("default", path, 0); - volume = std::make_shared("default", std::vector{disk}, 0); + volume = std::make_shared("default", std::vector{disk}, 0); } else { diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 5e07e71f05d..125e1dee1e6 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -19,8 +19,8 @@ namespace DB class Context; class StorageDistributedDirectoryMonitor; -class Volume; -using VolumePtr = std::shared_ptr; +class VolumeJBOD; +using VolumeJBODPtr = std::shared_ptr; class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; @@ -176,7 +176,7 @@ protected: String storage_policy; String relative_data_path; /// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created. - VolumePtr volume; + VolumeJBODPtr volume; struct ClusterNodeData { diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 07a8e21fc12..2c7685c3a5f 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index b498a6f06fd..473177abf1e 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 8f822e15fcd..7a5d4dbf2bf 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -26,7 +26,7 @@ #include #include -#include +#include #include diff --git a/src/Storages/System/StorageSystemTables.cpp b/src/Storages/System/StorageSystemTables.cpp index 9ffd350c4bf..d6951d8467f 100644 --- a/src/Storages/System/StorageSystemTables.cpp +++ b/src/Storages/System/StorageSystemTables.cpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include