ClickHouse/src/Common/Throttler.h

137 lines
3.5 KiB
C++
Raw Normal View History

2015-02-10 20:50:01 +00:00
#pragma once
#include <mutex>
#include <memory>
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
2019-07-10 20:47:39 +00:00
#include <common/sleep.h>
#include <IO/WriteHelpers.h>
2015-02-10 20:50:01 +00:00
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LIMIT_EXCEEDED;
}
/** Allows you to limit the speed of something (in entities per second) using sleep.
2017-05-07 20:25:26 +00:00
* Specifics of work:
* - only the average speed is considered, from the moment of the first call of `add` function;
* if there were periods with low speed, then during some time after them, the speed will be higher;
*
* Also allows you to set a limit on the maximum number of entities. If exceeded, an exception will be thrown.
2015-02-10 20:50:01 +00:00
*/
class Throttler
{
public:
2019-08-03 11:02:40 +00:00
Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_ = nullptr)
: max_speed(max_speed_), limit_exceeded_exception_message(""), parent(parent_) {}
Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
2019-08-03 11:02:40 +00:00
const std::shared_ptr<Throttler> & parent_ = nullptr)
: max_speed(max_speed_), limit(limit_), limit_exceeded_exception_message(limit_exceeded_exception_message_), parent(parent_) {}
void add(const size_t amount)
{
size_t new_count;
UInt64 elapsed_ns = 0;
{
2018-03-09 22:11:42 +00:00
std::lock_guard lock(mutex);
if (max_speed)
{
if (0 == count)
{
watch.start();
elapsed_ns = 0;
}
else
elapsed_ns = watch.elapsed();
}
count += amount;
new_count = count;
}
if (limit && new_count > limit)
throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
if (max_speed)
{
/// How much time to wait for the average speed to become `max_speed`.
UInt64 desired_ns = new_count * 1000000000 / max_speed;
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
2021-05-27 12:54:47 +00:00
accumulated_sleep += sleep_ns;
2019-07-10 20:47:39 +00:00
sleepForNanoseconds(sleep_ns);
2021-05-27 12:54:47 +00:00
accumulated_sleep -= sleep_ns;
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL);
}
}
if (parent)
parent->add(amount);
}
2015-02-10 20:50:01 +00:00
/// Not thread safe
void setParent(const std::shared_ptr<Throttler> & parent_)
{
parent = parent_;
}
2018-03-09 22:11:42 +00:00
void reset()
{
std::lock_guard lock(mutex);
count = 0;
watch.reset();
2021-05-27 12:54:47 +00:00
accumulated_sleep = 0;
}
/// Is throttler already accumulated some sleep time and throttling.
bool isThrottling() const
{
if (accumulated_sleep != 0)
return true;
if (parent)
return parent->isThrottling();
return false;
2018-03-09 22:11:42 +00:00
}
2015-02-10 20:50:01 +00:00
private:
size_t count = 0;
2018-03-09 22:11:42 +00:00
const size_t max_speed = 0;
2019-02-10 15:17:45 +00:00
const UInt64 limit = 0; /// 0 - not limited.
const char * limit_exceeded_exception_message = nullptr;
Stopwatch watch {CLOCK_MONOTONIC_COARSE};
std::mutex mutex;
2021-05-27 12:54:47 +00:00
std::atomic<UInt64> accumulated_sleep{0};
/// Used to implement a hierarchy of throttlers
std::shared_ptr<Throttler> parent;
2015-02-10 20:50:01 +00:00
};
using ThrottlerPtr = std::shared_ptr<Throttler>;
}