mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
99 lines
2.5 KiB
C++
99 lines
2.5 KiB
C++
#include <Common/Throttler.h>
|
|
#include <Common/ProfileEvents.h>
|
|
#include <Common/Exception.h>
|
|
#include <Common/Stopwatch.h>
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
namespace ProfileEvents
|
|
{
|
|
extern const Event ThrottlerSleepMicroseconds;
|
|
}
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int LIMIT_EXCEEDED;
|
|
}
|
|
|
|
/// Just 10^9.
|
|
static constexpr auto NS = 1000000000UL;
|
|
|
|
Throttler::Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_)
|
|
: max_speed(max_speed_)
|
|
, max_burst(max_speed_ * default_burst_seconds)
|
|
, limit_exceeded_exception_message("")
|
|
, tokens(max_burst)
|
|
, parent(parent_)
|
|
{}
|
|
|
|
Throttler::Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
|
|
const std::shared_ptr<Throttler> & parent_)
|
|
: max_speed(max_speed_)
|
|
, max_burst(max_speed_ * default_burst_seconds)
|
|
, limit(limit_)
|
|
, limit_exceeded_exception_message(limit_exceeded_exception_message_)
|
|
, tokens(max_burst)
|
|
, parent(parent_)
|
|
{}
|
|
|
|
void Throttler::add(size_t amount)
|
|
{
|
|
// Values obtained under lock to be checked after release
|
|
size_t count_value;
|
|
double tokens_value;
|
|
{
|
|
std::lock_guard lock(mutex);
|
|
auto now = clock_gettime_ns_adjusted(prev_ns);
|
|
if (max_speed)
|
|
{
|
|
double delta_seconds = prev_ns ? static_cast<double>(now - prev_ns) / NS : 0;
|
|
tokens = std::min<double>(tokens + max_speed * delta_seconds - amount, max_burst);
|
|
}
|
|
count += amount;
|
|
count_value = count;
|
|
tokens_value = tokens;
|
|
prev_ns = now;
|
|
}
|
|
|
|
if (limit && count_value > limit)
|
|
throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
|
|
|
|
/// Wait unless there is positive amount of tokens - throttling
|
|
if (max_speed && tokens_value < 0)
|
|
{
|
|
int64_t sleep_time = static_cast<int64_t>(-tokens_value / max_speed * NS);
|
|
accumulated_sleep += sleep_time;
|
|
sleepForNanoseconds(sleep_time);
|
|
accumulated_sleep -= sleep_time;
|
|
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_time / 1000UL);
|
|
}
|
|
|
|
if (parent)
|
|
parent->add(amount);
|
|
}
|
|
|
|
void Throttler::reset()
|
|
{
|
|
std::lock_guard lock(mutex);
|
|
|
|
count = 0;
|
|
tokens = max_burst;
|
|
prev_ns = 0;
|
|
// NOTE: do not zero `accumulated_sleep` to avoid races
|
|
}
|
|
|
|
bool Throttler::isThrottling() const
|
|
{
|
|
if (accumulated_sleep != 0)
|
|
return true;
|
|
|
|
if (parent)
|
|
return parent->isThrottling();
|
|
|
|
return false;
|
|
}
|
|
|
|
}
|