Merge pull request #10777 from NanoBjorn/refactor-reservations

VolumePtr instead of DiskPtr in MergeTreeData*
This commit is contained in:
alesapin 2020-05-19 13:27:11 +03:00 committed by GitHub
commit a416813597
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 341 additions and 230 deletions

View File

@ -380,7 +380,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::string tmp_path = config().getString("tmp_path", path + "tmp/");
std::string tmp_policy = config().getString("tmp_policy", "");
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy);
for (const DiskPtr & disk : volume->disks)
for (const DiskPtr & disk : volume->getDisks())
setupTmpPath(log, disk->getPath());
}

View File

@ -496,6 +496,8 @@ namespace ErrorCodes
extern const int OPENCL_ERROR = 522;
extern const int UNKNOWN_ROW_POLICY = 523;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN = 524;
extern const int INCORRECT_DISK_INDEX = 525;
extern const int UNKNOWN_VOLUME_TYPE = 526;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -12,11 +12,13 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED;
extern const int INCORRECT_DISK_INDEX;
}
std::mutex DiskLocal::reservation_mutex;
@ -34,7 +36,9 @@ public:
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
DiskPtr getDisk(size_t i) const override;
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override;
@ -282,6 +286,15 @@ void DiskLocal::copy(const String & from_path, const std::shared_ptr<IDisk> & to
IDisk::copy(from_path, to_disk, to_path); /// Copy files through buffers.
}
DiskPtr DiskLocalReservation::getDisk(size_t i) const
{
if (i != 0)
{
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
}
return disk;
}
void DiskLocalReservation::update(UInt64 new_size)
{
std::lock_guard lock(DiskLocal::reservation_mutex);

View File

@ -55,7 +55,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
constexpr auto default_disk_name = "default";
std::set<String> old_disks_minus_new_disks;
for (const auto & [disk_name, _] : result->disks)
for (const auto & [disk_name, _] : result->getDisksMap())
{
old_disks_minus_new_disks.insert(disk_name);
}
@ -65,10 +65,10 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (result->disks.count(disk_name) == 0)
if (result->getDisksMap().count(disk_name) == 0)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
else
{

View File

@ -29,6 +29,10 @@ public:
/// Get all disks with names
const auto & getDisksMap() const { return disks; }
void addToDiskMap(String name, DiskPtr disk)
{
disks.emplace(name, disk);
}
private:
std::map<String, DiskPtr> disks;

View File

@ -206,8 +206,11 @@ public:
/// Get reservation size.
virtual UInt64 getSize() const = 0;
/// Get disk where reservation take place.
virtual DiskPtr getDisk() const = 0;
/// Get i-th disk where reservation take place.
virtual DiskPtr getDisk(size_t i = 0) const = 0;
/// Get all disks, used in reservation
virtual Disks getDisks() const = 0;
/// Changes amount of reserved space.
virtual void update(UInt64 new_size) = 0;

View File

@ -8,6 +8,17 @@
namespace DB
{
enum class VolumeType
{
JBOD,
SINGLE_DISK,
UNKNOWN
};
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
/**
* Disks group by some (user) criteria. For example,
* - VolumeJBOD("slow_disks", [d1, d2], 100)
@ -22,7 +33,7 @@ namespace DB
class IVolume : public Space
{
public:
IVolume(String name_, Disks disks_): disks(std::move(disks_)), name(std::move(name_))
IVolume(String name_, Disks disks_): disks(std::move(disks_)), name(name_)
{
}
@ -37,16 +48,17 @@ public:
/// Volume name from config
const String & getName() const override { return name; }
virtual VolumeType getType() const = 0;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
Disks disks;
DiskPtr getDisk(size_t i = 0) const { return disks[i]; }
const Disks & getDisks() const { return disks; }
protected:
Disks disks;
const String name;
};
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
}

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
extern const int INCORRECT_DISK_INDEX;
}
namespace
@ -369,7 +370,16 @@ public:
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
DiskPtr getDisk(size_t i) const override
{
if (i != 0)
{
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
}
return disk;
}
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override
{

View File

@ -0,0 +1,6 @@
#include <Disks/SingleDiskVolume.h>
namespace DB
{
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Disks/IVolume.h>
namespace DB
{
class SingleDiskVolume : public IVolume
{
public:
SingleDiskVolume(const String & name_, DiskPtr disk): IVolume(name_, {disk})
{
}
ReservationPtr reserve(UInt64 bytes) override
{
return disks[0]->reserve(bytes);
}
VolumeType getType() const override { return VolumeType::SINGLE_DISK; }
};
using VolumeSingleDiskPtr = std::shared_ptr<SingleDiskVolume>;
}

View File

@ -55,7 +55,7 @@ StoragePolicy::StoragePolicy(
std::set<String> disk_names;
for (const auto & volume : volumes)
{
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
{
if (disk_names.find(disk->getName()) != disk_names.end())
throw Exception(
@ -102,7 +102,7 @@ bool StoragePolicy::isDefaultPolicy() const
if (volumes[0]->getName() != "default")
return false;
const auto & disks = volumes[0]->disks;
const auto & disks = volumes[0]->getDisks();
if (disks.size() != 1)
return false;
@ -117,7 +117,7 @@ Disks StoragePolicy::getDisks() const
{
Disks res;
for (const auto & volume : volumes)
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
res.push_back(disk);
return res;
}
@ -130,17 +130,17 @@ DiskPtr StoragePolicy::getAnyDisk() const
if (volumes.empty())
throw Exception("StoragePolicy has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR);
if (volumes[0]->disks.empty())
if (volumes[0]->getDisks().empty())
throw Exception("Volume '" + volumes[0]->getName() + "' has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR);
return volumes[0]->disks[0];
return volumes[0]->getDisks()[0];
}
DiskPtr StoragePolicy::getDiskByName(const String & disk_name) const
{
for (auto && volume : volumes)
for (auto && disk : volume->disks)
for (auto && disk : volume->getDisks())
if (disk->getName() == disk_name)
return disk;
return {};
@ -181,7 +181,7 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
DiskPtr max_disk;
for (const auto & volume : volumes)
{
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
{
auto avail_space = disk->getAvailableSpace();
if (avail_space > max_space)
@ -207,10 +207,10 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol
throw Exception("New storage policy shall contain volumes of old one", ErrorCodes::LOGICAL_ERROR);
std::unordered_set<String> new_disk_names;
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->disks)
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->getDisks())
new_disk_names.insert(disk->getName());
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
if (new_disk_names.count(disk->getName()) == 0)
throw Exception("New storage policy shall contain disks of old one", ErrorCodes::LOGICAL_ERROR);
}
@ -222,7 +222,7 @@ size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
for (size_t i = 0; i < volumes.size(); ++i)
{
const auto & volume = volumes[i];
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
if (disk->getName() == disk_ptr->getName())
return i;
}

View File

@ -4,6 +4,7 @@
#include <Disks/IDisk.h>
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/SingleDiskVolume.h>
#include <IO/WriteHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>

View File

@ -25,6 +25,8 @@ public:
DiskSelectorPtr disk_selector
);
VolumeType getType() const override { return VolumeType::JBOD; }
/// Next disk (round-robin)
///
/// - Used with policy for temporary data

View File

@ -0,0 +1,17 @@
#include "createVolume.h"
namespace DB
{
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume)
{
if (other_volume->getType() == VolumeType::JBOD || other_volume->getType() == VolumeType::SINGLE_DISK)
{
/// Since reservation on JBOD chosies one of disks and makes reservation there, volume
/// for such type of reservation will be with one disk.
return std::make_shared<SingleDiskVolume>(other_volume->getName(), reservation->getDisk());
}
return nullptr;
}
}

12
src/Disks/createVolume.h Normal file
View File

@ -0,0 +1,12 @@
#pragma once
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/SingleDiskVolume.h>
namespace DB
{
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume);
}

View File

@ -5,6 +5,7 @@ PEERDIR(
)
SRCS(
createVolume.cpp
DiskFactory.cpp
DiskLocal.cpp
DiskMemory.cpp
@ -12,6 +13,7 @@ SRCS(
IDisk.cpp
IVolume.cpp
registerDisks.cpp
SingleDiskVolume.cpp
StoragePolicy.cpp
VolumeJBOD.cpp
)

View File

@ -586,7 +586,7 @@ VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & p
shared->tmp_volume = tmp_policy->getVolume(0);
}
if (shared->tmp_volume->disks.empty())
if (shared->tmp_volume->getDisks().empty())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return shared->tmp_volume;

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Disks/createVolume.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <IO/HTTPCommon.h>
@ -115,7 +116,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
{
String file_name = it.first;
auto disk = part->disk;
auto disk = part->volume->getDisk();
String path = part->getFullRelativePath() + file_name;
UInt64 size = disk->getFileSize(path);
@ -316,7 +317,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), part_relative_path);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);

View File

@ -137,11 +137,11 @@ void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
IMergeTreeDataPart::IMergeTreeDataPart(
MergeTreeData & storage_, const String & name_, const DiskPtr & disk_, const std::optional<String> & relative_path_, Type part_type_)
MergeTreeData & storage_, const String & name_, const VolumePtr & volume_, const std::optional<String> & relative_path_, Type part_type_)
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, disk(disk_)
, volume(volume_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
@ -152,13 +152,13 @@ IMergeTreeDataPart::IMergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_,
Type part_type_)
: storage(storage_)
, name(name_)
, info(info_)
, disk(disk_)
, volume(volume_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
@ -245,7 +245,7 @@ void IMergeTreeDataPart::removeIfNeeded()
{
auto path = getFullRelativePath();
if (!disk->exists(path))
if (!volume->getDisk()->exists(path))
return;
if (is_temp)
@ -392,7 +392,7 @@ String IMergeTreeDataPart::getFullPath() const
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
return storage.getFullPathOnDisk(disk) + relative_path + "/";
return storage.getFullPathOnDisk(volume->getDisk()) + relative_path + "/";
}
String IMergeTreeDataPart::getFullRelativePath() const
@ -452,7 +452,7 @@ void IMergeTreeDataPart::loadIndex()
}
String index_path = getFullRelativePath() + "primary.idx";
auto index_file = openForReading(disk, index_path);
auto index_file = openForReading(volume->getDisk(), index_path);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) //-V756
for (size_t j = 0; j < key_size; ++j)
@ -468,7 +468,7 @@ void IMergeTreeDataPart::loadIndex()
}
if (!index_file->eof())
throw Exception("Index file " + fullPath(disk, index_path) + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
throw Exception("Index file " + fullPath(volume->getDisk(), index_path) + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
@ -489,9 +489,9 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
else
{
String path = getFullRelativePath();
partition.load(storage, disk, path);
partition.load(storage, volume->getDisk(), path);
if (!isEmpty())
minmax_idx.load(storage, disk, path);
minmax_idx.load(storage, volume->getDisk(), path);
}
String calculated_partition_id = partition.getID(storage.partition_key_sample);
@ -505,23 +505,23 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
void IMergeTreeDataPart::loadChecksums(bool require)
{
String path = getFullRelativePath() + "checksums.txt";
if (disk->exists(path))
if (volume->getDisk()->exists(path))
{
auto buf = openForReading(disk, path);
auto buf = openForReading(volume->getDisk(), path);
if (checksums.read(*buf))
{
assertEOF(*buf);
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
else
bytes_on_disk = calculateTotalSizeOnDisk(disk, getFullRelativePath());
bytes_on_disk = calculateTotalSizeOnDisk(volume->getDisk(), getFullRelativePath());
}
else
{
if (require)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
bytes_on_disk = calculateTotalSizeOnDisk(disk, getFullRelativePath());
bytes_on_disk = calculateTotalSizeOnDisk(volume->getDisk(), getFullRelativePath());
}
}
@ -534,10 +534,10 @@ void IMergeTreeDataPart::loadRowsCount()
}
else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::COMPACT)
{
if (!disk->exists(path))
if (!volume->getDisk()->exists(path))
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
auto buf = openForReading(disk, path);
auto buf = openForReading(volume->getDisk(), path);
readIntText(rows_count, *buf);
assertEOF(*buf);
}
@ -582,9 +582,9 @@ void IMergeTreeDataPart::loadRowsCount()
void IMergeTreeDataPart::loadTTLInfos()
{
String path = getFullRelativePath() + "ttl.txt";
if (disk->exists(path))
if (volume->getDisk()->exists(path))
{
auto in = openForReading(disk, path);
auto in = openForReading(volume->getDisk(), path);
assertString("ttl format version: ", *in);
size_t format_version;
readText(format_version, *in);
@ -609,7 +609,7 @@ void IMergeTreeDataPart::loadTTLInfos()
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = getFullRelativePath() + "columns.txt";
if (!disk->exists(path))
if (!volume->getDisk()->exists(path))
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::COMPACT)
@ -617,21 +617,21 @@ void IMergeTreeDataPart::loadColumns(bool require)
/// If there is no file with a list of columns, write it down.
for (const NameAndTypePair & column : storage.getColumns().getAllPhysical())
if (disk->exists(getFullRelativePath() + getFileNameForColumn(column) + ".bin"))
if (volume->getDisk()->exists(getFullRelativePath() + getFileNameForColumn(column) + ".bin"))
columns.push_back(column);
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
{
auto buf = disk->writeFile(path + ".tmp", 4096);
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096);
columns.writeText(*buf);
}
disk->moveFile(path + ".tmp", path);
volume->getDisk()->moveFile(path + ".tmp", path);
}
else
{
columns.readText(*disk->readFile(path));
columns.readText(*volume->getDisk()->readFile(path));
}
size_t pos = 0;
@ -659,29 +659,29 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
String from = getFullRelativePath();
String to = storage.relative_data_path + new_relative_path + "/";
if (!disk->exists(from))
throw Exception("Part directory " + fullPath(disk, from) + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
if (!volume->getDisk()->exists(from))
throw Exception("Part directory " + fullPath(volume->getDisk(), from) + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
if (disk->exists(to))
if (volume->getDisk()->exists(to))
{
if (remove_new_dir_if_exists)
{
Names files;
disk->listFiles(to, files);
volume->getDisk()->listFiles(to, files);
LOG_WARNING(storage.log, "Part directory " << fullPath(disk, to) << " already exists"
LOG_WARNING(storage.log, "Part directory " << fullPath(volume->getDisk(), to) << " already exists"
<< " and contains " << files.size() << " files. Removing it.");
disk->removeRecursive(to);
volume->getDisk()->removeRecursive(to);
}
else
{
throw Exception("Part directory " + fullPath(disk, to) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
throw Exception("Part directory " + fullPath(volume->getDisk(), to) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
}
}
disk->setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
disk->moveFile(from, to);
volume->getDisk()->setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
volume->getDisk()->moveFile(from, to);
relative_path = new_relative_path;
}
@ -710,29 +710,29 @@ void IMergeTreeDataPart::remove() const
String to = storage.relative_data_path + "delete_tmp_" + name;
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
if (disk->exists(to))
if (volume->getDisk()->exists(to))
{
LOG_WARNING(storage.log, "Directory " << fullPath(disk, to) << " (to which part must be renamed before removing) already exists."
LOG_WARNING(storage.log, "Directory " << fullPath(volume->getDisk(), to) << " (to which part must be renamed before removing) already exists."
" Most likely this is due to unclean restart. Removing it.");
try
{
disk->removeRecursive(to + "/");
volume->getDisk()->removeRecursive(to + "/");
}
catch (...)
{
LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(disk, to) << ". Exception: " << getCurrentExceptionMessage(false));
LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(volume->getDisk(), to) << ". Exception: " << getCurrentExceptionMessage(false));
throw;
}
}
try
{
disk->moveFile(from, to);
volume->getDisk()->moveFile(from, to);
}
catch (const Poco::FileNotFoundException &)
{
LOG_ERROR(storage.log, "Directory " << fullPath(disk, to) << " (part to remove) doesn't exist or one of nested files has gone."
LOG_ERROR(storage.log, "Directory " << fullPath(volume->getDisk(), to) << " (part to remove) doesn't exist or one of nested files has gone."
" Most likely this is due to manual removing. This should be discouraged. Ignoring.");
return;
@ -741,7 +741,7 @@ void IMergeTreeDataPart::remove() const
if (checksums.empty())
{
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeRecursive(to + "/");
volume->getDisk()->removeRecursive(to + "/");
}
else
{
@ -754,25 +754,25 @@ void IMergeTreeDataPart::remove() const
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
for (const auto & [file, _] : checksums.files)
disk->remove(to + "/" + file);
volume->getDisk()->remove(to + "/" + file);
#if !__clang__
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
disk->remove(to + "/" + file);
disk->removeIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_PATH);
volume->getDisk()->remove(to + "/" + file);
volume->getDisk()->removeIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_PATH);
disk->remove(to);
volume->getDisk()->remove(to);
}
catch (...)
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(disk, to) << " by removing files; fallback to recursive removal. Reason: "
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(volume->getDisk(), to) << " by removing files; fallback to recursive removal. Reason: "
<< getCurrentExceptionMessage(false));
disk->removeRecursive(to + "/");
volume->getDisk()->removeRecursive(to + "/");
}
}
}
@ -793,7 +793,7 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
{
res = "detached/" + (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!disk->exists(getFullRelativePath() + res))
if (!volume->getDisk()->exists(getFullRelativePath() + res))
return res;
LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists."
@ -817,16 +817,16 @@ void IMergeTreeDataPart::makeCloneInDetached(const String & prefix) const
String destination_path = storage.relative_data_path + getRelativePathForDetachedPart(prefix);
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(disk, getFullRelativePath(), destination_path, 0);
disk->removeIfExists(destination_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
localBackup(volume->getDisk(), getFullRelativePath(), destination_path, 0);
volume->getDisk()->removeIfExists(destination_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
}
void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const
{
assertOnDisk();
auto reserved_disk = reservation->getDisk();
if (reserved_disk->getName() == disk->getName())
throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR);
if (reserved_disk->getName() == volume->getDisk()->getName())
throw Exception("Can not clone data part " + name + " to same disk " + volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR);
String path_to_clone = storage.relative_data_path + "detached/";
@ -834,8 +834,8 @@ void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservat
throw Exception("Path " + fullPath(reserved_disk, path_to_clone + relative_path) + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
reserved_disk->createDirectory(path_to_clone);
disk->copy(getFullRelativePath(), reserved_disk, path_to_clone);
disk->removeIfExists(path_to_clone + "/" + DELETE_ON_DESTROY_MARKER_PATH);
volume->getDisk()->copy(getFullRelativePath(), reserved_disk, path_to_clone);
volume->getDisk()->removeIfExists(path_to_clone + "/" + DELETE_ON_DESTROY_MARKER_PATH);
}
void IMergeTreeDataPart::checkConsistencyBase() const
@ -865,7 +865,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}
checksums.checkSizes(disk, path);
checksums.checkSizes(volume->getDisk(), path);
}
else
{
@ -879,17 +879,17 @@ void IMergeTreeDataPart::checkConsistencyBase() const
/// Check that the primary key index is not empty.
if (!storage.primary_key_columns.empty())
check_file_not_empty(disk, path + "primary.idx");
check_file_not_empty(volume->getDisk(), path + "primary.idx");
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
check_file_not_empty(disk, path + "count.txt");
check_file_not_empty(volume->getDisk(), path + "count.txt");
if (storage.partition_key_expr)
check_file_not_empty(disk, path + "partition.dat");
check_file_not_empty(volume->getDisk(), path + "partition.dat");
for (const String & col_name : storage.minmax_idx_columns)
check_file_not_empty(disk, path + "minmax_" + escapeForFileName(col_name) + ".idx");
check_file_not_empty(volume->getDisk(), path + "minmax_" + escapeForFileName(col_name) + ".idx");
}
}
}

View File

@ -31,6 +31,9 @@ struct FutureMergedMutatedPart;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
class IMergeTreeReader;
class IMergeTreeDataPartWriter;
@ -60,14 +63,14 @@ public:
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk,
const VolumePtr & volume,
const std::optional<String> & relative_path,
Type part_type_);
IMergeTreeDataPart(
MergeTreeData & storage_,
const String & name_,
const DiskPtr & disk,
const VolumePtr & volume,
const std::optional<String> & relative_path,
Type part_type_);
@ -155,7 +158,7 @@ public:
String name;
MergeTreePartInfo info;
DiskPtr disk;
VolumePtr volume;
mutable String relative_path;
MergeTreeIndexGranularityInfo index_granularity_info;

View File

@ -63,18 +63,16 @@ void IMergeTreeDataPartWriter::Stream::addToChecksums(MergeTreeData::DataPart::C
IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
DiskPtr disk_,
const String & part_path_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: disk(std::move(disk_))
, part_path(part_path_)
, storage(storage_)
: data_part(data_part_)
, part_path(data_part_->getFullRelativePath())
, storage(data_part_->storage)
, columns_list(columns_list_)
, marks_file_extension(marks_file_extension_)
, index_granularity(index_granularity_)
@ -87,6 +85,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
auto disk = data_part->volume->getDisk();
if (!disk->exists(part_path))
disk->createDirectories(part_path);
}
@ -165,7 +164,7 @@ void IMergeTreeDataPartWriter::initPrimaryIndex()
{
if (storage.hasPrimaryKey())
{
index_file_stream = disk->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_file_stream = data_part->volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
@ -180,7 +179,7 @@ void IMergeTreeDataPartWriter::initSkipIndices()
skip_indices_streams.emplace_back(
std::make_unique<IMergeTreeDataPartWriter::Stream>(
stream_name,
disk,
data_part->volume->getDisk(),
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,

View File

@ -61,9 +61,7 @@ public:
using StreamPtr = std::unique_ptr<Stream>;
IMergeTreeDataPartWriter(
DiskPtr disk,
const String & part_path,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,
@ -118,7 +116,7 @@ protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
DiskPtr disk;
MergeTreeData::DataPartPtr data_part;
String part_path;
const MergeTreeData & storage;
NamesAndTypesList columns_list;

View File

@ -9,7 +9,7 @@ namespace DB
IMergedBlockOutputStream::IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part)
: storage(data_part->storage)
, disk(data_part->disk)
, volume(data_part->volume)
, part_path(data_part->getFullRelativePath())
{
}
@ -82,7 +82,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
{
if (checksums.files.count(removed_file))
{
data_part->disk->remove(data_part->getFullRelativePath() + removed_file);
data_part->volume->getDisk()->remove(data_part->getFullRelativePath() + removed_file);
checksums.files.erase(removed_file);
}
}

View File

@ -37,7 +37,7 @@ protected:
protected:
const MergeTreeData & storage;
DiskPtr disk;
VolumePtr volume;
String part_path;
static Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation);

View File

@ -929,7 +929,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return;
auto part = createPart(part_name, part_info, part_disk_ptr, part_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr);
auto part = createPart(part_name, part_info, single_disk_volume, part_name);
bool broken = false;
String part_path = relative_data_path + "/" + part_name;
@ -1552,12 +1553,12 @@ MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, s
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const
const VolumePtr & volume, const String & relative_path) const
{
if (type == MergeTreeDataPartType::COMPACT)
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, disk, relative_path);
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, volume, relative_path);
else if (type == MergeTreeDataPartType::WIDE)
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, disk, relative_path);
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, volume, relative_path);
else
throw Exception("Unknown type in part " + relative_path, ErrorCodes::UNKNOWN_PART_TYPE);
}
@ -1575,18 +1576,18 @@ static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const DiskPtr & disk, const String & relative_path) const
const String & name, const VolumePtr & volume, const String & relative_path) const
{
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), disk, relative_path);
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), volume, relative_path);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const
const VolumePtr & volume, const String & relative_path) const
{
MergeTreeDataPartType type;
auto full_path = relative_data_path + relative_path + "/";
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(disk, full_path);
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(volume->getDisk(), full_path);
if (mrk_ext)
type = getPartTypeFromMarkExtension(*mrk_ext);
@ -1596,7 +1597,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
type = choosePartType(0, 0);
}
return createPart(name, type, part_info, disk, relative_path);
return createPart(name, type, part_info, volume, relative_path);
}
void MergeTreeData::changeSettings(
@ -2314,7 +2315,7 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
auto part_it = data_parts_indexes.insert(part_copy).first;
modifyPartState(part_it, DataPartState::Committed);
auto disk = original_active_part->disk;
auto disk = original_active_part->volume->getDisk();
String marker_path = original_active_part->getFullRelativePath() + DELETE_ON_DESTROY_MARKER_PATH;
try
{
@ -2379,7 +2380,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
auto disk = part->disk;
auto disk = part->volume->getDisk();
String full_part_path = part->getFullRelativePath();
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
@ -2404,9 +2405,9 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
}
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path) const
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const VolumePtr & volume, const String & relative_path) const
{
MutableDataPartPtr part = createPart(Poco::Path(relative_path).getFileName(), disk, relative_path);
MutableDataPartPtr part = createPart(Poco::Path(relative_path).getFileName(), volume, relative_path);
loadPartAndFixMetadataImpl(part);
return part;
}
@ -2519,7 +2520,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
{
return part_ptr->disk->getName() == disk->getName();
return part_ptr->volume->getDisk()->getName() == disk->getName();
}), parts.end());
if (parts.empty())
@ -2570,9 +2571,9 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
{
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
{
if (part_ptr->disk->getName() == disk->getName())
if (part_ptr->volume->getDisk()->getName() == disk->getName())
{
return true;
}
@ -2848,7 +2849,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part " << part_names.second);
MutableDataPartPtr part = createPart(part_names.first, name_to_disk[part_names.first], source_dir + part_names.second);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_names.first, name_to_disk[part_names.first]);
MutableDataPartPtr part = createPart(part_names.first, single_disk_volume, source_dir + part_names.second);
loadPartAndFixMetadataImpl(part);
loaded_parts.push_back(part);
}
@ -2962,12 +2964,12 @@ bool MergeTreeData::TTLEntry::isPartInDestination(StoragePolicyPtr policy, const
{
if (destination_type == PartDestinationType::VOLUME)
{
for (const auto & disk : policy->getVolumeByName(destination_name)->disks)
if (disk->getName() == part.disk->getName())
for (const auto & disk : policy->getVolumeByName(destination_name)->getDisks())
if (disk->getName() == part.volume->getDisk()->getName())
return true;
}
else if (destination_type == PartDestinationType::DISK)
return policy->getDiskByName(destination_name)->getName() == part.disk->getName();
return policy->getDiskByName(destination_name)->getName() == part.volume->getDisk()->getName();
return false;
}
@ -3181,7 +3183,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
bool does_storage_policy_allow_same_disk = false;
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
if (disk->getName() == src_part->disk->getName())
if (disk->getName() == src_part->volume->getDisk()->getName())
{
does_storage_policy_allow_same_disk = true;
break;
@ -3194,7 +3196,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpace(src_part->getBytesOnDisk(), src_part->disk);
auto reservation = reserveSpace(src_part->getBytesOnDisk(), src_part->volume->getDisk());
auto disk = reservation->getDisk();
String src_part_path = src_part->getFullRelativePath();
String dst_part_path = relative_data_path + tmp_dst_part_name;
@ -3206,7 +3208,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
localBackup(disk, src_part_path, dst_part_path);
disk->removeIfExists(dst_part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
auto dst_data_part = createPart(dst_part_name, dst_part_info, reservation->getDisk(), tmp_dst_part_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk);
auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name);
dst_data_part->is_temp = true;
@ -3278,7 +3281,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
if (!matcher(part))
continue;
part->disk->createDirectories(shadow_path);
part->volume->getDisk()->createDirectories(shadow_path);
String backup_path = shadow_path
+ (!with_name.empty()
@ -3289,8 +3292,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path);
String backup_part_path = backup_path + relative_data_path + part->relative_path;
localBackup(part->disk, part->getFullRelativePath(), backup_part_path);
part->disk->removeIfExists(backup_part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
localBackup(part->volume->getDisk(), part->getFullRelativePath(), backup_part_path);
part->volume->getDisk()->removeIfExists(backup_part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
part->is_frozen.store(true, std::memory_order_relaxed);
++parts_processed;
@ -3411,7 +3414,7 @@ bool MergeTreeData::areBackgroundMovesNeeded() const
if (policy->getVolumes().size() > 1)
return true;
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->disks.size() > 1 && !move_ttl_entries.empty();
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && !move_ttl_entries.empty();
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)

View File

@ -194,14 +194,14 @@ public:
/// After this method setColumns must be called
MutableDataPartPtr createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const;
const VolumePtr & volume, const String & relative_path) const;
/// After this methods 'loadColumnsChecksumsIndexes' must be called
MutableDataPartPtr createPart(const String & name,
const DiskPtr & disk, const String & relative_path) const;
const VolumePtr & volume, const String & relative_path) const;
MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const;
const VolumePtr & volume, const String & relative_path) const;
/// Auxiliary object to add a set of parts into the working set in two steps:
/// * First, as PreCommitted parts (the parts are ready, but not yet in the active set).
@ -539,7 +539,7 @@ public:
bool hasAnyTTL() const override { return hasRowsTTL() || hasAnyMoveTTL() || hasAnyColumnTTL(); }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path) const;
MutableDataPartPtr loadPartAndFixMetadata(const VolumePtr & volume, const String & relative_path) const;
/** Create local backup (snapshot) for parts with specified prefix.
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,

View File

@ -610,11 +610,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
all_columns, data.sorting_key_expr, data.skip_indices,
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, disk);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
future_part.name,
future_part.type,
future_part.part_info,
disk,
single_disk_volume,
TMP_PREFIX + future_part.name);
new_data_part->setColumns(all_columns);
@ -1028,8 +1029,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, space_reservation->getDisk());
auto new_data_part = data.createPart(
future_part.name, future_part.type, future_part.part_info, space_reservation->getDisk(), "tmp_mut_" + future_part.name);
future_part.name, future_part.type, future_part.part_info, single_disk_volume, "tmp_mut_" + future_part.name);
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
@ -1039,7 +1041,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns, for_file_renames));
new_data_part->partition.assign(source_part->partition);
auto disk = new_data_part->disk;
auto disk = new_data_part->volume->getDisk();
String new_part_tmp_path = new_data_part->getFullRelativePath();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
@ -1644,7 +1646,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
MergeTreeData::MutableDataPartPtr new_data_part,
bool need_remove_expired_values)
{
auto disk = new_data_part->disk;
auto disk = new_data_part->volume->getDisk();
if (need_remove_expired_values)
{
/// Write a file with ttl infos in json format.
@ -1674,7 +1676,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->setBytesOnDisk(
MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->disk, new_data_part->getFullRelativePath()));
MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->volume->getDisk(), new_data_part->getFullRelativePath()));
new_data_part->calculateColumnsSizesOnDisk();
}

View File

@ -20,9 +20,9 @@ namespace ErrorCodes
MergeTreeDataPartCompact::MergeTreeDataPartCompact(
MergeTreeData & storage_,
const String & name_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, disk_, relative_path_, Type::COMPACT)
: IMergeTreeDataPart(storage_, name_, volume_, relative_path_, Type::COMPACT)
{
}
@ -30,9 +30,9 @@ MergeTreeDataPartCompact::MergeTreeDataPartCompact(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, info_, disk_, relative_path_, Type::COMPACT)
: IMergeTreeDataPart(storage_, name_, info_, volume_, relative_path_, Type::COMPACT)
{
}
@ -68,7 +68,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
{ return *getColumnPosition(lhs.name) < *getColumnPosition(rhs.name); });
return std::make_unique<MergeTreeDataPartWriterCompact>(
disk, getFullRelativePath(), storage, ordered_columns_list, indices_to_recalc,
shared_from_this(), ordered_columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings, computed_index_granularity);
}
@ -99,12 +99,12 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
auto marks_file_path = index_granularity_info.getMarksFilePath(full_path + "data");
if (!disk->exists(marks_file_path))
throw Exception("Marks file '" + fullPath(disk, marks_file_path) + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
if (!volume->getDisk()->exists(marks_file_path))
throw Exception("Marks file '" + fullPath(volume->getDisk(), marks_file_path) + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
size_t marks_file_size = disk->getFileSize(marks_file_path);
size_t marks_file_size = volume->getDisk()->getFileSize(marks_file_path);
auto buffer = disk->readFile(marks_file_path, marks_file_size);
auto buffer = volume->getDisk()->readFile(marks_file_path, marks_file_size);
while (!buffer->eof())
{
/// Skip offsets for columns
@ -146,9 +146,9 @@ void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) cons
if (require_part_metadata)
{
if (!checksums.files.count(mrk_file_name))
throw Exception("No marks file checksum for column in part " + fullPath(disk, path), ErrorCodes::NO_FILE_IN_DATA_PART);
throw Exception("No marks file checksum for column in part " + fullPath(volume->getDisk(), path), ErrorCodes::NO_FILE_IN_DATA_PART);
if (!checksums.files.count(DATA_FILE_NAME_WITH_EXTENSION))
throw Exception("No data file checksum for in part " + fullPath(disk, path), ErrorCodes::NO_FILE_IN_DATA_PART);
throw Exception("No data file checksum for in part " + fullPath(volume->getDisk(), path), ErrorCodes::NO_FILE_IN_DATA_PART);
}
}
else
@ -156,24 +156,24 @@ void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) cons
{
/// count.txt should be present even in non custom-partitioned parts
auto file_path = path + "count.txt";
if (!disk->exists(file_path) || disk->getFileSize(file_path) == 0)
throw Exception("Part " + path + " is broken: " + fullPath(disk, file_path) + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (!volume->getDisk()->exists(file_path) || volume->getDisk()->getFileSize(file_path) == 0)
throw Exception("Part " + path + " is broken: " + fullPath(volume->getDisk(), file_path) + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
/// Check that marks are nonempty and have the consistent size with columns number.
auto mrk_file_path = path + mrk_file_name;
if (disk->exists(mrk_file_name))
if (volume->getDisk()->exists(mrk_file_name))
{
UInt64 file_size = disk->getFileSize(mrk_file_name);
UInt64 file_size = volume->getDisk()->getFileSize(mrk_file_name);
if (!file_size)
throw Exception("Part " + path + " is broken: " + fullPath(disk, mrk_file_name) + " is empty.",
throw Exception("Part " + path + " is broken: " + fullPath(volume->getDisk(), mrk_file_name) + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
UInt64 expected_file_size = index_granularity_info.getMarkSizeInBytes(columns.size()) * index_granularity.getMarksCount();
if (expected_file_size != file_size)
throw Exception(
"Part " + path + " is broken: bad size of marks file '" + fullPath(disk, mrk_file_name) + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
"Part " + path + " is broken: bad size of marks file '" + fullPath(volume->getDisk(), mrk_file_name) + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}

View File

@ -26,13 +26,13 @@ public:
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_ = {});
MergeTreeDataPartCompact(
MergeTreeData & storage_,
const String & name_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_ = {});
MergeTreeReaderPtr getReader(

View File

@ -19,9 +19,9 @@ namespace ErrorCodes
MergeTreeDataPartWide::MergeTreeDataPartWide(
MergeTreeData & storage_,
const String & name_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, disk_, relative_path_, Type::WIDE)
: IMergeTreeDataPart(storage_, name_, volume_, relative_path_, Type::WIDE)
{
}
@ -29,9 +29,9 @@ MergeTreeDataPartWide::MergeTreeDataPartWide(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, info_, disk_, relative_path_, Type::WIDE)
: IMergeTreeDataPart(storage_, name_, info_, volume_, relative_path_, Type::WIDE)
{
}
@ -59,7 +59,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
const MergeTreeIndexGranularity & computed_index_granularity) const
{
return std::make_unique<MergeTreeDataPartWriterWide>(
disk, getFullRelativePath(), storage, columns_list, indices_to_recalc,
shared_from_this(), columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings, computed_index_granularity);
}
@ -99,7 +99,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullRelativePath();
index_granularity_info.changeGranularityIfRequired(disk, full_path);
index_granularity_info.changeGranularityIfRequired(volume->getDisk(), full_path);
if (columns.empty())
@ -107,10 +107,10 @@ void MergeTreeDataPartWide::loadIndexGranularity()
/// We can use any column, it doesn't matter
std::string marks_file_path = index_granularity_info.getMarksFilePath(full_path + getFileNameForColumn(columns.front()));
if (!disk->exists(marks_file_path))
throw Exception("Marks file '" + fullPath(disk, marks_file_path) + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
if (!volume->getDisk()->exists(marks_file_path))
throw Exception("Marks file '" + fullPath(volume->getDisk(), marks_file_path) + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
size_t marks_file_size = disk->getFileSize(marks_file_path);
size_t marks_file_size = volume->getDisk()->getFileSize(marks_file_path);
if (!index_granularity_info.is_adaptive)
{
@ -119,7 +119,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
}
else
{
auto buffer = disk->readFile(marks_file_path, marks_file_size);
auto buffer = volume->getDisk()->readFile(marks_file_path, marks_file_size);
while (!buffer->eof())
{
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
@ -129,7 +129,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
}
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size)
throw Exception("Cannot read all marks from file " + fullPath(disk, marks_file_path), ErrorCodes::CANNOT_READ_ALL_DATA);
throw Exception("Cannot read all marks from file " + fullPath(volume->getDisk(), marks_file_path), ErrorCodes::CANNOT_READ_ALL_DATA);
}
index_granularity.setInitialized();
@ -158,10 +158,10 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
String bin_file_name = file_name + ".bin";
if (!checksums.files.count(mrk_file_name))
throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + fullPath(disk, path),
throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + fullPath(volume->getDisk(), path),
ErrorCodes::NO_FILE_IN_DATA_PART);
if (!checksums.files.count(bin_file_name))
throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + fullPath(disk, path),
throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + fullPath(volume->getDisk(), path),
ErrorCodes::NO_FILE_IN_DATA_PART);
}, stream_path);
}
@ -179,12 +179,12 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
auto file_path = path + IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension;
/// Missing file is Ok for case when new column was added.
if (disk->exists(file_path))
if (volume->getDisk()->exists(file_path))
{
UInt64 file_size = disk->getFileSize(file_path);
UInt64 file_size = volume->getDisk()->getFileSize(file_path);
if (!file_size)
throw Exception("Part " + path + " is broken: " + fullPath(disk, file_path) + " is empty.",
throw Exception("Part " + path + " is broken: " + fullPath(volume->getDisk(), file_path) + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (!marks_size)

View File

@ -19,13 +19,13 @@ public:
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk,
const VolumePtr & volume,
const std::optional<String> & relative_path = {});
MergeTreeDataPartWide(
MergeTreeData & storage_,
const String & name_,
const DiskPtr & disk,
const VolumePtr & volume,
const std::optional<String> & relative_path = {});
MergeTreeReaderPtr getReader(

View File

@ -6,26 +6,23 @@ namespace DB
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
DiskPtr disk_,
const String & part_path_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(disk_, part_path_,
storage_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
: IMergeTreeDataPartWriter(data_part_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
using DataPart = MergeTreeDataPartCompact;
String data_file_name = DataPart::DATA_FILE_NAME;
stream = std::make_unique<Stream>(
data_file_name,
disk_,
data_part->volume->getDisk(),
part_path + data_file_name, DataPart::DATA_FILE_EXTENSION,
part_path + data_file_name, marks_file_extension,
default_codec,

View File

@ -8,9 +8,7 @@ class MergeTreeDataPartWriterCompact : public IMergeTreeDataPartWriter
{
public:
MergeTreeDataPartWriterCompact(
DiskPtr disk,
const String & part_path,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,

View File

@ -13,18 +13,16 @@ namespace
}
MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
DiskPtr disk_,
const String & part_path_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(disk_, part_path_,
storage_, columns_list_, indices_to_recalc_,
marks_file_extension_, default_codec_, settings_, index_granularity_)
: IMergeTreeDataPartWriter(data_part_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
const auto & columns = storage.getColumns();
for (const auto & it : columns_list)
@ -46,7 +44,7 @@ void MergeTreeDataPartWriterWide::addStreams(
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,
disk,
data_part->volume->getDisk(),
part_path + stream_name, DATA_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
effective_codec,

View File

@ -11,9 +11,7 @@ public:
using ColumnToSize = std::map<std::string, UInt64>;
MergeTreeDataPartWriterWide(
DiskPtr disk,
const String & part_path,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,

View File

@ -1380,7 +1380,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
const MarkRanges & ranges,
const Settings & settings) const
{
if (!part->disk->exists(part->getFullRelativePath() + index->getFileName() + ".idx"))
if (!part->volume->getDisk()->exists(part->getFullRelativePath() + index->getFileName() + ".idx"))
{
LOG_DEBUG(log, "File for index " << backQuote(index->name) << " does not exist. Skipping it.");
return ranges;

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnConst.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Exception.h>
#include <Disks/createVolume.h>
#include <Interpreters/AggregationCommon.h>
#include <IO/HashingWriteBuffer.h>
#include <DataTypes/DataTypeDateTime.h>
@ -231,12 +232,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr));
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
auto new_data_part = data.createPart(
part_name,
data.choosePartType(expected_size, block.rows()),
new_part_info,
reservation->getDisk(),
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + part_name);
new_data_part->setColumns(columns);
@ -247,13 +249,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getFullRelativePath();
if (new_data_part->disk->exists(full_path))
if (new_data_part->volume->getDisk()->exists(full_path))
{
LOG_WARNING(log, "Removing old temporary directory " + fullPath(new_data_part->disk, full_path));
new_data_part->disk->removeRecursive(full_path);
LOG_WARNING(log, "Removing old temporary directory " + fullPath(new_data_part->volume->getDisk(), full_path));
new_data_part->volume->getDisk()->removeRecursive(full_path);
}
new_data_part->disk->createDirectories(full_path);
new_data_part->volume->getDisk()->createDirectories(full_path);
/// If we need to calculate some columns to sort.
if (data.hasSortingKey() || data.hasSkipIndices())

View File

@ -7,7 +7,7 @@ namespace DB
MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_)
: index(index_), stream(
part_->disk,
part_->volume->getDisk(),
part_->getFullRelativePath() + index->getFileName(), ".idx", marks_count_,
all_mark_ranges_,
MergeTreeReaderSettings{}, nullptr, nullptr,

View File

@ -108,7 +108,7 @@ bool MergeTreePartsMover::selectPartsForMove(
/// Do not check last volume
for (size_t i = 0; i != volumes.size() - 1; ++i)
{
for (const auto & disk : volumes[i]->disks)
for (const auto & disk : volumes[i]->getDisks())
{
UInt64 required_maximum_available_space = disk->getTotalSpace() * policy->getMoveFactor();
UInt64 unreserved_space = disk->getUnreservedSpace();
@ -129,7 +129,7 @@ bool MergeTreePartsMover::selectPartsForMove(
continue;
auto ttl_entry = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
auto to_insert = need_to_move.find(part->disk);
auto to_insert = need_to_move.find(part->volume->getDisk());
ReservationPtr reservation;
if (ttl_entry)
{
@ -196,8 +196,9 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
LOG_TRACE(log, "Cloning part " << moving_part.part->name);
moving_part.part->makeCloneOnDiskDetached(moving_part.reserved_space);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + moving_part.part->name, moving_part.reserved_space->getDisk());
MergeTreeData::MutableDataPartPtr cloned_part =
data->createPart(moving_part.part->name, moving_part.reserved_space->getDisk(), "detached/" + moving_part.part->name);
data->createPart(moving_part.part->name, single_disk_volume, "detached/" + moving_part.part->name);
LOG_TRACE(log, "Part " << moving_part.part->name << " was cloned to " << cloned_part->getFullPath());
cloned_part->loadColumnsChecksumsIndexes(true, true);

View File

@ -28,7 +28,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
uncompressed_cache_, mark_cache_, std::move(mark_ranges_),
std::move(settings_), std::move(avg_value_size_hints_))
, marks_loader(
data_part->disk,
data_part->volume->getDisk(),
mark_cache,
data_part->index_granularity_info.getMarksFilePath(data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(), data_part->index_granularity_info,
@ -40,10 +40,10 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
fullPath(data_part->disk, full_data_path),
fullPath(data_part->volume->getDisk(), full_data_path),
[this, full_data_path, buffer_size]()
{
return data_part->disk->readFile(
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
@ -62,7 +62,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->disk->readFile(full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, 0));
data_part->volume->getDisk()->readFile(full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, 0));
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);

View File

@ -176,7 +176,7 @@ void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type
return;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part->disk, data_part->getFullRelativePath() + stream_name, DATA_FILE_EXTENSION,
data_part->volume->getDisk(), data_part->getFullRelativePath() + stream_name, DATA_FILE_EXTENSION,
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part->index_granularity_info,

View File

@ -49,7 +49,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
}
}
disk->createDirectories(part_path);
volume->getDisk()->createDirectories(part_path);
writer = data_part->getWriter(columns_list, skip_indices, default_codec, writer_settings);
writer->initPrimaryIndex();
@ -99,14 +99,14 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
{
new_part->partition.store(storage, disk, part_path, checksums);
new_part->partition.store(storage, volume->getDisk(), part_path, checksums);
if (new_part->minmax_idx.initialized)
new_part->minmax_idx.store(storage, disk, part_path, checksums);
new_part->minmax_idx.store(storage, volume->getDisk(), part_path, checksums);
else if (rows_count)
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
auto count_out = disk->writeFile(part_path + "count.txt", 4096);
auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
@ -117,7 +117,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (!new_part->ttl_infos.empty())
{
/// Write a file with ttl infos in json format.
auto out = disk->writeFile(part_path + "ttl.txt", 4096);
auto out = volume->getDisk()->writeFile(part_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(*out);
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
@ -128,13 +128,13 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
{
/// Write a file with a description of columns.
auto out = disk->writeFile(part_path + "columns.txt", 4096);
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
part_columns.writeText(*out);
}
{
/// Write file with checksums.
auto out = disk->writeFile(part_path + "checksums.txt", 4096);
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
checksums.write(*out);
}

View File

@ -167,7 +167,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
std::function<bool()> is_cancelled)
{
return checkDataPart(
data_part->disk,
data_part->volume->getDisk(),
data_part->getFullRelativePath(),
data_part->getColumns(),
data_part->getType(),

View File

@ -579,7 +579,7 @@ void StorageDistributed::startup()
if (!volume)
return;
for (const DiskPtr & disk : volume->disks)
for (const DiskPtr & disk : volume->getDisks())
createDirectoryMonitors(disk->getPath());
for (const String & path : getDataPaths())
@ -607,7 +607,7 @@ Strings StorageDistributed::getDataPaths() const
if (relative_data_path.empty())
return paths;
for (const DiskPtr & disk : volume->disks)
for (const DiskPtr & disk : volume->getDisks())
paths.push_back(disk->getPath() + relative_data_path);
return paths;
@ -811,7 +811,7 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Sto
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : volume->disks)
for (const DiskPtr & disk : volume->getDisks())
{
const String path(disk->getPath());
auto new_path = path + new_path_to_table_data;

View File

@ -289,7 +289,7 @@ public:
/// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
if (is_mutation)
reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->disk);
reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->volume);
else
{
IMergeTreeDataPart::TTLInfos ttl_infos;
@ -297,7 +297,7 @@ public:
for (auto & part_ptr : future_part_.parts)
{
ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->disk));
max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
}
reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr), max_volume_index);
@ -1250,7 +1250,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, const Context & c
for (auto & part : data_parts)
{
auto disk = part->disk;
auto disk = part->volume->getDisk();
String part_path = part->getFullRelativePath();
/// If the checksums file is not present, calculate the checksums and write them to disk.
String checksums_path = part_path + "checksums.txt";

View File

@ -1050,7 +1050,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
for (auto & part_ptr : parts)
{
ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->disk));
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
}
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
@ -1189,7 +1189,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
/// Can throw an exception.
ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk);
ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->volume);
auto table_lock = lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);

View File

@ -111,7 +111,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
columns_[i++]->insert(info.database);
columns_[i++]->insert(info.table);
columns_[i++]->insert(info.engine);
columns_[i++]->insert(part->disk->getName());
columns_[i++]->insert(part->volume->getDisk()->getName());
columns_[i++]->insert(part->getFullPath());
if (has_state_column)

View File

@ -138,7 +138,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns_, co
columns_[j++]->insert(info.database);
columns_[j++]->insert(info.table);
columns_[j++]->insert(info.engine);
columns_[j++]->insert(part->disk->getName());
columns_[j++]->insert(part->volume->getDisk()->getName());
columns_[j++]->insert(part->getFullPath());
columns_[j++]->insert(column.name);

View File

@ -55,8 +55,8 @@ Pipes StorageSystemStoragePolicies::read(
col_volume_name->insert(volumes[i]->getName());
col_priority->insert(i + 1);
Array disks;
disks.reserve(volumes[i]->disks.size());
for (const auto & disk_ptr : volumes[i]->disks)
disks.reserve(volumes[i]->getDisks().size());
for (const auto & disk_ptr : volumes[i]->getDisks())
disks.push_back(disk_ptr->getName());
col_disks->insert(disks);
col_max_part_size->insert(volumes[i]->max_data_part_size);