add max_remote_{read,write}_network_bandwidth_for_server settings

This commit is contained in:
Sergei Trifonov 2022-07-11 14:59:39 +02:00
parent 7fd37d5c54
commit 43779ec280
15 changed files with 133 additions and 29 deletions

View File

@ -1,5 +1,7 @@
#pragma once
#include <Common/Throttler_fwd.h>
#include <mutex>
#include <memory>
#include <base/sleep.h>
@ -57,7 +59,4 @@ private:
std::shared_ptr<Throttler> parent;
};
using ThrottlerPtr = std::shared_ptr<Throttler>;
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <memory>
namespace DB
{
class Throttler;
using ThrottlerPtr = std::shared_ptr<Throttler>;
}

View File

@ -96,6 +96,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_write_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for write. Zero means unlimited. Only has meaning at server startup.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\

View File

@ -47,4 +47,28 @@ void IObjectStorage::copyObjectToAnotherObjectStorage(const std::string & object
out->finalize();
}
void IObjectStorage::applyRemoteThrottlingSettings(ContextPtr context)
{
std::unique_lock lock{throttlers_mutex};
read_throttler = context->getRemoteReadThrottler();
write_throttler = context->getRemoteWriteThrottler();
}
ReadSettings IObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
std::unique_lock lock{throttlers_mutex};
ReadSettings settings{read_settings};
settings.throttler = read_throttler;
return settings;
}
WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings) const
{
std::unique_lock lock{throttlers_mutex};
WriteSettings settings{write_settings};
settings.throttler = write_throttler;
return settings;
}
}

View File

@ -3,6 +3,7 @@
#include <filesystem>
#include <string>
#include <map>
#include <mutex>
#include <optional>
#include <Poco/Timestamp.h>
@ -157,6 +158,19 @@ public:
protected:
FileCachePtr cache;
protected:
/// Should be called from implementation of applyNewSettings()
void applyRemoteThrottlingSettings(ContextPtr context);
/// Should be used by implementation of read* and write* methods
ReadSettings patchSettings(const ReadSettings & read_settings) const;
WriteSettings patchSettings(const WriteSettings & write_settings) const;
private:
mutable std::mutex throttlers_mutex;
ThrottlerPtr read_throttler;
ThrottlerPtr write_throttler;
};
using ObjectStoragePtr = std::unique_ptr<IObjectStorage>;

View File

@ -114,15 +114,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
std::optional<size_t>,
std::optional<size_t>) const
{
ReadSettings disk_read_settings{read_settings};
if (cache)
{
if (IFileCache::isReadOnly())
disk_read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
disk_read_settings.remote_fs_cache = cache;
}
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = s3_settings.get();
@ -153,19 +145,10 @@ std::unique_ptr<SeekableReadBuffer> S3ObjectStorage::readObject( /// NOLINT
std::optional<size_t>) const
{
auto settings_ptr = s3_settings.get();
ReadSettings disk_read_settings{read_settings};
if (cache)
{
if (IFileCache::isReadOnly())
disk_read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
disk_read_settings.remote_fs_cache = cache;
}
ReadSettings disk_read_settings = patchSettings(read_settings);
return std::make_unique<ReadBufferFromS3>(client.get(), bucket, path, version_id, settings_ptr->s3_settings.max_single_read_retries, disk_read_settings);
}
std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLINT
const std::string & path,
WriteMode mode, // S3 doesn't support append, only rewrite
@ -174,12 +157,14 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
size_t buf_size,
const WriteSettings & write_settings)
{
WriteSettings disk_write_settings = patchSettings(write_settings);
if (mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files");
bool cache_on_write = cache
&& fs::path(path).extension() != ".tmp"
&& write_settings.enable_filesystem_cache_on_write_operations
&& disk_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_write_operations;
auto settings_ptr = s3_settings.get();
@ -189,7 +174,9 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
path,
settings_ptr->s3_settings,
attributes,
buf_size, threadPoolCallbackRunner(getThreadPoolWriter()),
buf_size,
threadPoolCallbackRunner(getThreadPoolWriter()),
disk_write_settings,
cache_on_write ? cache : nullptr);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), path);
@ -457,6 +444,19 @@ void S3ObjectStorage::copyObject(const std::string & object_from, const std::str
copyObjectImpl(bucket, object_from, bucket, object_to, head, object_to_attributes);
}
ReadSettings S3ObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
ReadSettings settings{read_settings};
if (cache)
{
if (IFileCache::isReadOnly())
settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
settings.remote_fs_cache = cache;
}
return IObjectStorage::patchSettings(settings);
}
void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_)
{
s3_settings.set(std::move(s3_settings_));
@ -489,6 +489,7 @@ void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration &
{
s3_settings.set(getSettings(config, config_prefix, context));
client.set(getClient(config, config_prefix, context));
applyRemoteThrottlingSettings(context);
}
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)

View File

@ -122,6 +122,9 @@ public:
ContextPtr context) override;
private:
ReadSettings patchSettings(const ReadSettings & read_settings) const;
WriteSettings patchSettings(const WriteSettings & write_settings) const;
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);

View File

@ -5,12 +5,13 @@
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h>
#include <Common/Stopwatch.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
#include <Common/logger_useful.h>
#include <base/sleep.h>
@ -300,6 +301,9 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
read_result = outcome.GetResultWithOwnership();
if (read_settings.throttler)
read_settings.throttler->add(read_result.GetContentLength());
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
}

View File

@ -4,6 +4,7 @@
#include <string>
#include <Core/Defines.h>
#include <Common/FileCache_fwd.h>
#include <Common/Throttler_fwd.h>
namespace DB
{
@ -89,6 +90,9 @@ struct ReadSettings
FileCachePtr remote_fs_cache;
/// Bandwidth throttler to use during reading
ThrottlerPtr throttler;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;

View File

@ -4,6 +4,7 @@
#include <Common/logger_useful.h>
#include <Common/IFileCache.h>
#include <Common/Throttler.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
@ -61,6 +62,7 @@ WriteBufferFromS3::WriteBufferFromS3(
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_,
ScheduleFunc schedule_,
const WriteSettings & write_settings_,
FileCachePtr cache_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_)
@ -70,6 +72,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, s3_settings(s3_settings_)
, object_metadata(std::move(object_metadata_))
, schedule(std::move(schedule_))
, write_settings(write_settings_)
, cache(cache_)
{
allocateBuffer();
@ -331,6 +334,8 @@ void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & re
void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
{
auto outcome = client_ptr->UploadPart(task.req);
if (write_settings.throttler)
write_settings.throttler->add(bytes);
if (outcome.IsSuccess())
{
@ -460,9 +465,12 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
{
size_t bytes = task.req.GetContentLength();
auto outcome = client_ptr->PutObject(task.req);
bool with_pool = static_cast<bool>(schedule);
if (write_settings.throttler)
write_settings.throttler->add(bytes);
bool with_pool = static_cast<bool>(schedule);
if (outcome.IsSuccess())
LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
else

View File

@ -16,6 +16,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <Storages/StorageS3Settings.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>
@ -55,6 +56,7 @@ public:
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ScheduleFunc schedule_ = {},
const WriteSettings & write_settings_ = {},
FileCachePtr cache_ = nullptr);
~WriteBufferFromS3() override;
@ -119,6 +121,8 @@ private:
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
WriteSettings write_settings;
FileCachePtr cache;
size_t current_download_offset = 0;
std::optional<FileSegmentsHolder> file_segments_holder;

View File

@ -1,5 +1,7 @@
#pragma once
#include <Common/Throttler_fwd.h>
namespace DB
{
@ -7,6 +9,9 @@ namespace DB
struct WriteSettings
{
bool enable_filesystem_cache_on_write_operations = false;
/// Bandwidth throttler to use during writing
ThrottlerPtr throttler;
};
}

View File

@ -228,8 +228,10 @@ struct ContextSharedPart
mutable std::unique_ptr<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::unique_ptr<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
mutable ThrottlerPtr remote_read_throttler; /// A server-wide throttler for remote IO reads
mutable ThrottlerPtr remote_write_throttler; /// A server-wide throttler for remote IO writes
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
@ -1930,6 +1932,26 @@ ThrottlerPtr Context::getReplicatedSendsThrottler() const
return shared->replicated_sends_throttler;
}
ThrottlerPtr Context::getRemoteReadThrottler() const
{
auto lock = getLock();
if (!shared->remote_read_throttler)
shared->remote_read_throttler = std::make_shared<Throttler>(
settings.max_remote_read_network_bandwidth_for_server);
return shared->remote_read_throttler;
}
ThrottlerPtr Context::getRemoteWriteThrottler() const
{
auto lock = getLock();
if (!shared->remote_write_throttler)
shared->remote_write_throttler = std::make_shared<Throttler>(
settings.max_remote_write_network_bandwidth_for_server);
return shared->remote_write_throttler;
}
bool Context::hasDistributedDDL() const
{
return getConfigRef().has("distributed_ddl");

View File

@ -819,6 +819,8 @@ public:
ThrottlerPtr getReplicatedFetchesThrottler() const;
ThrottlerPtr getReplicatedSendsThrottler() const;
ThrottlerPtr getRemoteReadThrottler() const;
ThrottlerPtr getRemoteWriteThrottler() const;
/// Has distributed_ddl configuration or not.
bool hasDistributedDDL() const;

View File

@ -587,7 +587,8 @@ public:
s3_configuration_.rw_settings,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner(IOThreadPool::get())),
threadPoolCallbackRunner(IOThreadPool::get()),
WriteSettings{.throttler = context->getRemoteWriteThrottler()}),
compression_method,
3);
writer