Addressed code review comments

This commit is contained in:
Alexander Burmak 2019-12-03 16:37:40 +03:00
parent 8104395dd2
commit edd11abad1
8 changed files with 163 additions and 256 deletions

View File

@ -29,8 +29,7 @@ std::filesystem::path getMountPoint(std::filesystem::path absolute_path);
#if !defined(__linux__)
[[noreturn]]
#endif
String
getFilesystemName([[maybe_unused]] const String & mount_point);
String getFilesystemName([[maybe_unused]] const String & mount_point);
inline struct statvfs getStatVFS(const String & path)
{

View File

@ -10,14 +10,14 @@ namespace DB
{
std::mutex DiskLocal::mutex;
ReservationPtr DiskLocal::reserve(UInt64 bytes) const
ReservationPtr DiskLocal::reserve(UInt64 bytes)
{
if (!tryReserve(bytes))
return {};
return std::make_unique<DiskLocalReservation>(std::static_pointer_cast<const DiskLocal>(shared_from_this()), bytes);
return std::make_unique<DiskLocalReservation>(std::static_pointer_cast<DiskLocal>(shared_from_this()), bytes);
}
bool DiskLocal::tryReserve(UInt64 bytes) const
bool DiskLocal::tryReserve(UInt64 bytes)
{
std::lock_guard lock(mutex);
if (bytes == 0)
@ -44,7 +44,7 @@ bool DiskLocal::tryReserve(UInt64 bytes) const
UInt64 DiskLocal::getTotalSpace() const
{
auto fs = getStatVFS(path);
auto fs = getStatVFS(disk_path);
UInt64 total_size = fs.f_blocks * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;
@ -55,7 +55,7 @@ UInt64 DiskLocal::getAvailableSpace() const
{
/// we use f_bavail, because part of b_free space is
/// available for superuser only and for system purposes
auto fs = getStatVFS(path);
auto fs = getStatVFS(disk_path);
UInt64 total_size = fs.f_bavail * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;
@ -70,93 +70,63 @@ UInt64 DiskLocal::getUnreservedSpace() const
return available_space;
}
DiskFilePtr DiskLocal::file(const String & path_) const
bool DiskLocal::exists(const String & path) const
{
return std::make_shared<DiskLocalFile>(std::static_pointer_cast<const DiskLocal>(shared_from_this()), path_);
return Poco::File(disk_path + path).exists();
}
DiskLocalFile::DiskLocalFile(const DiskPtr & disk_ptr_, const String & rel_path_)
: IDiskFile(disk_ptr_, rel_path_), file(disk_ptr->getPath() + rel_path)
bool DiskLocal::isFile(const String & path) const
{
return Poco::File(disk_path + path).isFile();
}
bool DiskLocalFile::exists() const
bool DiskLocal::isDirectory(const String & path) const
{
return file.exists();
return Poco::File(disk_path + path).isDirectory();
}
bool DiskLocalFile::isDirectory() const
void DiskLocal::createDirectory(const String & path)
{
return file.isDirectory();
Poco::File(disk_path + path).createDirectory();
}
void DiskLocalFile::createDirectory()
void DiskLocal::createDirectories(const String & path)
{
file.createDirectory();
Poco::File(disk_path + path).createDirectories();
}
void DiskLocalFile::createDirectories()
DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
{
file.createDirectories();
return std::make_unique<DiskLocalDirectoryIterator>(disk_path + path);
}
void DiskLocalFile::moveTo(const String & new_path)
void DiskLocal::moveFile(const String & from_path, const String & to_path)
{
file.renameTo(disk_ptr->getPath() + new_path);
Poco::File(disk_path + from_path).renameTo(disk_path + to_path);
}
void DiskLocalFile::copyTo(const String & new_path)
void DiskLocal::copyFile(const String & from_path, const String & to_path)
{
file.copyTo(disk_ptr->getPath() + new_path);
Poco::File(disk_path + from_path).copyTo(disk_path + to_path);
}
std::unique_ptr<ReadBuffer> DiskLocalFile::read() const
std::unique_ptr<ReadBuffer> DiskLocal::readFile(const String & path) const
{
return std::make_unique<ReadBufferFromFile>(file.path());
return std::make_unique<ReadBufferFromFile>(disk_path + path);
}
std::unique_ptr<WriteBuffer> DiskLocalFile::write()
std::unique_ptr<WriteBuffer> DiskLocal::writeFile(const String & path)
{
return std::make_unique<WriteBufferFromFile>(file.path());
}
DiskDirectoryIteratorImplPtr DiskLocalFile::iterateDirectory()
{
return std::make_unique<DiskLocalDirectoryIterator>(shared_from_this());
}
DiskLocalDirectoryIterator::DiskLocalDirectoryIterator(const DiskFilePtr & parent_) : parent(parent_), iter(parent_->fullPath())
{
updateCurrentFile();
}
void DiskLocalDirectoryIterator::next()
{
++iter;
updateCurrentFile();
}
void DiskLocalDirectoryIterator::updateCurrentFile()
{
current_file.reset();
if (iter != Poco::DirectoryIterator())
{
String path = parent->path() + iter.name();
current_file = std::make_shared<DiskLocalFile>(parent->disk(), path);
}
return std::make_unique<WriteBufferFromFile>(disk_path + path);
}
void DiskLocalReservation::update(UInt64 new_size)
{
std::lock_guard lock(DiskLocal::mutex);
auto disk_local = std::static_pointer_cast<const DiskLocal>(disk_ptr);
disk_local->reserved_bytes -= size;
disk->reserved_bytes -= size;
size = new_size;
disk_local->reserved_bytes += size;
disk->reserved_bytes += size;
}
DiskLocalReservation::~DiskLocalReservation()
@ -164,21 +134,20 @@ DiskLocalReservation::~DiskLocalReservation()
try
{
std::lock_guard lock(DiskLocal::mutex);
auto disk_local = std::static_pointer_cast<const DiskLocal>(disk_ptr);
if (disk_local->reserved_bytes < size)
if (disk->reserved_bytes < size)
{
disk_local->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservations size for disk '" + disk_ptr->getName() + "'.");
disk->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservations size for disk '" + disk->getName() + "'.");
}
else
{
disk_local->reserved_bytes -= size;
disk->reserved_bytes -= size;
}
if (disk_local->reservation_count == 0)
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservation count for disk '" + disk_ptr->getName() + "'.");
if (disk->reservation_count == 0)
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservation count for disk '" + disk->getName() + "'.");
else
--disk_local->reservation_count;
--disk->reservation_count;
}
catch (...)
{
@ -232,7 +201,7 @@ void registerDiskLocal(DiskFactory & factory)
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
return std::make_shared<const DiskLocal>(name, path, keep_free_space_bytes);
return std::make_shared<DiskLocal>(name, path, keep_free_space_bytes);
};
factory.registerDiskType("local", creator);
}

View File

@ -23,17 +23,17 @@ public:
friend class DiskLocalReservation;
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_), path(path_), keep_free_space_bytes(keep_free_space_bytes_)
: name(name_), disk_path(path_), keep_free_space_bytes(keep_free_space_bytes_)
{
if (path.back() != '/')
throw Exception("Disk path must ends with '/', but '" + path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
if (disk_path.back() != '/')
throw Exception("Disk disk_path must ends with '/', but '" + disk_path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
}
const String & getName() const override { return name; }
const String & getPath() const override { return path; }
const String & getPath() const override { return disk_path; }
ReservationPtr reserve(UInt64 bytes) const override;
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override;
@ -43,73 +43,58 @@ public:
UInt64 getKeepingFreeSpace() const override { return keep_free_space_bytes; }
DiskFilePtr file(const String & path) const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path) override;
bool supportsAtomicMove() const override { return true; }
private:
bool tryReserve(UInt64 bytes) const;
bool tryReserve(UInt64 bytes);
private:
const String name;
const String path;
const String disk_path;
const UInt64 keep_free_space_bytes;
/// Used for reservation counters modification
static std::mutex mutex;
mutable UInt64 reserved_bytes = 0;
mutable UInt64 reservation_count = 0;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
};
using DiskLocalPtr = std::shared_ptr<const DiskLocal>;
using DiskLocalPtr = std::shared_ptr<DiskLocal>;
class DiskLocalFile : public IDiskFile
class DiskLocalDirectoryIterator : public IDiskDirectoryIterator
{
public:
DiskLocalFile(const DiskPtr & disk_ptr_, const String & rel_path_);
explicit DiskLocalDirectoryIterator(const String & path) : iter(path) {}
bool exists() const override;
void next() override { ++iter; }
bool isDirectory() const override;
bool isValid() const override { return iter != Poco::DirectoryIterator(); }
void createDirectory() override;
void createDirectories() override;
void moveTo(const String & new_path) override;
void copyTo(const String & new_path) override;
std::unique_ptr<ReadBuffer> read() const override;
std::unique_ptr<WriteBuffer> write() override;
String name() const override { return iter.name(); }
private:
DiskDirectoryIteratorImplPtr iterateDirectory() override;
private:
Poco::File file;
};
class DiskLocalDirectoryIterator : public IDiskDirectoryIteratorImpl
{
public:
explicit DiskLocalDirectoryIterator(const DiskFilePtr & parent_);
const String & name() const override { return iter.name(); }
const DiskFilePtr & get() const override { return current_file; }
void next() override;
bool isValid() const override { return bool(current_file); }
private:
void updateCurrentFile();
private:
DiskFilePtr parent;
Poco::DirectoryIterator iter;
DiskFilePtr current_file;
};
/**
@ -119,11 +104,23 @@ private:
class DiskLocalReservation : public IReservation
{
public:
DiskLocalReservation(const DiskLocalPtr & disk_, UInt64 size_) : IReservation(disk_, size_) {}
DiskLocalReservation(const DiskLocalPtr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
void update(UInt64 new_size) override;
~DiskLocalReservation() override;
private:
DiskLocalPtr disk;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
}

View File

@ -34,7 +34,7 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<const DiskLocal>(default_disk_name, context.getPath(), 0));
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context.getPath(), 0));
}
@ -112,7 +112,7 @@ Volume::Volume(
}
ReservationPtr Volume::reserve(UInt64 expected_size) const
ReservationPtr Volume::reserve(UInt64 expected_size)
{
/// This volume can not store files which size greater than max_data_part_size

View File

@ -69,7 +69,7 @@ public:
/// Uses Round-robin to choose disk for reservation.
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) const override;
ReservationPtr reserve(UInt64 bytes) override;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
@ -88,7 +88,7 @@ private:
const String name;
};
using VolumePtr = std::shared_ptr<const Volume>;
using VolumePtr = std::shared_ptr<Volume>;
using Volumes = std::vector<VolumePtr>;
@ -96,7 +96,7 @@ using Volumes = std::vector<VolumePtr>;
* Contains all information about volumes configuration for Storage.
* Can determine appropriate Volume and Disk for each reservation.
*/
class StoragePolicy : public Space
class StoragePolicy
{
public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disks);
@ -116,10 +116,10 @@ public:
/// Get free space from most free disk
UInt64 getMaxUnreservedFreeSpace() const;
const String & getName() const override { return name; }
const String & getName() const { return name; }
/// Returns valid reservation or null
ReservationPtr reserve(UInt64 bytes) const override;
ReservationPtr reserve(UInt64 bytes) const;
/// Reserve space on any volume with index > min_volume_index
ReservationPtr reserve(UInt64 bytes, size_t min_volume_index) const;

View File

@ -16,12 +16,8 @@ extern const Metric DiskSpaceReservedForMerge;
namespace DB
{
class IDiskFile;
using DiskFilePtr = std::shared_ptr<IDiskFile>;
class DiskDirectoryIterator;
class IDiskDirectoryIteratorImpl;
using DiskDirectoryIteratorImplPtr = std::unique_ptr<IDiskDirectoryIteratorImpl>;
class IDiskDirectoryIterator;
using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
@ -35,14 +31,16 @@ class WriteBuffer;
class Space : public std::enable_shared_from_this<Space>
{
public:
/// Return the name of the space object.
virtual const String & getName() const = 0;
virtual ReservationPtr reserve(UInt64 bytes) const = 0;
/// Reserve the specified number of bytes.
virtual ReservationPtr reserve(UInt64 bytes) = 0;
virtual ~Space() = default;
};
using SpacePtr = std::shared_ptr<const Space>;
using SpacePtr = std::shared_ptr<Space>;
/**
* A unit of storage persisting data and metadata.
@ -54,126 +52,77 @@ using SpacePtr = std::shared_ptr<const Space>;
class IDisk : public Space
{
public:
/// Root path for all files stored on the disk.
/// It's not required to be a local filesystem path.
virtual const String & getPath() const = 0;
/// Total available space on disk.
/// Total available space on the disk.
virtual UInt64 getTotalSpace() const = 0;
/// Space currently available on disk.
/// Space currently available on the disk.
virtual UInt64 getAvailableSpace() const = 0;
/// Currently available (prev method) minus already reserved space.
/// Space available for reservation (available space minus reserved space).
virtual UInt64 getUnreservedSpace() const = 0;
/// Amount of bytes which should be kept free on this disk.
/// Amount of bytes which should be kept free on the disk.
virtual UInt64 getKeepingFreeSpace() const { return 0; }
virtual DiskFilePtr file(const String & path) const = 0;
/// Return `true` if the specified file exists.
virtual bool exists(const String & path) const = 0;
/// Return `true` if the specified file exists and it's a regular file (not a directory or special file type).
virtual bool isFile(const String & path) const = 0;
/// Return `true` if the specified file exists and it's a directory.
virtual bool isDirectory(const String & path) const = 0;
/// Create directory.
virtual void createDirectory(const String & path) = 0;
/// Create directory and all parent directories if necessary.
virtual void createDirectories(const String & path) = 0;
/// Return iterator to the contents of the specified directory.
virtual DiskDirectoryIteratorPtr iterateDirectory(const String & path) = 0;
/// Move the file from `from_path` to `to_path`.
virtual void moveFile(const String & from_path, const String & to_path) = 0;
/// Copy the file from `from_path` to `to_path`.
virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// Open the file for read and return ReadBuffer object.
virtual std::unique_ptr<ReadBuffer> readFile(const String & path) const = 0;
/// Open the file for write and return WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path) = 0;
/// Return `true` if underlying storage supports atomic move of files (rename).
virtual bool supportsAtomicMove() const { return false; }
};
using DiskPtr = std::shared_ptr<const IDisk>;
using DiskPtr = std::shared_ptr<IDisk>;
using Disks = std::vector<DiskPtr>;
class IDiskFile : public std::enable_shared_from_this<IDiskFile>
/**
* Iterator of directory contents on particular disk.
*/
class IDiskDirectoryIterator
{
public:
friend class DiskDirectoryIterator;
/// Return disk which the file belongs to.
const DiskPtr & disk() const { return disk_ptr; }
const String & path() const { return rel_path; }
String fullPath() const { return disk_ptr->getPath() + rel_path; }
/// Returns true if the file exists.
virtual bool exists() const = 0;
/// Returns true if the file is a directory.
virtual bool isDirectory() const = 0;
/// Creates a directory.
virtual void createDirectory() = 0;
/// Creates a directory and all parent directories if necessary.
virtual void createDirectories() = 0;
virtual void moveTo(const String & new_path) = 0;
virtual void copyTo(const String & new_path) = 0;
/// Open the file for read and returns ReadBuffer object.
virtual std::unique_ptr<ReadBuffer> read() const = 0;
/// Open the file for write and returns WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> write() = 0;
virtual ~IDiskFile() = default;
protected:
IDiskFile(const DiskPtr & disk_ptr_, const String & rel_path_) : disk_ptr(disk_ptr_), rel_path(rel_path_) {}
private:
virtual DiskDirectoryIteratorImplPtr iterateDirectory() = 0;
protected:
DiskPtr disk_ptr;
String rel_path;
};
class IDiskDirectoryIteratorImpl
{
public:
virtual const String & name() const = 0;
virtual const DiskFilePtr & get() const = 0;
/// Iterate to the next file.
virtual void next() = 0;
/// Return `true` if the iterator points to a valid element.
virtual bool isValid() const = 0;
virtual ~IDiskDirectoryIteratorImpl() = default;
/// Name of the file that the iterator currently points to.
virtual String name() const = 0;
virtual ~IDiskDirectoryIterator() = default;
};
class DiskDirectoryIterator final
{
public:
DiskDirectoryIterator() = default;
explicit DiskDirectoryIterator(const DiskFilePtr & file) : impl(file->iterateDirectory()) {}
String name() const { return impl->name(); }
DiskDirectoryIterator & operator++()
{
impl->next();
return *this;
}
const DiskFilePtr & operator*() const { return impl->get(); }
const DiskFilePtr & operator->() const { return impl->get(); }
bool operator==(const DiskDirectoryIterator & iterator) const
{
if (this == &iterator)
return true;
if (iterator.impl && iterator.impl->isValid())
return false;
if (impl && impl->isValid())
return false;
return true;
}
bool operator!=(const DiskDirectoryIterator & iterator) const { return !operator==(iterator); }
private:
DiskDirectoryIteratorImplPtr impl;
};
/**
* Information about reserved size on particular disk.
*/
@ -181,27 +130,22 @@ class IReservation
{
public:
/// Get reservation size.
UInt64 getSize() const { return size; }
virtual UInt64 getSize() const = 0;
/// Get disk where reservation take place.
const DiskPtr & getDisk() const { return disk_ptr; }
virtual DiskPtr getDisk() const = 0;
/// Changes amount of reserved space.
virtual void update(UInt64 new_size) = 0;
/// Unreserves reserved space.
virtual ~IReservation() = default;
protected:
explicit IReservation(const DiskPtr & disk_ptr_, UInt64 size_)
: disk_ptr(disk_ptr_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
protected:
DiskPtr disk_ptr;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
/// Return full path to a file on disk.
inline String fullPath(const DiskPtr & disk, const String & path)
{
return disk->getPath() + path;
}
}

View File

@ -1173,17 +1173,15 @@ void MergeTreeData::rename(
for (const auto & disk : disks)
{
auto new_table_file = disk->file(new_table_path);
if (new_table_file->exists())
throw Exception{"Target path already exists: " + new_table_file->fullPath(), ErrorCodes::DIRECTORY_ALREADY_EXISTS};
if (disk->exists(new_table_path))
throw Exception{"Target path already exists: " + fullPath(disk, new_table_path), ErrorCodes::DIRECTORY_ALREADY_EXISTS};
}
for (const auto & disk : disks)
{
disk->file(new_db_path)->createDirectory();
disk->createDirectory(new_db_path);
disk->file(old_table_path)->moveTo(new_table_path);
disk->moveFile(old_table_path, new_table_path);
}
global_context.dropCaches();
@ -2733,7 +2731,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
throw Exception(no_parts_to_move_message, ErrorCodes::UNKNOWN_DISK);
}
if (!movePartsToSpace(parts, std::static_pointer_cast<const Space>(disk)))
if (!movePartsToSpace(parts, std::static_pointer_cast<Space>(disk)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
}
@ -2785,7 +2783,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception(no_parts_to_move_message, ErrorCodes::UNKNOWN_DISK);
}
if (!movePartsToSpace(parts, std::static_pointer_cast<const Space>(volume)))
if (!movePartsToSpace(parts, std::static_pointer_cast<Space>(volume)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
}
@ -3520,7 +3518,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const
if (!reservation)
throw Exception("Move is not possible. Not enough space on '" + space->getName() + "'", ErrorCodes::NOT_ENOUGH_SPACE);
auto & reserved_disk = reservation->getDisk();
auto reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())

View File

@ -547,7 +547,7 @@ void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
void MergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const
{
auto & reserved_disk = reservation->getDisk();
auto reserved_disk = reservation->getDisk();
if (reserved_disk->getName() == disk->getName())
throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR);