Added disable_merges option for volumes in multi-disk configuration (#13956)

Co-authored-by: Alexander Kazakov <Akazz@users.noreply.github.com>
This commit is contained in:
Vladimir Chebotarev 2020-10-20 18:10:24 +03:00 committed by GitHub
parent b4f0e08369
commit aa5f207fd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 609 additions and 119 deletions

View File

@ -510,6 +510,8 @@ namespace ErrorCodes
extern const int ROW_AND_ROWS_TOGETHER = 544;
extern const int FIRST_AND_NEXT_TOGETHER = 545;
extern const int NO_ROW_DELIMITER = 546;
extern const int INVALID_RAID_TYPE = 547;
extern const int UNKNOWN_VOLUME = 548;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -23,8 +23,11 @@ public:
DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context);
DiskSelector(const DiskSelector & from) : disks(from.disks) { }
DiskSelectorPtr
updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const;
DiskSelectorPtr updateFromConfig(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context
) const;
/// Get disk by name
DiskPtr get(const String & name) const;

View File

@ -9,7 +9,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int INCONSISTENT_RESERVATIONS;
extern const int NO_RESERVATIONS_PROVIDED;
extern const int UNKNOWN_VOLUME_TYPE;
@ -51,7 +51,7 @@ IVolume::IVolume(
}
if (disks.empty())
throw Exception("Volume must contain at least one disk.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
throw Exception("Volume must contain at least one disk", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
UInt64 IVolume::getMaxUnreservedFreeSpace() const

View File

@ -64,6 +64,12 @@ public:
virtual DiskPtr getDisk(size_t i) const { return disks[i]; }
const Disks & getDisks() const { return disks; }
/// Returns effective value of whether merges are allowed on this volume (true) or not (false).
virtual bool areMergesAvoided() const { return false; }
/// User setting for enabling and disabling merges on volume.
virtual void setAvoidMergesUserOverride(bool /*avoid*/) {}
protected:
Disks disks;
const String name;

View File

@ -8,7 +8,7 @@ namespace DB
class SingleDiskVolume : public IVolume
{
public:
SingleDiskVolume(const String & name_, DiskPtr disk): IVolume(name_, {disk})
SingleDiskVolume(const String & name_, DiskPtr disk, size_t max_data_part_size_ = 0): IVolume(name_, {disk}, max_data_part_size_)
{
}

View File

@ -11,6 +11,13 @@
#include <Poco/File.h>
namespace
{
const auto DEFAULT_STORAGE_POLICY_NAME = "default";
const auto DEFAULT_VOLUME_NAME = "default";
const auto DEFAULT_DISK_NAME = "default";
}
namespace DB
{
@ -18,11 +25,14 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int UNKNOWN_DISK;
extern const int UNKNOWN_POLICY;
extern const int UNKNOWN_VOLUME;
extern const int LOGICAL_ERROR;
}
StoragePolicy::StoragePolicy(
String name_,
const Poco::Util::AbstractConfiguration & config,
@ -30,44 +40,42 @@ StoragePolicy::StoragePolicy(
DiskSelectorPtr disks)
: name(std::move(name_))
{
String volumes_prefix = config_prefix + ".volumes";
if (!config.has(volumes_prefix))
throw Exception("StoragePolicy must contain at least one volume (.volumes)", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
Poco::Util::AbstractConfiguration::Keys keys;
String volumes_prefix = config_prefix + ".volumes";
if (!config.has(volumes_prefix))
{
if (name != DEFAULT_STORAGE_POLICY_NAME)
throw Exception("Storage policy " + backQuote(name) + " must contain at least one volume (.volumes)", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
else
{
config.keys(volumes_prefix, keys);
}
for (const auto & attr_name : keys)
{
if (!std::all_of(attr_name.begin(), attr_name.end(), isWordCharASCII))
throw Exception(
"Volume name can contain only alphanumeric and '_' (" + attr_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
volumes.push_back(std::make_shared<VolumeJBOD>(attr_name, config, volumes_prefix + "." + attr_name, disks));
if (volumes_names.find(attr_name) != volumes_names.end())
throw Exception("Volumes names must be unique (" + attr_name + " duplicated)", ErrorCodes::UNKNOWN_POLICY);
volumes_names[attr_name] = volumes.size() - 1;
"Volume name can contain only alphanumeric and '_' in storage policy " + backQuote(name) + " (" + attr_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
volumes.emplace_back(createVolumeFromConfig(attr_name, config, volumes_prefix + "." + attr_name, disks));
}
if (volumes.empty() && name == DEFAULT_STORAGE_POLICY_NAME)
{
auto default_volume = std::make_shared<VolumeJBOD>(DEFAULT_VOLUME_NAME, std::vector<DiskPtr>{disks->get(DEFAULT_DISK_NAME)}, 0, false);
volumes.emplace_back(std::move(default_volume));
}
if (volumes.empty())
throw Exception("StoragePolicy must contain at least one volume.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
throw Exception("Storage policy " + backQuote(name) + " must contain at least one volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
/// Check that disks are unique in Policy
std::set<String> disk_names;
for (const auto & volume : volumes)
{
for (const auto & disk : volume->getDisks())
{
if (disk_names.find(disk->getName()) != disk_names.end())
throw Exception(
"Duplicate disk '" + disk->getName() + "' in storage policy '" + name + "'", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
disk_names.insert(disk->getName());
}
}
move_factor = config.getDouble(config_prefix + ".move_factor", 0.1);
const double default_move_factor = volumes.size() > 1 ? 0.1 : 0.0;
move_factor = config.getDouble(config_prefix + ".move_factor", default_move_factor);
if (move_factor > 1)
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor), ErrorCodes::LOGICAL_ERROR);
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
buildVolumeIndices();
}
@ -75,16 +83,43 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_
: volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
{
if (volumes.empty())
throw Exception("StoragePolicy must contain at least one Volume.", ErrorCodes::UNKNOWN_POLICY);
throw Exception("Storage policy " + backQuote(name) + " must contain at least one Volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (move_factor > 1)
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor), ErrorCodes::LOGICAL_ERROR);
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < volumes.size(); ++i)
buildVolumeIndices();
}
StoragePolicy::StoragePolicy(const StoragePolicy & storage_policy,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disks)
: StoragePolicy(storage_policy.getName(), config, config_prefix, disks)
{
if (volumes_names.find(volumes[i]->getName()) != volumes_names.end())
throw Exception("Volumes names must be unique (" + volumes[i]->getName() + " duplicated).", ErrorCodes::UNKNOWN_POLICY);
volumes_names[volumes[i]->getName()] = i;
for (auto & volume : volumes)
{
if (storage_policy.volume_index_by_volume_name.count(volume->getName()) > 0)
{
auto old_volume = storage_policy.getVolumeByName(volume->getName());
try
{
auto new_volume = updateVolumeFromConfig(old_volume, config, config_prefix + ".volumes." + volume->getName(), disks);
volume = std::move(new_volume);
}
catch (Exception & e)
{
/// Default policies are allowed to be missed in configuration.
if (e.code() != ErrorCodes::NO_ELEMENTS_IN_CONFIG || storage_policy.getName() != DEFAULT_STORAGE_POLICY_NAME)
throw;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
if (!keys.empty())
throw;
}
}
}
}
@ -93,20 +128,20 @@ bool StoragePolicy::isDefaultPolicy() const
{
/// Guessing if this policy is default, not 100% correct though.
if (getName() != "default")
if (getName() != DEFAULT_STORAGE_POLICY_NAME)
return false;
if (volumes.size() != 1)
return false;
if (volumes[0]->getName() != "default")
if (volumes[0]->getName() != DEFAULT_VOLUME_NAME)
return false;
const auto & disks = volumes[0]->getDisks();
if (disks.size() != 1)
return false;
if (disks[0]->getName() != "default")
if (disks[0]->getName() != DEFAULT_DISK_NAME)
return false;
return true;
@ -128,10 +163,10 @@ DiskPtr StoragePolicy::getAnyDisk() const
/// StoragePolicy must contain at least one Volume
/// Volume must contain at least one Disk
if (volumes.empty())
throw Exception("StoragePolicy has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Storage policy " + backQuote(name) + " has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR);
if (volumes[0]->getDisks().empty())
throw Exception("Volume '" + volumes[0]->getName() + "' has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Volume " + backQuote(name) + "." + backQuote(volumes[0]->getName()) + " has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR);
return volumes[0]->getDisks()[0];
}
@ -195,6 +230,24 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
}
VolumePtr StoragePolicy::getVolume(size_t index) const
{
if (index < volume_index_by_volume_name.size())
return volumes[index];
else
throw Exception("No volume with index " + std::to_string(index) + " in storage policy " + backQuote(name), ErrorCodes::UNKNOWN_VOLUME);
}
VolumePtr StoragePolicy::getVolumeByName(const String & volume_name) const
{
auto it = volume_index_by_volume_name.find(volume_name);
if (it == volume_index_by_volume_name.end())
throw Exception("No such volume " + backQuote(volume_name) + " in storage policy " + backQuote(name), ErrorCodes::UNKNOWN_VOLUME);
return getVolume(it->second);
}
void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const
{
std::unordered_set<String> new_volume_names;
@ -204,7 +257,7 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol
for (const auto & volume : getVolumes())
{
if (new_volume_names.count(volume->getName()) == 0)
throw Exception("New storage policy shall contain volumes of old one", ErrorCodes::BAD_ARGUMENTS);
throw Exception("New storage policy " + backQuote(name) + " shall contain volumes of old one", ErrorCodes::BAD_ARGUMENTS);
std::unordered_set<String> new_disk_names;
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->getDisks())
@ -212,21 +265,46 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol
for (const auto & disk : volume->getDisks())
if (new_disk_names.count(disk->getName()) == 0)
throw Exception("New storage policy shall contain disks of old one", ErrorCodes::BAD_ARGUMENTS);
throw Exception("New storage policy " + backQuote(name) + " shall contain disks of old one", ErrorCodes::BAD_ARGUMENTS);
}
}
size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
{
for (size_t i = 0; i < volumes.size(); ++i)
{
const auto & volume = volumes[i];
for (const auto & disk : volume->getDisks())
if (disk->getName() == disk_ptr->getName())
return i;
auto it = volume_index_by_disk_name.find(disk_ptr->getName());
if (it != volume_index_by_disk_name.end())
return it->second;
else
throw Exception("No disk " + backQuote(disk_ptr->getName()) + " in policy " + backQuote(name), ErrorCodes::UNKNOWN_DISK);
}
void StoragePolicy::buildVolumeIndices()
{
for (size_t index = 0; index < volumes.size(); ++index)
{
const VolumePtr & volume = volumes[index];
if (volume_index_by_volume_name.find(volume->getName()) != volume_index_by_volume_name.end())
throw Exception("Volume names must be unique in storage policy "
+ backQuote(name) + " (" + backQuote(volume->getName()) + " is duplicated)"
, ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
volume_index_by_volume_name[volume->getName()] = index;
for (const auto & disk : volume->getDisks())
{
const String & disk_name = disk->getName();
if (volume_index_by_disk_name.find(disk_name) != volume_index_by_disk_name.end())
throw Exception("Disk names must be unique in storage policy "
+ backQuote(name) + " (" + backQuote(disk_name) + " is duplicated)"
, ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
volume_index_by_disk_name[disk_name] = index;
}
}
throw Exception("No disk " + disk_ptr->getName() + " in policy " + name, ErrorCodes::UNKNOWN_DISK);
}
@ -242,44 +320,40 @@ StoragePolicySelector::StoragePolicySelector(
{
if (!std::all_of(name.begin(), name.end(), isWordCharASCII))
throw Exception(
"StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
"Storage policy name can contain only alphanumeric and '_' (" + backQuote(name) + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
LOG_INFO(&Poco::Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name));
}
constexpr auto default_storage_policy_name = "default";
constexpr auto default_volume_name = "default";
constexpr auto default_disk_name = "default";
/// Add default policy if it's not specified explicetly
if (policies.find(default_storage_policy_name) == policies.end())
/// Add default policy if it isn't explicitly specified.
if (policies.find(DEFAULT_STORAGE_POLICY_NAME) == policies.end())
{
auto default_volume = std::make_shared<VolumeJBOD>(default_volume_name, std::vector<DiskPtr>{disks->get(default_disk_name)}, 0);
auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, Volumes{default_volume}, 0.0);
policies.emplace(default_storage_policy_name, default_policy);
auto default_policy = std::make_shared<StoragePolicy>(DEFAULT_STORAGE_POLICY_NAME, config, config_prefix + "." + DEFAULT_STORAGE_POLICY_NAME, disks);
policies.emplace(DEFAULT_STORAGE_POLICY_NAME, std::move(default_policy));
}
}
StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
std::shared_ptr<StoragePolicySelector> result = std::make_shared<StoragePolicySelector>(config, config_prefix, disks);
constexpr auto default_storage_policy_name = "default";
/// First pass, check.
for (const auto & [name, policy] : policies)
{
if (name != default_storage_policy_name && result->policies.count(name) == 0)
if (result->policies.count(name) == 0)
throw Exception("Storage policy " + backQuote(name) + " is missing in new configuration", ErrorCodes::BAD_ARGUMENTS);
policy->checkCompatibleWith(result->policies[name]);
}
/// Second pass, load.
for (const auto & [name, policy] : policies)
{
result->policies[name] = std::make_shared<StoragePolicy>(*policy, config, config_prefix + "." + name, disks);
}
return result;
}
@ -288,7 +362,7 @@ StoragePolicyPtr StoragePolicySelector::get(const String & name) const
{
auto it = policies.find(name);
if (it == policies.end())
throw Exception("Unknown StoragePolicy " + name, ErrorCodes::UNKNOWN_POLICY);
throw Exception("Unknown storage policy " + backQuote(name), ErrorCodes::UNKNOWN_POLICY);
return it->second;
}

View File

@ -14,6 +14,7 @@
#include <memory>
#include <mutex>
#include <unordered_map>
#include <unistd.h>
#include <boost/noncopyable.hpp>
#include <Poco/Util/AbstractConfiguration.h>
@ -36,6 +37,13 @@ public:
StoragePolicy(String name_, Volumes volumes_, double move_factor_);
StoragePolicy(
const StoragePolicy & storage_policy,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disks
);
bool isDefaultPolicy() const;
/// Returns disks ordered by volumes priority
@ -72,16 +80,10 @@ public:
/// which should be kept with help of background moves
double getMoveFactor() const { return move_factor; }
/// Get volume by index from storage_policy
VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); }
/// Get volume by index.
VolumePtr getVolume(size_t index) const;
VolumePtr getVolumeByName(const String & volume_name) const
{
auto it = volumes_names.find(volume_name);
if (it == volumes_names.end())
return {};
return getVolume(it->second);
}
VolumePtr getVolumeByName(const String & volume_name) const;
/// Checks if storage policy can be replaced by another one.
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const;
@ -89,12 +91,15 @@ public:
private:
Volumes volumes;
const String name;
std::map<String, size_t> volumes_names;
std::unordered_map<String, size_t> volume_index_by_volume_name;
std::unordered_map<String, size_t> volume_index_by_disk_name;
/// move_factor from interval [0., 1.]
/// We move something if disk from this policy
/// filled more than total_size * move_factor
double move_factor = 0.1; /// by default move factor is 10%
void buildVolumeIndices();
};

View File

@ -56,11 +56,23 @@ VolumeJBOD::VolumeJBOD(
/// Default value is 'true' due to backward compatibility.
perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true);
are_merges_avoided = config.getBool(config_prefix + ".prefer_not_to_merge", false);
}
VolumeJBOD::VolumeJBOD(const VolumeJBOD & volume_jbod,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector)
: VolumeJBOD(volume_jbod.name, config, config_prefix, disk_selector)
{
are_merges_avoided_user_override = volume_jbod.are_merges_avoided_user_override.load(std::memory_order_relaxed);
last_used = volume_jbod.last_used.load(std::memory_order_relaxed);
}
DiskPtr VolumeJBOD::getDisk(size_t /* index */) const
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t index = start_from % disks.size();
return disks[index];
}
@ -73,7 +85,7 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t start_from = last_used.fetch_add(1u, std::memory_order_acq_rel);
size_t disks_num = disks.size();
for (size_t i = 0; i < disks_num; ++i)
{
@ -87,4 +99,19 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
return {};
}
bool VolumeJBOD::areMergesAvoided() const
{
auto are_merges_avoided_user_override_value = are_merges_avoided_user_override.load(std::memory_order_acquire);
if (are_merges_avoided_user_override_value)
return *are_merges_avoided_user_override_value;
else
return are_merges_avoided;
}
void VolumeJBOD::setAvoidMergesUserOverride(bool avoid)
{
are_merges_avoided_user_override.store(avoid, std::memory_order_release);
}
}

View File

@ -1,10 +1,19 @@
#pragma once
#include <memory>
#include <optional>
#include <Disks/IVolume.h>
namespace DB
{
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
using VolumesJBOD = std::vector<VolumeJBODPtr>;
/**
* Implements something similar to JBOD (https://en.wikipedia.org/wiki/Non-RAID_drive_architectures#JBOD).
* When MergeTree engine wants to write part it requests VolumeJBOD to reserve space on the next available
@ -13,8 +22,9 @@ namespace DB
class VolumeJBOD : public IVolume
{
public:
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_)
VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_)
: IVolume(name_, disks_, max_data_part_size_)
, are_merges_avoided(are_merges_avoided_)
{
}
@ -25,6 +35,13 @@ public:
DiskSelectorPtr disk_selector
);
VolumeJBOD(
const VolumeJBOD & volume_jbod,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector
);
VolumeType getType() const override { return VolumeType::JBOD; }
/// Always returns next disk (round-robin), ignores argument.
@ -38,11 +55,19 @@ public:
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) override;
bool areMergesAvoided() const override;
void setAvoidMergesUserOverride(bool avoid) override;
/// True if parts on this volume participate in merges according to configuration.
bool are_merges_avoided = true;
private:
/// Index of last used disk.
mutable std::atomic<size_t> last_used = 0;
/// True if parts on this volume participate in merges according to START/STOP MERGES ON VOLUME.
std::atomic<std::optional<bool>> are_merges_avoided_user_override{std::nullopt};
};
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
using VolumesJBOD = std::vector<VolumeJBODPtr>;
}

View File

@ -3,18 +3,23 @@
#include <Disks/createVolume.h>
#include <Disks/VolumeJBOD.h>
namespace DB
{
/// Volume which reserserves space on each underlying disk.
class VolumeRAID1;
using VolumeRAID1Ptr = std::shared_ptr<VolumeRAID1>;
/// Volume which reserves space on each underlying disk.
///
/// NOTE: Just interface implementation, doesn't used in codebase,
/// also not available for user.
class VolumeRAID1 : public VolumeJBOD
{
public:
VolumeRAID1(String name_, Disks disks_, UInt64 max_data_part_size_)
: VolumeJBOD(name_, disks_, max_data_part_size_)
VolumeRAID1(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_in_config_)
: VolumeJBOD(name_, disks_, max_data_part_size_, are_merges_avoided_in_config_)
{
}
@ -27,11 +32,18 @@ public:
{
}
VolumeRAID1(
VolumeRAID1 & volume_raid1,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector)
: VolumeJBOD(volume_raid1, config, config_prefix, disk_selector)
{
}
VolumeType getType() const override { return VolumeType::RAID1; }
ReservationPtr reserve(UInt64 bytes) override;
};
using VolumeRAID1Ptr = std::shared_ptr<VolumeRAID1>;
}

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_RAID_TYPE;
extern const int INVALID_RAID_TYPE;
}
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume)
@ -20,12 +21,12 @@ VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, Volume
{
/// Since reservation on JBOD chooses one of disks and makes reservation there, volume
/// for such type of reservation will be with one disk.
return std::make_shared<SingleDiskVolume>(other_volume->getName(), reservation->getDisk());
return std::make_shared<SingleDiskVolume>(other_volume->getName(), reservation->getDisk(), other_volume->max_data_part_size);
}
if (other_volume->getType() == VolumeType::RAID1)
{
auto volume = std::dynamic_pointer_cast<VolumeRAID1>(other_volume);
return std::make_shared<VolumeRAID1>(volume->getName(), reservation->getDisks(), volume->max_data_part_size);
return std::make_shared<VolumeRAID1>(volume->getName(), reservation->getDisks(), volume->max_data_part_size, volume->are_merges_avoided);
}
return nullptr;
}
@ -37,17 +38,31 @@ VolumePtr createVolumeFromConfig(
DiskSelectorPtr disk_selector
)
{
auto has_raid_type = config.has(config_prefix + ".raid_type");
if (!has_raid_type)
{
return std::make_shared<VolumeJBOD>(name, config, config_prefix, disk_selector);
}
String raid_type = config.getString(config_prefix + ".raid_type");
String raid_type = config.getString(config_prefix + ".raid_type", "JBOD");
if (raid_type == "JBOD")
{
return std::make_shared<VolumeJBOD>(name, config, config_prefix, disk_selector);
}
throw Exception("Unknown raid type '" + raid_type + "'", ErrorCodes::UNKNOWN_RAID_TYPE);
throw Exception("Unknown RAID type '" + raid_type + "'", ErrorCodes::UNKNOWN_RAID_TYPE);
}
VolumePtr updateVolumeFromConfig(
VolumePtr volume,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr & disk_selector
)
{
String raid_type = config.getString(config_prefix + ".raid_type", "JBOD");
if (raid_type == "JBOD")
{
VolumeJBODPtr volume_jbod = std::dynamic_pointer_cast<VolumeJBOD>(volume);
if (!volume_jbod)
throw Exception("Invalid RAID type '" + raid_type + "', shall be JBOD", ErrorCodes::INVALID_RAID_TYPE);
return std::make_shared<VolumeJBOD>(*volume_jbod, config, config_prefix, disk_selector);
}
throw Exception("Unknown RAID type '" + raid_type + "'", ErrorCodes::UNKNOWN_RAID_TYPE);
}
}

View File

@ -6,6 +6,7 @@ namespace DB
{
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume);
VolumePtr createVolumeFromConfig(
String name_,
const Poco::Util::AbstractConfiguration & config,
@ -13,4 +14,11 @@ VolumePtr createVolumeFromConfig(
DiskSelectorPtr disk_selector
);
VolumePtr updateVolumeFromConfig(
VolumePtr volume,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr & disk_selector
);
}

View File

@ -590,7 +590,7 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk);
shared->tmp_volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
}
else
{

View File

@ -133,7 +133,11 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
auto manager = context.getActionLocksManager();
manager->cleanExpired();
if (table_id)
if (volume_ptr && action_type == ActionLocks::PartsMerge)
{
volume_ptr->setAvoidMergesUserOverride(!start);
}
else if (table_id)
{
context.checkAccess(getRequiredAccessType(action_type), table_id);
if (start)
@ -199,6 +203,10 @@ BlockIO InterpreterSystemQuery::execute()
if (!query.target_dictionary.empty() && !query.database.empty())
query.target_dictionary = query.database + "." + query.target_dictionary;
volume_ptr = {};
if (!query.storage_policy.empty() && !query.volume.empty())
volume_ptr = context.getStoragePolicy(query.storage_policy)->getVolumeByName(query.volume);
switch (query.type)
{
case Type::SHUTDOWN:

View File

@ -5,6 +5,7 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
#include <Common/ActionLock.h>
#include <Disks/IVolume.h>
namespace Poco { class Logger; }
@ -44,6 +45,7 @@ private:
Context & context;
Poco::Logger * log = nullptr;
StorageID table_id = StorageID::createEmpty(); /// Will be set up if query contains table name
VolumePtr volume_ptr;
/// Tries to get a replicated table and restart it
/// Returns pointer to a newly created table if the restart was successful

View File

@ -118,7 +118,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
<< (settings.hilite ? hilite_none : "");
};
auto print_drop_replica = [&] {
auto print_drop_replica = [&]
{
settings.ostr << " " << quoteString(replica);
if (!table.empty())
{
@ -140,6 +141,16 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
}
};
auto print_on_volume = [&]
{
settings.ostr << " ON VOLUME "
<< (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(storage_policy)
<< (settings.hilite ? hilite_none : "")
<< "."
<< (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(volume)
<< (settings.hilite ? hilite_none : "");
};
if (!cluster.empty())
formatOnCluster(settings);
@ -160,6 +171,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
{
if (!table.empty())
print_database_table();
else if (!volume.empty())
print_on_volume();
}
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED)
{

View File

@ -65,6 +65,8 @@ public:
String replica;
String replica_zk_path;
bool is_drop_whole_replica;
String storage_policy;
String volume;
String getID(char) const override { return "SYSTEM query"; }

View File

@ -129,6 +129,33 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::STOP_MERGES:
case Type::START_MERGES:
{
String storage_policy_str;
String volume_str;
if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected))
{
ASTPtr ast;
if (ParserIdentifier{}.parse(pos, ast, expected))
storage_policy_str = ast->as<ASTIdentifier &>().name;
else
return false;
if (!ParserToken{TokenType::Dot}.ignore(pos, expected))
return false;
if (ParserIdentifier{}.parse(pos, ast, expected))
volume_str = ast->as<ASTIdentifier &>().name;
else
return false;
}
res->storage_policy = storage_policy_str;
res->volume = volume_str;
if (res->volume.empty() && res->storage_policy.empty())
parseDatabaseAndTableName(pos, expected, res->database, res->table);
break;
}
case Type::STOP_TTL_MERGES:
case Type::START_TTL_MERGES:
case Type::STOP_MOVES:

View File

@ -311,7 +311,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
NativeBlockInputStream block_in(in, 0);
auto block = block_in.read();
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, reservation->getDisk());
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, reservation->getDisk(), 0);
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, volume);
@ -408,7 +408,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
assertEOF(in);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);

View File

@ -760,6 +760,16 @@ void IMergeTreeDataPart::loadColumns(bool require)
column_name_to_position.emplace(column.name, pos++);
}
bool IMergeTreeDataPart::shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const
{
/// `IMergeTreeDataPart::volume` describes space where current part belongs, and holds
/// `SingleDiskVolume` object which does not contain up-to-date settings of corresponding volume.
/// Therefore we shall obtain volume from storage policy.
auto volume_ptr = storage_policy->getVolume(storage_policy->getVolumeIndexByDisk(volume->getDisk()));
return !volume_ptr->areMergesAvoided();
}
UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from)
{
if (disk_->isFile(from))

View File

@ -326,6 +326,10 @@ public:
/// storage and pass it to this method.
virtual bool hasColumnFiles(const String & /* column */, const IDataType & /* type */) const { return false; }
/// Returns true if this part shall participate in merges according to
/// settings of given storage policy.
bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const;
/// Calculate the total size of the entire directory with all the files
static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from);
void calculateColumnsSizesOnDisk();

View File

@ -48,6 +48,8 @@ public:
/// Part compression codec definition.
ASTPtr compression_codec_desc;
bool shall_participate_in_merges = true;
};
/// Parts are belong to partitions. Only parts within same partition could be merged.

View File

@ -776,7 +776,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto part = createPart(part_name, part_info, single_disk_volume, part_name);
bool broken = false;
@ -2996,7 +2996,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part {}", part_names.second);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_names.first, name_to_disk[part_names.first]);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_names.first, name_to_disk[part_names.first], 0);
MutableDataPartPtr part = createPart(part_names.first, single_disk_volume, source_dir + part_names.second);
loadPartAndFixMetadataImpl(part);
loaded_parts.push_back(part);
@ -3409,7 +3409,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
localBackup(disk, src_part_path, dst_part_path);
disk->removeIfExists(dst_part_path + "/" + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name);
dst_data_part->is_temp = true;

View File

@ -226,6 +226,8 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
IMergeSelector::PartsRanges parts_ranges;
StoragePolicyPtr storage_policy = data.getStoragePolicy();
const String * prev_partition_id = nullptr;
/// Previous part only in boundaries of partition frame
const MergeTreeData::DataPartPtr * prev_part = nullptr;
@ -275,6 +277,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
part_info.data = &part;
part_info.ttl_infos = &part->ttl_infos;
part_info.compression_codec_desc = part->default_codec->getFullCodecDesc();
part_info.shall_participate_in_merges = part->shallParticipateInMerges(storage_policy);
parts_ranges.back().emplace_back(part_info);
@ -667,7 +670,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merging_columns,
merging_column_names);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, disk);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
future_part.name,
future_part.type,
@ -1127,7 +1130,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, space_reservation->getDisk());
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, space_reservation->getDisk(), 0);
auto new_data_part = data.createPart(
future_part.name, future_part.type, future_part.part_info, single_disk_volume, "tmp_mut_" + future_part.name);

View File

@ -199,7 +199,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
const String directory_to_move = "moving";
moving_part.part->makeCloneOnDisk(moving_part.reserved_space->getDisk(), directory_to_move);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + moving_part.part->name, moving_part.reserved_space->getDisk());
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + moving_part.part->name, moving_part.reserved_space->getDisk(), 0);
MergeTreeData::MutableDataPartPtr cloned_part =
data->createPart(moving_part.part->name, single_disk_volume, directory_to_move + '/' + moving_part.part->name);
LOG_TRACE(log, "Part {} was cloned to {}", moving_part.part->name, cloned_part->getFullPath());

View File

@ -130,7 +130,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
else if (action_type == ActionType::ADD_PART)
{
auto part_disk = storage.reserveSpace(0)->getDisk();
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
part = storage.createPart(
part_name,

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Common/interpolate.h>
#include <cmath>
@ -152,6 +153,9 @@ void selectWithinPartition(
if (begin > 1000)
break;
if (!parts[begin].shall_participate_in_merges)
continue;
size_t sum_size = parts[begin].size;
size_t max_size = parts[begin].size;
size_t min_age = parts[begin].age;
@ -161,6 +165,9 @@ void selectWithinPartition(
if (settings.max_parts_to_merge_at_once && end - begin > settings.max_parts_to_merge_at_once)
break;
if (!parts[end - 1].shall_participate_in_merges)
break;
size_t cur_size = parts[end - 1].size;
size_t cur_age = parts[end - 1].age;

View File

@ -25,6 +25,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select(
ssize_t partition_to_merge_index = -1;
time_t partition_to_merge_min_ttl = 0;
/// Find most old TTL.
for (size_t i = 0; i < parts_ranges.size(); ++i)
{
const auto & mergeable_parts_in_partition = parts_ranges[i];
@ -56,6 +57,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select(
Iterator best_end = best_begin + 1;
size_t total_size = 0;
/// Find begin of range with most old TTL.
while (true)
{
time_t ttl = getTTLForPart(*best_begin);
@ -63,6 +65,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select(
if (!ttl || isTTLAlreadySatisfied(*best_begin) || ttl > current_time
|| (max_total_size_to_merge && total_size > max_total_size_to_merge))
{
/// This condition can not be satisfied on first iteration.
++best_begin;
break;
}
@ -74,6 +77,7 @@ IMergeSelector::PartsRange ITTLMergeSelector::select(
--best_begin;
}
/// Find end of range with most old TTL.
while (best_end != best_partition.end())
{
time_t ttl = getTTLForPart(*best_end);
@ -97,6 +101,19 @@ time_t TTLDeleteMergeSelector::getTTLForPart(const IMergeSelector::Part & part)
return only_drop_parts ? part.ttl_infos->part_max_ttl : part.ttl_infos->part_min_ttl;
}
bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & part) const
{
/// N.B. Satisfied TTL means that TTL is NOT expired.
/// return true -- this part can not be selected
/// return false -- this part can be selected
/// Dropping whole part is an exception to `shall_participate_in_merges` logic.
if (only_drop_parts)
return false;
return !part.shall_participate_in_merges;
}
time_t TTLRecompressMergeSelector::getTTLForPart(const IMergeSelector::Part & part) const
{
return part.ttl_infos->getMinimalMaxRecompressionTTL();
@ -104,6 +121,13 @@ time_t TTLRecompressMergeSelector::getTTLForPart(const IMergeSelector::Part & pa
bool TTLRecompressMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & part) const
{
/// N.B. Satisfied TTL means that TTL is NOT expired.
/// return true -- this part can not be selected
/// return false -- this part can be selected
if (!part.shall_participate_in_merges)
return true;
if (recompression_ttls.empty())
return false;

View File

@ -64,10 +64,7 @@ public:
/// Delete TTL should be checked only by TTL time, there are no other ways
/// to satisfy it.
bool isTTLAlreadySatisfied(const IMergeSelector::Part &) const override
{
return false;
}
bool isTTLAlreadySatisfied(const IMergeSelector::Part &) const override;
private:
bool only_drop_parts;

View File

@ -29,6 +29,7 @@ StorageSystemStoragePolicies::StorageSystemStoragePolicies(const StorageID & tab
{"volume_type", std::make_shared<DataTypeString>()},
{"max_data_part_size", std::make_shared<DataTypeUInt64>()},
{"move_factor", std::make_shared<DataTypeFloat32>()},
{"prefer_not_to_merge", std::make_shared<DataTypeUInt8>()}
}));
// TODO: Add string column with custom volume-type-specific options
setInMemoryMetadata(storage_metadata);
@ -52,6 +53,7 @@ Pipe StorageSystemStoragePolicies::read(
MutableColumnPtr col_volume_type = ColumnString::create();
MutableColumnPtr col_max_part_size = ColumnUInt64::create();
MutableColumnPtr col_move_factor = ColumnFloat32::create();
MutableColumnPtr col_prefer_not_to_merge = ColumnUInt8::create();
for (const auto & [policy_name, policy_ptr] : context.getPoliciesMap())
{
@ -69,6 +71,7 @@ Pipe StorageSystemStoragePolicies::read(
col_volume_type->insert(volumeTypeToString(volumes[i]->getType()));
col_max_part_size->insert(volumes[i]->max_data_part_size);
col_move_factor->insert(policy_ptr->getMoveFactor());
col_prefer_not_to_merge->insert(volumes[i]->areMergesAvoided() ? 1 : 0);
}
}
@ -80,6 +83,7 @@ Pipe StorageSystemStoragePolicies::read(
res_columns.emplace_back(std::move(col_volume_type));
res_columns.emplace_back(std::move(col_max_part_size));
res_columns.emplace_back(std::move(col_move_factor));
res_columns.emplace_back(std::move(col_prefer_not_to_merge));
UInt64 num_rows = res_columns.at(0)->size();
Chunk chunk(std::move(res_columns), num_rows);

View File

@ -30,6 +30,18 @@
</volumes>
</small_jbod_with_external>
<small_jbod_with_external_no_merges>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
<prefer_not_to_merge>true</prefer_not_to_merge>
</external>
</volumes>
</small_jbod_with_external_no_merges>
<one_more_small_jbod_with_external>
<volumes>
<m>

View File

@ -76,6 +76,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "small_jbod_with_external",
@ -85,6 +86,27 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "small_jbod_with_external_no_merges",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "small_jbod_with_external_no_merges",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 1,
},
{
"policy_name": "one_more_small_jbod_with_external",
@ -94,6 +116,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "one_more_small_jbod_with_external",
@ -103,6 +126,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "jbods_with_external",
@ -112,6 +136,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "10485760",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "jbods_with_external",
@ -121,6 +146,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "moving_jbod_with_external",
@ -130,6 +156,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.7,
"prefer_not_to_merge": 0,
},
{
"policy_name": "moving_jbod_with_external",
@ -139,6 +166,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.7,
"prefer_not_to_merge": 0,
},
{
"policy_name": "default_disk_with_external",
@ -148,6 +176,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "2097152",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "default_disk_with_external",
@ -157,6 +186,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "20971520",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "special_warning_policy",
@ -166,6 +196,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "special_warning_policy",
@ -175,6 +206,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "special_warning_policy",
@ -184,6 +216,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "1024",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
{
"policy_name": "special_warning_policy",
@ -193,6 +226,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "1024000000",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
},
]
@ -306,6 +340,9 @@ def get_used_disks_for_table(node, table_name):
table_name)).strip().split('\n')
def get_used_parts_for_table(node, table_name):
return node.query("SELECT name FROM system.parts WHERE table = '{}' AND active = 1 ORDER BY modification_time".format(table_name)).splitlines()
def test_no_warning_about_zero_max_data_part_size(start_cluster):
def get_log(node):
return node.exec_in_container(["bash", "-c", "cat /var/log/clickhouse-server/clickhouse-server.log"])
@ -370,6 +407,8 @@ def test_round_robin(start_cluster, name, engine):
])
def test_max_data_part_size(start_cluster, name, engine):
try:
assert int(*node1.query("""SELECT max_data_part_size FROM system.storage_policies WHERE policy_name = 'jbods_with_external' AND volume_name = 'main'""").splitlines()) == 10*1024*1024
node1.query("""
CREATE TABLE {name} (
s1 String
@ -832,7 +871,7 @@ def test_concurrent_alter_move(start_cluster, name, engine):
tasks.append(p.apply_async(optimize_table, (100,)))
for task in tasks:
task.get(timeout=120)
task.get(timeout=240)
assert node1.query("SELECT 1") == "1\n"
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
@ -1263,8 +1302,7 @@ def test_move_while_merge(start_cluster):
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
parts = node1.query(
"SELECT name FROM system.parts WHERE table = '{name}' AND active = 1".format(name=name)).splitlines()
parts = get_used_parts_for_table(node1, name)
assert len(parts) == 2
def optimize():
@ -1329,7 +1367,10 @@ def test_move_across_policies_does_not_work(start_cluster):
""".format(name=name))
node1.query("""INSERT INTO {name} VALUES (1)""".format(name=name))
try:
node1.query("""ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'jbod2'""".format(name=name))
except QueryRuntimeException:
"""All parts of partition 'all' are already on disk 'jbod2'."""
with pytest.raises(QueryRuntimeException, match='.*because disk does not belong to storage policy.*'):
node1.query("""ALTER TABLE {name}2 ATTACH PARTITION tuple() FROM {name}""".format(name=name))
@ -1345,3 +1386,160 @@ def test_move_across_policies_does_not_work(start_cluster):
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
node1.query("DROP TABLE IF EXISTS {name}2".format(name=name))
def _insert_merge_execute(node, name, policy, parts, cmds, parts_before_cmds, parts_after_cmds):
try:
node.query("""
CREATE TABLE {name} (
n Int64
) ENGINE = MergeTree
ORDER BY tuple()
PARTITION BY tuple()
TTL now()-1 TO VOLUME 'external'
SETTINGS storage_policy='{policy}'
""".format(name=name, policy=policy))
for i in range(parts):
node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i))
disks = get_used_disks_for_table(node, name)
assert set(disks) == {"external"}
node.query("""OPTIMIZE TABLE {name}""".format(name=name))
parts = get_used_parts_for_table(node, name)
assert len(parts) == parts_before_cmds
for cmd in cmds:
node.query(cmd)
node.query("""OPTIMIZE TABLE {name}""".format(name=name))
parts = get_used_parts_for_table(node, name)
assert len(parts) == parts_after_cmds
finally:
node.query("DROP TABLE IF EXISTS {name}".format(name=name))
def _check_merges_are_working(node, storage_policy, volume, shall_work):
try:
name = "_check_merges_are_working_{storage_policy}_{volume}".format(storage_policy=storage_policy, volume=volume)
node.query("""
CREATE TABLE {name} (
n Int64
) ENGINE = MergeTree
ORDER BY tuple()
PARTITION BY tuple()
SETTINGS storage_policy='{storage_policy}'
""".format(name=name, storage_policy=storage_policy))
created_parts = 24
for i in range(created_parts):
node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i))
try:
node.query("""ALTER TABLE {name} MOVE PARTITION tuple() TO VOLUME '{volume}' """.format(name=name, volume=volume))
except:
"""Ignore 'nothing to move'."""
expected_disks = set(node.query("""
SELECT disks FROM system.storage_policies ARRAY JOIN disks WHERE volume_name = '{volume_name}'
""".format(volume_name=volume)).splitlines())
disks = get_used_disks_for_table(node, name)
assert set(disks) <= expected_disks
node.query("""OPTIMIZE TABLE {name} FINAL""".format(name=name))
parts = get_used_parts_for_table(node, name)
assert len(parts) == 1 if shall_work else created_parts
finally:
node.query("DROP TABLE IF EXISTS {name}".format(name=name))
def _get_prefer_not_to_merge_for_storage_policy(node, storage_policy):
return list(map(int, node.query("SELECT prefer_not_to_merge FROM system.storage_policies WHERE policy_name = '{}' ORDER BY volume_priority".format(storage_policy)).splitlines()))
def test_simple_merge_tree_merges_are_disabled(start_cluster):
_check_merges_are_working(node1, "small_jbod_with_external_no_merges", "external", False)
def test_no_merges_in_configuration_allow_from_query_without_reload(start_cluster):
try:
name = "test_no_merges_in_configuration_allow_from_query_without_reload"
policy = "small_jbod_with_external_no_merges"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
_insert_merge_execute(node1, name, policy, 2, [
"SYSTEM START MERGES ON VOLUME {}.external".format(policy)
], 2, 1)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
finally:
node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy))
def test_no_merges_in_configuration_allow_from_query_with_reload(start_cluster):
try:
name = "test_no_merges_in_configuration_allow_from_query_with_reload"
policy = "small_jbod_with_external_no_merges"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
_insert_merge_execute(node1, name, policy, 2, [
"SYSTEM START MERGES ON VOLUME {}.external".format(policy),
"SYSTEM RELOAD CONFIG"
], 2, 1)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
finally:
node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy))
def test_yes_merges_in_configuration_disallow_from_query_without_reload(start_cluster):
try:
name = "test_yes_merges_in_configuration_allow_from_query_without_reload"
policy = "small_jbod_with_external"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
_insert_merge_execute(node1, name, policy, 2, [
"SYSTEM STOP MERGES ON VOLUME {}.external".format(policy),
"INSERT INTO {name} VALUES (2)".format(name=name)
], 1, 2)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
finally:
node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy))
def test_yes_merges_in_configuration_disallow_from_query_with_reload(start_cluster):
try:
name = "test_yes_merges_in_configuration_allow_from_query_with_reload"
policy = "small_jbod_with_external"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
_insert_merge_execute(node1, name, policy, 2, [
"SYSTEM STOP MERGES ON VOLUME {}.external".format(policy),
"INSERT INTO {name} VALUES (2)".format(name=name),
"SYSTEM RELOAD CONFIG"
], 1, 2)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
finally:
node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy))