mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #32575 from vitlibar/improve-quota-end-of-interval-calculation
Improve quota's end-of-interval calculations.
This commit is contained in:
commit
8f9b0b4ed0
@ -2,6 +2,7 @@
|
||||
#include <Access/QuotaUsage.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <base/chrono_io.h>
|
||||
#include <base/range.h>
|
||||
#include <boost/smart_ptr/make_shared.hpp>
|
||||
@ -15,6 +16,7 @@ namespace ErrorCodes
|
||||
extern const int QUOTA_EXPIRED;
|
||||
}
|
||||
|
||||
|
||||
struct EnabledQuota::Impl
|
||||
{
|
||||
[[noreturn]] static void throwQuotaExceed(
|
||||
@ -35,52 +37,6 @@ struct EnabledQuota::Impl
|
||||
}
|
||||
|
||||
|
||||
/// Returns the end of the current interval. If the passed `current_time` is greater than that end,
|
||||
/// the function automatically recalculates the interval's end by adding the interval's duration
|
||||
/// one or more times until the interval's end is greater than `current_time`.
|
||||
/// If that recalculation occurs the function also resets amounts of resources used and sets the variable
|
||||
/// `counters_were_reset`.
|
||||
static std::chrono::system_clock::time_point getEndOfInterval(
|
||||
const Interval & interval, std::chrono::system_clock::time_point current_time, bool & counters_were_reset)
|
||||
{
|
||||
auto & end_of_interval = interval.end_of_interval;
|
||||
auto end_loaded = end_of_interval.load();
|
||||
auto end = std::chrono::system_clock::time_point{end_loaded};
|
||||
if (current_time < end)
|
||||
{
|
||||
counters_were_reset = false;
|
||||
return end;
|
||||
}
|
||||
|
||||
bool need_reset_counters = false;
|
||||
|
||||
do
|
||||
{
|
||||
/// Calculate the end of the next interval:
|
||||
/// | X |
|
||||
/// end current_time next_end = end + duration * n
|
||||
/// where n is an integer number, n >= 1.
|
||||
const auto duration = interval.duration;
|
||||
UInt64 n = static_cast<UInt64>((current_time - end + duration) / duration);
|
||||
end = end + duration * n;
|
||||
if (end_of_interval.compare_exchange_strong(end_loaded, end.time_since_epoch()))
|
||||
{
|
||||
need_reset_counters = true;
|
||||
break;
|
||||
}
|
||||
end = std::chrono::system_clock::time_point{end_loaded};
|
||||
}
|
||||
while (current_time >= end);
|
||||
|
||||
if (need_reset_counters)
|
||||
{
|
||||
boost::range::fill(interval.used, 0);
|
||||
counters_were_reset = true;
|
||||
}
|
||||
return end;
|
||||
}
|
||||
|
||||
|
||||
static void used(
|
||||
const String & user_name,
|
||||
const Intervals & intervals,
|
||||
@ -89,33 +45,22 @@ struct EnabledQuota::Impl
|
||||
std::chrono::system_clock::time_point current_time,
|
||||
bool check_exceeded)
|
||||
{
|
||||
auto quota_type_i = static_cast<size_t>(quota_type);
|
||||
for (const auto & interval : intervals.intervals)
|
||||
{
|
||||
if (!interval.end_of_interval.load().count())
|
||||
{
|
||||
/// We need to calculate end of the interval if it hasn't been calculated before.
|
||||
bool dummy;
|
||||
getEndOfInterval(interval, current_time, dummy);
|
||||
}
|
||||
|
||||
auto quota_type_i = static_cast<size_t>(quota_type);
|
||||
QuotaValue used = (interval.used[quota_type_i] += value);
|
||||
QuotaValue max = interval.max[quota_type_i];
|
||||
|
||||
if (!max)
|
||||
continue;
|
||||
|
||||
if (used > max)
|
||||
{
|
||||
bool counters_were_reset = false;
|
||||
auto end_of_interval = getEndOfInterval(interval, current_time, counters_were_reset);
|
||||
auto end_of_interval = interval.getEndOfInterval(current_time, counters_were_reset);
|
||||
if (counters_were_reset)
|
||||
{
|
||||
used = (interval.used[quota_type_i] += value);
|
||||
if ((used > max) && check_exceeded)
|
||||
throwQuotaExceed(user_name, intervals.quota_name, quota_type, used, max, interval.duration, end_of_interval);
|
||||
}
|
||||
else if (check_exceeded)
|
||||
|
||||
if (check_exceeded && (used > max))
|
||||
throwQuotaExceed(user_name, intervals.quota_name, quota_type, used, max, interval.duration, end_of_interval);
|
||||
}
|
||||
}
|
||||
@ -130,23 +75,15 @@ struct EnabledQuota::Impl
|
||||
auto quota_type_i = static_cast<size_t>(quota_type);
|
||||
for (const auto & interval : intervals.intervals)
|
||||
{
|
||||
if (!interval.end_of_interval.load().count())
|
||||
{
|
||||
/// We need to calculate end of the interval if it hasn't been calculated before.
|
||||
bool dummy;
|
||||
getEndOfInterval(interval, current_time, dummy);
|
||||
}
|
||||
|
||||
QuotaValue used = interval.used[quota_type_i];
|
||||
QuotaValue max = interval.max[quota_type_i];
|
||||
|
||||
if (!max)
|
||||
continue;
|
||||
|
||||
if (used > max)
|
||||
{
|
||||
bool counters_were_reset = false;
|
||||
std::chrono::system_clock::time_point end_of_interval = getEndOfInterval(interval, current_time, counters_were_reset);
|
||||
auto end_of_interval = interval.getEndOfInterval(current_time, counters_were_reset);
|
||||
if (!counters_were_reset)
|
||||
throwQuotaExceed(user_name, intervals.quota_name, quota_type, used, max, interval.duration, end_of_interval);
|
||||
}
|
||||
@ -161,17 +98,32 @@ struct EnabledQuota::Impl
|
||||
for (auto quota_type : collections::range(QuotaType::MAX))
|
||||
checkExceeded(user_name, intervals, quota_type, current_time);
|
||||
}
|
||||
|
||||
static std::chrono::system_clock::duration randomDuration(std::chrono::seconds max)
|
||||
{
|
||||
auto count = std::chrono::duration_cast<std::chrono::system_clock::duration>(max).count();
|
||||
std::uniform_int_distribution<Int64> distribution{0, count - 1};
|
||||
return std::chrono::system_clock::duration(distribution(thread_local_rng));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
EnabledQuota::Interval::Interval()
|
||||
EnabledQuota::Interval::Interval(std::chrono::seconds duration_, bool randomize_interval_, std::chrono::system_clock::time_point current_time_)
|
||||
: duration(duration_) , randomize_interval(randomize_interval_)
|
||||
{
|
||||
std::chrono::system_clock::time_point initial_end{};
|
||||
if (randomize_interval_)
|
||||
initial_end += Impl::randomDuration(duration_);
|
||||
end_of_interval = initial_end.time_since_epoch();
|
||||
|
||||
for (auto quota_type : collections::range(QuotaType::MAX))
|
||||
{
|
||||
auto quota_type_i = static_cast<size_t>(quota_type);
|
||||
used[quota_type_i].store(0);
|
||||
max[quota_type_i] = 0;
|
||||
}
|
||||
|
||||
getEndOfInterval(current_time_); /// Force updating the end of the interval for the first time.
|
||||
}
|
||||
|
||||
|
||||
@ -193,6 +145,55 @@ EnabledQuota::Interval & EnabledQuota::Interval::operator =(const Interval & src
|
||||
}
|
||||
|
||||
|
||||
/// Returns the end of the current interval. If the passed `current_time` is greater than that end,
|
||||
/// the function automatically recalculates the interval's end by adding the interval's duration
|
||||
/// one or more times until the interval's end is greater than `current_time`.
|
||||
/// If that recalculation occurs the function also resets amounts of resources used and sets the variable
|
||||
/// `counters_were_reset`.
|
||||
std::chrono::system_clock::time_point EnabledQuota::Interval::getEndOfInterval(std::chrono::system_clock::time_point current_time) const
|
||||
{
|
||||
bool counters_were_reset;
|
||||
return getEndOfInterval(current_time, counters_were_reset);
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point EnabledQuota::Interval::getEndOfInterval(std::chrono::system_clock::time_point current_time, bool & counters_were_reset) const
|
||||
{
|
||||
auto end_loaded = end_of_interval.load();
|
||||
auto end = std::chrono::system_clock::time_point{end_loaded};
|
||||
if (current_time < end)
|
||||
{
|
||||
counters_were_reset = false;
|
||||
return end;
|
||||
}
|
||||
|
||||
bool need_reset_counters = false;
|
||||
|
||||
do
|
||||
{
|
||||
/// Calculate the end of the next interval:
|
||||
/// | X |
|
||||
/// end current_time next_end = end + duration * n
|
||||
/// where n is an integer number, n >= 1.
|
||||
UInt64 n = static_cast<UInt64>((current_time - end + duration) / duration);
|
||||
end = end + duration * n;
|
||||
if (end_of_interval.compare_exchange_strong(end_loaded, end.time_since_epoch()))
|
||||
{
|
||||
need_reset_counters = true;
|
||||
break;
|
||||
}
|
||||
end = std::chrono::system_clock::time_point{end_loaded};
|
||||
}
|
||||
while (current_time >= end);
|
||||
|
||||
if (need_reset_counters)
|
||||
{
|
||||
boost::range::fill(used, 0);
|
||||
counters_were_reset = true;
|
||||
}
|
||||
return end;
|
||||
}
|
||||
|
||||
|
||||
std::optional<QuotaUsage> EnabledQuota::Intervals::getUsage(std::chrono::system_clock::time_point current_time) const
|
||||
{
|
||||
if (!quota_id)
|
||||
@ -208,8 +209,7 @@ std::optional<QuotaUsage> EnabledQuota::Intervals::getUsage(std::chrono::system_
|
||||
auto & out = usage.intervals.back();
|
||||
out.duration = in.duration;
|
||||
out.randomize_interval = in.randomize_interval;
|
||||
bool counters_were_reset = false;
|
||||
out.end_of_interval = Impl::getEndOfInterval(in, current_time, counters_were_reset);
|
||||
out.end_of_interval = in.getEndOfInterval(current_time);
|
||||
for (auto quota_type : collections::range(QuotaType::MAX))
|
||||
{
|
||||
auto quota_type_i = static_cast<size_t>(quota_type);
|
||||
|
@ -73,9 +73,13 @@ private:
|
||||
bool randomize_interval = false;
|
||||
mutable std::atomic<std::chrono::system_clock::duration> end_of_interval;
|
||||
|
||||
Interval();
|
||||
Interval(std::chrono::seconds duration_, bool randomize_interval_, std::chrono::system_clock::time_point current_time_);
|
||||
|
||||
Interval(const Interval & src) { *this = src; }
|
||||
Interval & operator =(const Interval & src);
|
||||
|
||||
std::chrono::system_clock::time_point getEndOfInterval(std::chrono::system_clock::time_point current_time) const;
|
||||
std::chrono::system_clock::time_point getEndOfInterval(std::chrono::system_clock::time_point current_time, bool & counters_were_reset) const;
|
||||
};
|
||||
|
||||
struct Intervals
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <Access/QuotaUsage.h>
|
||||
#include <Access/AccessControl.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <base/range.h>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
@ -22,17 +21,6 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
std::chrono::system_clock::duration randomDuration(std::chrono::seconds max)
|
||||
{
|
||||
auto count = std::chrono::duration_cast<std::chrono::system_clock::duration>(max).count();
|
||||
std::uniform_int_distribution<Int64> distribution{0, count - 1};
|
||||
return std::chrono::system_clock::duration(distribution(thread_local_rng));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void QuotaCache::QuotaInfo::setQuota(const QuotaPtr & quota_, const UUID & quota_id_)
|
||||
{
|
||||
quota = quota_;
|
||||
@ -94,18 +82,21 @@ boost::shared_ptr<const EnabledQuota::Intervals> QuotaCache::QuotaInfo::getOrBui
|
||||
auto it = key_to_intervals.find(key);
|
||||
if (it != key_to_intervals.end())
|
||||
return it->second;
|
||||
return rebuildIntervals(key);
|
||||
return rebuildIntervals(key, std::chrono::system_clock::now());
|
||||
}
|
||||
|
||||
|
||||
void QuotaCache::QuotaInfo::rebuildAllIntervals()
|
||||
{
|
||||
if (key_to_intervals.empty())
|
||||
return;
|
||||
auto current_time = std::chrono::system_clock::now();
|
||||
for (const String & key : key_to_intervals | boost::adaptors::map_keys)
|
||||
rebuildIntervals(key);
|
||||
rebuildIntervals(key, current_time);
|
||||
}
|
||||
|
||||
|
||||
boost::shared_ptr<const EnabledQuota::Intervals> QuotaCache::QuotaInfo::rebuildIntervals(const String & key)
|
||||
boost::shared_ptr<const EnabledQuota::Intervals> QuotaCache::QuotaInfo::rebuildIntervals(const String & key, std::chrono::system_clock::time_point current_time)
|
||||
{
|
||||
auto new_intervals = boost::make_shared<Intervals>();
|
||||
new_intervals->quota_name = quota->getName();
|
||||
@ -115,14 +106,8 @@ boost::shared_ptr<const EnabledQuota::Intervals> QuotaCache::QuotaInfo::rebuildI
|
||||
intervals.reserve(quota->all_limits.size());
|
||||
for (const auto & limits : quota->all_limits)
|
||||
{
|
||||
intervals.emplace_back();
|
||||
intervals.emplace_back(limits.duration, limits.randomize_interval, current_time);
|
||||
auto & interval = intervals.back();
|
||||
interval.duration = limits.duration;
|
||||
std::chrono::system_clock::time_point end_of_interval{};
|
||||
interval.randomize_interval = limits.randomize_interval;
|
||||
if (limits.randomize_interval)
|
||||
end_of_interval += randomDuration(limits.duration);
|
||||
interval.end_of_interval = end_of_interval.time_since_epoch();
|
||||
for (auto quota_type : collections::range(QuotaType::MAX))
|
||||
{
|
||||
auto quota_type_i = static_cast<size_t>(quota_type);
|
||||
|
@ -43,7 +43,7 @@ private:
|
||||
|
||||
String calculateKey(const EnabledQuota & enabled_quota) const;
|
||||
boost::shared_ptr<const Intervals> getOrBuildIntervals(const String & key);
|
||||
boost::shared_ptr<const Intervals> rebuildIntervals(const String & key);
|
||||
boost::shared_ptr<const Intervals> rebuildIntervals(const String & key, std::chrono::system_clock::time_point current_time);
|
||||
void rebuildAllIntervals();
|
||||
|
||||
QuotaPtr quota;
|
||||
|
Loading…
Reference in New Issue
Block a user