From 218b1f9c293a9b4320cee1ec13a255b1a4b37a75 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 30 Mar 2023 19:06:49 +0200 Subject: [PATCH] Add ability to throttle BACKUPs on per-server/backup basis Server settings: - backup_read_bandwidth_for_server - backup_write_bandwidth_for_server Query settings: - backup_read_bandwidth - backup_write_bandwidth Signed-off-by: Azat Khuzhin --- src/Backups/BackupEntryFromAppendOnlyFile.cpp | 3 +- src/Backups/BackupEntryFromAppendOnlyFile.h | 1 + src/Backups/BackupEntryFromImmutableFile.cpp | 10 +++- src/Backups/BackupEntryFromImmutableFile.h | 3 + src/Backups/BackupIO.cpp | 5 +- src/Backups/BackupIO.h | 4 +- src/Backups/BackupIO_Disk.cpp | 6 +- src/Backups/BackupIO_File.cpp | 2 +- src/Backups/BackupIO_S3.cpp | 6 +- src/Backups/BackupIO_S3.h | 2 +- src/Backups/BackupImpl.cpp | 21 ++++--- src/Backups/BackupImpl.h | 3 +- src/Core/ServerSettings.h | 2 + src/Core/Settings.h | 2 + src/Disks/IDisk.cpp | 5 +- src/Disks/IDisk.h | 4 +- src/Interpreters/Context.cpp | 59 +++++++++++++++++++ src/Interpreters/Context.h | 9 +++ .../MergeTree/DataPartStorageOnDiskBase.cpp | 4 +- .../MergeTree/DataPartStorageOnDiskBase.h | 1 + src/Storages/MergeTree/IDataPartStorage.h | 1 + src/Storages/MergeTree/MergeTreeData.cpp | 16 +---- src/Storages/MergeTree/MergeTreeData.h | 3 - src/Storages/StorageLog.cpp | 9 ++- src/Storages/StorageMemory.cpp | 21 +++++-- src/Storages/StorageStripeLog.cpp | 9 ++- tests/config/config.d/backups.xml | 6 ++ tests/config/install.sh | 1 + .../02704_backup_read_bandwidth.reference | 1 + .../02704_backup_read_bandwidth.sh | 25 ++++++++ .../02704_backup_write_bandwidth.reference | 1 + .../02704_backup_write_bandwidth.sh | 25 ++++++++ 32 files changed, 217 insertions(+), 53 deletions(-) create mode 100644 tests/config/config.d/backups.xml create mode 100644 tests/queries/0_stateless/02704_backup_read_bandwidth.reference create mode 100755 tests/queries/0_stateless/02704_backup_read_bandwidth.sh create mode 100644 tests/queries/0_stateless/02704_backup_write_bandwidth.reference create mode 100755 tests/queries/0_stateless/02704_backup_write_bandwidth.sh diff --git a/src/Backups/BackupEntryFromAppendOnlyFile.cpp b/src/Backups/BackupEntryFromAppendOnlyFile.cpp index 9bab101bc35..5384a69d890 100644 --- a/src/Backups/BackupEntryFromAppendOnlyFile.cpp +++ b/src/Backups/BackupEntryFromAppendOnlyFile.cpp @@ -8,10 +8,11 @@ namespace DB BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile( const DiskPtr & disk_, const String & file_path_, + const ReadSettings & settings_, const std::optional & file_size_, const std::optional & checksum_, const std::shared_ptr & temporary_file_) - : BackupEntryFromImmutableFile(disk_, file_path_, file_size_, checksum_, temporary_file_) + : BackupEntryFromImmutableFile(disk_, file_path_, settings_, file_size_, checksum_, temporary_file_) , limit(BackupEntryFromImmutableFile::getSize()) { } diff --git a/src/Backups/BackupEntryFromAppendOnlyFile.h b/src/Backups/BackupEntryFromAppendOnlyFile.h index c6055b86268..b0cee38c6be 100644 --- a/src/Backups/BackupEntryFromAppendOnlyFile.h +++ b/src/Backups/BackupEntryFromAppendOnlyFile.h @@ -16,6 +16,7 @@ public: BackupEntryFromAppendOnlyFile( const DiskPtr & disk_, const String & file_path_, + const ReadSettings & settings_, const std::optional & file_size_ = {}, const std::optional & checksum_ = {}, const std::shared_ptr & temporary_file_ = {}); diff --git a/src/Backups/BackupEntryFromImmutableFile.cpp b/src/Backups/BackupEntryFromImmutableFile.cpp index 86b9c13fb9a..48783a3bb63 100644 --- a/src/Backups/BackupEntryFromImmutableFile.cpp +++ b/src/Backups/BackupEntryFromImmutableFile.cpp @@ -11,10 +11,16 @@ namespace DB BackupEntryFromImmutableFile::BackupEntryFromImmutableFile( const DiskPtr & disk_, const String & file_path_, + const ReadSettings & settings_, const std::optional & file_size_, const std::optional & checksum_, const std::shared_ptr & temporary_file_) - : disk(disk_), file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file_on_disk(temporary_file_) + : disk(disk_) + , file_path(file_path_) + , settings(settings_) + , file_size(file_size_) + , checksum(checksum_) + , temporary_file_on_disk(temporary_file_) { } @@ -30,7 +36,7 @@ UInt64 BackupEntryFromImmutableFile::getSize() const std::unique_ptr BackupEntryFromImmutableFile::getReadBuffer() const { - return disk->readFile(file_path); + return disk->readFile(file_path, settings); } diff --git a/src/Backups/BackupEntryFromImmutableFile.h b/src/Backups/BackupEntryFromImmutableFile.h index 99241c691cb..66f1fade294 100644 --- a/src/Backups/BackupEntryFromImmutableFile.h +++ b/src/Backups/BackupEntryFromImmutableFile.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -19,6 +20,7 @@ public: BackupEntryFromImmutableFile( const DiskPtr & disk_, const String & file_path_, + const ReadSettings & settings_, const std::optional & file_size_ = {}, const std::optional & checksum_ = {}, const std::shared_ptr & temporary_file_ = {}); @@ -37,6 +39,7 @@ public: private: const DiskPtr disk; const String file_path; + ReadSettings settings; mutable std::optional file_size TSA_GUARDED_BY(get_file_size_mutex); mutable std::mutex get_file_size_mutex; const std::optional checksum; diff --git a/src/Backups/BackupIO.cpp b/src/Backups/BackupIO.cpp index cc252c2f1bd..a1d854f6b7c 100644 --- a/src/Backups/BackupIO.cpp +++ b/src/Backups/BackupIO.cpp @@ -22,13 +22,14 @@ void IBackupReader::copyFileToDisk(const String & file_name, size_t size, DiskPt write_buffer->finalize(); } -void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name) +void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name, ThrottlerPtr throttler) { auto read_buffer = create_read_buffer(); if (offset) read_buffer->seek(offset, SEEK_SET); auto write_buffer = writeFile(dest_file_name); - copyData(*read_buffer, *write_buffer, size); + std::atomic cancelled; + copyDataWithThrottler(*read_buffer, *write_buffer, size, cancelled, throttler); write_buffer->finalize(); } diff --git a/src/Backups/BackupIO.h b/src/Backups/BackupIO.h index cf3d29ee51e..d6a7eb0dd74 100644 --- a/src/Backups/BackupIO.h +++ b/src/Backups/BackupIO.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { @@ -36,8 +37,9 @@ public: virtual void removeFile(const String & file_name) = 0; virtual void removeFiles(const Strings & file_names) = 0; virtual DataSourceDescription getDataSourceDescription() const = 0; - virtual void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name); + virtual void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name, ThrottlerPtr throttler); virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const { return false; } + // Ignore throttling, copyDataToFile() should be used if throttling was requested. virtual void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name); }; diff --git a/src/Backups/BackupIO_Disk.cpp b/src/Backups/BackupIO_Disk.cpp index cc6076541d0..41f28965420 100644 --- a/src/Backups/BackupIO_Disk.cpp +++ b/src/Backups/BackupIO_Disk.cpp @@ -50,7 +50,9 @@ void BackupReaderDisk::copyFileToDisk(const String & file_name, size_t size, Dis } -BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_) +BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & path_) + : disk(disk_) + , path(path_) { } @@ -130,7 +132,7 @@ void BackupWriterDisk::copyFileNative(DiskPtr src_disk, const String & src_file_ if ((src_offset != 0) || (src_size != src_disk->getFileSize(src_file_name))) { auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); }; - copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name); + copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name, /* throttler= */ {}); return; } diff --git a/src/Backups/BackupIO_File.cpp b/src/Backups/BackupIO_File.cpp index 5bf6d54928d..e754275197b 100644 --- a/src/Backups/BackupIO_File.cpp +++ b/src/Backups/BackupIO_File.cpp @@ -155,7 +155,7 @@ void BackupWriterFile::copyFileNative(DiskPtr src_disk, const String & src_file_ if ((src_offset != 0) || (src_size != fs::file_size(abs_source_path))) { auto create_read_buffer = [abs_source_path] { return createReadBufferFromFileBase(abs_source_path, {}); }; - copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name); + copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name, /* throttler= */ {}); return; } diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index f7d518b064d..f703fa4615c 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -190,7 +190,7 @@ void BackupWriterS3::copyFileNative(DiskPtr src_disk, const String & src_file_na if (objects.size() > 1) { auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); }; - copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name); + copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name, /* throttler= */ {}); } else { @@ -203,8 +203,10 @@ void BackupWriterS3::copyFileNative(DiskPtr src_disk, const String & src_file_na } void BackupWriterS3::copyDataToFile( - const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name) + const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name, ThrottlerPtr throttler) { + // FIXME: + (void)throttler; copyDataToS3File(create_read_buffer, offset, size, client, s3_uri.bucket, fs::path(s3_uri.key) / dest_file_name, request_settings, {}, threadPoolCallbackRunner(BackupsIOThreadPool::get(), "BackupWriterS3")); } diff --git a/src/Backups/BackupIO_S3.h b/src/Backups/BackupIO_S3.h index 94e61248428..16663c01340 100644 --- a/src/Backups/BackupIO_S3.h +++ b/src/Backups/BackupIO_S3.h @@ -46,7 +46,7 @@ public: bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override; std::unique_ptr writeFile(const String & file_name) override; - void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name) override; + void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name, ThrottlerPtr throttler) override; void removeFile(const String & file_name) override; void removeFiles(const Strings & file_names) override; diff --git a/src/Backups/BackupImpl.cpp b/src/Backups/BackupImpl.cpp index 0ab1bf7f997..c15c1fd9db7 100644 --- a/src/Backups/BackupImpl.cpp +++ b/src/Backups/BackupImpl.cpp @@ -81,7 +81,8 @@ BackupImpl::BackupImpl( const std::optional & base_backup_info_, std::shared_ptr reader_, const ContextPtr & context_) - : backup_name_for_logging(backup_name_for_logging_) + : context(context_) + , backup_name_for_logging(backup_name_for_logging_) , use_archive(!archive_params_.archive_name.empty()) , archive_params(archive_params_) , open_mode(OpenMode::READ) @@ -90,7 +91,7 @@ BackupImpl::BackupImpl( , version(INITIAL_BACKUP_VERSION) , base_backup_info(base_backup_info_) { - open(context_); + open(); } @@ -104,7 +105,8 @@ BackupImpl::BackupImpl( const std::shared_ptr & coordination_, const std::optional & backup_uuid_, bool deduplicate_files_) - : backup_name_for_logging(backup_name_for_logging_) + : context(context_) + , backup_name_for_logging(backup_name_for_logging_) , use_archive(!archive_params_.archive_name.empty()) , archive_params(archive_params_) , open_mode(OpenMode::WRITE) @@ -117,7 +119,7 @@ BackupImpl::BackupImpl( , deduplicate_files(deduplicate_files_) , log(&Poco::Logger::get("BackupImpl")) { - open(context_); + open(); } @@ -133,7 +135,7 @@ BackupImpl::~BackupImpl() } } -void BackupImpl::open(const ContextPtr & context) +void BackupImpl::open() { std::lock_guard lock{mutex}; @@ -832,9 +834,11 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry) auto writer_description = writer->getDataSourceDescription(); auto reader_description = entry->getDataSourceDescription(); + bool has_throttler = context->getBackupsReadThrottler() || context->getBackupsWriteThrottler(); + /// We need to copy whole file without archive, we can do it faster /// if source and destination are compatible - if (!use_archive && writer->supportNativeCopy(reader_description)) + if (!use_archive && !has_throttler && writer->supportNativeCopy(reader_description)) { /// Should be much faster than writing data through server. LOG_TRACE(log, "Will copy file {} using native copy", info.data_file_name); @@ -860,7 +864,8 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry) auto read_buffer = entry->getReadBuffer(); if (info.base_size != 0) read_buffer->seek(info.base_size, SEEK_SET); - copyData(*read_buffer, *out); + std::atomic cancelled; + copyDataWithThrottler(*read_buffer, *out, cancelled, context->getBackupsWriteThrottler()); out->finalize(); } else @@ -869,7 +874,7 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry) auto create_read_buffer = [entry] { return entry->getReadBuffer(); }; /// NOTE: `mutex` must be unlocked here otherwise writing will be in one thread maximum and hence slow. - writer->copyDataToFile(create_read_buffer, info.base_size, info.size - info.base_size, info.data_file_name); + writer->copyDataToFile(create_read_buffer, info.base_size, info.size - info.base_size, info.data_file_name, context->getBackupsWriteThrottler()); } } diff --git a/src/Backups/BackupImpl.h b/src/Backups/BackupImpl.h index bf94926c46c..a70f16f411c 100644 --- a/src/Backups/BackupImpl.h +++ b/src/Backups/BackupImpl.h @@ -85,7 +85,7 @@ public: bool supportsWritingInMultipleThreads() const override { return !use_archive; } private: - void open(const ContextPtr & context); + void open(); void close(); void openArchive(); @@ -109,6 +109,7 @@ private: /// Calculates and sets `compressed_size`. void setCompressedSize(); + ContextPtr context; const String backup_name_for_logging; const bool use_archive; const ArchiveParams archive_params; diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 36c5a6c6be8..56d79f73b09 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -29,6 +29,8 @@ namespace DB M(UInt64, max_backups_io_thread_pool_free_size, 0, "Max free size for backups IO thread pool.", 0) \ M(UInt64, backups_io_thread_pool_queue_size, 0, "Queue size for backups IO thread pool.", 0) \ M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \ + M(UInt64, backup_read_bandwidth_for_server, 0, "The maximum read speed in bytes per second for all backups on server. Zero means unlimited.", 0) \ + M(UInt64, backup_write_bandwidth_for_server, 0, "The maximum write speed in bytes per second for all backups on server. Zero means unlimited.", 0) \ M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \ M(Int32, max_connections, 1024, "Max server connections.", 0) \ M(UInt32, asynchronous_metrics_update_period_s, 1, "Period in seconds for updating asynchronous metrics.", 0) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f1127916ca5..4c4f65c6e5f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -424,6 +424,8 @@ class IColumn; M(UInt64, backup_restore_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \ M(UInt64, backup_restore_keeper_value_max_size, 1048576, "Maximum size of data of a [Zoo]Keeper's node during backup", 0) \ M(UInt64, backup_restore_batch_size_for_keeper_multiread, 10000, "Maximum size of batch for multiread request to [Zoo]Keeper during backup or restore", 0) \ + M(UInt64, backup_read_bandwidth, 0, "The maximum read speed in bytes per second for particular backup on server. Zero means unlimited.", 0) \ + M(UInt64, backup_write_bandwidth, 0, "The maximum read speed in bytes per second for particular backup on server. Zero means unlimited.", 0) \ \ M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \ M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \ diff --git a/src/Disks/IDisk.cpp b/src/Disks/IDisk.cpp index 4969cc7c700..e966633a43f 100644 --- a/src/Disks/IDisk.cpp +++ b/src/Disks/IDisk.cpp @@ -27,14 +27,15 @@ bool IDisk::isDirectoryEmpty(const String & path) const return !iterateDirectory(path)->isValid(); } -void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const WriteSettings & settings) /// NOLINT +void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const WriteSettings & settings, ThrottlerPtr throttler) /// NOLINT { LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.", getName(), getPath(), from_file_path, to_disk.getName(), to_disk.getPath(), to_file_path); auto in = readFile(from_file_path); auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings); - copyData(*in, *out); + std::atomic cancelled; + copyDataWithThrottler(*in, *out, cancelled, throttler); out->finalize(); } diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index 4e488bbb39a..37b1a41b3cc 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -190,7 +191,8 @@ public: const String & from_file_path, IDisk & to_disk, const String & to_file_path, - const WriteSettings & settings = {}); + const WriteSettings & settings = {}, + ThrottlerPtr throttler = {}); /// List files at `path` and add their names to `file_names` virtual void listFiles(const String & path, std::vector & file_names) const = 0; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index f45f2ddc3a1..920f5b89e8c 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -284,6 +284,9 @@ struct ContextSharedPart : boost::noncopyable mutable ThrottlerPtr local_read_throttler; /// A server-wide throttler for local IO reads mutable ThrottlerPtr local_write_throttler; /// A server-wide throttler for local IO writes + mutable ThrottlerPtr backups_read_server_throttler; /// A server-wide throttler for backups reads + mutable ThrottlerPtr backups_write_server_throttler; /// A server-wide throttler for backups writes + MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -2447,6 +2450,54 @@ ThrottlerPtr Context::getLocalWriteThrottler() const return throttler; } +ThrottlerPtr Context::getBackupsReadThrottler() const +{ + ThrottlerPtr throttler; + + if (shared->server_settings.backup_read_bandwidth_for_server) + { + auto lock = getLock(); + if (!shared->backups_read_server_throttler) + shared->backups_read_server_throttler = std::make_shared(shared->server_settings.backup_read_bandwidth_for_server); + throttler = shared->backups_read_server_throttler; + } + + const auto & query_settings = getSettingsRef(); + if (query_settings.backup_read_bandwidth) + { + auto lock = getLock(); + if (!backups_read_query_throttler) + backups_read_query_throttler = std::make_shared(query_settings.backup_read_bandwidth, throttler); + throttler = backups_read_query_throttler; + } + + return throttler; +} + +ThrottlerPtr Context::getBackupsWriteThrottler() const +{ + ThrottlerPtr throttler; + + if (shared->server_settings.backup_write_bandwidth_for_server) + { + auto lock = getLock(); + if (!shared->backups_write_server_throttler) + shared->backups_write_server_throttler = std::make_shared(shared->server_settings.backup_write_bandwidth_for_server); + throttler = shared->backups_write_server_throttler; + } + + const auto & query_settings = getSettingsRef(); + if (query_settings.backup_write_bandwidth) + { + auto lock = getLock(); + if (!backups_write_query_throttler) + backups_write_query_throttler = std::make_shared(query_settings.backup_write_bandwidth, throttler); + throttler = backups_write_query_throttler; + } + + return throttler; +} + bool Context::hasDistributedDDL() const { return getConfigRef().has("distributed_ddl"); @@ -4167,6 +4218,14 @@ ReadSettings Context::getReadSettings() const return res; } +ReadSettings Context::getBackupReadSettings() const +{ + ReadSettings settings = getReadSettings(); + settings.remote_throttler = getBackupsReadThrottler(); + settings.local_throttler = getBackupsReadThrottler(); + return settings; +} + WriteSettings Context::getWriteSettings() const { WriteSettings res; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 2418d830b2a..c1d64b6813e 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1109,6 +1109,9 @@ public: /** Get settings for reading from filesystem. */ ReadSettings getReadSettings() const; + /** Get settings for reading from filesystem for BACKUPs. */ + ReadSettings getBackupReadSettings() const; + /** Get settings for writing to filesystem. */ WriteSettings getWriteSettings() const; @@ -1157,12 +1160,18 @@ public: ThrottlerPtr getLocalReadThrottler() const; ThrottlerPtr getLocalWriteThrottler() const; + ThrottlerPtr getBackupsReadThrottler() const; + ThrottlerPtr getBackupsWriteThrottler() const; + private: mutable ThrottlerPtr remote_read_query_throttler; /// A query-wide throttler for remote IO reads mutable ThrottlerPtr remote_write_query_throttler; /// A query-wide throttler for remote IO writes mutable ThrottlerPtr local_read_query_throttler; /// A query-wide throttler for local IO reads mutable ThrottlerPtr local_write_query_throttler; /// A query-wide throttler for local IO writes + + mutable ThrottlerPtr backups_read_query_throttler; /// A query-wide throttler for backups reads + mutable ThrottlerPtr backups_write_query_throttler; /// A query-wide throttler for backups writes }; struct HTTPContext : public IHTTPContext diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp index 175df9b6e28..9b601d9f3af 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -311,6 +312,7 @@ DataPartStorageOnDiskBase::getReplicatedFilesDescriptionForRemoteDisk(const Name } void DataPartStorageOnDiskBase::backup( + const ReadSettings & read_settings, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, @@ -386,7 +388,7 @@ void DataPartStorageOnDiskBase::backup( backup_entries.emplace_back( filepath_in_backup, - std::make_unique(disk, filepath_on_disk, file_size, file_hash, temp_dir_owner)); + std::make_unique(disk, filepath_on_disk, read_settings, file_size, file_hash, temp_dir_owner)); } } diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h index 7c408dcf381..11806e25a1e 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h @@ -49,6 +49,7 @@ public: ReplicatedFilesDescription getReplicatedFilesDescriptionForRemoteDisk(const NameSet & file_names) const override; void backup( + const ReadSettings & read_settings, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index f92784cb0da..4d7212eb17b 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -197,6 +197,7 @@ public: /// Also creates a new tmp_dir for internal disk (if disk is mentioned the first time). using TemporaryFilesOnDisks = std::map>; virtual void backup( + const ReadSettings & read_settings, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 66c52e6e24c..694f7b8d86e 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4869,24 +4869,12 @@ Pipe MergeTreeData::alterPartition( } -void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & partitions) -{ - auto local_context = backup_entries_collector.getContext(); - - DataPartsVector data_parts; - if (partitions) - data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(*partitions, local_context)); - else - data_parts = getVisibleDataPartsVector(local_context); - - backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context)); -} - BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context) { BackupEntries backup_entries; std::map> temp_dirs; TableLockHolder table_lock; + ReadSettings read_settings = local_context->getBackupReadSettings(); for (const auto & part : data_parts) { @@ -4916,6 +4904,7 @@ BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, con BackupEntries backup_entries_from_part; part->getDataPartStorage().backup( + read_settings, part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, @@ -4927,6 +4916,7 @@ BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, con for (const auto & [projection_name, projection_part] : projection_parts) { projection_part->getDataPartStorage().backup( + read_settings, projection_part->checksums, projection_part->getFileNamesWithoutChecksums(), fs::path{data_path_in_backup} / part->name, diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 0be932ccdaf..8312efa216d 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -751,9 +751,6 @@ public: ContextPtr context, TableLockHolder & table_lock_holder); - /// Makes backup entries to backup the data of the storage. - void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & partitions) override; - /// Extract data from the backup and put it to the storage. void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional & partitions) override; diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index 772ed34b7a9..8264d67aaba 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -926,7 +926,10 @@ std::optional StorageLog::totalBytes(const Settings &) const void StorageLog::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & /* partitions */) { - auto lock_timeout = getLockTimeout(backup_entries_collector.getContext()); + auto local_context = backup_entries_collector.getContext(); + ReadSettings read_settings = local_context->getBackupReadSettings(); + + auto lock_timeout = getLockTimeout(local_context); loadMarks(lock_timeout); ReadLock lock{rwlock, lock_timeout}; @@ -951,7 +954,7 @@ void StorageLog::backupData(BackupEntriesCollector & backup_entries_collector, c backup_entries_collector.addBackupEntry( data_path_in_backup_fs / data_file_name, std::make_unique( - disk, hardlink_file_path, file_checker.getFileSize(data_file.path), std::nullopt, temp_dir_owner)); + disk, hardlink_file_path, read_settings, file_checker.getFileSize(data_file.path), std::nullopt, temp_dir_owner)); } /// __marks.mrk @@ -964,7 +967,7 @@ void StorageLog::backupData(BackupEntriesCollector & backup_entries_collector, c backup_entries_collector.addBackupEntry( data_path_in_backup_fs / marks_file_name, std::make_unique( - disk, hardlink_file_path, file_checker.getFileSize(marks_file_path), std::nullopt, temp_dir_owner)); + disk, hardlink_file_path, read_settings, file_checker.getFileSize(marks_file_path), std::nullopt, temp_dir_owner)); } /// sizes.json diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 11688582877..c568178a469 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -290,12 +290,14 @@ namespace { public: MemoryBackup( + ContextPtr context_, const StorageMetadataPtr & metadata_snapshot_, const std::shared_ptr blocks_, const String & data_path_in_backup, const DiskPtr & temp_disk_, UInt64 max_compress_block_size_) - : metadata_snapshot(metadata_snapshot_) + : context(context_) + , metadata_snapshot(metadata_snapshot_) , blocks(blocks_) , temp_disk(temp_disk_) , max_compress_block_size(max_compress_block_size_) @@ -326,6 +328,8 @@ namespace BackupEntries generate() override { + ReadSettings read_settings = context->getBackupReadSettings(); + BackupEntries backup_entries; backup_entries.resize(file_paths.size()); @@ -342,7 +346,7 @@ namespace NativeWriter block_out{data_out, 0, metadata_snapshot->getSampleBlock(), false, &index}; for (const auto & block : *blocks) block_out.write(block); - backup_entries[data_bin_pos] = {file_paths[data_bin_pos], std::make_shared(temp_disk, data_file_path)}; + backup_entries[data_bin_pos] = {file_paths[data_bin_pos], std::make_shared(temp_disk, data_file_path, read_settings)}; } /// Writing index.mrk @@ -351,7 +355,7 @@ namespace auto index_mrk_out_compressed = temp_disk->writeFile(index_mrk_path); CompressedWriteBuffer index_mrk_out{*index_mrk_out_compressed}; index.write(index_mrk_out); - backup_entries[index_mrk_pos] = {file_paths[index_mrk_pos], std::make_shared(temp_disk, index_mrk_path)}; + backup_entries[index_mrk_pos] = {file_paths[index_mrk_pos], std::make_shared(temp_disk, index_mrk_path, read_settings)}; } /// Writing columns.txt @@ -389,6 +393,7 @@ namespace return backup_entries; } + ContextPtr context; StorageMetadataPtr metadata_snapshot; std::shared_ptr blocks; DiskPtr temp_disk; @@ -403,9 +408,13 @@ void StorageMemory::backupData(BackupEntriesCollector & backup_entries_collector { auto temp_disk = backup_entries_collector.getContext()->getTemporaryVolume()->getDisk(0); auto max_compress_block_size = backup_entries_collector.getContext()->getSettingsRef().max_compress_block_size; - backup_entries_collector.addBackupEntries( - std::make_shared(getInMemoryMetadataPtr(), data.get(), data_path_in_backup, temp_disk, max_compress_block_size) - ->getBackupEntries()); + backup_entries_collector.addBackupEntries(std::make_shared( + backup_entries_collector.getContext(), + getInMemoryMetadataPtr(), + data.get(), + data_path_in_backup, + temp_disk, + max_compress_block_size)->getBackupEntries()); } void StorageMemory::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional & /* partitions */) diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index 30585250be2..d54725b8b39 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -527,7 +527,10 @@ std::optional StorageStripeLog::totalBytes(const Settings &) const void StorageStripeLog::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & /* partitions */) { - auto lock_timeout = getLockTimeout(backup_entries_collector.getContext()); + auto local_context = backup_entries_collector.getContext(); + ReadSettings read_settings = local_context->getBackupReadSettings(); + + auto lock_timeout = getLockTimeout(local_context); loadIndices(lock_timeout); ReadLock lock{rwlock, lock_timeout}; @@ -551,7 +554,7 @@ void StorageStripeLog::backupData(BackupEntriesCollector & backup_entries_collec backup_entries_collector.addBackupEntry( data_path_in_backup_fs / data_file_name, std::make_unique( - disk, hardlink_file_path, file_checker.getFileSize(data_file_path), std::nullopt, temp_dir_owner)); + disk, hardlink_file_path, read_settings, file_checker.getFileSize(data_file_path), std::nullopt, temp_dir_owner)); } /// index.mrk @@ -563,7 +566,7 @@ void StorageStripeLog::backupData(BackupEntriesCollector & backup_entries_collec backup_entries_collector.addBackupEntry( data_path_in_backup_fs / index_file_name, std::make_unique( - disk, hardlink_file_path, file_checker.getFileSize(index_file_path), std::nullopt, temp_dir_owner)); + disk, hardlink_file_path, read_settings, file_checker.getFileSize(index_file_path), std::nullopt, temp_dir_owner)); } /// sizes.json diff --git a/tests/config/config.d/backups.xml b/tests/config/config.d/backups.xml new file mode 100644 index 00000000000..48f7a256233 --- /dev/null +++ b/tests/config/config.d/backups.xml @@ -0,0 +1,6 @@ + + + default + /backups + + diff --git a/tests/config/install.sh b/tests/config/install.sh index 44eab0e4db0..77e8a8460ad 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -56,6 +56,7 @@ ln -sf $SRC_PATH/config.d/display_name.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/reverse_dns_query_function.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/compressed_marks_and_index.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/disable_s3_env_credentials.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/backups.xml $DEST_SERVER_PATH/config.d/ # Not supported with fasttest. if [ "${DEST_SERVER_PATH}" = "/etc/clickhouse-server" ] diff --git a/tests/queries/0_stateless/02704_backup_read_bandwidth.reference b/tests/queries/0_stateless/02704_backup_read_bandwidth.reference new file mode 100644 index 00000000000..9972842f982 --- /dev/null +++ b/tests/queries/0_stateless/02704_backup_read_bandwidth.reference @@ -0,0 +1 @@ +1 1 diff --git a/tests/queries/0_stateless/02704_backup_read_bandwidth.sh b/tests/queries/0_stateless/02704_backup_read_bandwidth.sh new file mode 100755 index 00000000000..9c2708af614 --- /dev/null +++ b/tests/queries/0_stateless/02704_backup_read_bandwidth.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# Tags: no-s3-storage, no-random-settings, no-random-merge-tree-settings + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + drop table if exists data; + create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, min_bytes_for_compact_part=0; +" + +# reading 1e6*8 bytes with 1M bandwith it should take (8-1)/1=7 seconds +$CLICKHOUSE_CLIENT -q "insert into data select * from numbers(1e6)" + +query_id=$(random_str 10) +$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to Disk('default', 'backups/$CLICKHOUSE_DATABASE/data/backup1')" --backup_read_bandwidth=1M > /dev/null +$CLICKHOUSE_CLIENT -nm -q " + SYSTEM FLUSH LOGS; + SELECT + query_duration_ms >= 7e3, + ProfileEvents['ReadBufferFromFileDescriptorReadBytes'] > 8e6 + FROM system.query_log + WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart' +" diff --git a/tests/queries/0_stateless/02704_backup_write_bandwidth.reference b/tests/queries/0_stateless/02704_backup_write_bandwidth.reference new file mode 100644 index 00000000000..9972842f982 --- /dev/null +++ b/tests/queries/0_stateless/02704_backup_write_bandwidth.reference @@ -0,0 +1 @@ +1 1 diff --git a/tests/queries/0_stateless/02704_backup_write_bandwidth.sh b/tests/queries/0_stateless/02704_backup_write_bandwidth.sh new file mode 100755 index 00000000000..163bc955fb3 --- /dev/null +++ b/tests/queries/0_stateless/02704_backup_write_bandwidth.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# Tags: no-s3-storage, no-random-settings, no-random-merge-tree-settings + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + drop table if exists data; + create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, min_bytes_for_compact_part=0; +" + +# 1e6*8 bytes with 1M bandwith it should take (8-1)/1=7 seconds +$CLICKHOUSE_CLIENT -q "insert into data select * from numbers(1e6)" + +query_id=$(random_str 10) +$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to Disk('default', 'backups/$CLICKHOUSE_DATABASE/data/backup1')" --backup_write_bandwidth=1M > /dev/null +$CLICKHOUSE_CLIENT -nm -q " + SYSTEM FLUSH LOGS; + SELECT + query_duration_ms >= 7e3, + ProfileEvents['ReadBufferFromFileDescriptorReadBytes'] > 8e6 + FROM system.query_log + WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart' +"