From f2dca656f9307f90c17384d75dec440964f2d955 Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Thu, 19 Mar 2020 19:37:55 +0300 Subject: [PATCH] MergeTree full support for S3 (#9646) * IMergeDataPart full S3 support. * MergeTreeData full S3 support. * Compilation fixes. * Mutations and merges S3 support. * Fixed removing data part. * MergeTree for S3 integration tests and fixes. * Code style issues. * Enable AWS logging. * Fixed hardlink creation for DiskLocal. * Fixed localBackup.cpp compilation. * Fixed attaching partition. * Get rid of extra methods in IDisk. * Fixed storage config reloading. * More tests with table manipulations. * Remove unused error codes. * Move localBackup to MergeTree folder. * Minor fixes. --- dbms/src/Disks/DiskLocal.cpp | 29 +- dbms/src/Disks/DiskLocal.h | 8 + dbms/src/Disks/DiskMemory.cpp | 17 ++ dbms/src/Disks/DiskMemory.h | 8 +- dbms/src/Disks/DiskS3.cpp | 34 ++- dbms/src/Disks/DiskS3.h | 8 +- dbms/src/Disks/IDisk.cpp | 37 +++ dbms/src/Disks/IDisk.h | 19 ++ dbms/src/Interpreters/Context.cpp | 13 +- .../Storages/MergeTree/IMergeTreeDataPart.cpp | 63 ++-- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 283 ++++++++---------- dbms/src/Storages/MergeTree/MergeTreeData.h | 16 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 52 ++-- .../Storages/MergeTree/MergeTreeSettings.h | 1 - dbms/src/Storages/MergeTree/checkDataPart.cpp | 1 - .../MergeTree}/localBackup.cpp | 37 +-- .../MergeTree}/localBackup.h | 6 +- dbms/src/Storages/StorageMergeTree.cpp | 5 +- .../config.d/bg_processing_pool_conf.xml | 5 + .../configs/config.d/storage_conf.xml | 28 ++ .../configs/config.d/users.xml | 6 + .../test_merge_tree_s3/configs/config.xml | 20 -- .../test_merge_tree_s3/configs/users.xml | 23 -- .../integration/test_merge_tree_s3/test.py | 211 +++++++++++-- .../convert-month-partitioned-parts/main.cpp | 12 +- 25 files changed, 599 insertions(+), 343 deletions(-) rename dbms/src/{Common => Storages/MergeTree}/localBackup.cpp (51%) rename dbms/src/{Common => Storages/MergeTree}/localBackup.h (80%) create mode 100644 dbms/tests/integration/test_merge_tree_s3/configs/config.d/bg_processing_pool_conf.xml create mode 100644 dbms/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml create mode 100644 dbms/tests/integration/test_merge_tree_s3/configs/config.d/users.xml delete mode 100644 dbms/tests/integration/test_merge_tree_s3/configs/users.xml diff --git a/dbms/src/Disks/DiskLocal.cpp b/dbms/src/Disks/DiskLocal.cpp index 418ce966955..a1c2641e2f3 100644 --- a/dbms/src/Disks/DiskLocal.cpp +++ b/dbms/src/Disks/DiskLocal.cpp @@ -1,4 +1,5 @@ #include "DiskLocal.h" +#include #include "DiskFactory.h" #include @@ -11,7 +12,6 @@ namespace DB { - namespace ErrorCodes { extern const int UNKNOWN_ELEMENT_IN_CONFIG; @@ -254,6 +254,33 @@ Poco::Timestamp DiskLocal::getLastModified(const String & path) return Poco::File(disk_path + path).getLastModified(); } +void DiskLocal::createHardLink(const String & src_path, const String & dst_path) +{ + DB::createHardLink(disk_path + src_path, disk_path + dst_path); +} + +void DiskLocal::createFile(const String & path) +{ + Poco::File(disk_path + path).createFile(); +} + +void DiskLocal::setReadOnly(const String & path) +{ + Poco::File(disk_path + path).setReadOnly(true); +} + +bool inline isSameDiskType(const IDisk & one, const IDisk & another) +{ + return typeid(one) == typeid(another); +} + +void DiskLocal::copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) +{ + if (isSameDiskType(*this, *to_disk)) + Poco::File(disk_path + from_path).copyTo(to_disk->getPath() + to_path); /// Use more optimal way. + else + IDisk::copy(from_path, to_disk, to_path); /// Copy files through buffers. +} void DiskLocalReservation::update(UInt64 new_size) { diff --git a/dbms/src/Disks/DiskLocal.h b/dbms/src/Disks/DiskLocal.h index 77c86fa1f3e..61a3994b655 100644 --- a/dbms/src/Disks/DiskLocal.h +++ b/dbms/src/Disks/DiskLocal.h @@ -61,12 +61,16 @@ public: DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + void createFile(const String & path) override; + void moveFile(const String & from_path, const String & to_path) override; void replaceFile(const String & from_path, const String & to_path) override; void copyFile(const String & from_path, const String & to_path) override; + void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) override; + void listFiles(const String & path, std::vector & file_names) override; std::unique_ptr readFile( @@ -91,6 +95,10 @@ public: Poco::Timestamp getLastModified(const String & path) override; + void setReadOnly(const String & path) override; + + void createHardLink(const String & src_path, const String & dst_path) override; + private: bool tryReserve(UInt64 bytes); diff --git a/dbms/src/Disks/DiskMemory.cpp b/dbms/src/Disks/DiskMemory.cpp index 15b2b2152b1..6ae2af63485 100644 --- a/dbms/src/Disks/DiskMemory.cpp +++ b/dbms/src/Disks/DiskMemory.cpp @@ -386,10 +386,27 @@ void DiskMemory::removeRecursive(const String & path) void DiskMemory::listFiles(const String & path, std::vector & file_names) { + std::lock_guard lock(mutex); + for (auto it = iterateDirectory(path); it->isValid(); it->next()) file_names.push_back(it->name()); } +void DiskMemory::createHardLink(const String &, const String &) +{ + throw Exception("Method createHardLink is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); +} + +void DiskMemory::createFile(const String &) +{ + throw Exception("Method createFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); +} + +void DiskMemory::setReadOnly(const String &) +{ + throw Exception("Method setReadOnly is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); +} + using DiskMemoryPtr = std::shared_ptr; diff --git a/dbms/src/Disks/DiskMemory.h b/dbms/src/Disks/DiskMemory.h index 8ddb5307c41..b0c1d30c61d 100644 --- a/dbms/src/Disks/DiskMemory.h +++ b/dbms/src/Disks/DiskMemory.h @@ -54,6 +54,8 @@ public: DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + void createFile(const String & path) override; + void moveFile(const String & from_path, const String & to_path) override; void replaceFile(const String & from_path, const String & to_path) override; @@ -80,10 +82,14 @@ public: void removeRecursive(const String & path) override; - void setLastModified(const String &, const Poco::Timestamp &) override { } + void setLastModified(const String &, const Poco::Timestamp &) override {} Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp(); } + void setReadOnly(const String & path) override; + + void createHardLink(const String & src_path, const String & dst_path) override; + private: void createDirectoriesImpl(const String & path); void replaceFileImpl(const String & from_path, const String & to_path); diff --git a/dbms/src/Disks/DiskS3.cpp b/dbms/src/Disks/DiskS3.cpp index d3712631a58..8bd5c0f074d 100644 --- a/dbms/src/Disks/DiskS3.cpp +++ b/dbms/src/Disks/DiskS3.cpp @@ -303,7 +303,12 @@ namespace finalized = true; } - void sync() override { metadata.save(true); } + void sync() override + { + if (finalized) + metadata.save(true); + } + std::string getFileName() const override { return metadata.metadata_file_path; } private: @@ -480,14 +485,12 @@ void DiskS3::copyFile(const String & from_path, const String & to_path) Metadata from(metadata_path + from_path); Metadata to(metadata_path + to_path, true); - for (UInt32 i = 0; i < from.s3_objects_count; ++i) + for (const auto & [path, size] : from.s3_objects) { - auto path = from.s3_objects[i].first; - auto size = from.s3_objects[i].second; auto new_path = s3_root_path + getRandomName(); Aws::S3::Model::CopyObjectRequest req; + req.SetCopySource(bucket + "/" + path); req.SetBucket(bucket); - req.SetCopySource(path); req.SetKey(new_path); throwIfError(client->CopyObject(req)); @@ -621,6 +624,27 @@ Poco::Timestamp DiskS3::getLastModified(const String & path) return Poco::File(metadata_path + path).getLastModified(); } +void DiskS3::createHardLink(const String & src_path, const String & dst_path) +{ + /** + * TODO: Replace with optimal implementation: + * Store links into a list in metadata file. + * Hardlink creation is adding new link to list and just metadata file copy. + */ + copyFile(src_path, dst_path); +} + +void DiskS3::createFile(const String & path) +{ + /// Create empty metadata file. + Metadata metadata(metadata_path + path, true); + metadata.save(); +} + +void DiskS3::setReadOnly(const String & path) +{ + Poco::File(metadata_path + path).setReadOnly(true); +} DiskS3Reservation::~DiskS3Reservation() { diff --git a/dbms/src/Disks/DiskS3.h b/dbms/src/Disks/DiskS3.h index 10c7f015f77..1b61ed1cde3 100644 --- a/dbms/src/Disks/DiskS3.h +++ b/dbms/src/Disks/DiskS3.h @@ -31,7 +31,7 @@ public: const String & getName() const override { return name; } - const String & getPath() const override { return s3_root_path; } + const String & getPath() const override { return metadata_path; } ReservationPtr reserve(UInt64 bytes) override; @@ -87,10 +87,16 @@ public: void removeRecursive(const String & path) override; + void createHardLink(const String & src_path, const String & dst_path) override; + void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; Poco::Timestamp getLastModified(const String & path) override; + void createFile(const String & path) override; + + void setReadOnly(const String & path) override; + private: bool tryReserve(UInt64 bytes); diff --git a/dbms/src/Disks/IDisk.cpp b/dbms/src/Disks/IDisk.cpp index 48b080e1704..36ab2a49573 100644 --- a/dbms/src/Disks/IDisk.cpp +++ b/dbms/src/Disks/IDisk.cpp @@ -1,4 +1,9 @@ #include "IDisk.h" +#include +#include +#include +#include +#include namespace DB { @@ -7,4 +12,36 @@ bool IDisk::isDirectoryEmpty(const String & path) { return !iterateDirectory(path)->isValid(); } + +void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, const String & to_path) +{ + LOG_DEBUG( + &Poco::Logger::get("IDisk"), + "Copying from " << from_disk.getName() << " " << from_path << " to " << to_disk.getName() << " " << to_path); + + auto in = from_disk.readFile(from_path); + auto out = to_disk.writeFile(to_path); + copyData(*in, *out); +} + +void IDisk::copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) +{ + if (isFile(from_path)) + { + DB::copyFile(*this, from_path, *to_disk, to_path + fileName(from_path)); + } + else + { + Poco::Path path(from_path); + const String & dir_name = path.directory(path.depth() - 1); + const String dest = to_path + dir_name + "/"; + to_disk->createDirectories(dest); + + for (auto it = iterateDirectory(from_path); it->isValid(); it->next()) + { + copy(it->path(), to_disk, dest); + } + } +} + } diff --git a/dbms/src/Disks/IDisk.h b/dbms/src/Disks/IDisk.h index 877c6f84706..7d0b429720e 100644 --- a/dbms/src/Disks/IDisk.h +++ b/dbms/src/Disks/IDisk.h @@ -111,6 +111,9 @@ public: /// Return `true` if the specified directory is empty. bool isDirectoryEmpty(const String & path); + /// Create empty file at `path`. + virtual void createFile(const String & path) = 0; + /// Move the file from `from_path` to `to_path`. /// If a file with `to_path` path already exists, an exception will be thrown . virtual void moveFile(const String & from_path, const String & to_path) = 0; @@ -122,6 +125,9 @@ public: /// Copy the file from `from_path` to `to_path`. virtual void copyFile(const String & from_path, const String & to_path) = 0; + /// Recursively copy data containing at `from_path` to `to_path` located at `to_disk`. + virtual void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path); + /// List files at `path` and add their names to `file_names` virtual void listFiles(const String & path, std::vector & file_names) = 0; @@ -147,11 +153,24 @@ public: /// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists. virtual void removeRecursive(const String & path) = 0; + /// Remove file or directory if it exists. + void removeIfExists(const String & path) + { + if (exists(path)) + remove(path); + } + /// Set last modified time to file or directory at `path`. virtual void setLastModified(const String & path, const Poco::Timestamp & timestamp) = 0; /// Get last modified time of file or directory at `path`. virtual Poco::Timestamp getLastModified(const String & path) = 0; + + /// Set file at `path` as read-only. + virtual void setReadOnly(const String & path) = 0; + + /// Create hardlink from `src_path` to `dst_path`. + virtual void createHardLink(const String & src_path, const String & dst_path) = 0; }; using DiskPtr = std::shared_ptr; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 74340027dec..ab9b4a2c31b 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1321,7 +1321,18 @@ BackgroundProcessingPool & Context::getBackgroundPool() { auto lock = getLock(); if (!shared->background_pool) - shared->background_pool.emplace(settings.background_pool_size); + { + BackgroundProcessingPool::PoolSettings pool_settings; + auto & config = getConfigRef(); + pool_settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10); + pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0); + pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1); + pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10); + pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600); + pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1); + pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0); + shared->background_pool.emplace(settings.background_pool_size, pool_settings); + } return *shared->background_pool; } diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp index b5c8f16b7e5..dedda5b5159 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -3,16 +3,13 @@ #include #include #include -#include #include #include #include #include -#include -#include +#include #include #include -#include #include #include @@ -30,7 +27,6 @@ namespace ErrorCodes extern const int NOT_FOUND_EXPECTED_DATA_PART; extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; extern const int BAD_TTL_FILE; - extern const int CANNOT_UNLINK; extern const int NOT_IMPLEMENTED; } @@ -251,7 +247,7 @@ void IMergeTreeDataPart::removeIfNeeded() if (is_temp) { - String file_name = Poco::Path(relative_path).getFileName(); + String file_name = fileName(relative_path); if (file_name.empty()) throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set", ErrorCodes::LOGICAL_ERROR); @@ -699,33 +695,33 @@ void IMergeTreeDataPart::remove() const * And a race condition can happen that will lead to "File not found" error here. */ - String from_ = storage.relative_data_path + relative_path; - String to_ = storage.relative_data_path + "delete_tmp_" + name; + String from = storage.relative_data_path + relative_path; + String to = storage.relative_data_path + "delete_tmp_" + name; // TODO directory delete_tmp_ is never removed if server crashes before returning from this function - if (disk->exists(to_)) + if (disk->exists(to)) { - LOG_WARNING(storage.log, "Directory " << fullPath(disk, to_) << " (to which part must be renamed before removing) already exists." + LOG_WARNING(storage.log, "Directory " << fullPath(disk, to) << " (to which part must be renamed before removing) already exists." " Most likely this is due to unclean restart. Removing it."); try { - disk->removeRecursive(to_); + disk->removeRecursive(to); } catch (...) { - LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(disk, to_) << ". Exception: " << getCurrentExceptionMessage(false)); + LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(disk, to) << ". Exception: " << getCurrentExceptionMessage(false)); throw; } } try { - disk->moveFile(from_, to_); + disk->moveFile(from, to); } catch (const Poco::FileNotFoundException &) { - LOG_ERROR(storage.log, "Directory " << fullPath(disk, to_) << " (part to remove) doesn't exist or one of nested files has gone." + LOG_ERROR(storage.log, "Directory " << fullPath(disk, to) << " (part to remove) doesn't exist or one of nested files has gone." " Most likely this is due to manual removing. This should be discouraged. Ignoring."); return; @@ -741,37 +737,25 @@ void IMergeTreeDataPart::remove() const #endif std::shared_lock lock(columns_lock); - /// TODO: IDisk doesn't support `unlink()` and `rmdir()` functionality. - auto to = fullPath(disk, to_); - for (const auto & [file, _] : checksums.files) - { - String path_to_remove = to + "/" + file; - if (0 != unlink(path_to_remove.c_str())) - throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, ErrorCodes::CANNOT_UNLINK); - } + disk->remove(to + "/" + file); #if !__clang__ # pragma GCC diagnostic pop #endif for (const auto & file : {"checksums.txt", "columns.txt"}) - { - String path_to_remove = to + "/" + file; - if (0 != unlink(path_to_remove.c_str())) - throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, ErrorCodes::CANNOT_UNLINK); - } + disk->remove(to + "/" + file); - if (0 != rmdir(to.c_str())) - throwFromErrnoWithPath("Cannot rmdir file " + to, to, ErrorCodes::CANNOT_UNLINK); + disk->remove(to); } catch (...) { /// Recursive directory removal does many excessive "stat" syscalls under the hood. - LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(disk, to_) << " by removing files; fallback to recursive removal. Reason: " + LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(disk, to) << " by removing files; fallback to recursive removal. Reason: " << getCurrentExceptionMessage(false)); - disk->removeRecursive(to_ + "/"); + disk->removeRecursive(to + "/"); } } @@ -791,7 +775,7 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) { res = "detached/" + (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : ""); - if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists()) + if (!disk->exists(getFullRelativePath() + res)) return res; LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists." @@ -812,10 +796,8 @@ void IMergeTreeDataPart::makeCloneInDetached(const String & prefix) const assertOnDisk(); LOG_INFO(storage.log, "Detaching " << relative_path); - Poco::Path src(getFullPath()); - Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix)); /// Backup is not recursive (max_level is 0), so do not copy inner directories - localBackup(src, dst, 0); + localBackup(disk, getFullRelativePath(), storage.relative_data_path + getRelativePathForDetachedPart(prefix), 0); } void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const @@ -825,14 +807,13 @@ void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservat if (reserved_disk->getName() == disk->getName()) throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR); - String path_to_clone = storage.getFullPathOnDisk(reserved_disk) + "detached/"; + String path_to_clone = storage.relative_data_path + "detached/"; - if (Poco::File(path_to_clone + relative_path).exists()) - throw Exception("Path " + path_to_clone + relative_path + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - Poco::File(path_to_clone).createDirectory(); + if (reserved_disk->exists(path_to_clone + relative_path)) + throw Exception("Path " + fullPath(reserved_disk, path_to_clone + relative_path) + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS); + reserved_disk->createDirectory(path_to_clone); - Poco::File cloning_directory(getFullPath()); - cloning_directory.copyTo(path_to_clone); + disk->copy(getFullRelativePath(), reserved_disk, path_to_clone); } void IMergeTreeDataPart::checkConsistencyBase() const diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index adb106205de..2a22ca2dbd8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1,50 +1,49 @@ -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include #include +#include +#include #include #include #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include +#include +#include #include #include -#include -#include #include -#include -#include #include @@ -859,7 +858,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) const auto settings = getSettings(); std::vector> part_names_with_disks; Strings part_file_names; - Poco::DirectoryIterator end; auto disks = getStoragePolicy()->getDisks(); @@ -1318,7 +1316,7 @@ void MergeTreeData::rename( for (const auto & disk : disks) { - auto new_table_path_parent = Poco::Path(new_table_path).makeParent().toString(); + auto new_table_path_parent = parentPath(new_table_path); disk->createDirectory(new_table_path_parent); disk->moveDirectory(relative_data_path, new_table_path); } @@ -1713,8 +1711,8 @@ void MergeTreeData::alterDataPart( size_t num_files_to_modify = transaction->rename_map.size(); size_t num_files_to_remove = 0; - for (const auto & from_to : transaction->rename_map) - if (from_to.second.empty()) + for (const auto & [from, to] : transaction->rename_map) + if (to.empty()) ++num_files_to_remove; if (!skip_sanity_checks @@ -1732,18 +1730,18 @@ void MergeTreeData::alterDataPart( << ") files ("; bool first = true; - for (const auto & from_to : transaction->rename_map) + for (const auto & [from, to] : transaction->rename_map) { if (!first) exception_message << ", "; if (forbidden_because_of_modify) { - exception_message << "from " << backQuote(from_to.first) << " to " << backQuote(from_to.second); + exception_message << "from " << backQuote(from) << " to " << backQuote(to); first = false; } - else if (from_to.second.empty()) + else if (to.empty()) { - exception_message << backQuote(from_to.first); + exception_message << backQuote(from); first = false; } } @@ -1813,28 +1811,28 @@ void MergeTreeData::alterDataPart( /// Update the checksums. DataPart::Checksums new_checksums = part->checksums; - for (const auto & it : transaction->rename_map) + for (const auto & [from, to] : transaction->rename_map) { - if (it.second.empty()) - new_checksums.files.erase(it.first); + if (to.empty()) + new_checksums.files.erase(from); else - new_checksums.files[it.second] = add_checksums.files[it.first]; + new_checksums.files[to] = add_checksums.files[from]; } /// Write the checksums to the temporary file. if (!part->checksums.empty()) { transaction->new_checksums = new_checksums; - WriteBufferFromFile checksums_file(part->getFullPath() + "checksums.txt.tmp", 4096); - new_checksums.write(checksums_file); + auto checksums_file = part->disk->writeFile(part->getFullRelativePath() + "checksums.txt.tmp", 4096); + new_checksums.write(*checksums_file); transaction->rename_map["checksums.txt.tmp"] = "checksums.txt"; } /// Write the new column list to the temporary file. { transaction->new_columns = new_columns.filter(part->getColumns().getNames()); - WriteBufferFromFile columns_file(part->getFullPath() + "columns.txt.tmp", 4096); - transaction->new_columns.writeText(columns_file); + auto columns_file = part->disk->writeFile(part->getFullRelativePath() + "columns.txt.tmp", 4096); + transaction->new_columns.writeText(*columns_file); transaction->rename_map["columns.txt.tmp"] = "columns.txt"; } } @@ -1863,16 +1861,16 @@ void MergeTreeData::changeSettings( for (const String & disk_name : all_diff_disk_names) { - const auto & path = getFullPathOnDisk(new_storage_policy->getDiskByName(disk_name)); - if (Poco::File(path).exists()) + auto disk = new_storage_policy->getDiskByName(disk_name); + if (disk->exists(relative_data_path)) throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR); } for (const String & disk_name : all_diff_disk_names) { - const auto & path = getFullPathOnDisk(new_storage_policy->getDiskByName(disk_name)); - Poco::File(path).createDirectories(); - Poco::File(path + "detached").createDirectory(); + auto disk = new_storage_policy->getDiskByName(disk_name); + disk->createDirectories(relative_data_path); + disk->createDirectories(relative_data_path + "detached"); } /// FIXME how would that be done while reloading configuration??? } @@ -1939,7 +1937,8 @@ void MergeTreeData::AlterDataPartTransaction::commit() { std::unique_lock lock(data_part->columns_lock); - String path = data_part->getFullPath(); + auto disk = data_part->disk; + String path = data_part->getFullRelativePath(); /// NOTE: checking that a file exists before renaming or deleting it /// is justified by the fact that, when converting an ordinary column @@ -1947,19 +1946,18 @@ void MergeTreeData::AlterDataPartTransaction::commit() /// before, i.e. they do not have older versions. /// 1) Rename the old files. - for (const auto & from_to : rename_map) + for (const auto & [from, to] : rename_map) { - String name = from_to.second.empty() ? from_to.first : from_to.second; - Poco::File file{path + name}; - if (file.exists()) - file.renameTo(path + name + ".tmp2"); + String name = to.empty() ? from : to; + if (disk->exists(path + name)) + disk->moveFile(path + name, path + name + ".tmp2"); } /// 2) Move new files in the place of old and update the metadata in memory. - for (const auto & from_to : rename_map) + for (const auto & [from, to] : rename_map) { - if (!from_to.second.empty()) - Poco::File{path + from_to.first}.renameTo(path + from_to.second); + if (!to.empty()) + disk->moveFile(path + from, path + to); } auto & mutable_part = const_cast(*data_part); @@ -1967,12 +1965,10 @@ void MergeTreeData::AlterDataPartTransaction::commit() mutable_part.setColumns(new_columns); /// 3) Delete the old files and drop required columns (DROP COLUMN) - for (const auto & from_to : rename_map) + for (const auto & [from, to] : rename_map) { - String name = from_to.second.empty() ? from_to.first : from_to.second; - Poco::File file{path + name + ".tmp2"}; - if (file.exists()) - file.remove(); + String name = to.empty() ? from : to; + disk->removeIfExists(path + name + ".tmp2"); } mutable_part.bytes_on_disk = new_checksums.getTotalSizeOnDisk(); @@ -2002,20 +1998,18 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction() { LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->relative_path); - String path = data_part->getFullPath(); - for (const auto & from_to : rename_map) + String path = data_part->getFullRelativePath(); + for (const auto & [from, to] : rename_map) { - if (!from_to.second.empty()) + if (!to.empty()) { try { - Poco::File file(path + from_to.first); - if (file.exists()) - file.remove(); + data_part->disk->removeIfExists(path + from); } catch (Poco::Exception & e) { - LOG_WARNING(data_part->storage.log, "Can't remove " << path + from_to.first << ": " << e.displayText()); + LOG_WARNING(data_part->storage.log, "Can't remove " << fullPath(data_part->disk, path + from) << ": " << e.displayText()); } } } @@ -2029,14 +2023,13 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction() void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name) { old_and_new_names.push_back({old_name, new_name}); - const auto paths = storage.getDataPaths(); - for (const auto & full_path : paths) + for (const auto & [path, disk] : storage.getRelativeDataPathsWithDisks()) { - for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) + for (auto it = disk->iterateDirectory(path + source_dir); it->isValid(); it->next()) { - if (it.name() == old_name) + if (it->name() == old_name) { - old_part_name_to_full_path[old_name] = full_path; + old_part_name_to_path_and_disk[old_name] = {path, disk}; break; } } @@ -2050,11 +2043,12 @@ void MergeTreeData::PartsTemporaryRename::tryRenameAll() { try { - const auto & names = old_and_new_names[i]; - if (names.first.empty() || names.second.empty()) + const auto & [old_name, new_name] = old_and_new_names[i]; + if (old_name.empty() || new_name.empty()) throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::INCORRECT_FILE_NAME); - const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name - Poco::File(full_path + names.first).renameTo(full_path + names.second); + const auto & [path, disk] = old_part_name_to_path_and_disk[old_name]; + const auto full_path = path + source_dir; /// for old_name + disk->moveFile(full_path + old_name, full_path + new_name); } catch (...) { @@ -2070,15 +2064,16 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename() // TODO what if server had crashed before this destructor was called? if (!renamed) return; - for (const auto & names : old_and_new_names) + for (const auto & [old_name, new_name] : old_and_new_names) { - if (names.first.empty()) + if (old_name.empty()) continue; try { - const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name - Poco::File(full_path + names.second).renameTo(full_path + names.first); + const auto & [path, disk] = old_part_name_to_path_and_disk[old_name]; + const auto full_path = path + source_dir; /// for old_name + disk->moveFile(full_path + new_name, full_path + old_name); } catch (...) { @@ -2690,14 +2685,15 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy) auto part_it = data_parts_indexes.insert(part_copy).first; modifyPartState(part_it, DataPartState::Committed); - Poco::Path marker_path(Poco::Path(original_active_part->getFullPath()), DELETE_ON_DESTROY_MARKER_PATH); + auto disk = original_active_part->disk; + String marker_path = original_active_part->getFullRelativePath() + DELETE_ON_DESTROY_MARKER_PATH; try { - Poco::File(marker_path).createFile(); + disk->createFile(marker_path); } catch (Poco::Exception & e) { - LOG_ERROR(log, e.what() << " (while creating DeleteOnDestroy marker: " + backQuote(marker_path.toString()) + ")"); + LOG_ERROR(log, e.what() << " (while creating DeleteOnDestroy marker: " + backQuote(fullPath(disk, marker_path)) + ")"); } return; } @@ -2754,15 +2750,16 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part) { - String full_part_path = part->getFullPath(); + auto disk = part->disk; + String full_part_path = part->getFullRelativePath(); /// Earlier the list of columns was written incorrectly. Delete it and re-create. /// But in compact parts we can't get list of columns without this file. - if (isWidePart(part) && Poco::File(full_part_path + "columns.txt").exists()) - Poco::File(full_part_path + "columns.txt").remove(); + if (isWidePart(part)) + disk->removeIfExists(full_part_path + "columns.txt"); part->loadColumnsChecksumsIndexes(false, true); - part->modification_time = Poco::File(full_part_path).getLastModified().epochTime(); + part->modification_time = disk->getLastModified(full_part_path).epochTime(); /// If the checksums file is not present, calculate the checksums and write them to disk. /// Check the data while we are at it. @@ -2770,11 +2767,11 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part) { part->checksums = checkDataPart(part, false); { - WriteBufferFromFile out(full_part_path + "checksums.txt.tmp", 4096); - part->checksums.write(out); + auto out = disk->writeFile(full_part_path + "checksums.txt.tmp", 4096); + part->checksums.write(*out); } - Poco::File(full_part_path + "checksums.txt.tmp").renameTo(full_part_path + "checksums.txt"); + disk->moveFile(full_part_path + "checksums.txt.tmp", full_part_path + "checksums.txt"); } } @@ -3097,15 +3094,14 @@ MergeTreeData::getDetachedParts() const { std::vector res; - for (const auto & [path, disk] : getDataPathsWithDisks()) + for (const auto & [path, disk] : getRelativeDataPathsWithDisks()) { - for (Poco::DirectoryIterator it(path + "detached"); - it != Poco::DirectoryIterator(); ++it) + for (auto it = disk->iterateDirectory(path + "detached"); it->isValid(); it->next()) { res.emplace_back(); auto & part = res.back(); - DetachedPartInfo::tryParseDetachedPartName(it.name(), part, format_version); + DetachedPartInfo::tryParseDetachedPartName(it->name(), part, format_version); part.disk = disk->getName(); } } @@ -3117,9 +3113,9 @@ void MergeTreeData::validateDetachedPartName(const String & name) const if (name.find('/') != std::string::npos || name == "." || name == "..") throw DB::Exception("Invalid part name '" + name + "'", ErrorCodes::INCORRECT_FILE_NAME); - String full_path = getFullPathForPart(name, "detached/"); + auto full_path = getFullRelativePathForPart(name, "detached/"); - if (full_path.empty() || !Poco::File(full_path + name).exists()) + if (!full_path) throw DB::Exception("Detached part \"" + name + "\" not found" , ErrorCodes::BAD_DATA_PART_NAME); if (startsWith(name, "attaching_") || startsWith(name, "deleting_")) @@ -3154,7 +3150,8 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Cont for (auto & [old_name, new_name] : renamed_parts.old_and_new_names) { - Poco::File(renamed_parts.old_part_name_to_full_path[old_name] + "detached/" + new_name).remove(true); + const auto & [path, disk] = renamed_parts.old_part_name_to_path_and_disk[old_name]; + disk->removeRecursive(path + "detached/" + new_name + "/"); LOG_DEBUG(log, "Dropped detached part " << old_name); old_name.clear(); } @@ -3182,12 +3179,11 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ActiveDataPartSet active_parts(format_version); const auto disks = getStoragePolicy()->getDisks(); - for (const DiskPtr & disk : disks) + for (auto & disk : disks) { - const auto full_path = getFullPathOnDisk(disk); - for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) + for (auto it = disk->iterateDirectory(relative_data_path + source_dir); it->isValid(); it->next()) { - const String & name = it.name(); + const String & name = it->name(); MergeTreePartInfo part_info; // TODO what if name contains "_tryN" suffix? /// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored @@ -3208,10 +3204,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const String containing_part = active_parts.getContainingPart(name); if (!containing_part.empty() && containing_part != name) { - auto full_path = getFullPathOnDisk(disk); // TODO maybe use PartsTemporaryRename here? - Poco::File(full_path + source_dir + name) - .renameTo(full_path + source_dir + "inactive_" + name); + disk->moveDirectory(relative_data_path + source_dir + name, relative_data_path + source_dir + "inactive_" + name); } else renamed_parts.addPart(name, "attaching_" + name); @@ -3576,22 +3570,22 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk( String tmp_dst_part_name = tmp_part_prefix + dst_part_name; auto reservation = reserveSpace(src_part->bytes_on_disk, src_part->disk); - String dst_part_path = getFullPathOnDisk(reservation->getDisk()); - Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute(); - Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute(); + auto disk = reservation->getDisk(); + String src_part_path = src_part->getFullRelativePath(); + String dst_part_path = relative_data_path + tmp_dst_part_name; - if (Poco::File(dst_part_absolute_path).exists()) - throw Exception("Part in " + dst_part_absolute_path.toString() + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); + if (disk->exists(dst_part_path)) + throw Exception("Part in " + fullPath(disk, dst_part_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString()); - localBackup(src_part_absolute_path, dst_part_absolute_path); + LOG_DEBUG(log, "Cloning part " << fullPath(disk, src_part_path) << " to " << fullPath(disk, dst_part_path)); + localBackup(disk, src_part_path, dst_part_path); auto dst_data_part = createPart(dst_part_name, dst_part_info, reservation->getDisk(), tmp_dst_part_name); dst_data_part->is_temp = true; dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true); - dst_data_part->modification_time = Poco::File(dst_part_absolute_path).getLastModified().epochTime(); + dst_data_part->modification_time = disk->getLastModified(dst_part_path).epochTime(); return dst_data_part; } @@ -3601,26 +3595,25 @@ String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const } -DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & relative_path) const +DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & additional_path) const { const auto disks = getStoragePolicy()->getDisks(); + for (const DiskPtr & disk : disks) - { - const auto disk_path = getFullPathOnDisk(disk); - for (Poco::DirectoryIterator it = Poco::DirectoryIterator(disk_path + relative_path); it != Poco::DirectoryIterator(); ++it) - if (it.name() == part_name) + for (auto it = disk->iterateDirectory(relative_data_path + additional_path); it->isValid(); it->next()) + if (it->name() == part_name) return disk; - } + return nullptr; } -String MergeTreeData::getFullPathForPart(const String & part_name, const String & relative_path) const +std::optional MergeTreeData::getFullRelativePathForPart(const String & part_name, const String & additional_path) const { - auto disk = getDiskForPart(part_name, relative_path); + auto disk = getDiskForPart(part_name, additional_path); if (disk) - return getFullPathOnDisk(disk) + relative_path; - return ""; + return relative_data_path + additional_path; + return {}; } Strings MergeTreeData::getDataPaths() const @@ -3632,15 +3625,6 @@ Strings MergeTreeData::getDataPaths() const return res; } -MergeTreeData::PathsWithDisks MergeTreeData::getDataPathsWithDisks() const -{ - PathsWithDisks res; - auto disks = getStoragePolicy()->getDisks(); - for (const auto & disk : disks) - res.emplace_back(getFullPathOnDisk(disk), disk); - return res; -} - MergeTreeData::PathsWithDisks MergeTreeData::getRelativeDataPathsWithDisks() const { PathsWithDisks res; @@ -3657,6 +3641,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & Poco::File(default_shadow_path).createDirectories(); auto increment = Increment(default_shadow_path + "increment.txt").get(true); + const String shadow_path = "shadow/"; + /// Acquire a snapshot of active data parts to prevent removing while doing backup. const auto data_parts = getDataParts(); @@ -3666,9 +3652,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & if (!matcher(part)) continue; - String shadow_path = part->disk->getPath() + "shadow/"; + part->disk->createDirectories(shadow_path); - Poco::File(shadow_path).createDirectories(); String backup_path = shadow_path + (!with_name.empty() ? escapeForFileName(with_name) @@ -3677,11 +3662,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path); - String part_absolute_path = Poco::Path(part->getFullPath()).absolute().toString(); - String backup_part_absolute_path = backup_path - + relative_data_path - + part->relative_path; - localBackup(part_absolute_path, backup_part_absolute_path); + String backup_part_path = backup_path + relative_data_path + part->relative_path; + localBackup(part->disk, part->getFullRelativePath(), backup_part_path); part->is_frozen.store(true, std::memory_order_relaxed); ++parts_processed; } @@ -3853,11 +3835,10 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const throw Exception("Move is not possible. Not enough space on '" + space->getName() + "'", ErrorCodes::NOT_ENOUGH_SPACE); auto reserved_disk = reservation->getDisk(); - String path_to_clone = getFullPathOnDisk(reserved_disk); - if (Poco::File(path_to_clone + part->name).exists()) + if (reserved_disk->exists(relative_data_path + part->name)) throw Exception( - "Move is not possible: " + path_to_clone + part->name + " already exists", + "Move is not possible: " + fullPath(reserved_disk, relative_data_path + part->name) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); if (currently_moving_parts.count(part) || partIsAssignedToBackgroundOperation(part)) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 079fb316ffd..0606b2d9cec 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -263,6 +263,7 @@ public: }; using AlterDataPartTransactionPtr = std::unique_ptr; + using PathWithDisk = std::pair; struct PartsTemporaryRename : private boost::noncopyable { @@ -285,7 +286,7 @@ public: const MergeTreeData & storage; const String source_dir; std::vector> old_and_new_names; - std::unordered_map old_part_name_to_full_path; + std::unordered_map old_part_name_to_path_and_disk; bool renamed = false; }; @@ -670,18 +671,17 @@ public: /// Get table path on disk String getFullPathOnDisk(const DiskPtr & disk) const; - /// Get disk for part. Looping through directories on FS because some parts maybe not in - /// active dataparts set (detached) - DiskPtr getDiskForPart(const String & part_name, const String & relative_path = "") const; + /// Get disk where part is located. + /// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/') + DiskPtr getDiskForPart(const String & part_name, const String & additional_path = "") const; - /// Get full path for part. Uses getDiskForPart and returns the full path - String getFullPathForPart(const String & part_name, const String & relative_path = "") const; + /// Get full path for part. Uses getDiskForPart and returns the full relative path. + /// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/') + std::optional getFullRelativePathForPart(const String & part_name, const String & additional_path = "") const; Strings getDataPaths() const override; - using PathWithDisk = std::pair; using PathsWithDisks = std::vector; - PathsWithDisks getDataPathsWithDisks() const; PathsWithDisks getRelativeDataPathsWithDisks() const; /// Reserves space at least 1MB. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index cffc654ed55..9911796e2d5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -26,9 +26,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -576,10 +573,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor << parts.front()->name << " to " << parts.back()->name << " into " << TMP_PREFIX + future_part.name + " with type " + future_part.type.toString()); - String part_path = data.getFullPathOnDisk(space_reservation->getDisk()); + auto disk = space_reservation->getDisk(); + String part_path = data.relative_data_path; String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/"; - if (Poco::File(new_part_tmp_path).exists()) - throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); + if (disk->exists(new_part_tmp_path)) + throw Exception("Directory " + fullPath(disk, new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); MergeTreeData::DataPart::ColumnToSize merged_column_to_size; @@ -598,7 +596,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor future_part.name, future_part.type, future_part.part_info, - space_reservation->getDisk(), + disk, TMP_PREFIX + future_part.name); new_data_part->setColumns(all_columns); @@ -633,16 +631,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor merge_entry->total_size_bytes_compressed, static_cast (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes()); + /// TODO: Should it go through IDisk interface? String rows_sources_file_path; - std::unique_ptr rows_sources_uncompressed_write_buf; + std::unique_ptr rows_sources_uncompressed_write_buf; std::unique_ptr rows_sources_write_buf; std::optional column_sizes; if (merge_alg == MergeAlgorithm::Vertical) { - Poco::File(new_part_tmp_path).createDirectories(); + disk->createDirectories(new_part_tmp_path); rows_sources_file_path = new_part_tmp_path + "rows_sources"; - rows_sources_uncompressed_write_buf = std::make_unique(rows_sources_file_path); + rows_sources_uncompressed_write_buf = disk->writeFile(rows_sources_file_path); rows_sources_write_buf = std::make_unique(*rows_sources_uncompressed_write_buf); for (const MergeTreeData::DataPartPtr & part : parts) @@ -832,6 +831,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor rows_sources_write_buf->next(); rows_sources_uncompressed_write_buf->next(); + /// Ensure data has written to disk. + rows_sources_uncompressed_write_buf->finalize(); size_t rows_sources_count = rows_sources_write_buf->count(); /// In special case, when there is only one source part, and no rows were skipped, we may have @@ -842,7 +843,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor + ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count) + "). It is a bug.", ErrorCodes::LOGICAL_ERROR); - CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0, 0); + CompressedReadBufferFromFile rows_sources_read_buf(disk->readFile(rows_sources_file_path)); IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns; for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size(); @@ -909,7 +910,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed); } - Poco::File(rows_sources_file_path).remove(); + disk->remove(rows_sources_file_path); } for (const auto & part : parts) @@ -1018,7 +1019,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor new_data_part->index_granularity_info = source_part->index_granularity_info; new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns)); - String new_part_tmp_path = new_data_part->getFullPath(); + auto disk = new_data_part->disk; + String new_part_tmp_path = new_data_part->getFullRelativePath(); /// Note: this is done before creating input streams, because otherwise data.data_parts_mutex /// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock @@ -1029,7 +1031,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor source_part->bytes_on_disk, static_cast(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes()); - Poco::File(new_part_tmp_path).createDirectories(); + disk->createDirectories(new_part_tmp_path); /// Don't change granularity type while mutating subset of columns auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(new_data_part->getType()) @@ -1125,17 +1127,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor if (need_remove_expired_values) files_to_skip.insert("ttl.txt"); - Poco::DirectoryIterator dir_end; /// Create hardlinks for unchanged files - for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it) + for (auto it = disk->iterateDirectory(source_part->getFullRelativePath()); it->isValid(); it->next()) { - if (files_to_skip.count(dir_it.name()) || files_to_remove.count(dir_it.name())) + if (files_to_skip.count(it->name()) || files_to_remove.count(it->name())) continue; - Poco::Path destination(new_part_tmp_path); - destination.append(dir_it.name()); + String destination = new_part_tmp_path + "/" + it->name(); - createHardLink(dir_it.path().toString(), destination.toString()); + disk->createHardLink(it->path(), destination); } merge_entry->columns_written = all_columns.size() - updated_header.columns(); @@ -1181,8 +1181,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor if (need_remove_expired_values) { /// Write a file with ttl infos in json format. - WriteBufferFromFile out_ttl(new_part_tmp_path + "ttl.txt", 4096); - HashingWriteBuffer out_hashing(out_ttl); + auto out_ttl = disk->writeFile(new_part_tmp_path + "ttl.txt", 4096); + HashingWriteBuffer out_hashing(*out_ttl); new_data_part->ttl_infos.write(out_hashing); new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count(); new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash(); @@ -1193,15 +1193,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor new_data_part->checksums.files.erase(removed_file); { /// Write file with checksums. - WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096); - new_data_part->checksums.write(out_checksums); + auto out_checksums = disk->writeFile(new_part_tmp_path + "checksums.txt", 4096); + new_data_part->checksums.write(*out_checksums); } /// close fd { /// Write a file with a description of columns. - WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096); - new_data_part->getColumns().writeText(out_columns); + auto out_columns = disk->writeFile(new_part_tmp_path + "columns.txt", 4096); + new_data_part->getColumns().writeText(*out_columns); } /// close new_data_part->rows_count = source_part->rows_count; diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 7d53f161620..bbd1fd6cbeb 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -42,7 +42,6 @@ struct MergeTreeSettings : public SettingsCollection M(SettingUInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \ M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \ M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \ - M(SettingBool, disable_background_merges, false, "Disable background merges.", 0) \ \ /** Inserts settings. */ \ M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \ diff --git a/dbms/src/Storages/MergeTree/checkDataPart.cpp b/dbms/src/Storages/MergeTree/checkDataPart.cpp index 6195facc914..6da051d04ac 100644 --- a/dbms/src/Storages/MergeTree/checkDataPart.cpp +++ b/dbms/src/Storages/MergeTree/checkDataPart.cpp @@ -99,7 +99,6 @@ IMergeTreeDataPart::Checksums checkDataPart( throw Exception("Unknown type in part " + path, ErrorCodes::UNKNOWN_PART_TYPE); } - Poco::DirectoryIterator dir_end; for (auto it = disk->iterateDirectory(path); it->isValid(); it->next()) { const String & file_name = it->name(); diff --git a/dbms/src/Common/localBackup.cpp b/dbms/src/Storages/MergeTree/localBackup.cpp similarity index 51% rename from dbms/src/Common/localBackup.cpp rename to dbms/src/Storages/MergeTree/localBackup.cpp index 2e042351a90..7d7dacaeaf1 100644 --- a/dbms/src/Common/localBackup.cpp +++ b/dbms/src/Storages/MergeTree/localBackup.cpp @@ -1,13 +1,8 @@ #include "localBackup.h" -#include #include -#include -#include -#include #include -#include -#include +#include namespace DB @@ -20,7 +15,7 @@ namespace ErrorCodes } -static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & destination_path, size_t level, +static void localBackupImpl(const DiskPtr & disk, const String & source_path, const String & destination_path, size_t level, std::optional max_level) { if (max_level && level > *max_level) @@ -29,34 +24,30 @@ static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & d if (level >= 1000) throw DB::Exception("Too deep recursion", DB::ErrorCodes::TOO_DEEP_RECURSION); - Poco::File(destination_path).createDirectories(); + disk->createDirectories(destination_path); - Poco::DirectoryIterator dir_end; - for (Poco::DirectoryIterator dir_it(source_path); dir_it != dir_end; ++dir_it) + for (auto it = disk->iterateDirectory(source_path); it->isValid(); it->next()) { - Poco::Path source = dir_it.path(); - Poco::Path destination = destination_path; - destination.append(dir_it.name()); + auto source = it->path(); + auto destination = destination_path + "/" + it->name(); - if (!dir_it->isDirectory()) + if (!disk->isDirectory(source)) { - dir_it->setReadOnly(); - - createHardLink(source.toString(), destination.toString()); + disk->setReadOnly(source); + disk->createHardLink(source, destination); } else { - localBackupImpl(source, destination, level + 1, max_level); + localBackupImpl(disk, source, destination, level + 1, max_level); } } } -void localBackup(const Poco::Path & source_path, const Poco::Path & destination_path, std::optional max_level) +void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, std::optional max_level) { - if (Poco::File(destination_path).exists() - && Poco::DirectoryIterator(destination_path) != Poco::DirectoryIterator()) + if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path)) { - throw DB::Exception("Directory " + destination_path.toString() + " already exists and is not empty.", DB::ErrorCodes::DIRECTORY_ALREADY_EXISTS); + throw DB::Exception("Directory " + fullPath(disk, destination_path) + " already exists and is not empty.", DB::ErrorCodes::DIRECTORY_ALREADY_EXISTS); } size_t try_no = 0; @@ -70,7 +61,7 @@ void localBackup(const Poco::Path & source_path, const Poco::Path & destination_ { try { - localBackupImpl(source_path, destination_path, 0, max_level); + localBackupImpl(disk, source_path, destination_path, 0, max_level); } catch (const DB::ErrnoException & e) { diff --git a/dbms/src/Common/localBackup.h b/dbms/src/Storages/MergeTree/localBackup.h similarity index 80% rename from dbms/src/Common/localBackup.h rename to dbms/src/Storages/MergeTree/localBackup.h index e3ea32614ee..3c9d92fa9da 100644 --- a/dbms/src/Common/localBackup.h +++ b/dbms/src/Storages/MergeTree/localBackup.h @@ -1,8 +1,8 @@ #pragma once #include - -namespace Poco { class Path; } +#include +#include namespace DB { @@ -20,6 +20,6 @@ namespace DB * If max_level is specified, than only files which depth relative source_path less or equal max_level will be copied. * So, if max_level=0 than only direct file child are copied. */ -void localBackup(const Poco::Path & source_path, const Poco::Path & destination_path, std::optional max_level = {}); +void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, std::optional max_level = {}); } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index aa4566fef37..d62ff1ca5cd 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -21,8 +21,6 @@ #include #include #include -#include -#include #include #include #include @@ -95,8 +93,7 @@ void StorageMergeTree::startup() /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup.restart(); - if (!getSettings()->disable_background_merges) - merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); }); + merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); }); if (areBackgroundMovesNeeded()) moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); }); } diff --git a/dbms/tests/integration/test_merge_tree_s3/configs/config.d/bg_processing_pool_conf.xml b/dbms/tests/integration/test_merge_tree_s3/configs/config.d/bg_processing_pool_conf.xml new file mode 100644 index 00000000000..a756c4434ea --- /dev/null +++ b/dbms/tests/integration/test_merge_tree_s3/configs/config.d/bg_processing_pool_conf.xml @@ -0,0 +1,5 @@ + + 0.5 + 0.5 + 0.5 + diff --git a/dbms/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml b/dbms/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml new file mode 100644 index 00000000000..5b292446c6b --- /dev/null +++ b/dbms/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml @@ -0,0 +1,28 @@ + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + local + / + + + + + +
+ s3 +
+ + hdd + +
+
+
+
+
diff --git a/dbms/tests/integration/test_merge_tree_s3/configs/config.d/users.xml b/dbms/tests/integration/test_merge_tree_s3/configs/config.d/users.xml new file mode 100644 index 00000000000..a13b24b278d --- /dev/null +++ b/dbms/tests/integration/test_merge_tree_s3/configs/config.d/users.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/dbms/tests/integration/test_merge_tree_s3/configs/config.xml b/dbms/tests/integration/test_merge_tree_s3/configs/config.xml index 63b4d951eb7..24b7344df3a 100644 --- a/dbms/tests/integration/test_merge_tree_s3/configs/config.xml +++ b/dbms/tests/integration/test_merge_tree_s3/configs/config.xml @@ -1,25 +1,5 @@ - - trace - /var/log/clickhouse-server/clickhouse-server.log - /var/log/clickhouse-server/clickhouse-server.err.log - 1000M - 10 - - - - - - s3 - http://minio1:9001/root/data/ - minio - minio123 - - - - - 9000 127.0.0.1 diff --git a/dbms/tests/integration/test_merge_tree_s3/configs/users.xml b/dbms/tests/integration/test_merge_tree_s3/configs/users.xml deleted file mode 100644 index 6061af8e33d..00000000000 --- a/dbms/tests/integration/test_merge_tree_s3/configs/users.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - - - - - - - - - - ::/0 - - default - default - - - - - - - - diff --git a/dbms/tests/integration/test_merge_tree_s3/test.py b/dbms/tests/integration/test_merge_tree_s3/test.py index c79745642a0..631d69911ff 100644 --- a/dbms/tests/integration/test_merge_tree_s3/test.py +++ b/dbms/tests/integration/test_merge_tree_s3/test.py @@ -1,6 +1,7 @@ import logging import random import string +import time import pytest from helpers.cluster import ClickHouseCluster @@ -36,56 +37,212 @@ def cluster(): cluster.shutdown() +FILES_OVERHEAD = 1 +FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files +FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 +FILES_OVERHEAD_PER_PART_COMPACT = 10 + + def random_string(length): letters = string.ascii_letters return ''.join(random.choice(letters) for i in range(length)) -def generate_values(date_str, count): - data = [[date_str, i, random_string(10)] for i in range(count)] +def generate_values(date_str, count, sign=1): + data = [[date_str, sign*(i + 1), random_string(10)] for i in range(count)] data.sort(key=lambda tup: tup[1]) return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data]) -@pytest.mark.parametrize( - "min_rows_for_wide_part,files_overhead,files_per_part", - [ - (0, 1, 14), - (8192, 1, 10) - ] -) -def test_log_family_s3(cluster, min_rows_for_wide_part, files_overhead, files_per_part): +def create_table(cluster, additional_settings=None): node = cluster.instances["node"] - minio = cluster.minio_client - node.query( - """ + create_table_statement = """ CREATE TABLE s3_test( dt Date, - id UInt64, + id Int64, data String, INDEX min_max (id) TYPE minmax GRANULARITY 3 ) ENGINE=MergeTree() PARTITION BY dt ORDER BY (dt, id) - SETTINGS disable_background_merges='true', index_granularity=512, min_rows_for_wide_part={} + SETTINGS + old_parts_lifetime=0, index_granularity=512 """ - .format(min_rows_for_wide_part) - ) - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 1 - values1 = generate_values('2020-01-03', 4096) - node.query("INSERT INTO s3_test VALUES {}".format(values1)) - assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1 - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead + files_per_part + if additional_settings: + create_table_statement += "," + create_table_statement += additional_settings - values2 = generate_values('2020-01-04', 4096) - node.query("INSERT INTO s3_test VALUES {}".format(values2)) - assert node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2 - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead + 2 * files_per_part + node.query(create_table_statement) - assert node.query("SELECT count(*) FROM s3_test where id = 0 FORMAT Values") == "(2)" + +@pytest.fixture(autouse=True) +def drop_table(cluster): + yield + node = cluster.instances["node"] + minio = cluster.minio_client node.query("DROP TABLE s3_test") assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0 + +@pytest.mark.parametrize( + "min_rows_for_wide_part,files_per_part", + [ + (0, FILES_OVERHEAD_PER_PART_WIDE), + (8192, FILES_OVERHEAD_PER_PART_COMPACT) + ] +) +def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part): + create_table(cluster, "min_rows_for_wide_part={}".format(min_rows_for_wide_part)) + + node = cluster.instances["node"] + minio = cluster.minio_client + + values1 = generate_values('2020-01-03', 4096) + node.query("INSERT INTO s3_test VALUES {}".format(values1)) + assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1 + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + files_per_part + + values2 = generate_values('2020-01-04', 4096) + node.query("INSERT INTO s3_test VALUES {}".format(values2)) + assert node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2 + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + files_per_part*2 + + assert node.query("SELECT count(*) FROM s3_test where id = 1 FORMAT Values") == "(2)" + + +@pytest.mark.parametrize( + "merge_vertical", [False, True] +) +def test_insert_same_partition_and_merge(cluster, merge_vertical): + settings = None + if merge_vertical: + settings = """ + vertical_merge_algorithm_min_rows_to_activate=0, + vertical_merge_algorithm_min_columns_to_activate=0 + """ + create_table(cluster, settings) + + node = cluster.instances["node"] + minio = cluster.minio_client + + node.query("SYSTEM STOP MERGES s3_test") + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 1024))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 2048))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 1024, -1))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 2048, -1))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096, -1))) + assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)" + assert node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE*6 + FILES_OVERHEAD + + node.query("SYSTEM START MERGES s3_test") + # Wait for merges and old parts deletion + time.sleep(3) + + assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)" + assert node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD + + +def test_alter_table_columns(cluster): + create_table(cluster) + + node = cluster.instances["node"] + minio = cluster.minio_client + + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096, -1))) + + node.query("ALTER TABLE s3_test ADD COLUMN col1 UInt64 DEFAULT 1") + # To ensure parts have merged + node.query("OPTIMIZE TABLE s3_test") + + # Wait for merges, mutations and old parts deletion + time.sleep(3) + + assert node.query("SELECT sum(col1) FROM s3_test FORMAT Values") == "(8192)" + assert node.query("SELECT sum(col1) FROM s3_test WHERE id > 0 FORMAT Values") == "(4096)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + + node.query("ALTER TABLE s3_test MODIFY COLUMN col1 String") + assert node.query("SELECT distinct(col1) FROM s3_test FORMAT Values") == "('1')" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + + node.query("ALTER TABLE s3_test DROP COLUMN col1") + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + + +def test_attach_detach_partition(cluster): + create_table(cluster) + + node = cluster.instances["node"] + minio = cluster.minio_client + + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096))) + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2 + + node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-03'") + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2 + + node.query("ALTER TABLE s3_test ATTACH PARTITION '2020-01-03'") + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2 + + node.query("ALTER TABLE s3_test DROP PARTITION '2020-01-03'") + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + + node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-04'") + node.query("SET allow_drop_detached=1; ALTER TABLE s3_test DROP DETACHED PARTITION '2020-01-04'") + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + + +def test_move_partition(cluster): + create_table(cluster) + + node = cluster.instances["node"] + minio = cluster.minio_client + + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096))) + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2 + + node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'") + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + + +def test_table_manipulations(cluster): + create_table(cluster) + + node = cluster.instances["node"] + minio = cluster.minio_client + + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096))) + + node.query("RENAME TABLE s3_test TO s3_renamed") + assert node.query("SELECT count(*) FROM s3_renamed FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2 + node.query("RENAME TABLE s3_renamed TO s3_test") + + # TODO: Doesn't work with min_max index. + #assert node.query("SET check_query_single_value_result='false'; CHECK TABLE s3_test FORMAT Values") == "(1)" + + node.query("DETACH TABLE s3_test") + node.query("ATTACH TABLE s3_test") + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2 + + node.query("TRUNCATE TABLE s3_test") + assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD diff --git a/utils/convert-month-partitioned-parts/main.cpp b/utils/convert-month-partitioned-parts/main.cpp index 8f1ca05dd32..51ea87d35b9 100644 --- a/utils/convert-month-partitioned-parts/main.cpp +++ b/utils/convert-month-partitioned-parts/main.cpp @@ -1,13 +1,13 @@ -#include -#include #include +#include #include #include -#include -#include -#include #include #include +#include +#include +#include +#include #include #include @@ -73,7 +73,7 @@ void run(String part_path, String date_column, String dest_path) { /// If the file is already deleted, do nothing. } - localBackup(old_part_path, new_tmp_part_path, {}); + localBackup(disk, old_part_path.toString(), new_tmp_part_path.toString(), {}); WriteBufferFromFile count_out(new_tmp_part_path_str + "count.txt", 4096); HashingWriteBuffer count_out_hashing(count_out);