This commit is contained in:
Nikolai Kochetov 2022-04-07 11:58:38 +00:00
parent 5a1392a8e3
commit 5cbec37907
6 changed files with 107 additions and 18 deletions

View File

@ -2,52 +2,65 @@
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Disks/IVolume.h>
#include <IO/WriteBufferFromFileBase.h>
#include <base/logger_useful.h>
#include <Disks/IStoragePolicy.h>
namespace DB
{
DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string relative_root_path_)
: volume(std::move(volume_)), root_path(std::move(root_path_)), relative_root_path(std::move(relative_root_path_))
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int DIRECTORY_ALREADY_EXISTS;
}
DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_)
: volume(std::move(volume_)), root_path(std::move(root_path_))
{
}
std::string DataPartStorageOnDisk::getFullPath() const
{
return fs::path(volume->getDisk()->getPath()) / root_path;
}
std::unique_ptr<ReadBufferFromFileBase> DataPartStorageOnDisk::readFile(
const std::string & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
return volume->getDisk()->readFile(fs::path(relative_root_path) / path, settings, read_hint, file_size);
return volume->getDisk()->readFile(fs::path(root_path) / path, settings, read_hint, file_size);
}
bool DataPartStorageOnDisk::exists(const std::string & path) const
{
return volume->getDisk()->exists(fs::path(relative_root_path) / path);
return volume->getDisk()->exists(fs::path(root_path) / path);
}
bool DataPartStorageOnDisk::exists() const
{
return volume->getDisk()->exists(relative_root_path);
return volume->getDisk()->exists(root_path);
}
size_t DataPartStorageOnDisk::getFileSize(const String & path) const
{
return volume->getDisk()->getFileSize(fs::path(relative_root_path) / path);
return volume->getDisk()->getFileSize(fs::path(root_path) / path);
}
DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterate() const
{
return volume->getDisk()->iterateDirectory(relative_root_path);
return volume->getDisk()->iterateDirectory(root_path);
}
DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String & path) const
{
return volume->getDisk()->iterateDirectory(fs::path(relative_root_path) / path);
return volume->getDisk()->iterateDirectory(fs::path(root_path) / path);
}
DataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name) const
{
return std::make_shared<DataPartStorageOnDisk>(volume, fs::path(relative_root_path) / name);
return std::make_shared<DataPartStorageOnDisk>(volume, fs::path(root_path) / name);
}
static UInt64 calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String & from)
@ -64,12 +77,12 @@ static UInt64 calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String &
UInt64 DataPartStorageOnDisk::calculateTotalSizeOnDisk() const
{
return calculateTotalSizeOnDiskImpl(volume->getDisk(), relative_root_path);
return calculateTotalSizeOnDiskImpl(volume->getDisk(), root_path);
}
void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksums) const
{
std::string path = fs::path(relative_root_path) / "checksums.txt";
std::string path = fs::path(root_path) / "checksums.txt";
{
auto out = volume->getDisk()->writeFile(path + ".tmp", 4096);
@ -81,7 +94,7 @@ void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksum
void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const
{
std::string path = fs::path(relative_root_path) / "columns.txt";
std::string path = fs::path(root_path) / "columns.txt";
{
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096);
@ -91,6 +104,55 @@ void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const
volume->getDisk()->moveFile(path + ".tmp", path);
}
void DataPartStorageOnDisk::rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync)
{
if (!volume->getDisk()->exists(root_path))
throw Exception("Part directory " + fullPath(volume->getDisk(), root_path) + " doesn't exist. Most likely it is a logical error.", ErrorCodes::FILE_DOESNT_EXIST);
/// Why?
String to = fs::path(new_relative_path) / "";
if (volume->getDisk()->exists(to))
{
if (remove_new_dir_if_exists)
{
Names files;
volume->getDisk()->listFiles(to, files);
LOG_WARNING(log, "Part directory {} already exists and contains {} files. Removing it.", fullPath(volume->getDisk(), to), files.size());
volume->getDisk()->removeRecursive(to);
}
else
{
throw Exception("Part directory " + fullPath(volume->getDisk(), to) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
}
}
// metadata_manager->deleteAll(true);
// metadata_manager->assertAllDeleted(true);
/// Why?
volume->getDisk()->setLastModified(root_path, Poco::Timestamp::fromEpochTime(time(nullptr)));
volume->getDisk()->moveDirectory(root_path, to);
root_path = new_relative_path;
// metadata_manager->updateAll(true);
SyncGuardPtr sync_guard;
if (fsync)
sync_guard = volume->getDisk()->getDirectorySyncGuard(root_path);
}
bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & storage_policy) const
{
/// `IMergeTreeDataPart::volume` describes space where current part belongs, and holds
/// `SingleDiskVolume` object which does not contain up-to-date settings of corresponding volume.
/// Therefore we shall obtain volume from storage policy.
auto volume_ptr = storage_policy.getVolume(storage_policy.getVolumeIndexByDisk(volume->getDisk()));
return !volume_ptr->areMergesAvoided();
}
std::string DataPartStorageOnDisk::getName() const
{
return volume->getDisk()->getName();

View File

@ -13,7 +13,7 @@ using VolumePtr = std::shared_ptr<IVolume>;
class DataPartStorageOnDisk final : public IDataPartStorage
{
public:
explicit DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string relative_root_path_);
explicit DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_);
std::unique_ptr<ReadBufferFromFileBase> readFile(
const std::string & path,
@ -29,14 +29,18 @@ public:
DiskDirectoryIteratorPtr iterate() const override;
DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
std::string getFullPath() const override { return root_path; }
std::string getFullRelativePath() const override { return relative_root_path; }
std::string getFullPath() const override;
std::string getFullRelativePath() const override { return root_path; }
UInt64 calculateTotalSizeOnDisk() const override;
void writeChecksums(MergeTreeDataPartChecksums & checksums) const override;
void writeColumns(NamesAndTypesList & columns) const override;
bool shallParticipateInMerges(const IStoragePolicy &) const;
void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync) override;
std::string getName() const override;
DataPartStoragePtr getProjection(const std::string & name) const override;
@ -44,7 +48,6 @@ public:
private:
VolumePtr volume;
std::string root_path;
std::string relative_root_path;
};
}

View File

@ -15,6 +15,8 @@ using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
struct MergeTreeDataPartChecksums;
class IStoragePolicy;
/// This is an abstraction of storage for data part files.
/// Generally, it contains read-only methods from IDisk.
class IDataPartStorage
@ -47,6 +49,11 @@ public:
virtual void writeChecksums(MergeTreeDataPartChecksums & checksums) const = 0;
virtual void writeColumns(NamesAndTypesList & columns) const = 0;
/// A leak of abstraction
virtual bool shallParticipateInMerges(const IStoragePolicy &) const { return true; }
virtual void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync);
/// Disk name
virtual std::string getName() const = 0;

View File

@ -1248,8 +1248,15 @@ try
{
assertOnDisk();
String from = getFullRelativePath();
String to = fs::path(storage.relative_data_path) / (parent_part ? parent_part->relative_path : "") / new_relative_path / "";
if (parent_part)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Move is not supported for projection parts: moving form {} to {}",
data_part_storage->getFullPath(), new_relative_path);
String to = fs::path(storage.relative_data_path) / new_relative_path / "";
data_part_storage->move(to);
if (!volume->getDisk()->exists(from))
throw Exception("Part directory " + fullPath(volume->getDisk(), from) + " doesn't exist. Most likely it is a logical error.", ErrorCodes::FILE_DOESNT_EXIST);
@ -1278,6 +1285,8 @@ try
relative_path = new_relative_path;
metadata_manager->updateAll(true);
metadata_manager->move(from, to);
SyncGuardPtr sync_guard;
if (storage.getSettings()->fsync_part_directory)
sync_guard = volume->getDisk()->getDirectorySyncGuard(to);

View File

@ -47,6 +47,8 @@ public:
/// If include_projection is true, also update metadatas in projection parts.
virtual void updateAll(bool include_projection) = 0;
virtual void move(const String & from, const String & to) = 0;
/// Check all metadatas in part.
virtual std::unordered_map<String, uint128> check() const = 0;

View File

@ -34,6 +34,8 @@ public:
/// Need to be called after part directory is renamed.
void updateAll(bool include_projection) override;
void move(const String & from, const String & to) override;
/// Check if all metadatas in part from RocksDB cache are up to date.
std::unordered_map<String, uint128> check() const override;
@ -49,6 +51,10 @@ private:
void getKeysAndCheckSums(Strings & keys, std::vector<uint128> & checksums) const;
void deleteAllImpl(const String & path, bool include_projection);
void assertAllDeletedImpl(const String & path, bool include_projection) const;
void updateAllImpl(const String & path, bool include_projection);
MergeTreeMetadataCachePtr cache;
};