From c66b60f00f851670a36789a45254a4c07ee85e47 Mon Sep 17 00:00:00 2001 From: HarryLeeIBM Date: Tue, 19 Sep 2023 07:12:14 -0700 Subject: [PATCH 1/7] Fix SimHash function issue for s390x --- src/Functions/FunctionsStringHash.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Functions/FunctionsStringHash.cpp b/src/Functions/FunctionsStringHash.cpp index ff8ff2d2651..0bf6e39e651 100644 --- a/src/Functions/FunctionsStringHash.cpp +++ b/src/Functions/FunctionsStringHash.cpp @@ -18,6 +18,10 @@ #include "vec_crc32.h" #endif +#if defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ +#include +#endif + namespace DB { @@ -43,7 +47,7 @@ struct Hash #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ return crc32_ppc(crc, reinterpret_cast(&val), sizeof(val)); #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ - return s390x_crc32(crc, val); + return crc32c_le(static_cast(crc), reinterpret_cast(&val), sizeof(val)); #else throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); #endif @@ -58,7 +62,7 @@ struct Hash #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ return crc32_ppc(crc, reinterpret_cast(&val), sizeof(val)); #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ - return s390x_crc32_u32(crc, val); + return crc32c_le(static_cast(crc), reinterpret_cast(&val), sizeof(val)); #else throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); #endif @@ -73,7 +77,7 @@ struct Hash #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ return crc32_ppc(crc, reinterpret_cast(&val), sizeof(val)); #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ - return s390x_crc32_u16(crc, val); + return crc32c_le(static_cast(crc), reinterpret_cast(&val), sizeof(val)); #else throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); #endif @@ -88,7 +92,7 @@ struct Hash #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ return crc32_ppc(crc, reinterpret_cast(&val), sizeof(val)); #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ - return s390x_crc32_u8(crc, val); + return crc32c_le(static_cast(crc), reinterpret_cast(&val), sizeof(val)); #else throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); #endif From 2805ebf2b259bf99382ebdc537139ff8e6a3973a Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 20 Sep 2023 11:08:02 +0000 Subject: [PATCH 2/7] Set correct size for signal pipe buffer --- src/Daemon/BaseDaemon.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Daemon/BaseDaemon.cpp b/src/Daemon/BaseDaemon.cpp index be323dc6786..8e01311dcb0 100644 --- a/src/Daemon/BaseDaemon.cpp +++ b/src/Daemon/BaseDaemon.cpp @@ -104,7 +104,8 @@ static const size_t signal_pipe_buf_size = + sizeof(ucontext_t*) + sizeof(StackTrace) + sizeof(UInt32) - + sizeof(void*); + + sizeof(void*) + + sizeof(UInt64); using signal_function = void(int, siginfo_t*, void*); From 1a9467535066f54dc0dcf0f8c0e75dcd6fb9509b Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 20 Sep 2023 13:32:01 +0000 Subject: [PATCH 3/7] Don't capture this in callback --- src/Server/KeeperTCPHandler.cpp | 2 +- src/Server/KeeperTCPHandler.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 58d227a5ae5..84ed7388503 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -382,7 +382,7 @@ void KeeperTCPHandler::runImpl() } auto response_fd = poll_wrapper->getResponseFD(); - auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) + auto response_callback = [responses = this->responses, response_fd](const Coordination::ZooKeeperResponsePtr & response) { if (!responses->push(response)) throw Exception(ErrorCodes::SYSTEM_ERROR, diff --git a/src/Server/KeeperTCPHandler.h b/src/Server/KeeperTCPHandler.h index ffdd50b805a..588cdf6305e 100644 --- a/src/Server/KeeperTCPHandler.h +++ b/src/Server/KeeperTCPHandler.h @@ -25,7 +25,7 @@ struct SocketInterruptablePollWrapper; using SocketInterruptablePollWrapperPtr = std::unique_ptr; using ThreadSafeResponseQueue = ConcurrentBoundedQueue; -using ThreadSafeResponseQueuePtr = std::unique_ptr; +using ThreadSafeResponseQueuePtr = std::shared_ptr; struct LastOp; using LastOpMultiVersion = MultiVersion; From 6dab5bf3a7571055d887ebe84d37fbb021fac9b4 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 20 Sep 2023 15:55:12 +0200 Subject: [PATCH 4/7] Better --- src/Daemon/BaseDaemon.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Daemon/BaseDaemon.cpp b/src/Daemon/BaseDaemon.cpp index 8e01311dcb0..f64d4b365a9 100644 --- a/src/Daemon/BaseDaemon.cpp +++ b/src/Daemon/BaseDaemon.cpp @@ -103,9 +103,9 @@ static const size_t signal_pipe_buf_size = + sizeof(siginfo_t) + sizeof(ucontext_t*) + sizeof(StackTrace) + + sizeof(UInt64) + sizeof(UInt32) - + sizeof(void*) - + sizeof(UInt64); + + sizeof(void*); using signal_function = void(int, siginfo_t*, void*); From c706101891dc491fab08de3a62d959e2fd19d8e4 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 11 Aug 2023 14:29:37 +0200 Subject: [PATCH 5/7] Fix throttling of BACKUPs from/to S3 (in case native copy was not used) In some cases native copy is not possible, and such requests should be throttled. v0: copyS3FileNativeWithFallback v2: revert v0 and pass write_settings v3: pass read_settings to copyFile() Signed-off-by: Azat Khuzhin --- programs/disks/CommandCopy.cpp | 2 +- src/Backups/BackupIO_Disk.cpp | 4 +- src/Backups/BackupIO_S3.cpp | 2 + src/Disks/DiskEncrypted.cpp | 6 +-- src/Disks/DiskEncrypted.h | 2 +- src/Disks/DiskEncryptedTransaction.cpp | 4 +- src/Disks/DiskEncryptedTransaction.h | 2 +- src/Disks/DiskLocal.cpp | 6 +-- src/Disks/DiskLocal.h | 2 +- src/Disks/FakeDiskTransaction.h | 4 +- src/Disks/IDisk.cpp | 24 +++++------ src/Disks/IDisk.h | 7 ++-- src/Disks/IDiskTransaction.h | 6 ++- .../AzureBlobStorage/AzureObjectStorage.cpp | 2 + .../AzureBlobStorage/AzureObjectStorage.h | 2 + .../Cached/CachedObjectStorage.cpp | 12 ++++-- .../Cached/CachedObjectStorage.h | 4 ++ .../ObjectStorages/DiskObjectStorage.cpp | 7 ++-- src/Disks/ObjectStorages/DiskObjectStorage.h | 3 +- ...jectStorageRemoteMetadataRestoreHelper.cpp | 6 +-- ...ObjectStorageRemoteMetadataRestoreHelper.h | 4 +- .../DiskObjectStorageTransaction.cpp | 17 +++++--- .../DiskObjectStorageTransaction.h | 2 +- .../ObjectStorages/HDFS/HDFSObjectStorage.cpp | 6 ++- .../ObjectStorages/HDFS/HDFSObjectStorage.h | 2 + src/Disks/ObjectStorages/IObjectStorage.cpp | 8 ++-- src/Disks/ObjectStorages/IObjectStorage.h | 4 ++ .../Local/LocalObjectStorage.cpp | 10 +++-- .../ObjectStorages/Local/LocalObjectStorage.h | 2 + .../ObjectStorages/S3/S3ObjectStorage.cpp | 42 +++++++++++++++---- src/Disks/ObjectStorages/S3/S3ObjectStorage.h | 4 ++ .../ObjectStorages/Web/WebObjectStorage.cpp | 2 +- .../ObjectStorages/Web/WebObjectStorage.h | 2 + src/IO/S3/copyS3File.cpp | 10 +++-- src/IO/S3/copyS3File.h | 3 ++ .../MergeTree/DataPartStorageOnDiskBase.cpp | 9 ++-- .../MergeTree/DataPartStorageOnDiskBase.h | 4 +- src/Storages/MergeTree/IDataPartStorage.h | 4 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 5 ++- src/Storages/MergeTree/IMergeTreeDataPart.h | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 20 +++++---- src/Storages/MergeTree/MergeTreeData.h | 5 ++- .../MergeTree/MergeTreePartsMover.cpp | 6 +-- src/Storages/MergeTree/MergeTreePartsMover.h | 2 +- src/Storages/MergeTree/MutateTask.cpp | 9 +++- src/Storages/MergeTree/localBackup.cpp | 25 +++++------ src/Storages/MergeTree/localBackup.h | 3 +- src/Storages/StorageMergeTree.cpp | 19 ++++++++- src/Storages/StorageReplicatedMergeTree.cpp | 35 ++++++++++++++-- .../02844_max_backup_bandwidth_s3.reference | 2 + .../02844_max_backup_bandwidth_s3.sh | 36 ++++++++++++++++ 51 files changed, 299 insertions(+), 112 deletions(-) create mode 100644 tests/queries/0_stateless/02844_max_backup_bandwidth_s3.reference create mode 100755 tests/queries/0_stateless/02844_max_backup_bandwidth_s3.sh diff --git a/programs/disks/CommandCopy.cpp b/programs/disks/CommandCopy.cpp index 4a7af1ced29..296fc708411 100644 --- a/programs/disks/CommandCopy.cpp +++ b/programs/disks/CommandCopy.cpp @@ -57,7 +57,7 @@ public: String relative_path_from = validatePathAndGetAsRelative(path_from); String relative_path_to = validatePathAndGetAsRelative(path_to); - disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* settings= */ {}); + disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* read_settings= */ {}, /* write_settings= */ {}); } }; } diff --git a/src/Backups/BackupIO_Disk.cpp b/src/Backups/BackupIO_Disk.cpp index 21b3afbddf8..1e260ad22d9 100644 --- a/src/Backups/BackupIO_Disk.cpp +++ b/src/Backups/BackupIO_Disk.cpp @@ -46,7 +46,7 @@ void BackupReaderDisk::copyFileToDisk(const String & path_in_backup, size_t file { /// Use more optimal way. LOG_TRACE(log, "Copying file {} from disk {} to disk {}", path_in_backup, disk->getName(), destination_disk->getName()); - disk->copyFile(root_path / path_in_backup, *destination_disk, destination_path, write_settings); + disk->copyFile(root_path / path_in_backup, *destination_disk, destination_path, read_settings, write_settings); return; /// copied! } } @@ -119,7 +119,7 @@ void BackupWriterDisk::copyFileFromDisk(const String & path_in_backup, DiskPtr s LOG_TRACE(log, "Copying file {} from disk {} to disk {}", src_path, src_disk->getName(), disk->getName()); auto dest_file_path = root_path / path_in_backup; disk->createDirectories(dest_file_path.parent_path()); - src_disk->copyFile(src_path, *disk, dest_file_path, write_settings); + src_disk->copyFile(src_path, *disk, dest_file_path, read_settings, write_settings); return; /// copied! } } diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index ef820784bdf..5b08683b157 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -170,6 +170,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s /* dest_bucket= */ blob_path[1], /* dest_key= */ blob_path[0], request_settings, + read_settings, object_attributes, threadPoolCallbackRunner(getBackupsIOThreadPool().get(), "BackupReaderS3"), /* for_disk_s3= */ true); @@ -230,6 +231,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, request_settings, + read_settings, {}, threadPoolCallbackRunner(getBackupsIOThreadPool().get(), "BackupWriterS3")); return; /// copied! diff --git a/src/Disks/DiskEncrypted.cpp b/src/Disks/DiskEncrypted.cpp index ca7cbf443f2..7bc7c1c7dc4 100644 --- a/src/Disks/DiskEncrypted.cpp +++ b/src/Disks/DiskEncrypted.cpp @@ -324,7 +324,7 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes) } -void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const WriteSettings & settings) +void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) { /// Check if we can copy the file without deciphering. if (isSameDiskType(*this, *to_disk)) @@ -340,14 +340,14 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha auto wrapped_from_path = wrappedPath(from_dir); auto to_delegate = to_disk_enc->delegate; auto wrapped_to_path = to_disk_enc->wrappedPath(to_dir); - delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, settings); + delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, read_settings, write_settings); return; } } } /// Copy the file through buffers with deciphering. - IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings); + IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings); } std::unique_ptr DiskEncrypted::readFile( diff --git a/src/Disks/DiskEncrypted.h b/src/Disks/DiskEncrypted.h index 2252e4f43f5..8b4461a8dee 100644 --- a/src/Disks/DiskEncrypted.h +++ b/src/Disks/DiskEncrypted.h @@ -112,7 +112,7 @@ public: delegate->listFiles(wrapped_path, file_names); } - void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const WriteSettings & settings) override; + void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) override; std::unique_ptr readFile( const String & path, diff --git a/src/Disks/DiskEncryptedTransaction.cpp b/src/Disks/DiskEncryptedTransaction.cpp index 3fd2085f9cc..daeab7aae6c 100644 --- a/src/Disks/DiskEncryptedTransaction.cpp +++ b/src/Disks/DiskEncryptedTransaction.cpp @@ -53,11 +53,11 @@ String DiskEncryptedSettings::findKeyByFingerprint(UInt128 key_fingerprint, cons return it->second; } -void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) +void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) { auto wrapped_from_path = wrappedPath(from_file_path); auto wrapped_to_path = wrappedPath(to_file_path); - delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path, settings); + delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path, read_settings, write_settings); } std::unique_ptr DiskEncryptedTransaction::writeFile( // NOLINT diff --git a/src/Disks/DiskEncryptedTransaction.h b/src/Disks/DiskEncryptedTransaction.h index 70ed1f469ef..6cb2941cc11 100644 --- a/src/Disks/DiskEncryptedTransaction.h +++ b/src/Disks/DiskEncryptedTransaction.h @@ -116,7 +116,7 @@ public: /// but it's impossible to implement correctly in transactions because other disk can /// use different metadata storage. /// TODO: maybe remove it at all, we don't want copies - void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override; + void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) override; /// Open the file for write and return WriteBufferFromFileBase object. std::unique_ptr writeFile( /// NOLINT diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index aaa22655f7b..c71f6f81de2 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -432,13 +432,13 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another) return typeid(one) == typeid(another); } -void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const WriteSettings & settings) +void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) { /// If throttling was configured we cannot use copying directly. - if (isSameDiskType(*this, *to_disk) && !settings.local_throttler) + if (isSameDiskType(*this, *to_disk) && !read_settings.local_throttler && !write_settings.local_throttler) fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way. else - IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings); + IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings); } SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 197f6bb9367..c52c192d824 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -65,7 +65,7 @@ public: void replaceFile(const String & from_path, const String & to_path) override; - void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const WriteSettings & settings) override; + void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) override; void listFiles(const String & path, std::vector & file_names) const override; diff --git a/src/Disks/FakeDiskTransaction.h b/src/Disks/FakeDiskTransaction.h index 440ee6271e9..f83642eee56 100644 --- a/src/Disks/FakeDiskTransaction.h +++ b/src/Disks/FakeDiskTransaction.h @@ -54,9 +54,9 @@ public: disk.replaceFile(from_path, to_path); } - void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override + void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) override { - disk.copyFile(from_file_path, disk, to_file_path, settings); + disk.copyFile(from_file_path, disk, to_file_path, read_settings, write_settings); } std::unique_ptr writeFile( /// NOLINT diff --git a/src/Disks/IDisk.cpp b/src/Disks/IDisk.cpp index 5b9f1208622..1997ce06990 100644 --- a/src/Disks/IDisk.cpp +++ b/src/Disks/IDisk.cpp @@ -24,13 +24,13 @@ bool IDisk::isDirectoryEmpty(const String & path) const return !iterateDirectory(path)->isValid(); } -void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const WriteSettings & settings) /// NOLINT +void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) /// NOLINT { LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.", getName(), getPath(), from_file_path, to_disk.getName(), to_disk.getPath(), to_file_path); - auto in = readFile(from_file_path); - auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings); + auto in = readFile(from_file_path, read_settings); + auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings); copyData(*in, *out); out->finalize(); } @@ -80,7 +80,7 @@ UInt128 IDisk::getEncryptedFileIV(const String &) const using ResultsCollector = std::vector>; -void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings) +void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) { if (from_disk.isFile(from_path)) { @@ -88,7 +88,7 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p auto future = promise->get_future(); pool.scheduleOrThrowOnError( - [&from_disk, from_path, &to_disk, to_path, &settings, promise, thread_group = CurrentThread::getGroup()]() + [&from_disk, from_path, &to_disk, to_path, &read_settings, &write_settings, promise, thread_group = CurrentThread::getGroup()]() { try { @@ -97,7 +97,7 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p if (thread_group) CurrentThread::attachToGroup(thread_group); - from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings); + from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), read_settings, write_settings); promise->set_value(); } catch (...) @@ -119,19 +119,19 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p } for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next()) - asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, settings); + asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, read_settings, write_settings); } } -void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings) +void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr & to_disk, const String & to_path, bool copy_root_dir, const ReadSettings & read_settings, WriteSettings write_settings) { ResultsCollector results; /// Disable parallel write. We already copy in parallel. /// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage - settings.s3_allow_parallel_part_upload = false; + write_settings.s3_allow_parallel_part_upload = false; - asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, settings); + asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, read_settings, write_settings); for (auto & result : results) result.wait(); @@ -140,12 +140,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr & to_disk, const String & to_dir, const WriteSettings & settings) +void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) { if (!to_disk->exists(to_dir)) to_disk->createDirectories(to_dir); - copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, settings); + copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, read_settings, write_settings); } void IDisk::truncateFile(const String &, size_t) diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index fc4eaec428c..bfb418e1c5e 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -193,14 +193,15 @@ public: virtual void replaceFile(const String & from_path, const String & to_path) = 0; /// Recursively copy files from from_dir to to_dir. Create to_dir if not exists. - virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const WriteSettings & settings); + virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings); /// Copy file `from_file_path` to `to_file_path` located at `to_disk`. virtual void copyFile( /// NOLINT const String & from_file_path, IDisk & to_disk, const String & to_file_path, - const WriteSettings & settings = {}); + const ReadSettings & read_settings = {}, + const WriteSettings & write_settings = {}); /// List files at `path` and add their names to `file_names` virtual void listFiles(const String & path, std::vector & file_names) const = 0; @@ -470,7 +471,7 @@ protected: /// Base implementation of the function copy(). /// It just opens two files, reads data by portions from the first file, and writes it to the second one. /// A derived class may override copy() to provide a faster implementation. - void copyThroughBuffers(const String & from_path, const std::shared_ptr & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings); + void copyThroughBuffers(const String & from_path, const std::shared_ptr & to_disk, const String & to_path, bool copy_root_dir, const ReadSettings & read_settings, WriteSettings write_settings); virtual void checkAccessImpl(const String & path); diff --git a/src/Disks/IDiskTransaction.h b/src/Disks/IDiskTransaction.h index 9f18206a4ad..975c41cb70b 100644 --- a/src/Disks/IDiskTransaction.h +++ b/src/Disks/IDiskTransaction.h @@ -59,7 +59,11 @@ public: /// but it's impossible to implement correctly in transactions because other disk can /// use different metadata storage. /// TODO: maybe remove it at all, we don't want copies - virtual void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings = {}) = 0; + virtual void copyFile( + const std::string & from_file_path, + const std::string & to_file_path, + const ReadSettings & read_settings = {}, + const WriteSettings & write_settings = {}) = 0; /// Open the file for write and return WriteBufferFromFileBase object. virtual std::unique_ptr writeFile( /// NOLINT diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index f76fbd45736..73be834c1bb 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -357,6 +357,8 @@ ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) c void AzureObjectStorage::copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings &, + const WriteSettings &, std::optional object_to_attributes) { auto client_ptr = client.get(); diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index b5f81cef235..5436860818c 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -100,6 +100,8 @@ public: void copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; void shutdown() override {} diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp index 0da572a06ab..d94c26f27e8 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp @@ -160,16 +160,22 @@ void CachedObjectStorage::removeObjectsIfExist(const StoredObjects & objects) void CachedObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, IObjectStorage & object_storage_to, std::optional object_to_attributes) { - object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes); + object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes); } void CachedObjectStorage::copyObject( // NOLINT - const StoredObject & object_from, const StoredObject & object_to, std::optional object_to_attributes) + const StoredObject & object_from, + const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, + std::optional object_to_attributes) { - object_storage->copyObject(object_from, object_to, object_to_attributes); + object_storage->copyObject(object_from, object_to, read_settings, write_settings, object_to_attributes); } std::unique_ptr CachedObjectStorage::cloneObjectStorage( diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h index 76f16c9d930..925abbc6932 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h @@ -57,11 +57,15 @@ public: void copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; void copyObjectToAnotherObjectStorage( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, IObjectStorage & object_storage_to, std::optional object_to_attributes = {}) override; diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 466a1d3d5dd..734482ae851 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -68,7 +68,7 @@ DiskObjectStorage::DiskObjectStorage( , send_metadata(config.getBool(config_prefix + ".send_metadata", false)) , read_resource_name(config.getString(config_prefix + ".read_resource", "")) , write_resource_name(config.getString(config_prefix + ".write_resource", "")) - , metadata_helper(std::make_unique(this, ReadSettings{})) + , metadata_helper(std::make_unique(this, ReadSettings{}, WriteSettings{})) {} StoredObjects DiskObjectStorage::getStorageObjects(const String & local_path) const @@ -180,7 +180,8 @@ void DiskObjectStorage::copyFile( /// NOLINT const String & from_file_path, IDisk & to_disk, const String & to_file_path, - const WriteSettings & settings) + const ReadSettings & read_settings, + const WriteSettings & write_settings) { if (this == &to_disk) { @@ -192,7 +193,7 @@ void DiskObjectStorage::copyFile( /// NOLINT else { /// Copy through buffers - IDisk::copyFile(from_file_path, to_disk, to_file_path, settings); + IDisk::copyFile(from_file_path, to_disk, to_file_path, read_settings, write_settings); } } diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index 72103edd77e..ccd7e807513 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -162,7 +162,8 @@ public: const String & from_file_path, IDisk & to_disk, const String & to_file_path, - const WriteSettings & settings = {}) override; + const ReadSettings & read_settings = {}, + const WriteSettings & write_settings = {}) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp index bbcdd40d85f..91e15547068 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp @@ -84,7 +84,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int & { StoredObject object{fs::path(disk->object_storage_root_path) / SCHEMA_VERSION_OBJECT}; - auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite); + auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings); writeIntText(version, *buf); buf->finalize(); @@ -93,7 +93,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int & void DiskObjectStorageRemoteMetadataRestoreHelper::updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const { StoredObject object{key}; - disk->object_storage->copyObject(object, object, metadata); + disk->object_storage->copyObject(object, object, read_settings, write_settings, metadata); } void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema(const String & path) const @@ -434,7 +434,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::processRestoreFiles( /// Copy object if we restore to different bucket / path. if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->object_storage_root_path != source_path) - source_object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, *disk->object_storage); + source_object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, *disk->object_storage); auto tx = disk->metadata_storage->createTransaction(); tx->addBlobToMetadata(path, relative_key, meta.size_bytes); diff --git a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h index e7de4afcaf3..ee81e8a209e 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h @@ -24,9 +24,10 @@ public: static constexpr UInt64 LATEST_REVISION = std::numeric_limits::max(); static constexpr UInt64 UNKNOWN_REVISION = 0; - DiskObjectStorageRemoteMetadataRestoreHelper(DiskObjectStorage * disk_, ReadSettings read_settings_) + DiskObjectStorageRemoteMetadataRestoreHelper(DiskObjectStorage * disk_, ReadSettings read_settings_, WriteSettings write_settings_) : disk(disk_) , read_settings(std::move(read_settings_)) + , write_settings(std::move(write_settings_)) , operation_log_suffix("-" + getFQDNOrHostName()) { } @@ -94,6 +95,7 @@ private: ObjectStoragePtr object_storage_from_another_namespace; ReadSettings read_settings; + WriteSettings write_settings; String operation_log_suffix; }; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp index fd01caacd25..99cbd234e08 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -474,6 +475,9 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation { + ReadSettings read_settings; + WriteSettings write_settings; + /// Local paths std::string from_path; std::string to_path; @@ -483,9 +487,13 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation CopyFileObjectStorageOperation( IObjectStorage & object_storage_, IMetadataStorage & metadata_storage_, + const ReadSettings & read_settings_, + const WriteSettings & write_settings_, const std::string & from_path_, const std::string & to_path_) : IDiskObjectStorageOperation(object_storage_, metadata_storage_) + , read_settings(read_settings_) + , write_settings(write_settings_) , from_path(from_path_) , to_path(to_path_) {} @@ -505,7 +513,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation std::string blob_name = object_storage.generateBlobNameForPath(to_path); auto object_to = StoredObject(fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name); - object_storage.copyObject(object_from, object_to); + object_storage.copyObject(object_from, object_to, read_settings, write_settings); tx->addBlobToMetadata(to_path, blob_name, object_from.bytes_size); @@ -810,13 +818,10 @@ void DiskObjectStorageTransaction::createFile(const std::string & path) })); } -void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) +void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) { - /// NOTE: For native copy we can ignore throttling, so no need to use WriteSettings - UNUSED(settings); - operations_to_execute.emplace_back( - std::make_unique(object_storage, metadata_storage, from_file_path, to_file_path)); + std::make_unique(object_storage, metadata_storage, read_settings, write_settings, from_file_path, to_file_path)); } void DiskObjectStorageTransaction::commit() diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h index 8ce10dad212..4b62a41e161 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h @@ -86,7 +86,7 @@ public: void createFile(const String & path) override; - void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override; + void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override; /// writeFile is a difficult function for transactions. /// Now it's almost noop because metadata added to transaction in finalize method diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp index 60230ce2fb0..5eca98aa494 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp @@ -133,6 +133,8 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string &) const void HDFSObjectStorage::copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes) { if (object_to_attributes.has_value()) @@ -140,8 +142,8 @@ void HDFSObjectStorage::copyObject( /// NOLINT ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects"); - auto in = readObject(object_from); - auto out = writeObject(object_to, WriteMode::Rewrite); + auto in = readObject(object_from, read_settings); + auto out = writeObject(object_to, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings); copyData(*in, *out); out->finalize(); } diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h index a691b089b43..8d770c12d8f 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h @@ -98,6 +98,8 @@ public: void copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; void shutdown() override; diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index ea22294224c..3c77de8f5b7 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -62,14 +62,16 @@ ThreadPool & IObjectStorage::getThreadPoolWriter() void IObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, IObjectStorage & object_storage_to, std::optional object_to_attributes) { if (&object_storage_to == this) - copyObject(object_from, object_to, object_to_attributes); + copyObject(object_from, object_to, read_settings, write_settings, object_to_attributes); - auto in = readObject(object_from); - auto out = object_storage_to.writeObject(object_to, WriteMode::Rewrite); + auto in = readObject(object_from, read_settings); + auto out = object_storage_to.writeObject(object_to, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings); copyData(*in, *out); out->finalize(); } diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 32f9d1ba764..032795b380f 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -131,6 +131,8 @@ public: virtual void copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes = {}) = 0; /// Copy object to another instance of object storage @@ -139,6 +141,8 @@ public: virtual void copyObjectToAnotherObjectStorage( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, IObjectStorage & object_storage_to, std::optional object_to_attributes = {}); diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp index 69ccf309096..cc53df956c6 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp @@ -167,10 +167,14 @@ ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & /* path } void LocalObjectStorage::copyObject( // NOLINT - const StoredObject & object_from, const StoredObject & object_to, std::optional /* object_to_attributes */) + const StoredObject & object_from, + const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, + std::optional /* object_to_attributes */) { - auto in = readObject(object_from); - auto out = writeObject(object_to, WriteMode::Rewrite); + auto in = readObject(object_from, read_settings); + auto out = writeObject(object_to, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings); copyData(*in, *out); out->finalize(); } diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.h b/src/Disks/ObjectStorages/Local/LocalObjectStorage.h index 630320ab7f9..aa3a68731e4 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.h +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.h @@ -57,6 +57,8 @@ public: void copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; void shutdown() override; diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 0d9670efebe..8f020e0d1ac 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -425,6 +425,8 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, IObjectStorage & object_storage_to, std::optional object_to_attributes) { @@ -435,24 +437,48 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT auto settings_ptr = s3_settings.get(); auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); auto scheduler = threadPoolCallbackRunner(getThreadPoolWriter(), "S3ObjStor_copy"); - copyS3File(clients_->client, clients_->client_with_long_timeout, bucket, object_from.remote_path, 0, size, dest_s3->bucket, object_to.remote_path, - settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true); + copyS3File(clients_->client, + clients_->client_with_long_timeout, + bucket, + object_from.remote_path, + 0, + size, + dest_s3->bucket, + object_to.remote_path, + settings_ptr->request_settings, + patchSettings(read_settings), + object_to_attributes, + scheduler, + /* for_disk_s3= */ true); } else - { - IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes); - } + IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes); } void S3ObjectStorage::copyObject( // NOLINT - const StoredObject & object_from, const StoredObject & object_to, std::optional object_to_attributes) + const StoredObject & object_from, + const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings &, + std::optional object_to_attributes) { auto clients_ = clients.get(); auto settings_ptr = s3_settings.get(); auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); auto scheduler = threadPoolCallbackRunner(getThreadPoolWriter(), "S3ObjStor_copy"); - copyS3File(clients_->client, clients_->client_with_long_timeout, bucket, object_from.remote_path, 0, size, bucket, object_to.remote_path, - settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true); + copyS3File(clients_->client, + clients_->client_with_long_timeout, + bucket, + object_from.remote_path, + 0, + size, + bucket, + object_to.remote_path, + settings_ptr->request_settings, + patchSettings(read_settings), + object_to_attributes, + scheduler, + /* for_disk_s3= */ true); } void S3ObjectStorage::setNewSettings(std::unique_ptr && s3_settings_) diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index 527b1479d89..6e516b39c88 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -135,11 +135,15 @@ public: void copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; void copyObjectToAnotherObjectStorage( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, IObjectStorage & object_storage_to, std::optional object_to_attributes = {}) override; diff --git a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp index 45b183c15f2..ea05012fb61 100644 --- a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp @@ -264,7 +264,7 @@ void WebObjectStorage::removeObjectsIfExist(const StoredObjects &) throwNotAllowed(); } -void WebObjectStorage::copyObject(const StoredObject &, const StoredObject &, std::optional) // NOLINT +void WebObjectStorage::copyObject(const StoredObject &, const StoredObject &, const ReadSettings &, const WriteSettings &, std::optional) // NOLINT { throwNotAllowed(); } diff --git a/src/Disks/ObjectStorages/Web/WebObjectStorage.h b/src/Disks/ObjectStorages/Web/WebObjectStorage.h index 1a21d94e230..089bdb99e71 100644 --- a/src/Disks/ObjectStorages/Web/WebObjectStorage.h +++ b/src/Disks/ObjectStorages/Web/WebObjectStorage.h @@ -68,6 +68,8 @@ public: void copyObject( /// NOLINT const StoredObject & object_from, const StoredObject & object_to, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; void shutdown() override; diff --git a/src/IO/S3/copyS3File.cpp b/src/IO/S3/copyS3File.cpp index 002b8dde566..a16a1a41505 100644 --- a/src/IO/S3/copyS3File.cpp +++ b/src/IO/S3/copyS3File.cpp @@ -610,6 +610,7 @@ namespace const String & dest_bucket_, const String & dest_key_, const S3Settings::RequestSettings & request_settings_, + const ReadSettings & read_settings_, const std::optional> & object_metadata_, ThreadPoolCallbackRunner schedule_, bool for_disk_s3_) @@ -619,6 +620,7 @@ namespace , offset(src_offset_) , size(src_size_) , supports_multipart_copy(client_ptr_->supportsMultiPartCopy()) + , read_settings(read_settings_) { } @@ -639,12 +641,13 @@ namespace size_t offset; size_t size; bool supports_multipart_copy; + const ReadSettings read_settings; CreateReadBuffer getSourceObjectReadBuffer() { return [&] { - return std::make_unique(client_ptr, src_bucket, src_key, "", request_settings, Context::getGlobalContextInstance()->getReadSettings()); + return std::make_unique(client_ptr, src_bucket, src_key, "", request_settings, read_settings); }; } @@ -826,20 +829,21 @@ void copyS3File( const String & dest_bucket, const String & dest_key, const S3Settings::RequestSettings & settings, + const ReadSettings & read_settings, const std::optional> & object_metadata, ThreadPoolCallbackRunner schedule, bool for_disk_s3) { if (settings.allow_native_copy) { - CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3}; + CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3}; helper.performCopy(); } else { auto create_read_buffer = [&] { - return std::make_unique(s3_client, src_bucket, src_key, "", settings, Context::getGlobalContextInstance()->getReadSettings()); + return std::make_unique(s3_client, src_bucket, src_key, "", settings, read_settings); }; copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3); } diff --git a/src/IO/S3/copyS3File.h b/src/IO/S3/copyS3File.h index 3477f5a20ab..1bcbfd7735e 100644 --- a/src/IO/S3/copyS3File.h +++ b/src/IO/S3/copyS3File.h @@ -31,6 +31,8 @@ using CreateReadBuffer = std::function()>; /// CompleteMultipartUpload requests. These requests need longer timeout because S3 servers often /// block on them for multiple seconds without sending or receiving data from us (maybe the servers /// are copying data internally, or maybe throttling, idk). +/// +/// read_settings - is used for throttling in case of native copy is not possible void copyS3File( const std::shared_ptr & s3_client, const std::shared_ptr & s3_client_with_long_timeout, @@ -41,6 +43,7 @@ void copyS3File( const String & dest_bucket, const String & dest_key, const S3Settings::RequestSettings & settings, + const ReadSettings & read_settings, const std::optional> & object_metadata = std::nullopt, ThreadPoolCallbackRunner schedule_ = {}, bool for_disk_s3 = false); diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp index 27d8991bd62..7fc8187aee5 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp @@ -416,7 +416,8 @@ void DataPartStorageOnDiskBase::backup( MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze( const std::string & to, const std::string & dir_path, - const WriteSettings & settings, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::function save_metadata_callback, const ClonePartParams & params) const { @@ -430,7 +431,8 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze( disk, getRelativePath(), fs::path(to) / dir_path, - settings, + read_settings, + write_settings, params.make_source_readonly, /* max_level= */ {}, params.copy_instead_of_hardlink, @@ -466,6 +468,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart( const std::string & to, const std::string & dir_path, const DiskPtr & dst_disk, + const ReadSettings & read_settings, const WriteSettings & write_settings, Poco::Logger * log) const { @@ -482,7 +485,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart( try { dst_disk->createDirectories(to); - src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone, write_settings); + src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone, read_settings, write_settings); } catch (...) { diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h index 0adf048b56a..1826e84c28d 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h @@ -63,7 +63,8 @@ public: MutableDataPartStoragePtr freeze( const std::string & to, const std::string & dir_path, - const WriteSettings & settings, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::function save_metadata_callback, const ClonePartParams & params) const override; @@ -71,6 +72,7 @@ public: const std::string & to, const std::string & dir_path, const DiskPtr & dst_disk, + const ReadSettings & read_settings, const WriteSettings & write_settings, Poco::Logger * log) const override; diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index c76b17f3370..072cb29626e 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -250,7 +250,8 @@ public: virtual std::shared_ptr freeze( const std::string & to, const std::string & dir_path, - const WriteSettings & settings, + const ReadSettings & read_settings, + const WriteSettings & write_settings, std::function save_metadata_callback, const ClonePartParams & params) const = 0; @@ -259,6 +260,7 @@ public: const std::string & to, const std::string & dir_path, const DiskPtr & disk, + const ReadSettings & read_settings, const WriteSettings & write_settings, Poco::Logger * log) const = 0; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 54a169fc779..dc387496371 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1793,12 +1793,13 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix return getDataPartStorage().freeze( storage.relative_data_path, *maybe_path_in_detached, + Context::getGlobalContextInstance()->getReadSettings(), Context::getGlobalContextInstance()->getWriteSettings(), /* save_metadata_callback= */ {}, params); } -MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const +MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const ReadSettings & read_settings, const WriteSettings & write_settings) const { assertOnDisk(); @@ -1808,7 +1809,7 @@ MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & di throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name); String path_to_clone = fs::path(storage.relative_data_path) / directory_name / ""; - return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, write_settings, storage.log); + return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, storage.log); } UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 02c838458f9..c30accbc1ba 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -377,7 +377,7 @@ public: const DiskTransactionPtr & disk_transaction) const; /// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk - MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const; + MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const ReadSettings & read_settings, const WriteSettings & write_settings) const; /// Checks that .bin and .mrk files exist. /// diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 14c9961f6c3..26d110f5510 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4968,7 +4968,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String & throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on disk '{}'", partition_id, disk->getName()); } - MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast(disk), local_context->getWriteSettings()); + MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast(disk), local_context->getReadSettings(), local_context->getWriteSettings()); switch (moves_outcome) { case MovePartsOutcome::MovesAreCancelled: @@ -5031,7 +5031,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on volume '{}'", partition_id, volume->getName()); } - MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast(volume), local_context->getWriteSettings()); + MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast(volume), local_context->getReadSettings(), local_context->getWriteSettings()); switch (moves_outcome) { case MovePartsOutcome::MovesAreCancelled: @@ -7488,6 +7488,7 @@ std::pair MergeTreeData::cloneAn const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot, const IDataPartStorage::ClonePartParams & params, + const ReadSettings & read_settings, const WriteSettings & write_settings) { /// Check that the storage policy contains the disk where the src_part is located. @@ -7545,6 +7546,7 @@ std::pair MergeTreeData::cloneAn auto dst_part_storage = src_part_storage->freeze( relative_data_path, tmp_dst_part_name, + read_settings, write_settings, /* save_metadata_callback= */ {}, params); @@ -7803,6 +7805,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher( auto new_storage = data_part_storage->freeze( backup_part_path, part->getDataPartStorage().getPartDirectory(), + local_context->getReadSettings(), local_context->getWriteSettings(), callback, params); @@ -8002,8 +8005,9 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) assignee.scheduleMoveTask(std::make_shared( [this, moving_tagger] () mutable { + ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); - return moveParts(moving_tagger, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; + return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; }, moves_assignee_trigger, getStorageID())); return true; } @@ -8018,7 +8022,7 @@ bool MergeTreeData::areBackgroundMovesNeeded() const return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1; } -MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const WriteSettings & write_settings) +MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const ReadSettings & read_settings, const WriteSettings & write_settings) { if (parts_mover.moves_blocker.isCancelled()) return MovePartsOutcome::MovesAreCancelled; @@ -8027,7 +8031,7 @@ MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, if (moving_tagger->parts_to_move.empty()) return MovePartsOutcome::NothingToMove; - return moveParts(moving_tagger, write_settings, /* wait_for_move_if_zero_copy= */ true); + return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ true); } MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove() @@ -8082,7 +8086,7 @@ MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(co return std::make_shared(std::move(parts_to_move), *this); } -MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy) +MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const ReadSettings & read_settings, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy) { LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size()); @@ -8143,7 +8147,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & { if (lock->isLocked()) { - cloned_part = parts_mover.clonePart(moving_part, write_settings); + cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings); parts_mover.swapClonedPart(cloned_part); break; } @@ -8170,7 +8174,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & } else /// Ordinary move as it should be { - cloned_part = parts_mover.clonePart(moving_part, write_settings); + cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings); parts_mover.swapClonedPart(cloned_part); } write_part_log({}); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 6f9779bde00..414fa493085 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -848,6 +848,7 @@ public: const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot, const IDataPartStorage::ClonePartParams & params, + const ReadSettings & read_settings, const WriteSettings & write_settings); virtual std::vector getMutationsStatus() const = 0; @@ -1340,7 +1341,7 @@ protected: /// MergeTree because they store mutations in different way. virtual std::map getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0; /// Moves part to specified space, used in ALTER ... MOVE ... queries - MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const WriteSettings & write_settings); + MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const ReadSettings & read_settings, const WriteSettings & write_settings); struct PartBackupEntries { @@ -1494,7 +1495,7 @@ private: using CurrentlyMovingPartsTaggerPtr = std::shared_ptr; /// Move selected parts to corresponding disks - MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy); + MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const ReadSettings & read_settings, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy); /// Select parts for move and disks for them. Used in background moving processes. CurrentlyMovingPartsTaggerPtr selectPartsForMove(); diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 51e4cee19f8..f4dc6c8d042 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -208,7 +208,7 @@ bool MergeTreePartsMover::selectPartsForMove( return false; } -MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part, const WriteSettings & write_settings) const +MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part, const ReadSettings & read_settings, const WriteSettings & write_settings) const { if (moves_blocker.isCancelled()) throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts."); @@ -249,12 +249,12 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me { LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name); cloned_part_storage = part->getDataPartStorage().clonePart( - path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, write_settings, log); + path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, log); } } else { - cloned_part_storage = part->makeCloneOnDisk(disk, MergeTreeData::MOVING_DIR_NAME, write_settings); + cloned_part_storage = part->makeCloneOnDisk(disk, MergeTreeData::MOVING_DIR_NAME, read_settings, write_settings); } MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage); diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index 5dcc364a4e9..f172dade40e 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -65,7 +65,7 @@ public: const std::lock_guard & moving_parts_lock); /// Copies part to selected reservation in detached folder. Throws exception if part already exists. - TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part, const WriteSettings & write_settings) const; + TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part, const ReadSettings & read_settings, const WriteSettings & write_settings) const; /// Replaces cloned part from detached directory into active data parts set. /// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 808ece8dc82..15ca2b65731 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1845,7 +1845,14 @@ bool MutateTask::prepare() .txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files, .files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true }; - auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getWriteSettings()); + auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk( + ctx->source_part, + prefix, + ctx->future_part->part_info, + ctx->metadata_snapshot, + clone_params, + ctx->context->getReadSettings(), + ctx->context->getWriteSettings()); part->getDataPartStorage().beginTransaction(); ctx->temporary_directory_lock = std::move(lock); diff --git a/src/Storages/MergeTree/localBackup.cpp b/src/Storages/MergeTree/localBackup.cpp index 4c645a8628e..c84e13b167f 100644 --- a/src/Storages/MergeTree/localBackup.cpp +++ b/src/Storages/MergeTree/localBackup.cpp @@ -21,7 +21,8 @@ void localBackupImpl( IDiskTransaction * transaction, const String & source_path, const String & destination_path, - const WriteSettings & settings, + const ReadSettings & read_settings, + const WriteSettings & write_settings, bool make_source_readonly, size_t level, std::optional max_level, @@ -56,13 +57,9 @@ void localBackupImpl( if (copy_instead_of_hardlinks || files_to_copy_instead_of_hardlinks.contains(it->name())) { if (transaction) - { - transaction->copyFile(source, destination, settings); - } + transaction->copyFile(source, destination, read_settings, write_settings); else - { - disk->copyFile(source, *disk, destination, settings); - } + disk->copyFile(source, *disk, destination, read_settings, write_settings); } else { @@ -79,7 +76,8 @@ void localBackupImpl( transaction, source, destination, - settings, + read_settings, + write_settings, make_source_readonly, level + 1, max_level, @@ -129,7 +127,8 @@ void localBackup( const DiskPtr & disk, const String & source_path, const String & destination_path, - const WriteSettings & settings, + const ReadSettings & read_settings, + const WriteSettings & write_settings, bool make_source_readonly, std::optional max_level, bool copy_instead_of_hardlinks, @@ -160,7 +159,8 @@ void localBackup( disk_transaction.get(), source_path, destination_path, - settings, + read_settings, + write_settings, make_source_readonly, /* level= */ 0, max_level, @@ -170,7 +170,7 @@ void localBackup( else if (copy_instead_of_hardlinks) { CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); }); - disk->copyDirectoryContent(source_path, disk, destination_path, settings); + disk->copyDirectoryContent(source_path, disk, destination_path, read_settings, write_settings); cleanup.success(); } else @@ -189,7 +189,8 @@ void localBackup( disk_transaction.get(), source_path, destination_path, - settings, + read_settings, + write_settings, make_source_readonly, /* level= */ 0, max_level, diff --git a/src/Storages/MergeTree/localBackup.h b/src/Storages/MergeTree/localBackup.h index d9b7f3e8b0c..3490db9726e 100644 --- a/src/Storages/MergeTree/localBackup.h +++ b/src/Storages/MergeTree/localBackup.h @@ -28,7 +28,8 @@ struct WriteSettings; const DiskPtr & disk, const String & source_path, const String & destination_path, - const WriteSettings & settings, + const ReadSettings & read_settings, + const WriteSettings & write_settings, bool make_source_readonly = true, std::optional max_level = {}, bool copy_instead_of_hardlinks = false, diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 22700712829..694ad9a49f8 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2043,7 +2043,14 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()}; - auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, clone_params, local_context->getWriteSettings()); + auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk( + src_part, + TMP_PREFIX, + dst_part_info, + my_metadata_snapshot, + clone_params, + local_context->getReadSettings(), + local_context->getWriteSettings()); dst_parts.emplace_back(std::move(dst_part)); dst_parts_locks.emplace_back(std::move(part_lock)); } @@ -2142,7 +2149,15 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()}; - auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params, local_context->getWriteSettings()); + auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk( + src_part, + TMP_PREFIX, + dst_part_info, + dest_metadata_snapshot, + clone_params, + local_context->getReadSettings(), + local_context->getWriteSettings() + ); dst_parts.emplace_back(std::move(dst_part)); dst_parts_locks.emplace_back(std::move(part_lock)); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7c7e6dbd42c..276db7639b9 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2473,7 +2473,13 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) .metadata_version_to_write = metadata_snapshot->getMetadataVersion() }; auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk( - part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, clone_params, getContext()->getWriteSettings()); + part_desc->src_table_part, + TMP_PREFIX + "clone_", + part_desc->new_part_info, + metadata_snapshot, + clone_params, + getContext()->getReadSettings(), + getContext()->getWriteSettings()); part_desc->res_part = std::move(res_part); part_desc->temporary_part_lock = std::move(temporary_part_lock); } @@ -4568,7 +4574,14 @@ bool StorageReplicatedMergeTree::fetchPart( { chassert(!is_zero_copy_part(part_to_clone)); IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true }; - auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, clone_params, getContext()->getWriteSettings()); + auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk( + part_to_clone, + "tmp_clone_", + part_info, + metadata_snapshot, + clone_params, + getContext()->getReadSettings(), + getContext()->getWriteSettings()); part_directory_lock = std::move(lock); return cloned_part; }; @@ -7656,7 +7669,14 @@ void StorageReplicatedMergeTree::replacePartitionFrom( .copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(), .metadata_version_to_write = metadata_snapshot->getMetadataVersion() }; - auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, clone_params, query_context->getWriteSettings()); + auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk( + src_part, + TMP_PREFIX, + dst_part_info, + metadata_snapshot, + clone_params, + query_context->getReadSettings(), + query_context->getWriteSettings()); src_parts.emplace_back(src_part); dst_parts.emplace_back(dst_part); dst_parts_locks.emplace_back(std::move(part_lock)); @@ -7896,7 +7916,14 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta .copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(), .metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion() }; - auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params, query_context->getWriteSettings()); + auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk( + src_part, + TMP_PREFIX, + dst_part_info, + dest_metadata_snapshot, + clone_params, + query_context->getReadSettings(), + query_context->getWriteSettings()); src_parts.emplace_back(src_part); dst_parts.emplace_back(dst_part); diff --git a/tests/queries/0_stateless/02844_max_backup_bandwidth_s3.reference b/tests/queries/0_stateless/02844_max_backup_bandwidth_s3.reference new file mode 100644 index 00000000000..939eb45ce1b --- /dev/null +++ b/tests/queries/0_stateless/02844_max_backup_bandwidth_s3.reference @@ -0,0 +1,2 @@ +native_copy 0 +no_native_copy 1 diff --git a/tests/queries/0_stateless/02844_max_backup_bandwidth_s3.sh b/tests/queries/0_stateless/02844_max_backup_bandwidth_s3.sh new file mode 100755 index 00000000000..4650415c202 --- /dev/null +++ b/tests/queries/0_stateless/02844_max_backup_bandwidth_s3.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# Tags: no-fasttest +# Tag: no-fasttest - requires S3 + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + drop table if exists data; + create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, disk='s3_disk'; + -- reading 1e6*8 bytes with 1M bandwith it should take (8-1)/1=7 seconds + insert into data select * from numbers(1e6); +" + +query_id=$(random_str 10) +$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to S3(s3_conn, 'backups/$CLICKHOUSE_DATABASE/data/backup2') SETTINGS allow_s3_native_copy=1" --max_backup_bandwidth=1M > /dev/null +$CLICKHOUSE_CLIENT -nm -q " + SYSTEM FLUSH LOGS; + SELECT + 'native_copy', + query_duration_ms >= 7e3 + FROM system.query_log + WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart' +" + +query_id=$(random_str 10) +$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to S3(s3_conn, 'backups/$CLICKHOUSE_DATABASE/data/backup3') SETTINGS allow_s3_native_copy=0" --max_backup_bandwidth=1M > /dev/null +$CLICKHOUSE_CLIENT -nm -q " + SYSTEM FLUSH LOGS; + SELECT + 'no_native_copy', + query_duration_ms >= 7e3 + FROM system.query_log + WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart' +" From 05a1c96258394b591e94601221bc0180305a27d6 Mon Sep 17 00:00:00 2001 From: Jordi Villar Date: Wed, 20 Sep 2023 23:00:25 +0200 Subject: [PATCH 6/7] Interval operator support plural literals --- src/Parsers/parseIntervalKind.cpp | 33 ++++++++++++------- ..._operator_support_plural_literal.reference | 16 +++++++++ ...terval_operator_support_plural_literal.sql | 16 +++++++++ 3 files changed, 54 insertions(+), 11 deletions(-) create mode 100644 tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference create mode 100644 tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql diff --git a/src/Parsers/parseIntervalKind.cpp b/src/Parsers/parseIntervalKind.cpp index 77c3178ae2b..fe052287083 100644 --- a/src/Parsers/parseIntervalKind.cpp +++ b/src/Parsers/parseIntervalKind.cpp @@ -7,77 +7,88 @@ namespace DB { bool parseIntervalKind(IParser::Pos & pos, Expected & expected, IntervalKind & result) { - if (ParserKeyword("NANOSECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_NANOSECOND").ignore(pos, expected) + if (ParserKeyword("NANOSECOND").ignore(pos, expected) || ParserKeyword("NANOSECONDS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_NANOSECOND").ignore(pos, expected) || ParserKeyword("NS").ignore(pos, expected)) { result = IntervalKind::Nanosecond; return true; } - if (ParserKeyword("MICROSECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_MICROSECOND").ignore(pos, expected) + if (ParserKeyword("MICROSECOND").ignore(pos, expected) || ParserKeyword("MICROSECONDS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_MICROSECOND").ignore(pos, expected) || ParserKeyword("MCS").ignore(pos, expected)) { result = IntervalKind::Microsecond; return true; } - if (ParserKeyword("MILLISECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_MILLISECOND").ignore(pos, expected) + if (ParserKeyword("MILLISECOND").ignore(pos, expected) || ParserKeyword("MILLISECONDS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_MILLISECOND").ignore(pos, expected) || ParserKeyword("MS").ignore(pos, expected)) { result = IntervalKind::Millisecond; return true; } - if (ParserKeyword("SECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_SECOND").ignore(pos, expected) + if (ParserKeyword("SECOND").ignore(pos, expected) || ParserKeyword("SECONDS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_SECOND").ignore(pos, expected) || ParserKeyword("SS").ignore(pos, expected) || ParserKeyword("S").ignore(pos, expected)) { result = IntervalKind::Second; return true; } - if (ParserKeyword("MINUTE").ignore(pos, expected) || ParserKeyword("SQL_TSI_MINUTE").ignore(pos, expected) + if (ParserKeyword("MINUTE").ignore(pos, expected) || ParserKeyword("MINUTES").ignore(pos, expected) + || ParserKeyword("SQL_TSI_MINUTE").ignore(pos, expected) || ParserKeyword("MI").ignore(pos, expected) || ParserKeyword("N").ignore(pos, expected)) { result = IntervalKind::Minute; return true; } - if (ParserKeyword("HOUR").ignore(pos, expected) || ParserKeyword("SQL_TSI_HOUR").ignore(pos, expected) + if (ParserKeyword("HOUR").ignore(pos, expected) || ParserKeyword("HOURS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_HOUR").ignore(pos, expected) || ParserKeyword("HH").ignore(pos, expected) || ParserKeyword("H").ignore(pos, expected)) { result = IntervalKind::Hour; return true; } - if (ParserKeyword("DAY").ignore(pos, expected) || ParserKeyword("SQL_TSI_DAY").ignore(pos, expected) + if (ParserKeyword("DAY").ignore(pos, expected) || ParserKeyword("DAYS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_DAY").ignore(pos, expected) || ParserKeyword("DD").ignore(pos, expected) || ParserKeyword("D").ignore(pos, expected)) { result = IntervalKind::Day; return true; } - if (ParserKeyword("WEEK").ignore(pos, expected) || ParserKeyword("SQL_TSI_WEEK").ignore(pos, expected) + if (ParserKeyword("WEEK").ignore(pos, expected) || ParserKeyword("WEEKS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_WEEK").ignore(pos, expected) || ParserKeyword("WK").ignore(pos, expected) || ParserKeyword("WW").ignore(pos, expected)) { result = IntervalKind::Week; return true; } - if (ParserKeyword("MONTH").ignore(pos, expected) || ParserKeyword("SQL_TSI_MONTH").ignore(pos, expected) + if (ParserKeyword("MONTH").ignore(pos, expected) || ParserKeyword("MONTHS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_MONTH").ignore(pos, expected) || ParserKeyword("MM").ignore(pos, expected) || ParserKeyword("M").ignore(pos, expected)) { result = IntervalKind::Month; return true; } - if (ParserKeyword("QUARTER").ignore(pos, expected) || ParserKeyword("SQL_TSI_QUARTER").ignore(pos, expected) + if (ParserKeyword("QUARTER").ignore(pos, expected) || ParserKeyword("QUARTERS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_QUARTER").ignore(pos, expected) || ParserKeyword("QQ").ignore(pos, expected) || ParserKeyword("Q").ignore(pos, expected)) { result = IntervalKind::Quarter; return true; } - if (ParserKeyword("YEAR").ignore(pos, expected) || ParserKeyword("SQL_TSI_YEAR").ignore(pos, expected) + if (ParserKeyword("YEAR").ignore(pos, expected) || ParserKeyword("YEARS").ignore(pos, expected) + || ParserKeyword("SQL_TSI_YEAR").ignore(pos, expected) || ParserKeyword("YYYY").ignore(pos, expected) || ParserKeyword("YY").ignore(pos, expected)) { result = IntervalKind::Year; diff --git a/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference new file mode 100644 index 00000000000..4f1d0bdcd49 --- /dev/null +++ b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference @@ -0,0 +1,16 @@ +2 +2 +2 +2 +2 +2 +2 +2 +2 +2 +2 +2 +2009-02-14 01:31:30 +2009-02-14 01:31:30 +2009-02-15 23:31:30 +2009-02-15 23:31:30 diff --git a/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql new file mode 100644 index 00000000000..dd5fc9eca45 --- /dev/null +++ b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql @@ -0,0 +1,16 @@ +SELECT INTERVAL 2 year; +SELECT INTERVAL 2 years; +SELECT INTERVAL 2 month; +SELECT INTERVAL 2 months; +SELECT INTERVAL 2 week; +SELECT INTERVAL 2 weeks; +SELECT INTERVAL 2 day; +SELECT INTERVAL 2 days; +SELECT INTERVAL 2 hour; +SELECT INTERVAL 2 hours; +SELECT INTERVAL 2 minute; +SELECT INTERVAL 2 minutes; +SELECT DATE_ADD(hour, 2, toDateTime(1234567890, 'UTC')); +SELECT DATE_ADD(hours, 2, toDateTime(1234567890, 'UTC')); +SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL 2 day); +SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL 2 days); From 0518931bbd1e76e195fd798fa6d66a3a9f828cc3 Mon Sep 17 00:00:00 2001 From: Jordi Villar Date: Wed, 20 Sep 2023 23:16:36 +0200 Subject: [PATCH 7/7] Add more tests --- ...2884_interval_operator_support_plural_literal.reference | 7 +++++++ .../02884_interval_operator_support_plural_literal.sql | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference index 4f1d0bdcd49..9616b4c1415 100644 --- a/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference +++ b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.reference @@ -10,7 +10,14 @@ 2 2 2 +2 +2 +2 +2 +2 +2 2009-02-14 01:31:30 2009-02-14 01:31:30 2009-02-15 23:31:30 2009-02-15 23:31:30 +2009-02-15 23:31:30 diff --git a/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql index dd5fc9eca45..41403cdf72e 100644 --- a/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql +++ b/tests/queries/0_stateless/02884_interval_operator_support_plural_literal.sql @@ -1,16 +1,23 @@ SELECT INTERVAL 2 year; SELECT INTERVAL 2 years; +SELECT INTERVAL '2 years'; SELECT INTERVAL 2 month; SELECT INTERVAL 2 months; +SELECT INTERVAL '2 months'; SELECT INTERVAL 2 week; SELECT INTERVAL 2 weeks; +SELECT INTERVAL '2 weeks'; SELECT INTERVAL 2 day; SELECT INTERVAL 2 days; +SELECT INTERVAL '2 days'; SELECT INTERVAL 2 hour; SELECT INTERVAL 2 hours; +SELECT INTERVAL '2 hours'; SELECT INTERVAL 2 minute; SELECT INTERVAL 2 minutes; +SELECT INTERVAL '2 minutes'; SELECT DATE_ADD(hour, 2, toDateTime(1234567890, 'UTC')); SELECT DATE_ADD(hours, 2, toDateTime(1234567890, 'UTC')); SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL 2 day); SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL 2 days); +SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL '2 days');