ClickHouse/dbms/include/DB/Common/Throttler.h
2015-09-25 15:52:47 +03:00

81 lines
2.4 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
#include <mutex>
#include <memory>
#include <statdaemons/Stopwatch.h>
#include <DB/Core/Exception.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
/** Позволяет ограничить скорость чего либо (в штуках в секунду) с помощью sleep.
* Особенности работы:
* - считается только средняя скорость, от момента первого вызова функции add;
* если были периоды с низкой скоростью, то в течение промежутка времени после них, скорость будет выше;
*
* Также позволяет задать ограничение на максимальное количество в штуках. При превышении кидается исключение.
*/
class Throttler
{
public:
Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_)
: max_speed(max_speed_), limit(limit_), limit_exceeded_exception_message(limit_exceeded_exception_message_) {}
void add(size_t amount)
{
size_t new_count;
UInt64 elapsed_ns = 0;
{
std::lock_guard<std::mutex> lock(mutex);
if (max_speed)
{
if (0 == count)
{
watch.start();
elapsed_ns = 0;
}
else
elapsed_ns = watch.elapsed();
}
count += amount;
new_count = count;
}
if (limit && new_count > limit)
throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
if (max_speed)
{
/// Сколько должно было бы пройти времени, если бы скорость была равна max_speed.
UInt64 desired_ns = new_count * 1000000000 / max_speed;
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
timespec sleep_ts;
sleep_ts.tv_sec = sleep_ns / 1000000000;
sleep_ts.tv_nsec = sleep_ns % 1000000000;
nanosleep(&sleep_ts, nullptr); /// NOTE Завершается раньше в случае сигнала. Это считается нормальным.
}
}
}
private:
size_t max_speed = 0;
size_t count = 0;
size_t limit = 0; /// 0 - не ограничено.
const char * limit_exceeded_exception_message = nullptr;
Stopwatch watch {CLOCK_MONOTONIC_COARSE};
std::mutex mutex;
};
typedef std::shared_ptr<Throttler> ThrottlerPtr;
}