From aa5f207fd4a818f56d4cee593bd23d1414651cd1 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Tue, 20 Oct 2020 18:10:24 +0300 Subject: [PATCH] Added `disable_merges` option for volumes in multi-disk configuration (#13956) Co-authored-by: Alexander Kazakov --- src/Common/ErrorCodes.cpp | 2 + src/Disks/DiskSelector.h | 7 +- src/Disks/IVolume.cpp | 4 +- src/Disks/IVolume.h | 6 + src/Disks/SingleDiskVolume.h | 2 +- src/Disks/StoragePolicy.cpp | 200 +++++++++++------ src/Disks/StoragePolicy.h | 25 ++- src/Disks/VolumeJBOD.cpp | 31 ++- src/Disks/VolumeJBOD.h | 33 ++- src/Disks/VolumeRAID1.h | 22 +- src/Disks/createVolume.cpp | 33 ++- src/Disks/createVolume.h | 8 + src/Interpreters/Context.cpp | 2 +- src/Interpreters/InterpreterSystemQuery.cpp | 10 +- src/Interpreters/InterpreterSystemQuery.h | 2 + src/Parsers/ASTSystemQuery.cpp | 15 +- src/Parsers/ASTSystemQuery.h | 2 + src/Parsers/ParserSystemQuery.cpp | 27 +++ src/Storages/MergeTree/DataPartsExchange.cpp | 4 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 10 + src/Storages/MergeTree/IMergeTreeDataPart.h | 6 +- src/Storages/MergeTree/MergeSelector.h | 2 + src/Storages/MergeTree/MergeTreeData.cpp | 6 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 7 +- .../MergeTree/MergeTreePartsMover.cpp | 2 +- .../MergeTree/MergeTreeWriteAheadLog.cpp | 2 +- .../MergeTree/SimpleMergeSelector.cpp | 7 + src/Storages/MergeTree/TTLMergeSelector.cpp | 24 ++ src/Storages/MergeTree/TTLMergeSelector.h | 5 +- .../System/StorageSystemStoragePolicies.cpp | 4 + .../config.d/storage_configuration.xml | 12 + tests/integration/test_multiple_disks/test.py | 206 +++++++++++++++++- 32 files changed, 609 insertions(+), 119 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index b841368f662..b14c090c848 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -510,6 +510,8 @@ namespace ErrorCodes extern const int ROW_AND_ROWS_TOGETHER = 544; extern const int FIRST_AND_NEXT_TOGETHER = 545; extern const int NO_ROW_DELIMITER = 546; + extern const int INVALID_RAID_TYPE = 547; + extern const int UNKNOWN_VOLUME = 548; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/src/Disks/DiskSelector.h b/src/Disks/DiskSelector.h index 3f19dfba381..5d023fe1fbc 100644 --- a/src/Disks/DiskSelector.h +++ b/src/Disks/DiskSelector.h @@ -23,8 +23,11 @@ public: DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context); DiskSelector(const DiskSelector & from) : disks(from.disks) { } - DiskSelectorPtr - updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const; + DiskSelectorPtr updateFromConfig( + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + const Context & context + ) const; /// Get disk by name DiskPtr get(const String & name) const; diff --git a/src/Disks/IVolume.cpp b/src/Disks/IVolume.cpp index 95f03826591..ac277d962ed 100644 --- a/src/Disks/IVolume.cpp +++ b/src/Disks/IVolume.cpp @@ -9,7 +9,7 @@ namespace DB { namespace ErrorCodes { - extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int NO_ELEMENTS_IN_CONFIG; extern const int INCONSISTENT_RESERVATIONS; extern const int NO_RESERVATIONS_PROVIDED; extern const int UNKNOWN_VOLUME_TYPE; @@ -51,7 +51,7 @@ IVolume::IVolume( } if (disks.empty()) - throw Exception("Volume must contain at least one disk.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + throw Exception("Volume must contain at least one disk", ErrorCodes::NO_ELEMENTS_IN_CONFIG); } UInt64 IVolume::getMaxUnreservedFreeSpace() const diff --git a/src/Disks/IVolume.h b/src/Disks/IVolume.h index eaf3bf1dbd4..c040d9d58e1 100644 --- a/src/Disks/IVolume.h +++ b/src/Disks/IVolume.h @@ -64,6 +64,12 @@ public: virtual DiskPtr getDisk(size_t i) const { return disks[i]; } const Disks & getDisks() const { return disks; } + /// Returns effective value of whether merges are allowed on this volume (true) or not (false). + virtual bool areMergesAvoided() const { return false; } + + /// User setting for enabling and disabling merges on volume. + virtual void setAvoidMergesUserOverride(bool /*avoid*/) {} + protected: Disks disks; const String name; diff --git a/src/Disks/SingleDiskVolume.h b/src/Disks/SingleDiskVolume.h index c441d4c2dd2..bade6041ea0 100644 --- a/src/Disks/SingleDiskVolume.h +++ b/src/Disks/SingleDiskVolume.h @@ -8,7 +8,7 @@ namespace DB class SingleDiskVolume : public IVolume { public: - SingleDiskVolume(const String & name_, DiskPtr disk): IVolume(name_, {disk}) + SingleDiskVolume(const String & name_, DiskPtr disk, size_t max_data_part_size_ = 0): IVolume(name_, {disk}, max_data_part_size_) { } diff --git a/src/Disks/StoragePolicy.cpp b/src/Disks/StoragePolicy.cpp index 1aa20301bc0..8a71f4f7a2f 100644 --- a/src/Disks/StoragePolicy.cpp +++ b/src/Disks/StoragePolicy.cpp @@ -11,6 +11,13 @@ #include +namespace +{ + const auto DEFAULT_STORAGE_POLICY_NAME = "default"; + const auto DEFAULT_VOLUME_NAME = "default"; + const auto DEFAULT_DISK_NAME = "default"; +} + namespace DB { @@ -18,11 +25,14 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int NO_ELEMENTS_IN_CONFIG; extern const int UNKNOWN_DISK; extern const int UNKNOWN_POLICY; + extern const int UNKNOWN_VOLUME; extern const int LOGICAL_ERROR; } + StoragePolicy::StoragePolicy( String name_, const Poco::Util::AbstractConfiguration & config, @@ -30,44 +40,42 @@ StoragePolicy::StoragePolicy( DiskSelectorPtr disks) : name(std::move(name_)) { - String volumes_prefix = config_prefix + ".volumes"; - if (!config.has(volumes_prefix)) - throw Exception("StoragePolicy must contain at least one volume (.volumes)", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - Poco::Util::AbstractConfiguration::Keys keys; - config.keys(volumes_prefix, keys); + String volumes_prefix = config_prefix + ".volumes"; + + if (!config.has(volumes_prefix)) + { + if (name != DEFAULT_STORAGE_POLICY_NAME) + throw Exception("Storage policy " + backQuote(name) + " must contain at least one volume (.volumes)", ErrorCodes::NO_ELEMENTS_IN_CONFIG); + } + else + { + config.keys(volumes_prefix, keys); + } for (const auto & attr_name : keys) { if (!std::all_of(attr_name.begin(), attr_name.end(), isWordCharASCII)) throw Exception( - "Volume name can contain only alphanumeric and '_' (" + attr_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - volumes.push_back(std::make_shared(attr_name, config, volumes_prefix + "." + attr_name, disks)); - if (volumes_names.find(attr_name) != volumes_names.end()) - throw Exception("Volumes names must be unique (" + attr_name + " duplicated)", ErrorCodes::UNKNOWN_POLICY); - volumes_names[attr_name] = volumes.size() - 1; + "Volume name can contain only alphanumeric and '_' in storage policy " + backQuote(name) + " (" + attr_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + volumes.emplace_back(createVolumeFromConfig(attr_name, config, volumes_prefix + "." + attr_name, disks)); + } + + if (volumes.empty() && name == DEFAULT_STORAGE_POLICY_NAME) + { + auto default_volume = std::make_shared(DEFAULT_VOLUME_NAME, std::vector{disks->get(DEFAULT_DISK_NAME)}, 0, false); + volumes.emplace_back(std::move(default_volume)); } if (volumes.empty()) - throw Exception("StoragePolicy must contain at least one volume.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + throw Exception("Storage policy " + backQuote(name) + " must contain at least one volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG); - /// Check that disks are unique in Policy - std::set disk_names; - for (const auto & volume : volumes) - { - for (const auto & disk : volume->getDisks()) - { - if (disk_names.find(disk->getName()) != disk_names.end()) - throw Exception( - "Duplicate disk '" + disk->getName() + "' in storage policy '" + name + "'", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - - disk_names.insert(disk->getName()); - } - } - - move_factor = config.getDouble(config_prefix + ".move_factor", 0.1); + const double default_move_factor = volumes.size() > 1 ? 0.1 : 0.0; + move_factor = config.getDouble(config_prefix + ".move_factor", default_move_factor); if (move_factor > 1) - throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor), ErrorCodes::LOGICAL_ERROR); + throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR); + + buildVolumeIndices(); } @@ -75,16 +83,43 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_ : volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_) { if (volumes.empty()) - throw Exception("StoragePolicy must contain at least one Volume.", ErrorCodes::UNKNOWN_POLICY); + throw Exception("Storage policy " + backQuote(name) + " must contain at least one Volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG); if (move_factor > 1) - throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor), ErrorCodes::LOGICAL_ERROR); + throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR); - for (size_t i = 0; i < volumes.size(); ++i) + buildVolumeIndices(); +} + + +StoragePolicy::StoragePolicy(const StoragePolicy & storage_policy, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disks) + : StoragePolicy(storage_policy.getName(), config, config_prefix, disks) +{ + for (auto & volume : volumes) { - if (volumes_names.find(volumes[i]->getName()) != volumes_names.end()) - throw Exception("Volumes names must be unique (" + volumes[i]->getName() + " duplicated).", ErrorCodes::UNKNOWN_POLICY); - volumes_names[volumes[i]->getName()] = i; + if (storage_policy.volume_index_by_volume_name.count(volume->getName()) > 0) + { + auto old_volume = storage_policy.getVolumeByName(volume->getName()); + try + { + auto new_volume = updateVolumeFromConfig(old_volume, config, config_prefix + ".volumes." + volume->getName(), disks); + volume = std::move(new_volume); + } + catch (Exception & e) + { + /// Default policies are allowed to be missed in configuration. + if (e.code() != ErrorCodes::NO_ELEMENTS_IN_CONFIG || storage_policy.getName() != DEFAULT_STORAGE_POLICY_NAME) + throw; + + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + if (!keys.empty()) + throw; + } + } } } @@ -93,20 +128,20 @@ bool StoragePolicy::isDefaultPolicy() const { /// Guessing if this policy is default, not 100% correct though. - if (getName() != "default") + if (getName() != DEFAULT_STORAGE_POLICY_NAME) return false; if (volumes.size() != 1) return false; - if (volumes[0]->getName() != "default") + if (volumes[0]->getName() != DEFAULT_VOLUME_NAME) return false; const auto & disks = volumes[0]->getDisks(); if (disks.size() != 1) return false; - if (disks[0]->getName() != "default") + if (disks[0]->getName() != DEFAULT_DISK_NAME) return false; return true; @@ -128,10 +163,10 @@ DiskPtr StoragePolicy::getAnyDisk() const /// StoragePolicy must contain at least one Volume /// Volume must contain at least one Disk if (volumes.empty()) - throw Exception("StoragePolicy has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR); + throw Exception("Storage policy " + backQuote(name) + " has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR); if (volumes[0]->getDisks().empty()) - throw Exception("Volume '" + volumes[0]->getName() + "' has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR); + throw Exception("Volume " + backQuote(name) + "." + backQuote(volumes[0]->getName()) + " has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR); return volumes[0]->getDisks()[0]; } @@ -195,6 +230,24 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const } +VolumePtr StoragePolicy::getVolume(size_t index) const +{ + if (index < volume_index_by_volume_name.size()) + return volumes[index]; + else + throw Exception("No volume with index " + std::to_string(index) + " in storage policy " + backQuote(name), ErrorCodes::UNKNOWN_VOLUME); +} + + +VolumePtr StoragePolicy::getVolumeByName(const String & volume_name) const +{ + auto it = volume_index_by_volume_name.find(volume_name); + if (it == volume_index_by_volume_name.end()) + throw Exception("No such volume " + backQuote(volume_name) + " in storage policy " + backQuote(name), ErrorCodes::UNKNOWN_VOLUME); + return getVolume(it->second); +} + + void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const { std::unordered_set new_volume_names; @@ -204,7 +257,7 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol for (const auto & volume : getVolumes()) { if (new_volume_names.count(volume->getName()) == 0) - throw Exception("New storage policy shall contain volumes of old one", ErrorCodes::BAD_ARGUMENTS); + throw Exception("New storage policy " + backQuote(name) + " shall contain volumes of old one", ErrorCodes::BAD_ARGUMENTS); std::unordered_set new_disk_names; for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->getDisks()) @@ -212,21 +265,46 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol for (const auto & disk : volume->getDisks()) if (new_disk_names.count(disk->getName()) == 0) - throw Exception("New storage policy shall contain disks of old one", ErrorCodes::BAD_ARGUMENTS); + throw Exception("New storage policy " + backQuote(name) + " shall contain disks of old one", ErrorCodes::BAD_ARGUMENTS); } } size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const { - for (size_t i = 0; i < volumes.size(); ++i) + auto it = volume_index_by_disk_name.find(disk_ptr->getName()); + if (it != volume_index_by_disk_name.end()) + return it->second; + else + throw Exception("No disk " + backQuote(disk_ptr->getName()) + " in policy " + backQuote(name), ErrorCodes::UNKNOWN_DISK); +} + + +void StoragePolicy::buildVolumeIndices() +{ + for (size_t index = 0; index < volumes.size(); ++index) { - const auto & volume = volumes[i]; + const VolumePtr & volume = volumes[index]; + + if (volume_index_by_volume_name.find(volume->getName()) != volume_index_by_volume_name.end()) + throw Exception("Volume names must be unique in storage policy " + + backQuote(name) + " (" + backQuote(volume->getName()) + " is duplicated)" + , ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + volume_index_by_volume_name[volume->getName()] = index; + for (const auto & disk : volume->getDisks()) - if (disk->getName() == disk_ptr->getName()) - return i; + { + const String & disk_name = disk->getName(); + + if (volume_index_by_disk_name.find(disk_name) != volume_index_by_disk_name.end()) + throw Exception("Disk names must be unique in storage policy " + + backQuote(name) + " (" + backQuote(disk_name) + " is duplicated)" + , ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + volume_index_by_disk_name[disk_name] = index; + } } - throw Exception("No disk " + disk_ptr->getName() + " in policy " + name, ErrorCodes::UNKNOWN_DISK); } @@ -242,44 +320,40 @@ StoragePolicySelector::StoragePolicySelector( { if (!std::all_of(name.begin(), name.end(), isWordCharASCII)) throw Exception( - "StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + "Storage policy name can contain only alphanumeric and '_' (" + backQuote(name) + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); policies.emplace(name, std::make_shared(name, config, config_prefix + "." + name, disks)); LOG_INFO(&Poco::Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name)); } - constexpr auto default_storage_policy_name = "default"; - constexpr auto default_volume_name = "default"; - constexpr auto default_disk_name = "default"; - - /// Add default policy if it's not specified explicetly - if (policies.find(default_storage_policy_name) == policies.end()) + /// Add default policy if it isn't explicitly specified. + if (policies.find(DEFAULT_STORAGE_POLICY_NAME) == policies.end()) { - auto default_volume = std::make_shared(default_volume_name, std::vector{disks->get(default_disk_name)}, 0); - - auto default_policy = std::make_shared(default_storage_policy_name, Volumes{default_volume}, 0.0); - policies.emplace(default_storage_policy_name, default_policy); + auto default_policy = std::make_shared(DEFAULT_STORAGE_POLICY_NAME, config, config_prefix + "." + DEFAULT_STORAGE_POLICY_NAME, disks); + policies.emplace(DEFAULT_STORAGE_POLICY_NAME, std::move(default_policy)); } } StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const { - Poco::Util::AbstractConfiguration::Keys keys; - config.keys(config_prefix, keys); - std::shared_ptr result = std::make_shared(config, config_prefix, disks); - constexpr auto default_storage_policy_name = "default"; - + /// First pass, check. for (const auto & [name, policy] : policies) { - if (name != default_storage_policy_name && result->policies.count(name) == 0) + if (result->policies.count(name) == 0) throw Exception("Storage policy " + backQuote(name) + " is missing in new configuration", ErrorCodes::BAD_ARGUMENTS); policy->checkCompatibleWith(result->policies[name]); } + /// Second pass, load. + for (const auto & [name, policy] : policies) + { + result->policies[name] = std::make_shared(*policy, config, config_prefix + "." + name, disks); + } + return result; } @@ -288,7 +362,7 @@ StoragePolicyPtr StoragePolicySelector::get(const String & name) const { auto it = policies.find(name); if (it == policies.end()) - throw Exception("Unknown StoragePolicy " + name, ErrorCodes::UNKNOWN_POLICY); + throw Exception("Unknown storage policy " + backQuote(name), ErrorCodes::UNKNOWN_POLICY); return it->second; } diff --git a/src/Disks/StoragePolicy.h b/src/Disks/StoragePolicy.h index 0e0795d8bf1..f4a4a0070b8 100644 --- a/src/Disks/StoragePolicy.h +++ b/src/Disks/StoragePolicy.h @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,13 @@ public: StoragePolicy(String name_, Volumes volumes_, double move_factor_); + StoragePolicy( + const StoragePolicy & storage_policy, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disks + ); + bool isDefaultPolicy() const; /// Returns disks ordered by volumes priority @@ -72,16 +80,10 @@ public: /// which should be kept with help of background moves double getMoveFactor() const { return move_factor; } - /// Get volume by index from storage_policy - VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); } + /// Get volume by index. + VolumePtr getVolume(size_t index) const; - VolumePtr getVolumeByName(const String & volume_name) const - { - auto it = volumes_names.find(volume_name); - if (it == volumes_names.end()) - return {}; - return getVolume(it->second); - } + VolumePtr getVolumeByName(const String & volume_name) const; /// Checks if storage policy can be replaced by another one. void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const; @@ -89,12 +91,15 @@ public: private: Volumes volumes; const String name; - std::map volumes_names; + std::unordered_map volume_index_by_volume_name; + std::unordered_map volume_index_by_disk_name; /// move_factor from interval [0., 1.] /// We move something if disk from this policy /// filled more than total_size * move_factor double move_factor = 0.1; /// by default move factor is 10% + + void buildVolumeIndices(); }; diff --git a/src/Disks/VolumeJBOD.cpp b/src/Disks/VolumeJBOD.cpp index 3ac8a50acfb..e5905f77db3 100644 --- a/src/Disks/VolumeJBOD.cpp +++ b/src/Disks/VolumeJBOD.cpp @@ -56,11 +56,23 @@ VolumeJBOD::VolumeJBOD( /// Default value is 'true' due to backward compatibility. perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true); + + are_merges_avoided = config.getBool(config_prefix + ".prefer_not_to_merge", false); +} + +VolumeJBOD::VolumeJBOD(const VolumeJBOD & volume_jbod, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disk_selector) + : VolumeJBOD(volume_jbod.name, config, config_prefix, disk_selector) +{ + are_merges_avoided_user_override = volume_jbod.are_merges_avoided_user_override.load(std::memory_order_relaxed); + last_used = volume_jbod.last_used.load(std::memory_order_relaxed); } DiskPtr VolumeJBOD::getDisk(size_t /* index */) const { - size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed); + size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel); size_t index = start_from % disks.size(); return disks[index]; } @@ -73,7 +85,7 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes) if (max_data_part_size != 0 && bytes > max_data_part_size) return {}; - size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed); + size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel); size_t disks_num = disks.size(); for (size_t i = 0; i < disks_num; ++i) { @@ -87,4 +99,19 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes) return {}; } +bool VolumeJBOD::areMergesAvoided() const +{ + auto are_merges_avoided_user_override_value = are_merges_avoided_user_override.load(std::memory_order_acquire); + if (are_merges_avoided_user_override_value) + return *are_merges_avoided_user_override_value; + else + return are_merges_avoided; +} + +void VolumeJBOD::setAvoidMergesUserOverride(bool avoid) +{ + are_merges_avoided_user_override.store(avoid, std::memory_order_release); +} + + } diff --git a/src/Disks/VolumeJBOD.h b/src/Disks/VolumeJBOD.h index 52eb2f00721..621125f1109 100644 --- a/src/Disks/VolumeJBOD.h +++ b/src/Disks/VolumeJBOD.h @@ -1,10 +1,19 @@ #pragma once +#include +#include + #include + namespace DB { +class VolumeJBOD; + +using VolumeJBODPtr = std::shared_ptr; +using VolumesJBOD = std::vector; + /** * Implements something similar to JBOD (https://en.wikipedia.org/wiki/Non-RAID_drive_architectures#JBOD). * When MergeTree engine wants to write part — it requests VolumeJBOD to reserve space on the next available @@ -13,8 +22,9 @@ namespace DB class VolumeJBOD : public IVolume { public: - VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_) + VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_) : IVolume(name_, disks_, max_data_part_size_) + , are_merges_avoided(are_merges_avoided_) { } @@ -25,6 +35,13 @@ public: DiskSelectorPtr disk_selector ); + VolumeJBOD( + const VolumeJBOD & volume_jbod, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disk_selector + ); + VolumeType getType() const override { return VolumeType::JBOD; } /// Always returns next disk (round-robin), ignores argument. @@ -38,11 +55,19 @@ public: /// Returns valid reservation or nullptr if there is no space left on any disk. ReservationPtr reserve(UInt64 bytes) override; + bool areMergesAvoided() const override; + + void setAvoidMergesUserOverride(bool avoid) override; + + /// True if parts on this volume participate in merges according to configuration. + bool are_merges_avoided = true; + private: + /// Index of last used disk. mutable std::atomic last_used = 0; + + /// True if parts on this volume participate in merges according to START/STOP MERGES ON VOLUME. + std::atomic> are_merges_avoided_user_override{std::nullopt}; }; -using VolumeJBODPtr = std::shared_ptr; -using VolumesJBOD = std::vector; - } diff --git a/src/Disks/VolumeRAID1.h b/src/Disks/VolumeRAID1.h index 58cb5bd2623..f6f2d245a49 100644 --- a/src/Disks/VolumeRAID1.h +++ b/src/Disks/VolumeRAID1.h @@ -3,18 +3,23 @@ #include #include + namespace DB { -/// Volume which reserserves space on each underlying disk. +class VolumeRAID1; + +using VolumeRAID1Ptr = std::shared_ptr; + +/// Volume which reserves space on each underlying disk. /// /// NOTE: Just interface implementation, doesn't used in codebase, /// also not available for user. class VolumeRAID1 : public VolumeJBOD { public: - VolumeRAID1(String name_, Disks disks_, UInt64 max_data_part_size_) - : VolumeJBOD(name_, disks_, max_data_part_size_) + VolumeRAID1(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_in_config_) + : VolumeJBOD(name_, disks_, max_data_part_size_, are_merges_avoided_in_config_) { } @@ -27,11 +32,18 @@ public: { } + VolumeRAID1( + VolumeRAID1 & volume_raid1, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr disk_selector) + : VolumeJBOD(volume_raid1, config, config_prefix, disk_selector) + { + } + VolumeType getType() const override { return VolumeType::RAID1; } ReservationPtr reserve(UInt64 bytes) override; }; -using VolumeRAID1Ptr = std::shared_ptr; - } diff --git a/src/Disks/createVolume.cpp b/src/Disks/createVolume.cpp index 90ed333406e..a290a1d3db3 100644 --- a/src/Disks/createVolume.cpp +++ b/src/Disks/createVolume.cpp @@ -12,6 +12,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_RAID_TYPE; + extern const int INVALID_RAID_TYPE; } VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume) @@ -20,12 +21,12 @@ VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, Volume { /// Since reservation on JBOD chooses one of disks and makes reservation there, volume /// for such type of reservation will be with one disk. - return std::make_shared(other_volume->getName(), reservation->getDisk()); + return std::make_shared(other_volume->getName(), reservation->getDisk(), other_volume->max_data_part_size); } if (other_volume->getType() == VolumeType::RAID1) { auto volume = std::dynamic_pointer_cast(other_volume); - return std::make_shared(volume->getName(), reservation->getDisks(), volume->max_data_part_size); + return std::make_shared(volume->getName(), reservation->getDisks(), volume->max_data_part_size, volume->are_merges_avoided); } return nullptr; } @@ -37,17 +38,31 @@ VolumePtr createVolumeFromConfig( DiskSelectorPtr disk_selector ) { - auto has_raid_type = config.has(config_prefix + ".raid_type"); - if (!has_raid_type) - { - return std::make_shared(name, config, config_prefix, disk_selector); - } - String raid_type = config.getString(config_prefix + ".raid_type"); + String raid_type = config.getString(config_prefix + ".raid_type", "JBOD"); if (raid_type == "JBOD") { return std::make_shared(name, config, config_prefix, disk_selector); } - throw Exception("Unknown raid type '" + raid_type + "'", ErrorCodes::UNKNOWN_RAID_TYPE); + throw Exception("Unknown RAID type '" + raid_type + "'", ErrorCodes::UNKNOWN_RAID_TYPE); +} + +VolumePtr updateVolumeFromConfig( + VolumePtr volume, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr & disk_selector +) +{ + String raid_type = config.getString(config_prefix + ".raid_type", "JBOD"); + if (raid_type == "JBOD") + { + VolumeJBODPtr volume_jbod = std::dynamic_pointer_cast(volume); + if (!volume_jbod) + throw Exception("Invalid RAID type '" + raid_type + "', shall be JBOD", ErrorCodes::INVALID_RAID_TYPE); + + return std::make_shared(*volume_jbod, config, config_prefix, disk_selector); + } + throw Exception("Unknown RAID type '" + raid_type + "'", ErrorCodes::UNKNOWN_RAID_TYPE); } } diff --git a/src/Disks/createVolume.h b/src/Disks/createVolume.h index 64f5e73181b..479501759d1 100644 --- a/src/Disks/createVolume.h +++ b/src/Disks/createVolume.h @@ -6,6 +6,7 @@ namespace DB { VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume); + VolumePtr createVolumeFromConfig( String name_, const Poco::Util::AbstractConfiguration & config, @@ -13,4 +14,11 @@ VolumePtr createVolumeFromConfig( DiskSelectorPtr disk_selector ); +VolumePtr updateVolumeFromConfig( + VolumePtr volume, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DiskSelectorPtr & disk_selector +); + } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 4295972ef80..9c1f253f820 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -590,7 +590,7 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic shared->tmp_path += '/'; auto disk = std::make_shared("_tmp_default", shared->tmp_path, 0); - shared->tmp_volume = std::make_shared("_tmp_default", disk); + shared->tmp_volume = std::make_shared("_tmp_default", disk, 0); } else { diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 4bfa84090c2..f0a8ce9064d 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -133,7 +133,11 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type, auto manager = context.getActionLocksManager(); manager->cleanExpired(); - if (table_id) + if (volume_ptr && action_type == ActionLocks::PartsMerge) + { + volume_ptr->setAvoidMergesUserOverride(!start); + } + else if (table_id) { context.checkAccess(getRequiredAccessType(action_type), table_id); if (start) @@ -199,6 +203,10 @@ BlockIO InterpreterSystemQuery::execute() if (!query.target_dictionary.empty() && !query.database.empty()) query.target_dictionary = query.database + "." + query.target_dictionary; + volume_ptr = {}; + if (!query.storage_policy.empty() && !query.volume.empty()) + volume_ptr = context.getStoragePolicy(query.storage_policy)->getVolumeByName(query.volume); + switch (query.type) { case Type::SHUTDOWN: diff --git a/src/Interpreters/InterpreterSystemQuery.h b/src/Interpreters/InterpreterSystemQuery.h index 8e3578dfb2f..6fd96c15a2e 100644 --- a/src/Interpreters/InterpreterSystemQuery.h +++ b/src/Interpreters/InterpreterSystemQuery.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace Poco { class Logger; } @@ -44,6 +45,7 @@ private: Context & context; Poco::Logger * log = nullptr; StorageID table_id = StorageID::createEmpty(); /// Will be set up if query contains table name + VolumePtr volume_ptr; /// Tries to get a replicated table and restart it /// Returns pointer to a newly created table if the restart was successful diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 5ed1757e1ce..9cbb6ae94f6 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -118,7 +118,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, << (settings.hilite ? hilite_none : ""); }; - auto print_drop_replica = [&] { + auto print_drop_replica = [&] + { settings.ostr << " " << quoteString(replica); if (!table.empty()) { @@ -140,6 +141,16 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, } }; + auto print_on_volume = [&] + { + settings.ostr << " ON VOLUME " + << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(storage_policy) + << (settings.hilite ? hilite_none : "") + << "." + << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(volume) + << (settings.hilite ? hilite_none : ""); + }; + if (!cluster.empty()) formatOnCluster(settings); @@ -160,6 +171,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, { if (!table.empty()) print_database_table(); + else if (!volume.empty()) + print_on_volume(); } else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED) { diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index b2ffa706e19..f8f803e1c0c 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -65,6 +65,8 @@ public: String replica; String replica_zk_path; bool is_drop_whole_replica; + String storage_policy; + String volume; String getID(char) const override { return "SYSTEM query"; } diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index a98ca2d4922..296f4187e3a 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -129,6 +129,33 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & case Type::STOP_MERGES: case Type::START_MERGES: + { + String storage_policy_str; + String volume_str; + + if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected)) + { + ASTPtr ast; + if (ParserIdentifier{}.parse(pos, ast, expected)) + storage_policy_str = ast->as().name; + else + return false; + + if (!ParserToken{TokenType::Dot}.ignore(pos, expected)) + return false; + + if (ParserIdentifier{}.parse(pos, ast, expected)) + volume_str = ast->as().name; + else + return false; + } + res->storage_policy = storage_policy_str; + res->volume = volume_str; + if (res->volume.empty() && res->storage_policy.empty()) + parseDatabaseAndTableName(pos, expected, res->database, res->table); + break; + } + case Type::STOP_TTL_MERGES: case Type::START_TTL_MERGES: case Type::STOP_MOVES: diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 3da0e203f14..0e79404e59d 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -311,7 +311,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( NativeBlockInputStream block_in(in, 0); auto block = block_in.read(); - auto volume = std::make_shared("volume_" + part_name, reservation->getDisk()); + auto volume = std::make_shared("volume_" + part_name, reservation->getDisk(), 0); MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data, part_name, volume); @@ -408,7 +408,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( assertEOF(in); - auto volume = std::make_shared("volume_" + part_name, disk); + auto volume = std::make_shared("volume_" + part_name, disk, 0); MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path); new_data_part->is_temp = true; new_data_part->modification_time = time(nullptr); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 7538c194f95..27d306e1642 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -760,6 +760,16 @@ void IMergeTreeDataPart::loadColumns(bool require) column_name_to_position.emplace(column.name, pos++); } +bool IMergeTreeDataPart::shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const +{ + /// `IMergeTreeDataPart::volume` describes space where current part belongs, and holds + /// `SingleDiskVolume` object which does not contain up-to-date settings of corresponding volume. + /// Therefore we shall obtain volume from storage policy. + auto volume_ptr = storage_policy->getVolume(storage_policy->getVolumeIndexByDisk(volume->getDisk())); + + return !volume_ptr->areMergesAvoided(); +} + UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from) { if (disk_->isFile(from)) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index b45691a5ed6..202d9494247 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -324,7 +324,11 @@ public: /// NOTE: Doesn't take column renames into account, if some column renames /// take place, you must take original name of column for this part from /// storage and pass it to this method. - virtual bool hasColumnFiles(const String & /* column */, const IDataType & /* type */) const{ return false; } + virtual bool hasColumnFiles(const String & /* column */, const IDataType & /* type */) const { return false; } + + /// Returns true if this part shall participate in merges according to + /// settings of given storage policy. + bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const; /// Calculate the total size of the entire directory with all the files static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from); diff --git a/src/Storages/MergeTree/MergeSelector.h b/src/Storages/MergeTree/MergeSelector.h index fcdfcf5b890..5a92b4c5dd6 100644 --- a/src/Storages/MergeTree/MergeSelector.h +++ b/src/Storages/MergeTree/MergeSelector.h @@ -48,6 +48,8 @@ public: /// Part compression codec definition. ASTPtr compression_codec_desc; + + bool shall_participate_in_merges = true; }; /// Parts are belong to partitions. Only parts within same partition could be merged. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a0c1de756be..cd776a661ed 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -776,7 +776,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version)) return; - auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr); + auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr, 0); auto part = createPart(part_name, part_info, single_disk_volume, part_name); bool broken = false; @@ -2996,7 +2996,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const for (const auto & part_names : renamed_parts.old_and_new_names) { LOG_DEBUG(log, "Checking part {}", part_names.second); - auto single_disk_volume = std::make_shared("volume_" + part_names.first, name_to_disk[part_names.first]); + auto single_disk_volume = std::make_shared("volume_" + part_names.first, name_to_disk[part_names.first], 0); MutableDataPartPtr part = createPart(part_names.first, single_disk_volume, source_dir + part_names.second); loadPartAndFixMetadataImpl(part); loaded_parts.push_back(part); @@ -3409,7 +3409,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk( localBackup(disk, src_part_path, dst_part_path); disk->removeIfExists(dst_part_path + "/" + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME); - auto single_disk_volume = std::make_shared(disk->getName(), disk); + auto single_disk_volume = std::make_shared(disk->getName(), disk, 0); auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name); dst_data_part->is_temp = true; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index fb0a488700c..df42f164e34 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -226,6 +226,8 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( IMergeSelector::PartsRanges parts_ranges; + StoragePolicyPtr storage_policy = data.getStoragePolicy(); + const String * prev_partition_id = nullptr; /// Previous part only in boundaries of partition frame const MergeTreeData::DataPartPtr * prev_part = nullptr; @@ -275,6 +277,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( part_info.data = ∂ part_info.ttl_infos = &part->ttl_infos; part_info.compression_codec_desc = part->default_codec->getFullCodecDesc(); + part_info.shall_participate_in_merges = part->shallParticipateInMerges(storage_policy); parts_ranges.back().emplace_back(part_info); @@ -667,7 +670,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor merging_columns, merging_column_names); - auto single_disk_volume = std::make_shared("volume_" + future_part.name, disk); + auto single_disk_volume = std::make_shared("volume_" + future_part.name, disk, 0); MergeTreeData::MutableDataPartPtr new_data_part = data.createPart( future_part.name, future_part.type, @@ -1127,7 +1130,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress)); } - auto single_disk_volume = std::make_shared("volume_" + future_part.name, space_reservation->getDisk()); + auto single_disk_volume = std::make_shared("volume_" + future_part.name, space_reservation->getDisk(), 0); auto new_data_part = data.createPart( future_part.name, future_part.type, future_part.part_info, single_disk_volume, "tmp_mut_" + future_part.name); diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 2420c1576d2..7b8c88b1bff 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -199,7 +199,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt const String directory_to_move = "moving"; moving_part.part->makeCloneOnDisk(moving_part.reserved_space->getDisk(), directory_to_move); - auto single_disk_volume = std::make_shared("volume_" + moving_part.part->name, moving_part.reserved_space->getDisk()); + auto single_disk_volume = std::make_shared("volume_" + moving_part.part->name, moving_part.reserved_space->getDisk(), 0); MergeTreeData::MutableDataPartPtr cloned_part = data->createPart(moving_part.part->name, single_disk_volume, directory_to_move + '/' + moving_part.part->name); LOG_TRACE(log, "Part {} was cloned to {}", moving_part.part->name, cloned_part->getFullPath()); diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 4c445735a97..1d133f73a7b 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -130,7 +130,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor else if (action_type == ActionType::ADD_PART) { auto part_disk = storage.reserveSpace(0)->getDisk(); - auto single_disk_volume = std::make_shared("volume_" + part_name, disk); + auto single_disk_volume = std::make_shared("volume_" + part_name, disk, 0); part = storage.createPart( part_name, diff --git a/src/Storages/MergeTree/SimpleMergeSelector.cpp b/src/Storages/MergeTree/SimpleMergeSelector.cpp index cbb24d1494e..c8c38e73a33 100644 --- a/src/Storages/MergeTree/SimpleMergeSelector.cpp +++ b/src/Storages/MergeTree/SimpleMergeSelector.cpp @@ -1,4 +1,5 @@ #include + #include #include @@ -152,6 +153,9 @@ void selectWithinPartition( if (begin > 1000) break; + if (!parts[begin].shall_participate_in_merges) + continue; + size_t sum_size = parts[begin].size; size_t max_size = parts[begin].size; size_t min_age = parts[begin].age; @@ -161,6 +165,9 @@ void selectWithinPartition( if (settings.max_parts_to_merge_at_once && end - begin > settings.max_parts_to_merge_at_once) break; + if (!parts[end - 1].shall_participate_in_merges) + break; + size_t cur_size = parts[end - 1].size; size_t cur_age = parts[end - 1].age; diff --git a/src/Storages/MergeTree/TTLMergeSelector.cpp b/src/Storages/MergeTree/TTLMergeSelector.cpp index 7f76da085c9..fc7aa93e129 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.cpp +++ b/src/Storages/MergeTree/TTLMergeSelector.cpp @@ -25,6 +25,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select( ssize_t partition_to_merge_index = -1; time_t partition_to_merge_min_ttl = 0; + /// Find most old TTL. for (size_t i = 0; i < parts_ranges.size(); ++i) { const auto & mergeable_parts_in_partition = parts_ranges[i]; @@ -56,6 +57,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select( Iterator best_end = best_begin + 1; size_t total_size = 0; + /// Find begin of range with most old TTL. while (true) { time_t ttl = getTTLForPart(*best_begin); @@ -63,6 +65,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select( if (!ttl || isTTLAlreadySatisfied(*best_begin) || ttl > current_time || (max_total_size_to_merge && total_size > max_total_size_to_merge)) { + /// This condition can not be satisfied on first iteration. ++best_begin; break; } @@ -74,6 +77,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select( --best_begin; } + /// Find end of range with most old TTL. while (best_end != best_partition.end()) { time_t ttl = getTTLForPart(*best_end); @@ -97,6 +101,19 @@ time_t TTLDeleteMergeSelector::getTTLForPart(const IMergeSelector::Part & part) return only_drop_parts ? part.ttl_infos->part_max_ttl : part.ttl_infos->part_min_ttl; } +bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & part) const +{ + /// N.B. Satisfied TTL means that TTL is NOT expired. + /// return true -- this part can not be selected + /// return false -- this part can be selected + + /// Dropping whole part is an exception to `shall_participate_in_merges` logic. + if (only_drop_parts) + return false; + + return !part.shall_participate_in_merges; +} + time_t TTLRecompressMergeSelector::getTTLForPart(const IMergeSelector::Part & part) const { return part.ttl_infos->getMinimalMaxRecompressionTTL(); @@ -104,6 +121,13 @@ time_t TTLRecompressMergeSelector::getTTLForPart(const IMergeSelector::Part & pa bool TTLRecompressMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & part) const { + /// N.B. Satisfied TTL means that TTL is NOT expired. + /// return true -- this part can not be selected + /// return false -- this part can be selected + + if (!part.shall_participate_in_merges) + return true; + if (recompression_ttls.empty()) return false; diff --git a/src/Storages/MergeTree/TTLMergeSelector.h b/src/Storages/MergeTree/TTLMergeSelector.h index 73d364f28c7..c294687cdc5 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.h +++ b/src/Storages/MergeTree/TTLMergeSelector.h @@ -64,10 +64,7 @@ public: /// Delete TTL should be checked only by TTL time, there are no other ways /// to satisfy it. - bool isTTLAlreadySatisfied(const IMergeSelector::Part &) const override - { - return false; - } + bool isTTLAlreadySatisfied(const IMergeSelector::Part &) const override; private: bool only_drop_parts; diff --git a/src/Storages/System/StorageSystemStoragePolicies.cpp b/src/Storages/System/StorageSystemStoragePolicies.cpp index 415e7ce2c78..86ce9de081c 100644 --- a/src/Storages/System/StorageSystemStoragePolicies.cpp +++ b/src/Storages/System/StorageSystemStoragePolicies.cpp @@ -29,6 +29,7 @@ StorageSystemStoragePolicies::StorageSystemStoragePolicies(const StorageID & tab {"volume_type", std::make_shared()}, {"max_data_part_size", std::make_shared()}, {"move_factor", std::make_shared()}, + {"prefer_not_to_merge", std::make_shared()} })); // TODO: Add string column with custom volume-type-specific options setInMemoryMetadata(storage_metadata); @@ -52,6 +53,7 @@ Pipe StorageSystemStoragePolicies::read( MutableColumnPtr col_volume_type = ColumnString::create(); MutableColumnPtr col_max_part_size = ColumnUInt64::create(); MutableColumnPtr col_move_factor = ColumnFloat32::create(); + MutableColumnPtr col_prefer_not_to_merge = ColumnUInt8::create(); for (const auto & [policy_name, policy_ptr] : context.getPoliciesMap()) { @@ -69,6 +71,7 @@ Pipe StorageSystemStoragePolicies::read( col_volume_type->insert(volumeTypeToString(volumes[i]->getType())); col_max_part_size->insert(volumes[i]->max_data_part_size); col_move_factor->insert(policy_ptr->getMoveFactor()); + col_prefer_not_to_merge->insert(volumes[i]->areMergesAvoided() ? 1 : 0); } } @@ -80,6 +83,7 @@ Pipe StorageSystemStoragePolicies::read( res_columns.emplace_back(std::move(col_volume_type)); res_columns.emplace_back(std::move(col_max_part_size)); res_columns.emplace_back(std::move(col_move_factor)); + res_columns.emplace_back(std::move(col_prefer_not_to_merge)); UInt64 num_rows = res_columns.at(0)->size(); Chunk chunk(std::move(res_columns), num_rows); diff --git a/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml b/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml index 9abbdd26650..c04106221f7 100644 --- a/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml +++ b/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml @@ -30,6 +30,18 @@ + + +
+ jbod1 +
+ + external + true + +
+
+ diff --git a/tests/integration/test_multiple_disks/test.py b/tests/integration/test_multiple_disks/test.py index 5058bcf368e..496b34f22f0 100644 --- a/tests/integration/test_multiple_disks/test.py +++ b/tests/integration/test_multiple_disks/test.py @@ -76,6 +76,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "small_jbod_with_external", @@ -85,6 +86,27 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "prefer_not_to_merge": 0, + }, + { + "policy_name": "small_jbod_with_external_no_merges", + "volume_name": "main", + "volume_priority": "1", + "disks": ["jbod1"], + "volume_type": "JBOD", + "max_data_part_size": "0", + "move_factor": 0.1, + "prefer_not_to_merge": 0, + }, + { + "policy_name": "small_jbod_with_external_no_merges", + "volume_name": "external", + "volume_priority": "2", + "disks": ["external"], + "volume_type": "JBOD", + "max_data_part_size": "0", + "move_factor": 0.1, + "prefer_not_to_merge": 1, }, { "policy_name": "one_more_small_jbod_with_external", @@ -94,6 +116,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "one_more_small_jbod_with_external", @@ -103,6 +126,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "jbods_with_external", @@ -112,6 +136,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "10485760", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "jbods_with_external", @@ -121,6 +146,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "moving_jbod_with_external", @@ -130,6 +156,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.7, + "prefer_not_to_merge": 0, }, { "policy_name": "moving_jbod_with_external", @@ -139,6 +166,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.7, + "prefer_not_to_merge": 0, }, { "policy_name": "default_disk_with_external", @@ -148,6 +176,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "2097152", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "default_disk_with_external", @@ -157,6 +186,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "20971520", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "special_warning_policy", @@ -166,6 +196,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "special_warning_policy", @@ -175,6 +206,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "special_warning_policy", @@ -184,6 +216,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "1024", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, { "policy_name": "special_warning_policy", @@ -193,6 +226,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "1024000000", "move_factor": 0.1, + "prefer_not_to_merge": 0, }, ] @@ -306,6 +340,9 @@ def get_used_disks_for_table(node, table_name): table_name)).strip().split('\n') +def get_used_parts_for_table(node, table_name): + return node.query("SELECT name FROM system.parts WHERE table = '{}' AND active = 1 ORDER BY modification_time".format(table_name)).splitlines() + def test_no_warning_about_zero_max_data_part_size(start_cluster): def get_log(node): return node.exec_in_container(["bash", "-c", "cat /var/log/clickhouse-server/clickhouse-server.log"]) @@ -370,6 +407,8 @@ def test_round_robin(start_cluster, name, engine): ]) def test_max_data_part_size(start_cluster, name, engine): try: + assert int(*node1.query("""SELECT max_data_part_size FROM system.storage_policies WHERE policy_name = 'jbods_with_external' AND volume_name = 'main'""").splitlines()) == 10*1024*1024 + node1.query(""" CREATE TABLE {name} ( s1 String @@ -832,7 +871,7 @@ def test_concurrent_alter_move(start_cluster, name, engine): tasks.append(p.apply_async(optimize_table, (100,))) for task in tasks: - task.get(timeout=120) + task.get(timeout=240) assert node1.query("SELECT 1") == "1\n" assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n" @@ -1263,8 +1302,7 @@ def test_move_while_merge(start_cluster): node1.query("INSERT INTO {name} VALUES (1)".format(name=name)) node1.query("INSERT INTO {name} VALUES (2)".format(name=name)) - parts = node1.query( - "SELECT name FROM system.parts WHERE table = '{name}' AND active = 1".format(name=name)).splitlines() + parts = get_used_parts_for_table(node1, name) assert len(parts) == 2 def optimize(): @@ -1329,7 +1367,10 @@ def test_move_across_policies_does_not_work(start_cluster): """.format(name=name)) node1.query("""INSERT INTO {name} VALUES (1)""".format(name=name)) - node1.query("""ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'jbod2'""".format(name=name)) + try: + node1.query("""ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'jbod2'""".format(name=name)) + except QueryRuntimeException: + """All parts of partition 'all' are already on disk 'jbod2'.""" with pytest.raises(QueryRuntimeException, match='.*because disk does not belong to storage policy.*'): node1.query("""ALTER TABLE {name}2 ATTACH PARTITION tuple() FROM {name}""".format(name=name)) @@ -1345,3 +1386,160 @@ def test_move_across_policies_does_not_work(start_cluster): finally: node1.query("DROP TABLE IF EXISTS {name}".format(name=name)) node1.query("DROP TABLE IF EXISTS {name}2".format(name=name)) + + +def _insert_merge_execute(node, name, policy, parts, cmds, parts_before_cmds, parts_after_cmds): + try: + node.query(""" + CREATE TABLE {name} ( + n Int64 + ) ENGINE = MergeTree + ORDER BY tuple() + PARTITION BY tuple() + TTL now()-1 TO VOLUME 'external' + SETTINGS storage_policy='{policy}' + """.format(name=name, policy=policy)) + + for i in range(parts): + node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i)) + + disks = get_used_disks_for_table(node, name) + assert set(disks) == {"external"} + + node.query("""OPTIMIZE TABLE {name}""".format(name=name)) + + parts = get_used_parts_for_table(node, name) + assert len(parts) == parts_before_cmds + + for cmd in cmds: + node.query(cmd) + + node.query("""OPTIMIZE TABLE {name}""".format(name=name)) + + parts = get_used_parts_for_table(node, name) + assert len(parts) == parts_after_cmds + + finally: + node.query("DROP TABLE IF EXISTS {name}".format(name=name)) + + +def _check_merges_are_working(node, storage_policy, volume, shall_work): + try: + name = "_check_merges_are_working_{storage_policy}_{volume}".format(storage_policy=storage_policy, volume=volume) + + node.query(""" + CREATE TABLE {name} ( + n Int64 + ) ENGINE = MergeTree + ORDER BY tuple() + PARTITION BY tuple() + SETTINGS storage_policy='{storage_policy}' + """.format(name=name, storage_policy=storage_policy)) + + created_parts = 24 + + for i in range(created_parts): + node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i)) + try: + node.query("""ALTER TABLE {name} MOVE PARTITION tuple() TO VOLUME '{volume}' """.format(name=name, volume=volume)) + except: + """Ignore 'nothing to move'.""" + + expected_disks = set(node.query(""" + SELECT disks FROM system.storage_policies ARRAY JOIN disks WHERE volume_name = '{volume_name}' + """.format(volume_name=volume)).splitlines()) + + disks = get_used_disks_for_table(node, name) + assert set(disks) <= expected_disks + + node.query("""OPTIMIZE TABLE {name} FINAL""".format(name=name)) + + parts = get_used_parts_for_table(node, name) + assert len(parts) == 1 if shall_work else created_parts + + finally: + node.query("DROP TABLE IF EXISTS {name}".format(name=name)) + + +def _get_prefer_not_to_merge_for_storage_policy(node, storage_policy): + return list(map(int, node.query("SELECT prefer_not_to_merge FROM system.storage_policies WHERE policy_name = '{}' ORDER BY volume_priority".format(storage_policy)).splitlines())) + + +def test_simple_merge_tree_merges_are_disabled(start_cluster): + _check_merges_are_working(node1, "small_jbod_with_external_no_merges", "external", False) + + +def test_no_merges_in_configuration_allow_from_query_without_reload(start_cluster): + try: + name = "test_no_merges_in_configuration_allow_from_query_without_reload" + policy = "small_jbod_with_external_no_merges" + node1.restart_clickhouse(kill=True) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1] + _check_merges_are_working(node1, policy, "external", False) + + _insert_merge_execute(node1, name, policy, 2, [ + "SYSTEM START MERGES ON VOLUME {}.external".format(policy) + ], 2, 1) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0] + _check_merges_are_working(node1, policy, "external", True) + + finally: + node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy)) + + +def test_no_merges_in_configuration_allow_from_query_with_reload(start_cluster): + try: + name = "test_no_merges_in_configuration_allow_from_query_with_reload" + policy = "small_jbod_with_external_no_merges" + node1.restart_clickhouse(kill=True) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1] + _check_merges_are_working(node1, policy, "external", False) + + _insert_merge_execute(node1, name, policy, 2, [ + "SYSTEM START MERGES ON VOLUME {}.external".format(policy), + "SYSTEM RELOAD CONFIG" + ], 2, 1) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0] + _check_merges_are_working(node1, policy, "external", True) + + finally: + node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy)) + + +def test_yes_merges_in_configuration_disallow_from_query_without_reload(start_cluster): + try: + name = "test_yes_merges_in_configuration_allow_from_query_without_reload" + policy = "small_jbod_with_external" + node1.restart_clickhouse(kill=True) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0] + _check_merges_are_working(node1, policy, "external", True) + + _insert_merge_execute(node1, name, policy, 2, [ + "SYSTEM STOP MERGES ON VOLUME {}.external".format(policy), + "INSERT INTO {name} VALUES (2)".format(name=name) + ], 1, 2) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1] + _check_merges_are_working(node1, policy, "external", False) + + finally: + node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy)) + + +def test_yes_merges_in_configuration_disallow_from_query_with_reload(start_cluster): + try: + name = "test_yes_merges_in_configuration_allow_from_query_with_reload" + policy = "small_jbod_with_external" + node1.restart_clickhouse(kill=True) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0] + _check_merges_are_working(node1, policy, "external", True) + + _insert_merge_execute(node1, name, policy, 2, [ + "SYSTEM STOP MERGES ON VOLUME {}.external".format(policy), + "INSERT INTO {name} VALUES (2)".format(name=name), + "SYSTEM RELOAD CONFIG" + ], 1, 2) + assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1] + _check_merges_are_working(node1, policy, "external", False) + + finally: + node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy))