Basic throttler implementation

This commit is contained in:
alesapin 2021-05-26 23:37:44 +03:00
parent ee12652f8a
commit 42b2383e01
10 changed files with 133 additions and 33 deletions

View File

@ -83,6 +83,8 @@ class IColumn;
M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
\
M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \

View File

@ -1,4 +1,5 @@
#include <Common/Exception.h>
#include <Common/Throttler.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/copyData.h>
@ -14,7 +15,7 @@ namespace ErrorCodes
namespace
{
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic<int> * is_cancelled)
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic<int> * is_cancelled, ThrottlerPtr throttler)
{
/// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
while (bytes > 0 && !from.eof())
@ -27,13 +28,16 @@ void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t
to.write(from.position(), count);
from.position() += count;
bytes -= count;
if (throttler)
throttler->add(count);
}
if (check_bytes && bytes > 0)
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function<void()> cancellation_hook)
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function<void()> cancellation_hook, ThrottlerPtr throttler)
{
/// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
while (bytes > 0 && !from.eof())
@ -46,6 +50,9 @@ void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t
to.write(from.position(), count);
from.position() += count;
bytes -= count;
if (throttler)
throttler->add(count);
}
if (check_bytes && bytes > 0)
@ -56,32 +63,42 @@ void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t
void copyData(ReadBuffer & from, WriteBuffer & to)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr);
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr, nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled);
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), cancellation_hook);
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), cancellation_hook, nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
{
copyDataImpl(from, to, true, bytes, nullptr);
copyDataImpl(from, to, true, bytes, nullptr, nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled)
{
copyDataImpl(from, to, true, bytes, &is_cancelled);
copyDataImpl(from, to, true, bytes, &is_cancelled, nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook)
{
copyDataImpl(from, to, true, bytes, cancellation_hook);
copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr);
}
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
{
copyDataImpl(from, to, true, std::numeric_limits<size_t>::max(), &is_cancelled, throttler);
}
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
{
copyDataImpl(from, to, true, bytes, &is_cancelled, throttler);
}
}

View File

@ -9,6 +9,9 @@ namespace DB
class ReadBuffer;
class WriteBuffer;
class Throttler;
using ThrottlerPtr = std::shared_ptr<Throttler>;
/** Copies data from ReadBuffer to WriteBuffer, all that is.
@ -24,6 +27,9 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes);
void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled);
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);

View File

@ -11,6 +11,7 @@
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/Throttler.h>
#include <Common/thread_local_rng.h>
#include <Coordination/KeeperStorageDispatcher.h>
#include <Compression/ICompressionCodec.h>
@ -360,6 +361,10 @@ struct ContextSharedPart
mutable std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
@ -1625,6 +1630,26 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
return *shared->message_broker_schedule_pool;
}
ThrottlerPtr Context::getReplicatedFetchesThrottler() const
{
auto lock = getLock();
if (!shared->replicated_fetches_throttler)
shared->replicated_fetches_throttler = std::make_shared<Throttler>(
settings.max_replicated_fetches_network_bandwidth_for_server);
return shared->replicated_fetches_throttler;
}
ThrottlerPtr Context::getReplicatedSendsThrottler() const
{
auto lock = getLock();
if (!shared->replicated_sends_throttler)
shared->replicated_sends_throttler = std::make_shared<Throttler>(
settings.max_replicated_sends_network_bandwidth_for_server);
return shared->replicated_fetches_throttler;
}
bool Context::hasDistributedDDL() const
{
return getConfigRef().has("distributed_ddl");

View File

@ -113,6 +113,9 @@ using VolumePtr = std::shared_ptr<IVolume>;
struct NamedSession;
struct BackgroundTaskSchedulingSettings;
class Throttler;
using ThrottlerPtr = std::shared_ptr<Throttler>;
class ZooKeeperMetadataTransaction;
using ZooKeeperMetadataTransactionPtr = std::shared_ptr<ZooKeeperMetadataTransaction>;
@ -657,6 +660,9 @@ public:
BackgroundSchedulePool & getMessageBrokerSchedulePool() const;
BackgroundSchedulePool & getDistributedSchedulePool() const;
ThrottlerPtr getReplicatedFetchesThrottler() const;
ThrottlerPtr getReplicatedSendsThrottler() const;
/// Has distributed_ddl configuration or not.
bool hasDistributedDDL() const;
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <IO/createReadBufferFromFileBase.h>
@ -86,6 +87,10 @@ struct ReplicatedFetchReadCallback
}
Service::Service(StorageReplicatedMergeTree & data_) :
data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
@ -243,6 +248,8 @@ void Service::sendPartFromMemory(
NativeBlockOutputStream block_out(out, 0, metadata_snapshot->getSampleBlock());
part->checksums.write(out);
block_out.write(part_in_memory->block);
data.getSendsThrottler()->add(part_in_memory->block.bytes());
}
MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
@ -298,7 +305,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
auto file_in = disk->readFile(path);
HashingWriteBuffer hashing_out(out);
copyData(*file_in, hashing_out, blocker.getCounter());
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
@ -354,7 +361,7 @@ void Service::sendPartS3Metadata(const MergeTreeData::DataPartPtr & part, WriteB
auto file_in = createReadBufferFromFileBase(metadata_file, 0, 0, 0, nullptr, DBMS_DEFAULT_BUFFER_SIZE);
HashingWriteBuffer hashing_out(out);
copyData(*file_in, hashing_out, blocker.getCounter());
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
@ -388,6 +395,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & user,
const String & password,
const String & interserver_scheme,
ThrottlerPtr throttler,
bool to_detached,
const String & tmp_prefix_,
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
@ -514,7 +522,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
try
{
return downloadPartToS3(part_name, replica_path, to_detached, tmp_prefix_, std::move(disks_s3), in);
return downloadPartToS3(part_name, replica_path, to_detached, tmp_prefix_, std::move(disks_s3), in, throttler);
}
catch (const Exception & e)
{
@ -522,7 +530,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
throw;
/// Try again but without S3 copy
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false);
}
}
@ -585,8 +593,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
MergeTreeData::DataPart::Checksums checksums;
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums, throttler);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
@ -596,7 +604,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections)
size_t projections,
ThrottlerPtr throttler)
{
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, reservation->getDisk(), 0);
MergeTreeData::MutableDataPartPtr new_data_part =
@ -612,6 +621,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
NativeBlockInputStream block_in(in, 0);
auto block = block_in.read();
throttler->add(block.bytes());
MergeTreePartInfo new_part_info("all", 0, 0, 0);
MergeTreeData::MutableDataPartPtr new_projection_part =
@ -643,6 +653,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
NativeBlockInputStream block_in(in, 0);
auto block = block_in.read();
throttler->add(block.bytes());
new_data_part->uuid = part_uuid;
new_data_part->is_temp = true;
@ -666,7 +677,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
MergeTreeData::DataPart::Checksums & checksums) const
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler) const
{
size_t files;
readBinary(files, in);
@ -689,7 +701,7 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
auto file_out = disk->writeFile(part_download_path + file_name);
HashingWriteBuffer hashing_out(*file_out);
copyData(in, hashing_out, file_size, blocker.getCounter());
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
if (blocker.isCancelled())
{
@ -726,7 +738,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
MergeTreeData::DataPart::Checksums & checksums)
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler)
{
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
@ -763,13 +776,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
MergeTreeData::DataPart::Checksums projection_checksum;
disk->createDirectories(part_download_path + projection_name + ".proj/");
downloadBaseOrProjectionPartToDisk(
replica_path, part_download_path + projection_name + ".proj/", sync, disk, in, projection_checksum);
replica_path, part_download_path + projection_name + ".proj/", sync, disk, in, projection_checksum, throttler);
checksums.addFile(
projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
}
// Download the base part
downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums);
downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler);
assertEOF(in);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
@ -787,8 +800,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
bool to_detached,
const String & tmp_prefix_,
const Disks & disks_s3,
PooledReadWriteBufferFromHTTP & in
)
PooledReadWriteBufferFromHTTP & in,
ThrottlerPtr throttler)
{
if (disks_s3.empty())
throw Exception("No S3 disks anymore", ErrorCodes::LOGICAL_ERROR);
@ -841,7 +854,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
HashingWriteBuffer hashing_out(*file_out);
copyData(in, hashing_out, file_size, blocker.getCounter());
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
if (blocker.isCancelled())
{

View File

@ -7,6 +7,7 @@
#include <IO/copyData.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Common/Throttler.h>
namespace zkutil
@ -18,15 +19,17 @@ namespace zkutil
namespace DB
{
class StorageReplicatedMergeTree;
namespace DataPartsExchange
{
/** Service for sending parts from the table *MergeTree.
/** Service for sending parts from the table *ReplicatedMergeTree.
*/
class Service final : public InterserverIOEndpoint
{
public:
explicit Service(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
explicit Service(StorageReplicatedMergeTree & data_);
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
@ -51,7 +54,7 @@ private:
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,
/// so Service will never access dangling reference to storage
MergeTreeData & data;
StorageReplicatedMergeTree & data;
Poco::Logger * log;
};
@ -74,6 +77,7 @@ public:
const String & user,
const String & password,
const String & interserver_scheme,
ThrottlerPtr throttler,
bool to_detached = false,
const String & tmp_prefix_ = "",
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr = nullptr,
@ -90,7 +94,9 @@ private:
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
MergeTreeData::DataPart::Checksums & checksums) const;
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler) const;
MergeTreeData::MutableDataPartPtr downloadPartToDisk(
const String & part_name,
@ -101,7 +107,8 @@ private:
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
MergeTreeData::DataPart::Checksums & checksums);
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler);
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
const String & part_name,
@ -110,7 +117,8 @@ private:
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections);
size_t projections,
ThrottlerPtr throttler);
MergeTreeData::MutableDataPartPtr downloadPartToS3(
const String & part_name,
@ -118,7 +126,8 @@ private:
bool to_detached,
const String & tmp_prefix_,
const Disks & disks_s3,
PooledReadWriteBufferFromHTTP & in);
PooledReadWriteBufferFromHTTP & in,
ThrottlerPtr throttler);
MergeTreeData & data;
Poco::Logger * log;

View File

@ -91,6 +91,8 @@ struct Settings;
M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \
M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
M(Bool, detach_old_local_parts_when_cloning_replica, 1, "Do not remove old local parts when repairing lost replica.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \

View File

@ -287,6 +287,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, part_moves_between_shards_orchestrator(*this)
, allow_renaming(allow_renaming_)
, replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size)
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
{
queue_updating_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
@ -2501,7 +2503,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
part_desc->res_part = fetcher.fetchPart(
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path,
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -2618,7 +2621,8 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
return fetcher.fetchPart(
metadata_snapshot, getContext(), entry.new_part_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, true);
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
replicated_fetches_throttler, true);
};
part = get_part();
@ -4025,6 +4029,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
credentials->getUser(),
credentials->getPassword(),
interserver_scheme,
replicated_fetches_throttler,
to_detached,
"",
&tagger_ptr,
@ -4174,7 +4179,8 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
return fetcher.fetchPart(
metadata_snapshot, getContext(), part_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true,
replaced_disk);
};

View File

@ -26,6 +26,7 @@
#include <Interpreters/PartLog.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Throttler.h>
#include <Core/BackgroundSchedulePool.h>
#include <Processors/Pipe.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
@ -239,6 +240,16 @@ public:
/// Get best replica having this partition on S3
String getSharedDataReplica(const IMergeTreeDataPart & part) const;
ThrottlerPtr getFetchesThrottler() const
{
return replicated_fetches_throttler;
}
ThrottlerPtr getSendsThrottler() const
{
return replicated_sends_throttler;
}
private:
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
@ -363,6 +374,9 @@ private:
const size_t replicated_fetches_pool_size;
ThrottlerPtr replicated_fetches_throttler;
ThrottlerPtr replicated_sends_throttler;
template <class Func>
void foreachCommittedParts(Func && func, bool select_sequential_consistency) const;