Add per-query network throttling

Controlled with:
- max_remote_read_network_bandwidth
- max_remote_write_network_bandwidth

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-03-30 19:00:34 +02:00
parent abdb682048
commit b3406beeb7
4 changed files with 32 additions and 4 deletions

View File

@ -75,10 +75,10 @@
M(S3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 GET and SELECT request throttling.") \ M(S3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 GET and SELECT request throttling.") \
M(S3PutRequestThrottlerCount, "Number of S3 PUT, COPY, POST and LIST requests passed through throttler.") \ M(S3PutRequestThrottlerCount, "Number of S3 PUT, COPY, POST and LIST requests passed through throttler.") \
M(S3PutRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 PUT, COPY, POST and LIST request throttling.") \ M(S3PutRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 PUT, COPY, POST and LIST request throttling.") \
M(RemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth_for_server' throttler.") \ M(RemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth_for_server'/'max_remote_read_network_bandwidth' throttler.") \
M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server' throttling.") \ M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server'/'max_remote_read_network_bandwidth' throttling.") \
M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server' throttler.") \ M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttler.") \
M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server' throttling.") \ M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttling.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \ M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \
\ \
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \ M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \

View File

@ -100,6 +100,8 @@ class IColumn;
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \ M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \ M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_read_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for read.", 0) \
M(UInt64, max_remote_write_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for write.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \ M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \ M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\ \

View File

@ -1767,6 +1767,12 @@ ContextMutablePtr Context::getBufferContext() const
void Context::makeQueryContext() void Context::makeQueryContext()
{ {
query_context = shared_from_this(); query_context = shared_from_this();
/// Create throttlers, to inherit the ThrottlePtr in the context copies.
{
getRemoteReadThrottler();
getRemoteWriteThrottler();
}
} }
void Context::makeSessionContext() void Context::makeSessionContext()
@ -2345,6 +2351,14 @@ ThrottlerPtr Context::getRemoteReadThrottler() const
throttler = shared->remote_read_throttler; throttler = shared->remote_read_throttler;
} }
if (query_settings.max_remote_read_network_bandwidth)
{
auto lock = getLock();
if (!remote_read_query_throttler)
remote_read_query_throttler = std::make_shared<Throttler>(query_settings.max_remote_read_network_bandwidth, throttler);
throttler = remote_read_query_throttler;
}
return throttler; return throttler;
} }
@ -2365,6 +2379,14 @@ ThrottlerPtr Context::getRemoteWriteThrottler() const
throttler = shared->remote_write_throttler; throttler = shared->remote_write_throttler;
} }
if (query_settings.max_remote_write_network_bandwidth)
{
auto lock = getLock();
if (!remote_write_query_throttler)
remote_write_query_throttler = std::make_shared<Throttler>(query_settings.max_remote_write_network_bandwidth, throttler);
throttler = remote_write_query_throttler;
}
return throttler; return throttler;
} }

View File

@ -1153,6 +1153,10 @@ public:
ThrottlerPtr getRemoteReadThrottler() const; ThrottlerPtr getRemoteReadThrottler() const;
ThrottlerPtr getRemoteWriteThrottler() const; ThrottlerPtr getRemoteWriteThrottler() const;
private:
mutable ThrottlerPtr remote_read_query_throttler; /// A query-wide throttler for remote IO reads
mutable ThrottlerPtr remote_write_query_throttler; /// A query-wide throttler for remote IO writes
}; };
struct HTTPContext : public IHTTPContext struct HTTPContext : public IHTTPContext