mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
Merge pull request #45373 from ClickHouse/throttler-metrics
Add detailed profile events for throttling
This commit is contained in:
commit
91fb1eab8c
@ -64,7 +64,20 @@
|
||||
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries..") \
|
||||
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
|
||||
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
|
||||
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_network_bandwidth' and other throttling settings.") \
|
||||
\
|
||||
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
|
||||
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
|
||||
M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \
|
||||
M(DiskS3PutRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 PUT, COPY, POST and LIST request throttling.") \
|
||||
M(S3GetRequestThrottlerCount, "Number of S3 GET and SELECT requests passed through throttler.") \
|
||||
M(S3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 GET and SELECT request throttling.") \
|
||||
M(S3PutRequestThrottlerCount, "Number of S3 PUT, COPY, POST and LIST requests passed through throttler.") \
|
||||
M(S3PutRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 PUT, COPY, POST and LIST request throttling.") \
|
||||
M(RemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth_for_server' throttler.") \
|
||||
M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server' throttling.") \
|
||||
M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server' throttler.") \
|
||||
M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server' throttling.") \
|
||||
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \
|
||||
\
|
||||
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
|
||||
\
|
||||
|
@ -38,7 +38,7 @@ Throttler::Throttler(size_t max_speed_, size_t limit_, const char * limit_exceed
|
||||
, parent(parent_)
|
||||
{}
|
||||
|
||||
void Throttler::add(size_t amount)
|
||||
UInt64 Throttler::add(size_t amount)
|
||||
{
|
||||
// Values obtained under lock to be checked after release
|
||||
size_t count_value;
|
||||
@ -61,9 +61,10 @@ void Throttler::add(size_t amount)
|
||||
throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
|
||||
|
||||
/// Wait unless there is positive amount of tokens - throttling
|
||||
Int64 sleep_time = 0;
|
||||
if (max_speed && tokens_value < 0)
|
||||
{
|
||||
int64_t sleep_time = static_cast<int64_t>(-tokens_value / max_speed * NS);
|
||||
sleep_time = static_cast<Int64>(-tokens_value / max_speed * NS);
|
||||
accumulated_sleep += sleep_time;
|
||||
sleepForNanoseconds(sleep_time);
|
||||
accumulated_sleep -= sleep_time;
|
||||
@ -71,7 +72,9 @@ void Throttler::add(size_t amount)
|
||||
}
|
||||
|
||||
if (parent)
|
||||
parent->add(amount);
|
||||
sleep_time += parent->add(amount);
|
||||
|
||||
return static_cast<UInt64>(sleep_time);
|
||||
}
|
||||
|
||||
void Throttler::reset()
|
||||
|
@ -1,10 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/Throttler_fwd.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#include <base/sleep.h>
|
||||
#include <base/types.h>
|
||||
#include <atomic>
|
||||
|
||||
namespace DB
|
||||
@ -32,7 +34,16 @@ public:
|
||||
const std::shared_ptr<Throttler> & parent_ = nullptr);
|
||||
|
||||
/// Use `amount` tokens, sleeps if required or throws exception on limit overflow.
|
||||
void add(size_t amount);
|
||||
/// Returns duration of sleep in microseconds (to distinguish sleeping on different kinds of throttlers for metrics)
|
||||
UInt64 add(size_t amount);
|
||||
|
||||
UInt64 add(size_t amount, ProfileEvents::Event event_amount, ProfileEvents::Event event_sleep_us)
|
||||
{
|
||||
UInt64 sleep_us = add(amount);
|
||||
ProfileEvents::increment(event_amount, amount);
|
||||
ProfileEvents::increment(event_sleep_us, sleep_us);
|
||||
return sleep_us;
|
||||
}
|
||||
|
||||
/// Not thread safe
|
||||
void setParent(const std::shared_ptr<Throttler> & parent_)
|
||||
@ -50,12 +61,12 @@ private:
|
||||
size_t count{0};
|
||||
const size_t max_speed{0}; /// in tokens per second.
|
||||
const size_t max_burst{0}; /// in tokens.
|
||||
const uint64_t limit{0}; /// 0 - not limited.
|
||||
const UInt64 limit{0}; /// 0 - not limited.
|
||||
const char * limit_exceeded_exception_message = nullptr;
|
||||
std::mutex mutex;
|
||||
std::atomic<uint64_t> accumulated_sleep{0}; // Accumulated sleep time over all waiting threads
|
||||
std::atomic<UInt64> accumulated_sleep{0}; // Accumulated sleep time over all waiting threads
|
||||
double tokens{0}; /// Amount of tokens available in token bucket. Updated in `add` method.
|
||||
uint64_t prev_ns{0}; /// Previous `add` call time (in nanoseconds).
|
||||
UInt64 prev_ns{0}; /// Previous `add` call time (in nanoseconds).
|
||||
|
||||
/// Used to implement a hierarchy of throttlers
|
||||
std::shared_ptr<Throttler> parent;
|
||||
|
@ -7,8 +7,15 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <base/sleep.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event RemoteReadThrottlerBytes;
|
||||
extern const Event RemoteReadThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -91,7 +98,7 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
|
||||
{
|
||||
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
|
||||
if (read_settings.remote_throttler)
|
||||
read_settings.remote_throttler->add(bytes_read);
|
||||
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
|
||||
break;
|
||||
}
|
||||
catch (const Azure::Storage::StorageException & e)
|
||||
|
@ -8,6 +8,12 @@
|
||||
#include <Common/Throttler.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event RemoteWriteThrottlerBytes;
|
||||
extern const Event RemoteWriteThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -119,7 +125,7 @@ void WriteBufferFromAzureBlobStorage::nextImpl()
|
||||
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
|
||||
|
||||
if (write_settings.remote_throttler)
|
||||
write_settings.remote_throttler->add(size_to_upload);
|
||||
write_settings.remote_throttler->add(size_to_upload, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -26,6 +26,8 @@ namespace ProfileEvents
|
||||
extern const Event ReadBufferSeekCancelConnection;
|
||||
extern const Event S3GetObject;
|
||||
extern const Event DiskS3GetObject;
|
||||
extern const Event RemoteReadThrottlerBytes;
|
||||
extern const Event RemoteReadThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -186,7 +188,7 @@ bool ReadBufferFromS3::nextImpl()
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, working_buffer.size());
|
||||
offset += working_buffer.size();
|
||||
if (read_settings.remote_throttler)
|
||||
read_settings.remote_throttler->add(working_buffer.size());
|
||||
read_settings.remote_throttler->add(working_buffer.size(), ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -56,6 +56,16 @@ namespace ProfileEvents
|
||||
extern const Event DiskS3WriteRequestsErrors;
|
||||
extern const Event DiskS3WriteRequestsThrottling;
|
||||
extern const Event DiskS3WriteRequestsRedirects;
|
||||
|
||||
extern const Event S3GetRequestThrottlerCount;
|
||||
extern const Event S3GetRequestThrottlerSleepMicroseconds;
|
||||
extern const Event S3PutRequestThrottlerCount;
|
||||
extern const Event S3PutRequestThrottlerSleepMicroseconds;
|
||||
|
||||
extern const Event DiskS3GetRequestThrottlerCount;
|
||||
extern const Event DiskS3GetRequestThrottlerSleepMicroseconds;
|
||||
extern const Event DiskS3PutRequestThrottlerCount;
|
||||
extern const Event DiskS3PutRequestThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
@ -257,13 +267,27 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
case Aws::Http::HttpMethod::HTTP_GET:
|
||||
case Aws::Http::HttpMethod::HTTP_HEAD:
|
||||
if (get_request_throttler)
|
||||
get_request_throttler->add(1);
|
||||
{
|
||||
UInt64 sleep_us = get_request_throttler->add(1, ProfileEvents::S3GetRequestThrottlerCount, ProfileEvents::S3GetRequestThrottlerSleepMicroseconds);
|
||||
if (for_disk_s3)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3GetRequestThrottlerCount);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3GetRequestThrottlerSleepMicroseconds, sleep_us);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_PUT:
|
||||
case Aws::Http::HttpMethod::HTTP_POST:
|
||||
case Aws::Http::HttpMethod::HTTP_PATCH:
|
||||
if (put_request_throttler)
|
||||
put_request_throttler->add(1);
|
||||
{
|
||||
UInt64 sleep_us = put_request_throttler->add(1, ProfileEvents::S3PutRequestThrottlerCount, ProfileEvents::S3PutRequestThrottlerSleepMicroseconds);
|
||||
if (for_disk_s3)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3PutRequestThrottlerCount);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3PutRequestThrottlerSleepMicroseconds, sleep_us);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_DELETE:
|
||||
break; // Not throttled
|
||||
|
@ -39,6 +39,9 @@ namespace ProfileEvents
|
||||
extern const Event DiskS3CompleteMultipartUpload;
|
||||
extern const Event DiskS3UploadPart;
|
||||
extern const Event DiskS3PutObject;
|
||||
|
||||
extern const Event RemoteWriteThrottlerBytes;
|
||||
extern const Event RemoteWriteThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -108,7 +111,7 @@ void WriteBufferFromS3::nextImpl()
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, offset());
|
||||
last_part_size += offset();
|
||||
if (write_settings.remote_throttler)
|
||||
write_settings.remote_throttler->add(offset());
|
||||
write_settings.remote_throttler->add(offset(), ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
|
||||
|
||||
/// Data size exceeds singlepart upload threshold, need to use multipart upload.
|
||||
if (multipart_upload_id.empty() && last_part_size > settings.max_single_part_upload_size)
|
||||
|
@ -8,6 +8,12 @@
|
||||
#include <mutex>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event RemoteReadThrottlerBytes;
|
||||
extern const Event RemoteReadThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -103,7 +109,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
working_buffer.resize(bytes_read);
|
||||
file_offset += bytes_read;
|
||||
if (read_settings.remote_throttler)
|
||||
read_settings.remote_throttler->add(bytes_read);
|
||||
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,13 @@
|
||||
#include <Common/safe_cast.h>
|
||||
#include <hdfs/hdfs.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event RemoteWriteThrottlerBytes;
|
||||
extern const Event RemoteWriteThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -18,7 +25,6 @@ extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_FSYNC;
|
||||
}
|
||||
|
||||
|
||||
struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
|
||||
{
|
||||
std::string hdfs_uri;
|
||||
@ -59,13 +65,13 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
|
||||
int write(const char * start, size_t size) const
|
||||
{
|
||||
int bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
|
||||
if (write_settings.remote_throttler)
|
||||
write_settings.remote_throttler->add(bytes_written);
|
||||
|
||||
if (bytes_written < 0)
|
||||
throw Exception("Fail to write HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
|
||||
if (write_settings.remote_throttler)
|
||||
write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
|
||||
|
||||
return bytes_written;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user