diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp index 073e8c2c6e0..0ec9acaa0c6 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp @@ -2,52 +2,65 @@ #include #include #include +#include +#include namespace DB { -DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string relative_root_path_) - : volume(std::move(volume_)), root_path(std::move(root_path_)), relative_root_path(std::move(relative_root_path_)) +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int DIRECTORY_ALREADY_EXISTS; +} + +DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_) + : volume(std::move(volume_)), root_path(std::move(root_path_)) { } +std::string DataPartStorageOnDisk::getFullPath() const +{ + return fs::path(volume->getDisk()->getPath()) / root_path; +} + std::unique_ptr DataPartStorageOnDisk::readFile( const std::string & path, const ReadSettings & settings, std::optional read_hint, std::optional file_size) const { - return volume->getDisk()->readFile(fs::path(relative_root_path) / path, settings, read_hint, file_size); + return volume->getDisk()->readFile(fs::path(root_path) / path, settings, read_hint, file_size); } bool DataPartStorageOnDisk::exists(const std::string & path) const { - return volume->getDisk()->exists(fs::path(relative_root_path) / path); + return volume->getDisk()->exists(fs::path(root_path) / path); } bool DataPartStorageOnDisk::exists() const { - return volume->getDisk()->exists(relative_root_path); + return volume->getDisk()->exists(root_path); } size_t DataPartStorageOnDisk::getFileSize(const String & path) const { - return volume->getDisk()->getFileSize(fs::path(relative_root_path) / path); + return volume->getDisk()->getFileSize(fs::path(root_path) / path); } DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterate() const { - return volume->getDisk()->iterateDirectory(relative_root_path); + return volume->getDisk()->iterateDirectory(root_path); } DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String & path) const { - return volume->getDisk()->iterateDirectory(fs::path(relative_root_path) / path); + return volume->getDisk()->iterateDirectory(fs::path(root_path) / path); } DataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name) const { - return std::make_shared(volume, fs::path(relative_root_path) / name); + return std::make_shared(volume, fs::path(root_path) / name); } static UInt64 calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String & from) @@ -64,12 +77,12 @@ static UInt64 calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String & UInt64 DataPartStorageOnDisk::calculateTotalSizeOnDisk() const { - return calculateTotalSizeOnDiskImpl(volume->getDisk(), relative_root_path); + return calculateTotalSizeOnDiskImpl(volume->getDisk(), root_path); } void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksums) const { - std::string path = fs::path(relative_root_path) / "checksums.txt"; + std::string path = fs::path(root_path) / "checksums.txt"; { auto out = volume->getDisk()->writeFile(path + ".tmp", 4096); @@ -81,7 +94,7 @@ void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksum void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const { - std::string path = fs::path(relative_root_path) / "columns.txt"; + std::string path = fs::path(root_path) / "columns.txt"; { auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096); @@ -91,6 +104,55 @@ void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const volume->getDisk()->moveFile(path + ".tmp", path); } +void DataPartStorageOnDisk::rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync) +{ + if (!volume->getDisk()->exists(root_path)) + throw Exception("Part directory " + fullPath(volume->getDisk(), root_path) + " doesn't exist. Most likely it is a logical error.", ErrorCodes::FILE_DOESNT_EXIST); + + /// Why? + String to = fs::path(new_relative_path) / ""; + + if (volume->getDisk()->exists(to)) + { + if (remove_new_dir_if_exists) + { + Names files; + volume->getDisk()->listFiles(to, files); + + LOG_WARNING(log, "Part directory {} already exists and contains {} files. Removing it.", fullPath(volume->getDisk(), to), files.size()); + + volume->getDisk()->removeRecursive(to); + } + else + { + throw Exception("Part directory " + fullPath(volume->getDisk(), to) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); + } + } + + // metadata_manager->deleteAll(true); + // metadata_manager->assertAllDeleted(true); + + /// Why? + volume->getDisk()->setLastModified(root_path, Poco::Timestamp::fromEpochTime(time(nullptr))); + volume->getDisk()->moveDirectory(root_path, to); + root_path = new_relative_path; + // metadata_manager->updateAll(true); + + SyncGuardPtr sync_guard; + if (fsync) + sync_guard = volume->getDisk()->getDirectorySyncGuard(root_path); +} + +bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & storage_policy) const +{ + /// `IMergeTreeDataPart::volume` describes space where current part belongs, and holds + /// `SingleDiskVolume` object which does not contain up-to-date settings of corresponding volume. + /// Therefore we shall obtain volume from storage policy. + auto volume_ptr = storage_policy.getVolume(storage_policy.getVolumeIndexByDisk(volume->getDisk())); + + return !volume_ptr->areMergesAvoided(); +} + std::string DataPartStorageOnDisk::getName() const { return volume->getDisk()->getName(); diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.h b/src/Storages/MergeTree/DataPartStorageOnDisk.h index 474833500c0..69858440fe1 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.h +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.h @@ -13,7 +13,7 @@ using VolumePtr = std::shared_ptr; class DataPartStorageOnDisk final : public IDataPartStorage { public: - explicit DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string relative_root_path_); + explicit DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_); std::unique_ptr readFile( const std::string & path, @@ -29,14 +29,18 @@ public: DiskDirectoryIteratorPtr iterate() const override; DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const override; - std::string getFullPath() const override { return root_path; } - std::string getFullRelativePath() const override { return relative_root_path; } + std::string getFullPath() const override; + std::string getFullRelativePath() const override { return root_path; } UInt64 calculateTotalSizeOnDisk() const override; void writeChecksums(MergeTreeDataPartChecksums & checksums) const override; void writeColumns(NamesAndTypesList & columns) const override; + bool shallParticipateInMerges(const IStoragePolicy &) const; + + void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync) override; + std::string getName() const override; DataPartStoragePtr getProjection(const std::string & name) const override; @@ -44,7 +48,6 @@ public: private: VolumePtr volume; std::string root_path; - std::string relative_root_path; }; } diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 1bf5c890ee8..5af628971bf 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -15,6 +15,8 @@ using DiskDirectoryIteratorPtr = std::unique_ptr; struct MergeTreeDataPartChecksums; +class IStoragePolicy; + /// This is an abstraction of storage for data part files. /// Generally, it contains read-only methods from IDisk. class IDataPartStorage @@ -47,6 +49,11 @@ public: virtual void writeChecksums(MergeTreeDataPartChecksums & checksums) const = 0; virtual void writeColumns(NamesAndTypesList & columns) const = 0; + /// A leak of abstraction + virtual bool shallParticipateInMerges(const IStoragePolicy &) const { return true; } + + virtual void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync); + /// Disk name virtual std::string getName() const = 0; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 5c944d284d6..fc66f547565 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1248,8 +1248,15 @@ try { assertOnDisk(); - String from = getFullRelativePath(); - String to = fs::path(storage.relative_data_path) / (parent_part ? parent_part->relative_path : "") / new_relative_path / ""; + if (parent_part) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Move is not supported for projection parts: moving form {} to {}", + data_part_storage->getFullPath(), new_relative_path); + + String to = fs::path(storage.relative_data_path) / new_relative_path / ""; + + data_part_storage->move(to); if (!volume->getDisk()->exists(from)) throw Exception("Part directory " + fullPath(volume->getDisk(), from) + " doesn't exist. Most likely it is a logical error.", ErrorCodes::FILE_DOESNT_EXIST); @@ -1278,6 +1285,8 @@ try relative_path = new_relative_path; metadata_manager->updateAll(true); + metadata_manager->move(from, to); + SyncGuardPtr sync_guard; if (storage.getSettings()->fsync_part_directory) sync_guard = volume->getDisk()->getDirectorySyncGuard(to); diff --git a/src/Storages/MergeTree/IPartMetadataManager.h b/src/Storages/MergeTree/IPartMetadataManager.h index 876000de412..68bdada3464 100644 --- a/src/Storages/MergeTree/IPartMetadataManager.h +++ b/src/Storages/MergeTree/IPartMetadataManager.h @@ -47,6 +47,8 @@ public: /// If include_projection is true, also update metadatas in projection parts. virtual void updateAll(bool include_projection) = 0; + virtual void move(const String & from, const String & to) = 0; + /// Check all metadatas in part. virtual std::unordered_map check() const = 0; diff --git a/src/Storages/MergeTree/PartMetadataManagerWithCache.h b/src/Storages/MergeTree/PartMetadataManagerWithCache.h index 06e7a85ba2b..856ea991611 100644 --- a/src/Storages/MergeTree/PartMetadataManagerWithCache.h +++ b/src/Storages/MergeTree/PartMetadataManagerWithCache.h @@ -34,6 +34,8 @@ public: /// Need to be called after part directory is renamed. void updateAll(bool include_projection) override; + void move(const String & from, const String & to) override; + /// Check if all metadatas in part from RocksDB cache are up to date. std::unordered_map check() const override; @@ -49,6 +51,10 @@ private: void getKeysAndCheckSums(Strings & keys, std::vector & checksums) const; + void deleteAllImpl(const String & path, bool include_projection); + void assertAllDeletedImpl(const String & path, bool include_projection) const; + void updateAllImpl(const String & path, bool include_projection); + MergeTreeMetadataCachePtr cache; };