From 42b2383e0192afefbaf42da937e293af9c238f99 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 26 May 2021 23:37:44 +0300 Subject: [PATCH] Basic throttler implementation --- src/Core/Settings.h | 2 + src/IO/copyData.cpp | 33 +++++++++++---- src/IO/copyData.h | 6 +++ src/Interpreters/Context.cpp | 25 ++++++++++++ src/Interpreters/Context.h | 6 +++ src/Storages/MergeTree/DataPartsExchange.cpp | 43 +++++++++++++------- src/Storages/MergeTree/DataPartsExchange.h | 23 +++++++---- src/Storages/MergeTree/MergeTreeSettings.h | 2 + src/Storages/StorageReplicatedMergeTree.cpp | 12 ++++-- src/Storages/StorageReplicatedMergeTree.h | 14 +++++++ 10 files changed, 133 insertions(+), 33 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index d759e87dfbc..1a723df2d48 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -83,6 +83,8 @@ class IColumn; M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \ M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \ M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ + M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \ + M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \ \ M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \ diff --git a/src/IO/copyData.cpp b/src/IO/copyData.cpp index c653af761d4..428741f2a47 100644 --- a/src/IO/copyData.cpp +++ b/src/IO/copyData.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -14,7 +15,7 @@ namespace ErrorCodes namespace { -void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic * is_cancelled) +void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic * is_cancelled, ThrottlerPtr throttler) { /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false. while (bytes > 0 && !from.eof()) @@ -27,13 +28,16 @@ void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t to.write(from.position(), count); from.position() += count; bytes -= count; + + if (throttler) + throttler->add(count); } if (check_bytes && bytes > 0) throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF); } -void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function cancellation_hook) +void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function cancellation_hook, ThrottlerPtr throttler) { /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false. while (bytes > 0 && !from.eof()) @@ -46,6 +50,9 @@ void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t to.write(from.position(), count); from.position() += count; bytes -= count; + + if (throttler) + throttler->add(count); } if (check_bytes && bytes > 0) @@ -56,32 +63,42 @@ void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t void copyData(ReadBuffer & from, WriteBuffer & to) { - copyDataImpl(from, to, false, std::numeric_limits::max(), nullptr); + copyDataImpl(from, to, false, std::numeric_limits::max(), nullptr, nullptr); } void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic & is_cancelled) { - copyDataImpl(from, to, false, std::numeric_limits::max(), &is_cancelled); + copyDataImpl(from, to, false, std::numeric_limits::max(), &is_cancelled, nullptr); } void copyData(ReadBuffer & from, WriteBuffer & to, std::function cancellation_hook) { - copyDataImpl(from, to, false, std::numeric_limits::max(), cancellation_hook); + copyDataImpl(from, to, false, std::numeric_limits::max(), cancellation_hook, nullptr); } void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes) { - copyDataImpl(from, to, true, bytes, nullptr); + copyDataImpl(from, to, true, bytes, nullptr, nullptr); } void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic & is_cancelled) { - copyDataImpl(from, to, true, bytes, &is_cancelled); + copyDataImpl(from, to, true, bytes, &is_cancelled, nullptr); } void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function cancellation_hook) { - copyDataImpl(from, to, true, bytes, cancellation_hook); + copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr); +} + +void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic & is_cancelled, ThrottlerPtr throttler) +{ + copyDataImpl(from, to, true, std::numeric_limits::max(), &is_cancelled, throttler); +} + +void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic & is_cancelled, ThrottlerPtr throttler) +{ + copyDataImpl(from, to, true, bytes, &is_cancelled, throttler); } } diff --git a/src/IO/copyData.h b/src/IO/copyData.h index f888a039e9e..56244167861 100644 --- a/src/IO/copyData.h +++ b/src/IO/copyData.h @@ -9,6 +9,9 @@ namespace DB class ReadBuffer; class WriteBuffer; +class Throttler; + +using ThrottlerPtr = std::shared_ptr; /** Copies data from ReadBuffer to WriteBuffer, all that is. @@ -24,6 +27,9 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes); void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic & is_cancelled); void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic & is_cancelled); +void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic & is_cancelled, ThrottlerPtr throttler); +void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic & is_cancelled, ThrottlerPtr throttler); + void copyData(ReadBuffer & from, WriteBuffer & to, std::function cancellation_hook); void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function cancellation_hook); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7d9c2c27ee2..f1cf89dfaf6 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -360,6 +361,10 @@ struct ContextSharedPart mutable std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) mutable std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) mutable std::optional message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka) + + mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches + mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends + MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -1625,6 +1630,26 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const return *shared->message_broker_schedule_pool; } +ThrottlerPtr Context::getReplicatedFetchesThrottler() const +{ + auto lock = getLock(); + if (!shared->replicated_fetches_throttler) + shared->replicated_fetches_throttler = std::make_shared( + settings.max_replicated_fetches_network_bandwidth_for_server); + + return shared->replicated_fetches_throttler; +} + +ThrottlerPtr Context::getReplicatedSendsThrottler() const +{ + auto lock = getLock(); + if (!shared->replicated_sends_throttler) + shared->replicated_sends_throttler = std::make_shared( + settings.max_replicated_sends_network_bandwidth_for_server); + + return shared->replicated_fetches_throttler; +} + bool Context::hasDistributedDDL() const { return getConfigRef().has("distributed_ddl"); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 5089d2c0288..4fca9639a50 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -113,6 +113,9 @@ using VolumePtr = std::shared_ptr; struct NamedSession; struct BackgroundTaskSchedulingSettings; +class Throttler; +using ThrottlerPtr = std::shared_ptr; + class ZooKeeperMetadataTransaction; using ZooKeeperMetadataTransactionPtr = std::shared_ptr; @@ -657,6 +660,9 @@ public: BackgroundSchedulePool & getMessageBrokerSchedulePool() const; BackgroundSchedulePool & getDistributedSchedulePool() const; + ThrottlerPtr getReplicatedFetchesThrottler() const; + ThrottlerPtr getReplicatedSendsThrottler() const; + /// Has distributed_ddl configuration or not. bool hasDistributedDDL() const; void setDDLWorker(std::unique_ptr ddl_worker); diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 5ccb2ca8c27..9eeac69978f 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -86,6 +87,10 @@ struct ReplicatedFetchReadCallback } + +Service::Service(StorageReplicatedMergeTree & data_) : + data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {} + std::string Service::getId(const std::string & node_id) const { return getEndpointId(node_id); @@ -243,6 +248,8 @@ void Service::sendPartFromMemory( NativeBlockOutputStream block_out(out, 0, metadata_snapshot->getSampleBlock()); part->checksums.write(out); block_out.write(part_in_memory->block); + + data.getSendsThrottler()->add(part_in_memory->block.bytes()); } MergeTreeData::DataPart::Checksums Service::sendPartFromDisk( @@ -298,7 +305,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk( auto file_in = disk->readFile(path); HashingWriteBuffer hashing_out(out); - copyData(*file_in, hashing_out, blocker.getCounter()); + copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler()); if (blocker.isCancelled()) throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); @@ -354,7 +361,7 @@ void Service::sendPartS3Metadata(const MergeTreeData::DataPartPtr & part, WriteB auto file_in = createReadBufferFromFileBase(metadata_file, 0, 0, 0, nullptr, DBMS_DEFAULT_BUFFER_SIZE); HashingWriteBuffer hashing_out(out); - copyData(*file_in, hashing_out, blocker.getCounter()); + copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler()); if (blocker.isCancelled()) throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); @@ -388,6 +395,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( const String & user, const String & password, const String & interserver_scheme, + ThrottlerPtr throttler, bool to_detached, const String & tmp_prefix_, std::optional * tagger_ptr, @@ -514,7 +522,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( try { - return downloadPartToS3(part_name, replica_path, to_detached, tmp_prefix_, std::move(disks_s3), in); + return downloadPartToS3(part_name, replica_path, to_detached, tmp_prefix_, std::move(disks_s3), in, throttler); } catch (const Exception & e) { @@ -522,7 +530,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( throw; /// Try again but without S3 copy return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, - user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false); + user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false); } } @@ -585,8 +593,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( MergeTreeData::DataPart::Checksums checksums; return part_type == "InMemory" - ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections) - : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums); + ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections, throttler) + : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums, throttler); } MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( @@ -596,7 +604,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, - size_t projections) + size_t projections, + ThrottlerPtr throttler) { auto volume = std::make_shared("volume_" + part_name, reservation->getDisk(), 0); MergeTreeData::MutableDataPartPtr new_data_part = @@ -612,6 +621,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( NativeBlockInputStream block_in(in, 0); auto block = block_in.read(); + throttler->add(block.bytes()); MergeTreePartInfo new_part_info("all", 0, 0, 0); MergeTreeData::MutableDataPartPtr new_projection_part = @@ -643,6 +653,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( NativeBlockInputStream block_in(in, 0); auto block = block_in.read(); + throttler->add(block.bytes()); new_data_part->uuid = part_uuid; new_data_part->is_temp = true; @@ -666,7 +677,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk( bool sync, DiskPtr disk, PooledReadWriteBufferFromHTTP & in, - MergeTreeData::DataPart::Checksums & checksums) const + MergeTreeData::DataPart::Checksums & checksums, + ThrottlerPtr throttler) const { size_t files; readBinary(files, in); @@ -689,7 +701,7 @@ void Fetcher::downloadBaseOrProjectionPartToDisk( auto file_out = disk->writeFile(part_download_path + file_name); HashingWriteBuffer hashing_out(*file_out); - copyData(in, hashing_out, file_size, blocker.getCounter()); + copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler); if (blocker.isCancelled()) { @@ -726,7 +738,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( DiskPtr disk, PooledReadWriteBufferFromHTTP & in, size_t projections, - MergeTreeData::DataPart::Checksums & checksums) + MergeTreeData::DataPart::Checksums & checksums, + ThrottlerPtr throttler) { static const String TMP_PREFIX = "tmp_fetch_"; String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; @@ -763,13 +776,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( MergeTreeData::DataPart::Checksums projection_checksum; disk->createDirectories(part_download_path + projection_name + ".proj/"); downloadBaseOrProjectionPartToDisk( - replica_path, part_download_path + projection_name + ".proj/", sync, disk, in, projection_checksum); + replica_path, part_download_path + projection_name + ".proj/", sync, disk, in, projection_checksum, throttler); checksums.addFile( projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128()); } // Download the base part - downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums); + downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler); assertEOF(in); auto volume = std::make_shared("volume_" + part_name, disk, 0); @@ -787,8 +800,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3( bool to_detached, const String & tmp_prefix_, const Disks & disks_s3, - PooledReadWriteBufferFromHTTP & in - ) + PooledReadWriteBufferFromHTTP & in, + ThrottlerPtr throttler) { if (disks_s3.empty()) throw Exception("No S3 disks anymore", ErrorCodes::LOGICAL_ERROR); @@ -841,7 +854,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3( HashingWriteBuffer hashing_out(*file_out); - copyData(in, hashing_out, file_size, blocker.getCounter()); + copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler); if (blocker.isCancelled()) { diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index f59942ef7f4..eb776c33f0f 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace zkutil @@ -18,15 +19,17 @@ namespace zkutil namespace DB { +class StorageReplicatedMergeTree; + namespace DataPartsExchange { -/** Service for sending parts from the table *MergeTree. +/** Service for sending parts from the table *ReplicatedMergeTree. */ class Service final : public InterserverIOEndpoint { public: - explicit Service(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {} + explicit Service(StorageReplicatedMergeTree & data_); Service(const Service &) = delete; Service & operator=(const Service &) = delete; @@ -51,7 +54,7 @@ private: /// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish, /// so Service will never access dangling reference to storage - MergeTreeData & data; + StorageReplicatedMergeTree & data; Poco::Logger * log; }; @@ -74,6 +77,7 @@ public: const String & user, const String & password, const String & interserver_scheme, + ThrottlerPtr throttler, bool to_detached = false, const String & tmp_prefix_ = "", std::optional * tagger_ptr = nullptr, @@ -90,7 +94,9 @@ private: bool sync, DiskPtr disk, PooledReadWriteBufferFromHTTP & in, - MergeTreeData::DataPart::Checksums & checksums) const; + MergeTreeData::DataPart::Checksums & checksums, + ThrottlerPtr throttler) const; + MergeTreeData::MutableDataPartPtr downloadPartToDisk( const String & part_name, @@ -101,7 +107,8 @@ private: DiskPtr disk, PooledReadWriteBufferFromHTTP & in, size_t projections, - MergeTreeData::DataPart::Checksums & checksums); + MergeTreeData::DataPart::Checksums & checksums, + ThrottlerPtr throttler); MergeTreeData::MutableDataPartPtr downloadPartToMemory( const String & part_name, @@ -110,7 +117,8 @@ private: ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, - size_t projections); + size_t projections, + ThrottlerPtr throttler); MergeTreeData::MutableDataPartPtr downloadPartToS3( const String & part_name, @@ -118,7 +126,8 @@ private: bool to_detached, const String & tmp_prefix_, const Disks & disks_s3, - PooledReadWriteBufferFromHTTP & in); + PooledReadWriteBufferFromHTTP & in, + ThrottlerPtr throttler); MergeTreeData & data; Poco::Logger * log; diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 6e43d0fad77..da953eb0f47 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -91,6 +91,8 @@ struct Settings; M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \ M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \ M(Bool, detach_old_local_parts_when_cloning_replica, 1, "Do not remove old local parts when repairing lost replica.", 0) \ + M(UInt64, max_replicated_fetches_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \ + M(UInt64, max_replicated_sends_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \ \ /** Check delay of replicas settings. */ \ M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \ diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index b3da3e2287b..d776a51b06d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -287,6 +287,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , part_moves_between_shards_orchestrator(*this) , allow_renaming(allow_renaming_) , replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size) + , replicated_fetches_throttler(std::make_shared(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler())) + , replicated_sends_throttler(std::make_shared(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler())) { queue_updating_task = getContext()->getSchedulePool().createTask( getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); }); @@ -2501,7 +2503,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) part_desc->res_part = fetcher.fetchPart( metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path, - address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_"); + address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), + interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_"); /// TODO: check columns_version of fetched part @@ -2618,7 +2621,8 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr return fetcher.fetchPart( metadata_snapshot, getContext(), entry.new_part_name, source_replica_path, address.host, address.replication_port, - timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, true); + timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, + replicated_fetches_throttler, true); }; part = get_part(); @@ -4025,6 +4029,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora credentials->getUser(), credentials->getPassword(), interserver_scheme, + replicated_fetches_throttler, to_detached, "", &tagger_ptr, @@ -4174,7 +4179,8 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const return fetcher.fetchPart( metadata_snapshot, getContext(), part_name, source_replica_path, address.host, address.replication_port, - timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true, + timeouts, credentials->getUser(), credentials->getPassword(), + interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true, replaced_disk); }; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 4e697f2d1f2..93f86e8ae7e 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -239,6 +240,16 @@ public: /// Get best replica having this partition on S3 String getSharedDataReplica(const IMergeTreeDataPart & part) const; + ThrottlerPtr getFetchesThrottler() const + { + return replicated_fetches_throttler; + } + + ThrottlerPtr getSendsThrottler() const + { + return replicated_sends_throttler; + } + private: /// Get a sequential consistent view of current parts. ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; @@ -363,6 +374,9 @@ private: const size_t replicated_fetches_pool_size; + ThrottlerPtr replicated_fetches_throttler; + ThrottlerPtr replicated_sends_throttler; + template void foreachCommittedParts(Func && func, bool select_sequential_consistency) const;