Add ability to throttle local IO on per-query/server basis

Server settings:
- max_local_read_bandwidth_for_server
- max_local_write_bandwidth_for_server

Query settings:
- max_local_read_bandwidth
- max_local_write_bandwidth

This is the preparation for adding ability to throttle BACKUPs

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-03-30 19:02:28 +02:00
parent b3406beeb7
commit a25dd1d348
26 changed files with 263 additions and 28 deletions

View File

@ -79,6 +79,10 @@
M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server'/'max_remote_read_network_bandwidth' throttling.") \ M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server'/'max_remote_read_network_bandwidth' throttling.") \
M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttler.") \ M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttler.") \
M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttling.") \ M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttling.") \
M(LocalReadThrottlerBytes, "Bytes passed through 'max_local_read_bandwidth_for_server'/'max_local_read_bandwidth' throttler.") \
M(LocalReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_local_read_bandwidth_for_server'/'max_local_read_bandwidth' throttling.") \
M(LocalWriteThrottlerBytes, "Bytes passed through 'max_local_write_bandwidth_for_server'/'max_local_write_bandwidth' throttler.") \
M(LocalWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_local_write_bandwidth_for_server'/'max_local_write_bandwidth' throttling.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \ M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \
\ \
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \ M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \

View File

@ -23,6 +23,8 @@ namespace DB
M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \ M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \
M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited.", 0) \ M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited.", 0) \
M(UInt64, max_remote_write_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for write. Zero means unlimited.", 0) \ M(UInt64, max_remote_write_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for write. Zero means unlimited.", 0) \
M(UInt64, max_local_read_bandwidth_for_server, 0, "The maximum speed of local reads in bytes per second. Zero means unlimited.", 0) \
M(UInt64, max_local_write_bandwidth_for_server, 0, "The maximum speed of local writes in bytes per second. Zero means unlimited.", 0) \
M(UInt64, max_backups_io_thread_pool_size, 1000, "The maximum number of threads that would be used for IO operations for BACKUP queries", 0) \ M(UInt64, max_backups_io_thread_pool_size, 1000, "The maximum number of threads that would be used for IO operations for BACKUP queries", 0) \
M(UInt64, max_backups_io_thread_pool_free_size, 0, "Max free size for backups IO thread pool.", 0) \ M(UInt64, max_backups_io_thread_pool_free_size, 0, "Max free size for backups IO thread pool.", 0) \
M(UInt64, backups_io_thread_pool_queue_size, 0, "Queue size for backups IO thread pool.", 0) \ M(UInt64, backups_io_thread_pool_queue_size, 0, "Queue size for backups IO thread pool.", 0) \

View File

@ -102,6 +102,8 @@ class IColumn;
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \ M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_read_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for read.", 0) \ M(UInt64, max_remote_read_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for read.", 0) \
M(UInt64, max_remote_write_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for write.", 0) \ M(UInt64, max_remote_write_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for write.", 0) \
M(UInt64, max_local_read_bandwidth, 0, "The maximum speed of local reads in bytes per second.", 0) \
M(UInt64, max_local_write_bandwidth, 0, "The maximum speed of local writes in bytes per second.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \ M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \ M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\ \

View File

@ -1,4 +1,5 @@
#include "DiskLocal.h" #include "DiskLocal.h"
#include <Common/Throttler_fwd.h>
#include <Common/createHardLink.h> #include <Common/createHardLink.h>
#include "DiskFactory.h" #include "DiskFactory.h"
@ -367,10 +368,11 @@ std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path,
} }
std::unique_ptr<WriteBufferFromFileBase> std::unique_ptr<WriteBufferFromFileBase>
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &) DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings & settings)
{ {
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
return std::make_unique<WriteBufferFromFile>(fs::path(disk_path) / path, buf_size, flags); return std::make_unique<WriteBufferFromFile>(
fs::path(disk_path) / path, buf_size, flags, settings.local_throttler);
} }
void DiskLocal::removeFile(const String & path) void DiskLocal::removeFile(const String & path)

View File

@ -76,11 +76,25 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
if (settings.local_fs_method == LocalFSReadMethod::read) if (settings.local_fs_method == LocalFSReadMethod::read)
{ {
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); res = std::make_unique<ReadBufferFromFile>(
filename,
buffer_size,
actual_flags,
existing_memory,
buffer_alignment,
file_size,
settings.local_throttler);
} }
else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap) else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap)
{ {
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(
filename,
buffer_size,
actual_flags,
existing_memory,
buffer_alignment,
file_size,
settings.local_throttler);
} }
else if (settings.local_fs_method == LocalFSReadMethod::io_uring) else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
{ {
@ -90,7 +104,15 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>( res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
*reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); *reader,
settings.priority,
filename,
buffer_size,
actual_flags,
existing_memory,
buffer_alignment,
file_size,
settings.local_throttler);
#else #else
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Read method io_uring is only supported in Linux"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Read method io_uring is only supported in Linux");
#endif #endif
@ -103,7 +125,15 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER); auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>( res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); reader,
settings.priority,
filename,
buffer_size,
actual_flags,
existing_memory,
buffer_alignment,
file_size,
settings.local_throttler);
} }
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool) else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{ {
@ -113,7 +143,15 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER); auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>( res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); reader,
settings.priority,
filename,
buffer_size,
actual_flags,
existing_memory,
buffer_alignment,
file_size,
settings.local_throttler);
} }
else else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method");

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <Common/Throttler_fwd.h>
#include <IO/AsynchronousReadBufferFromFileDescriptor.h> #include <IO/AsynchronousReadBufferFromFileDescriptor.h>
#include <IO/OpenedFileCache.h> #include <IO/OpenedFileCache.h>
@ -7,6 +8,7 @@
namespace DB namespace DB
{ {
/* NOTE: Unused */
class AsynchronousReadBufferFromFile : public AsynchronousReadBufferFromFileDescriptor class AsynchronousReadBufferFromFile : public AsynchronousReadBufferFromFileDescriptor
{ {
protected: protected:
@ -62,8 +64,9 @@ public:
int flags = -1, int flags = -1,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0, size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt) std::optional<size_t> file_size_ = std::nullopt,
: AsynchronousReadBufferFromFileDescriptor(reader_, priority_, -1, buf_size, existing_memory, alignment, file_size_) ThrottlerPtr throttler_ = {})
: AsynchronousReadBufferFromFileDescriptor(reader_, priority_, -1, buf_size, existing_memory, alignment, file_size_, throttler_)
, file_name(file_name_) , file_name(file_name_)
{ {
file = OpenedFileCache::instance().get(file_name, flags); file = OpenedFileCache::instance().get(file_name, flags);

View File

@ -5,14 +5,17 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Throttler.h>
#include <Common/filesystemHelpers.h>
#include <IO/AsynchronousReadBufferFromFileDescriptor.h> #include <IO/AsynchronousReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Common/filesystemHelpers.h>
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event AsynchronousReadWaitMicroseconds; extern const Event AsynchronousReadWaitMicroseconds;
extern const Event LocalReadThrottlerBytes;
extern const Event LocalReadThrottlerSleepMicroseconds;
} }
namespace CurrentMetrics namespace CurrentMetrics
@ -92,6 +95,8 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
assert(offset <= size); assert(offset <= size);
size_t bytes_read = size - offset; size_t bytes_read = size - offset;
if (throttler)
throttler->add(bytes_read, ProfileEvents::LocalReadThrottlerBytes, ProfileEvents::LocalReadThrottlerSleepMicroseconds);
if (bytes_read) if (bytes_read)
{ {
@ -117,6 +122,8 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
assert(offset <= size); assert(offset <= size);
size_t bytes_read = size - offset; size_t bytes_read = size - offset;
if (throttler)
throttler->add(bytes_read, ProfileEvents::LocalReadThrottlerBytes, ProfileEvents::LocalReadThrottlerSleepMicroseconds);
if (bytes_read) if (bytes_read)
{ {
@ -149,12 +156,14 @@ AsynchronousReadBufferFromFileDescriptor::AsynchronousReadBufferFromFileDescript
size_t buf_size, size_t buf_size,
char * existing_memory, char * existing_memory,
size_t alignment, size_t alignment,
std::optional<size_t> file_size_) std::optional<size_t> file_size_,
ThrottlerPtr throttler_)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_) : ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
, reader(reader_) , reader(reader_)
, base_priority(priority_) , base_priority(priority_)
, required_alignment(alignment) , required_alignment(alignment)
, fd(fd_) , fd(fd_)
, throttler(throttler_)
{ {
if (required_alignment > buf_size) if (required_alignment > buf_size)
throw Exception( throw Exception(

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <IO/AsynchronousReader.h> #include <IO/AsynchronousReader.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Common/Throttler_fwd.h>
#include <optional> #include <optional>
#include <unistd.h> #include <unistd.h>
@ -26,6 +27,7 @@ protected:
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
size_t bytes_to_ignore = 0; /// How many bytes should we ignore upon a new read request. size_t bytes_to_ignore = 0; /// How many bytes should we ignore upon a new read request.
int fd; int fd;
ThrottlerPtr throttler;
bool nextImpl() override; bool nextImpl() override;
@ -42,7 +44,8 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0, size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt); std::optional<size_t> file_size_ = std::nullopt,
ThrottlerPtr throttler_ = {});
~AsynchronousReadBufferFromFileDescriptor() override; ~AsynchronousReadBufferFromFileDescriptor() override;

View File

@ -30,8 +30,10 @@ ReadBufferFromFile::ReadBufferFromFile(
int flags, int flags,
char * existing_memory, char * existing_memory,
size_t alignment, size_t alignment,
std::optional<size_t> file_size_) std::optional<size_t> file_size_,
: ReadBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment, file_size_), file_name(file_name_) ThrottlerPtr throttler_)
: ReadBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment, file_size_, throttler_)
, file_name(file_name_)
{ {
ProfileEvents::increment(ProfileEvents::FileOpen); ProfileEvents::increment(ProfileEvents::FileOpen);
@ -61,8 +63,9 @@ ReadBufferFromFile::ReadBufferFromFile(
size_t buf_size, size_t buf_size,
char * existing_memory, char * existing_memory,
size_t alignment, size_t alignment,
std::optional<size_t> file_size_) std::optional<size_t> file_size_,
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment, file_size_) ThrottlerPtr throttler_)
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment, file_size_, throttler_)
, file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name) , file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
{ {
fd_ = -1; fd_ = -1;

View File

@ -29,7 +29,8 @@ public:
int flags = -1, int flags = -1,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0, size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt); std::optional<size_t> file_size_ = std::nullopt,
ThrottlerPtr throttler = {});
/// Use pre-opened file descriptor. /// Use pre-opened file descriptor.
explicit ReadBufferFromFile( explicit ReadBufferFromFile(
@ -38,7 +39,8 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0, size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt); std::optional<size_t> file_size_ = std::nullopt,
ThrottlerPtr throttler = {});
~ReadBufferFromFile() override; ~ReadBufferFromFile() override;
@ -88,8 +90,9 @@ public:
int flags = -1, int flags = -1,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0, size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt) std::optional<size_t> file_size_ = std::nullopt,
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment, file_size_) ThrottlerPtr throttler_ = {})
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment, file_size_, throttler_)
, file_name(file_name_) , file_name(file_name_)
{ {
file = OpenedFileCache::instance().get(file_name, flags); file = OpenedFileCache::instance().get(file_name, flags);

View File

@ -5,6 +5,7 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Throttler.h>
#include <IO/ReadBufferFromFileDescriptor.h> #include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
@ -21,6 +22,8 @@ namespace ProfileEvents
extern const Event ReadBufferFromFileDescriptorReadBytes; extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event DiskReadElapsedMicroseconds; extern const Event DiskReadElapsedMicroseconds;
extern const Event Seek; extern const Event Seek;
extern const Event LocalReadThrottlerBytes;
extern const Event LocalReadThrottlerSleepMicroseconds;
} }
namespace CurrentMetrics namespace CurrentMetrics
@ -82,7 +85,12 @@ bool ReadBufferFromFileDescriptor::nextImpl()
} }
if (res > 0) if (res > 0)
{
bytes_read += res; bytes_read += res;
if (throttler)
throttler->add(res, ProfileEvents::LocalReadThrottlerBytes, ProfileEvents::LocalReadThrottlerSleepMicroseconds);
}
/// It reports real time spent including the time spent while thread was preempted doing nothing. /// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).

View File

@ -2,6 +2,7 @@
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Common/Throttler_fwd.h>
#include <unistd.h> #include <unistd.h>
@ -21,6 +22,8 @@ protected:
int fd; int fd;
ThrottlerPtr throttler;
bool nextImpl() override; bool nextImpl() override;
void prefetch(int64_t priority) override; void prefetch(int64_t priority) override;
@ -33,10 +36,12 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0, size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt) std::optional<size_t> file_size_ = std::nullopt,
ThrottlerPtr throttler_ = {})
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_) : ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
, required_alignment(alignment) , required_alignment(alignment)
, fd(fd_) , fd(fd_)
, throttler(throttler_)
{ {
} }
@ -78,8 +83,9 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0, size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt) std::optional<size_t> file_size_ = std::nullopt,
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment, file_size_) ThrottlerPtr throttler_ = {})
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment, file_size_, throttler_)
{ {
use_pread = true; use_pread = true;
} }

View File

@ -81,7 +81,7 @@ struct ReadSettings
size_t mmap_threshold = 0; size_t mmap_threshold = 0;
MMappedFileCache * mmap_cache = nullptr; MMappedFileCache * mmap_cache = nullptr;
/// For 'pread_threadpool' method. Lower is more priority. /// For 'pread_threadpool'/'io_uring' method. Lower is more priority.
size_t priority = 0; size_t priority = 0;
bool load_marks_asynchronously = true; bool load_marks_asynchronously = true;
@ -109,6 +109,7 @@ struct ReadSettings
/// Bandwidth throttler to use during reading /// Bandwidth throttler to use during reading
ThrottlerPtr remote_throttler; ThrottlerPtr remote_throttler;
ThrottlerPtr local_throttler;
// Resource to be used during reading // Resource to be used during reading
ResourceLink resource_link; ResourceLink resource_link;

View File

@ -29,10 +29,11 @@ WriteBufferFromFile::WriteBufferFromFile(
const std::string & file_name_, const std::string & file_name_,
size_t buf_size, size_t buf_size,
int flags, int flags,
ThrottlerPtr throttler_,
mode_t mode, mode_t mode,
char * existing_memory, char * existing_memory,
size_t alignment) size_t alignment)
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment, file_name_) : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_)
{ {
ProfileEvents::increment(ProfileEvents::FileOpen); ProfileEvents::increment(ProfileEvents::FileOpen);
@ -63,9 +64,10 @@ WriteBufferFromFile::WriteBufferFromFile(
int & fd_, int & fd_,
const std::string & original_file_name, const std::string & original_file_name,
size_t buf_size, size_t buf_size,
ThrottlerPtr throttler_,
char * existing_memory, char * existing_memory,
size_t alignment) size_t alignment)
: WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment, original_file_name) : WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name)
{ {
fd_ = -1; fd_ = -1;
} }

View File

@ -3,6 +3,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Throttler_fwd.h>
#include <IO/WriteBufferFromFileDescriptor.h> #include <IO/WriteBufferFromFileDescriptor.h>
@ -32,6 +33,7 @@ public:
const std::string & file_name_, const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1, int flags = -1,
ThrottlerPtr throttler_ = {},
mode_t mode = 0666, mode_t mode = 0666,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0); size_t alignment = 0);
@ -41,6 +43,7 @@ public:
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object. int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {}, const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
ThrottlerPtr throttler_ = {},
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0); size_t alignment = 0);

View File

@ -3,6 +3,7 @@
#include <cassert> #include <cassert>
#include <sys/stat.h> #include <sys/stat.h>
#include <Common/Throttler.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
@ -20,6 +21,8 @@ namespace ProfileEvents
extern const Event DiskWriteElapsedMicroseconds; extern const Event DiskWriteElapsedMicroseconds;
extern const Event FileSync; extern const Event FileSync;
extern const Event FileSyncElapsedMicroseconds; extern const Event FileSyncElapsedMicroseconds;
extern const Event LocalWriteThrottlerBytes;
extern const Event LocalWriteThrottlerSleepMicroseconds;
} }
namespace CurrentMetrics namespace CurrentMetrics
@ -71,7 +74,11 @@ void WriteBufferFromFileDescriptor::nextImpl()
} }
if (res > 0) if (res > 0)
{
bytes_written += res; bytes_written += res;
if (throttler)
throttler->add(res, ProfileEvents::LocalWriteThrottlerBytes, ProfileEvents::LocalWriteThrottlerSleepMicroseconds);
}
} }
ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
@ -85,10 +92,12 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
int fd_, int fd_,
size_t buf_size, size_t buf_size,
char * existing_memory, char * existing_memory,
ThrottlerPtr throttler_,
size_t alignment, size_t alignment,
std::string file_name_) std::string file_name_)
: WriteBufferFromFileBase(buf_size, existing_memory, alignment) : WriteBufferFromFileBase(buf_size, existing_memory, alignment)
, fd(fd_) , fd(fd_)
, throttler(throttler_)
, file_name(std::move(file_name_)) , file_name(std::move(file_name_))
{ {
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <IO/WriteBufferFromFileBase.h> #include <IO/WriteBufferFromFileBase.h>
#include <Common/Throttler_fwd.h>
namespace DB namespace DB
@ -15,6 +16,7 @@ public:
int fd_ = -1, int fd_ = -1,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
ThrottlerPtr throttler_ = {},
size_t alignment = 0, size_t alignment = 0,
std::string file_name_ = ""); std::string file_name_ = "");
@ -49,6 +51,7 @@ protected:
void nextImpl() override; void nextImpl() override;
int fd; int fd;
ThrottlerPtr throttler;
/// If file has name contains filename, otherwise contains string "(fd=...)" /// If file has name contains filename, otherwise contains string "(fd=...)"
std::string file_name; std::string file_name;

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file_) WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file_)
: WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, 0600), tmp_file(std::move(tmp_file_)) : WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600), tmp_file(std::move(tmp_file_))
{} {}

View File

@ -11,6 +11,7 @@ struct WriteSettings
{ {
/// Bandwidth throttler to use during writing /// Bandwidth throttler to use during writing
ThrottlerPtr remote_throttler; ThrottlerPtr remote_throttler;
ThrottlerPtr local_throttler;
// Resource to be used during reading // Resource to be used during reading
ResourceLink resource_link; ResourceLink resource_link;

View File

@ -46,7 +46,7 @@ int main(int, char **)
/// Write to file with O_DIRECT, read as usual. /// Write to file with O_DIRECT, read as usual.
{ {
WriteBufferFromFile wb("test2", BUF_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666, nullptr, page_size); WriteBufferFromFile wb("test2", BUF_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, /* throttler= */ {}, 0666, nullptr, page_size);
for (size_t i = 0; i < N; ++i) for (size_t i = 0; i < N; ++i)
writeStringBinary(test, wb); writeStringBinary(test, wb);

View File

@ -281,6 +281,9 @@ struct ContextSharedPart : boost::noncopyable
mutable ThrottlerPtr remote_read_throttler; /// A server-wide throttler for remote IO reads mutable ThrottlerPtr remote_read_throttler; /// A server-wide throttler for remote IO reads
mutable ThrottlerPtr remote_write_throttler; /// A server-wide throttler for remote IO writes mutable ThrottlerPtr remote_write_throttler; /// A server-wide throttler for remote IO writes
mutable ThrottlerPtr local_read_throttler; /// A server-wide throttler for local IO reads
mutable ThrottlerPtr local_write_throttler; /// A server-wide throttler for local IO writes
MultiVersion<Macros> macros; /// Substitutions extracted from config. MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk. std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part. /// Rules for selecting the compression settings, depending on the size of the part.
@ -1772,6 +1775,12 @@ void Context::makeQueryContext()
{ {
getRemoteReadThrottler(); getRemoteReadThrottler();
getRemoteWriteThrottler(); getRemoteWriteThrottler();
getLocalReadThrottler();
getLocalWriteThrottler();
getBackupsReadThrottler();
getBackupsWriteThrottler();
} }
} }
@ -2390,6 +2399,54 @@ ThrottlerPtr Context::getRemoteWriteThrottler() const
return throttler; return throttler;
} }
ThrottlerPtr Context::getLocalReadThrottler() const
{
ThrottlerPtr throttler;
if (shared->server_settings.max_local_read_bandwidth_for_server)
{
auto lock = getLock();
if (!shared->local_read_throttler)
shared->local_read_throttler = std::make_shared<Throttler>(shared->server_settings.max_local_read_bandwidth_for_server);
throttler = shared->local_read_throttler;
}
const auto & query_settings = getSettingsRef();
if (query_settings.max_local_read_bandwidth)
{
auto lock = getLock();
if (!local_read_query_throttler)
local_read_query_throttler = std::make_shared<Throttler>(query_settings.max_local_read_bandwidth, throttler);
throttler = local_read_query_throttler;
}
return throttler;
}
ThrottlerPtr Context::getLocalWriteThrottler() const
{
ThrottlerPtr throttler;
if (shared->server_settings.max_local_write_bandwidth_for_server)
{
auto lock = getLock();
if (!shared->local_write_throttler)
shared->local_write_throttler = std::make_shared<Throttler>(shared->server_settings.max_local_write_bandwidth_for_server);
throttler = shared->local_write_throttler;
}
const auto & query_settings = getSettingsRef();
if (query_settings.max_local_write_bandwidth)
{
auto lock = getLock();
if (!local_write_query_throttler)
local_write_query_throttler = std::make_shared<Throttler>(query_settings.max_local_write_bandwidth, throttler);
throttler = local_write_query_throttler;
}
return throttler;
}
bool Context::hasDistributedDDL() const bool Context::hasDistributedDDL() const
{ {
return getConfigRef().has("distributed_ddl"); return getConfigRef().has("distributed_ddl");
@ -4098,6 +4155,7 @@ ReadSettings Context::getReadSettings() const
res.priority = settings.read_priority; res.priority = settings.read_priority;
res.remote_throttler = getRemoteReadThrottler(); res.remote_throttler = getRemoteReadThrottler();
res.local_throttler = getLocalReadThrottler();
res.http_max_tries = settings.http_max_tries; res.http_max_tries = settings.http_max_tries;
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms; res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
@ -4120,6 +4178,7 @@ WriteSettings Context::getWriteSettings() const
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload; res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;
res.remote_throttler = getRemoteWriteThrottler(); res.remote_throttler = getRemoteWriteThrottler();
res.local_throttler = getLocalWriteThrottler();
return res; return res;
} }

View File

@ -1154,9 +1154,15 @@ public:
ThrottlerPtr getRemoteReadThrottler() const; ThrottlerPtr getRemoteReadThrottler() const;
ThrottlerPtr getRemoteWriteThrottler() const; ThrottlerPtr getRemoteWriteThrottler() const;
ThrottlerPtr getLocalReadThrottler() const;
ThrottlerPtr getLocalWriteThrottler() const;
private: private:
mutable ThrottlerPtr remote_read_query_throttler; /// A query-wide throttler for remote IO reads mutable ThrottlerPtr remote_read_query_throttler; /// A query-wide throttler for remote IO reads
mutable ThrottlerPtr remote_write_query_throttler; /// A query-wide throttler for remote IO writes mutable ThrottlerPtr remote_write_query_throttler; /// A query-wide throttler for remote IO writes
mutable ThrottlerPtr local_read_query_throttler; /// A query-wide throttler for local IO reads
mutable ThrottlerPtr local_write_query_throttler; /// A query-wide throttler for local IO writes
}; };
struct HTTPContext : public IHTTPContext struct HTTPContext : public IHTTPContext

View File

@ -0,0 +1,3 @@
read 1 1 1 1
pread 1 1 1 1
pread_threadpool 1 1 1 1

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
# Tags: no-s3-storage, no-random-settings, no-random-merge-tree-settings
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, min_bytes_for_compact_part=0;
"
# reading 1e6*8 bytes with 1M bandwith it should take (8-1)/1=7 seconds
$CLICKHOUSE_CLIENT -q "insert into data select * from numbers(1e6)"
read_methods=(
read
pread
pread_threadpool
# NOTE: io_uring doing all IO from one thread, that is not attached to the query
# io_uring
# NOTE: mmap cannot be throttled
# mmap
)
for read_method in "${read_methods[@]}"; do
query_id=$(random_str 10)
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "select * from data format Null settings max_local_read_bandwidth='1M', local_filesystem_read_method='$read_method'"
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT
'$read_method',
query_duration_ms >= 7e3,
ProfileEvents['ReadBufferFromFileDescriptorReadBytes'] > 8e6,
ProfileEvents['LocalReadThrottlerBytes'] > 8e6,
ProfileEvents['LocalReadThrottlerSleepMicroseconds'] > 7e6
FROM system.query_log
WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart'
"
done

View File

@ -0,0 +1 @@
1 1 1 1

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
# Tags: no-s3-storage
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, min_bytes_for_compact_part=0;
"
query_id=$(random_str 10)
# writes 1e6*8 bytes with 1M bandwith it should take (8-1)/1=7 seconds
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "insert into data select * from numbers(1e6) settings max_local_write_bandwidth='1M'"
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT
query_duration_ms >= 7e3,
ProfileEvents['WriteBufferFromFileDescriptorWriteBytes'] > 8e6,
ProfileEvents['LocalWriteThrottlerBytes'] > 8e6,
ProfileEvents['LocalWriteThrottlerSleepMicroseconds'] > 7e6
FROM system.query_log
WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart'
"