Overhaul timestamp arithmetic

This commit is contained in:
Michael Kolupaev 2023-11-25 03:00:32 +00:00
parent 01369a0a8a
commit a7c369e14f
15 changed files with 411 additions and 555 deletions

View File

@ -0,0 +1,139 @@
#include <Common/CalendarTimeInterval.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
CalendarTimeInterval::CalendarTimeInterval(const CalendarTimeInterval::Intervals & intervals)
{
for (auto [kind, val] : intervals)
{
switch (kind.kind)
{
case IntervalKind::Nanosecond:
case IntervalKind::Microsecond:
case IntervalKind::Millisecond:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Sub-second intervals are not supported here");
case IntervalKind::Second:
case IntervalKind::Minute:
case IntervalKind::Hour:
case IntervalKind::Day:
case IntervalKind::Week:
seconds += val * kind.toAvgSeconds();
break;
case IntervalKind::Month:
months += val;
break;
case IntervalKind::Quarter:
months += val * 3;
break;
case IntervalKind::Year:
months += val * 12;
break;
}
}
}
CalendarTimeInterval::Intervals CalendarTimeInterval::toIntervals() const
{
Intervals res;
auto greedy = [&](UInt64 x, std::initializer_list<std::pair<IntervalKind, UInt64>> kinds)
{
for (auto [kind, count] : kinds)
{
UInt64 k = x / count;
if (k == 0)
continue;
x -= k * count;
res.emplace_back(kind, k);
}
chassert(x == 0);
};
greedy(months, {{IntervalKind::Year, 12}, {IntervalKind::Month, 1}});
greedy(seconds, {{IntervalKind::Week, 3600*24*7}, {IntervalKind::Day, 3600*24}, {IntervalKind::Hour, 3600}, {IntervalKind::Minute, 60}, {IntervalKind::Second, 1}});
return res;
}
UInt64 CalendarTimeInterval::minSeconds() const
{
return 3600*24 * (365 * months/12 + 28 * months%12) + seconds;
}
UInt64 CalendarTimeInterval::maxSeconds() const
{
return 3600*24 * (366 * months/12 + 31 * months%12) + seconds;
}
void CalendarTimeInterval::assertSingleUnit() const
{
if (seconds && months)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Interval shouldn't contain both calendar units and clock units (e.g. months and days)");
}
void CalendarTimeInterval::assertPositive() const
{
if (!seconds && !months)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Interval must be positive");
}
/// Number of whole months between 1970-01-01 and `t`.
static Int64 toAbsoluteMonth(std::chrono::system_clock::time_point t)
{
std::chrono::year_month_day ymd(std::chrono::floor<std::chrono::days>(t));
return (Int64(int(ymd.year())) - 1970) * 12 + Int64(unsigned(ymd.month()) - 1);
}
static std::chrono::sys_seconds startOfAbsoluteMonth(Int64 absolute_month)
{
Int64 year = absolute_month >= 0 ? absolute_month/12 : -((-absolute_month+11)/12);
Int64 month = absolute_month - year*12;
chassert(month >= 0 && month < 12);
std::chrono::year_month_day ymd(
std::chrono::year(int(year + 1970)),
std::chrono::month(unsigned(month + 1)),
std::chrono::day(1));
return std::chrono::sys_days(ymd);
}
std::chrono::sys_seconds CalendarTimeInterval::advance(std::chrono::system_clock::time_point tp) const
{
auto t = std::chrono::sys_seconds(std::chrono::floor<std::chrono::seconds>(tp));
if (months)
{
auto m = toAbsoluteMonth(t);
auto s = t - startOfAbsoluteMonth(m);
t = startOfAbsoluteMonth(m + Int64(months)) + s;
}
return t + std::chrono::seconds(Int64(seconds));
}
std::chrono::sys_seconds CalendarTimeInterval::floor(std::chrono::system_clock::time_point tp) const
{
assertSingleUnit();
assertPositive();
if (months)
return startOfAbsoluteMonth(toAbsoluteMonth(tp) / months * months);
else
{
constexpr std::chrono::seconds epoch(-3600*24*3);
auto t = std::chrono::sys_seconds(std::chrono::floor<std::chrono::seconds>(tp));
/// We want to align with weeks, but 1970-01-01 is a Thursday, so align with 1969-12-29 instead.
return std::chrono::sys_seconds((t.time_since_epoch() - epoch) / seconds * seconds + epoch);
}
}
bool CalendarTimeInterval::operator==(const CalendarTimeInterval & rhs) const
{
return std::tie(months, seconds) == std::tie(rhs.months, rhs.seconds);
}
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Common/IntervalKind.h>
#include <chrono>
namespace DB
{
/// Represents a duration of calendar time, e.g.:
/// * 2 weeks + 5 minutes + and 21 seconds (aka 605121 seconds),
/// * 1 (calendar) month - not equivalent to any number of seconds!
/// * 3 years + 2 weeks (aka 36 months + 604800 seconds).
///
/// Be careful with calendar arithmetic: it's missing many familiar properties of numbers.
/// E.g. x + y - y is not always equal to x (October 31 + 1 month - 1 month = November 1).
struct CalendarTimeInterval
{
UInt64 seconds = 0;
UInt64 months = 0;
using Intervals = std::vector<std::pair<IntervalKind, UInt64>>;
CalendarTimeInterval() = default;
/// Year, Quarter, Month are converted to months.
/// Week, Day, Hour, Minute, Second are converted to seconds.
/// Millisecond, Microsecond, Nanosecond throw exception.
explicit CalendarTimeInterval(const Intervals & intervals);
/// E.g. for {36 months, 604801 seconds} returns {3 years, 2 weeks, 1 second}.
Intervals toIntervals() const;
/// Approximate shortest and longest duration in seconds. E.g. a month is [28, 31] days.
UInt64 minSeconds() const;
UInt64 maxSeconds() const;
/// Checks that the interval has only months or only seconds, throws otherwise.
void assertSingleUnit() const;
void assertPositive() const;
/// Add this interval to the timestamp. First months, then seconds.
/// Gets weird near month boundaries: October 31 + 1 month = December 1.
/// Gets weird with leap years: 2004-03-15 + 1 year = 2005-03-16,
/// 2004-12-31 + 1 year = 2006-01-01,
std::chrono::sys_seconds advance(std::chrono::system_clock::time_point t) const;
/// Rounds the timestamp down to the nearest timestamp "aligned" with this interval.
/// The interval must satisfy assertSingleUnit() and assertPositive().
/// * For months, rounds to the start of a month whose abosolute index is divisible by `months`.
/// The month index is 0-based starting from January 1970.
/// E.g. if the interval is 1 month, rounds down to the start of the month.
/// * For seconds, rounds to a timestamp x such that (x - December 29 1969 (Monday)) is divisible
/// by this interval.
/// E.g. if the interval is 1 week, rounds down to the start of the week.
///
/// Guarantees:
/// * advance(floor(x)) > x
/// * floor(advance(floor(x))) = advance(floor(x))
std::chrono::sys_seconds floor(std::chrono::system_clock::time_point t) const;
bool operator==(const CalendarTimeInterval & rhs) const;
};
}

View File

@ -10,12 +10,10 @@ ASTPtr ASTRefreshStrategy::clone() const
auto res = std::make_shared<ASTRefreshStrategy>(*this);
res->children.clear();
if (interval)
res->set(res->interval, interval->clone());
if (period)
res->set(res->period, period->clone());
if (periodic_offset)
res->set(res->periodic_offset, periodic_offset->clone());
if (offset)
res->set(res->offset, offset->clone());
if (spread)
res->set(res->spread, spread->clone());
if (settings)
@ -32,20 +30,20 @@ void ASTRefreshStrategy::formatImpl(
frame.need_parens = false;
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << "REFRESH " << (f_settings.hilite ? hilite_none : "");
using enum ScheduleKind;
using enum RefreshScheduleKind;
switch (schedule_kind)
{
case AFTER:
f_settings.ostr << "AFTER " << (f_settings.hilite ? hilite_none : "");
interval->formatImpl(f_settings, state, frame);
period->formatImpl(f_settings, state, frame);
break;
case EVERY:
f_settings.ostr << "EVERY " << (f_settings.hilite ? hilite_none : "");
period->formatImpl(f_settings, state, frame);
if (periodic_offset)
if (offset)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " OFFSET " << (f_settings.hilite ? hilite_none : "");
periodic_offset->formatImpl(f_settings, state, frame);
offset->formatImpl(f_settings, state, frame);
}
break;
default:

View File

@ -7,24 +7,23 @@
namespace DB
{
enum class RefreshScheduleKind : UInt8
{
UNKNOWN = 0,
AFTER,
EVERY
};
/// Strategy for MATERIALIZED VIEW ... REFRESH ..
class ASTRefreshStrategy : public IAST
{
public:
enum class ScheduleKind : UInt8
{
UNKNOWN = 0,
AFTER,
EVERY
};
ASTSetQuery * settings = nullptr;
ASTExpressionList * dependencies = nullptr;
ASTTimeInterval * interval = nullptr;
ASTTimePeriod * period = nullptr;
ASTTimeInterval * periodic_offset = nullptr;
ASTTimePeriod * spread = nullptr;
ScheduleKind schedule_kind{ScheduleKind::UNKNOWN};
ASTTimeInterval * period = nullptr;
ASTTimeInterval * offset = nullptr;
ASTTimeInterval * spread = nullptr;
RefreshScheduleKind schedule_kind{RefreshScheduleKind::UNKNOWN};
String getID(char) const override { return "Refresh strategy definition"; }

View File

@ -7,18 +7,6 @@
namespace DB
{
ASTPtr ASTTimePeriod::clone() const
{
return std::make_shared<ASTTimePeriod>(*this);
}
void ASTTimePeriod::formatImpl(const FormatSettings & f_settings, FormatState &, FormatStateStacked frame) const
{
frame.need_parens = false;
f_settings.ostr << value << ' ';
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << kind.toKeyword() << (f_settings.hilite ? hilite_none : "");
}
ASTPtr ASTTimeInterval::clone() const
{
return std::make_shared<ASTTimeInterval>(*this);
@ -28,7 +16,7 @@ void ASTTimeInterval::formatImpl(const FormatSettings & f_settings, FormatState
{
frame.need_parens = false;
for (bool is_first = true; auto [kind, value] : kinds | std::views::reverse)
for (bool is_first = true; auto [kind, value] : interval.toIntervals())
{
if (!std::exchange(is_first, false))
f_settings.ostr << ' ';

View File

@ -2,31 +2,18 @@
#include <Parsers/IAST.h>
#include <Common/IntervalKind.h>
#include <Common/CalendarTimeInterval.h>
#include <map>
namespace DB
{
/// Simple periodic time interval like 10 SECOND
class ASTTimePeriod : public IAST
{
public:
UInt64 value{0};
IntervalKind kind{IntervalKind::Second};
String getID(char) const override { return "TimePeriod"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
/// Compound time interval like 1 YEAR 3 DAY 15 MINUTE
class ASTTimeInterval : public IAST
{
public:
std::map<IntervalKind, UInt64> kinds;
CalendarTimeInterval interval;
String getID(char) const override { return "TimeInterval"; }

View File

@ -11,41 +11,52 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto refresh = std::make_shared<ASTRefreshStrategy>();
if (ParserKeyword{"AFTER"}.ignore(pos, expected))
{
refresh->schedule_kind = ASTRefreshStrategy::ScheduleKind::AFTER;
ASTPtr interval;
if (!ParserTimeInterval{}.parse(pos, interval, expected))
refresh->schedule_kind = RefreshScheduleKind::AFTER;
ASTPtr period;
if (!ParserTimeInterval{}.parse(pos, period, expected))
return false;
refresh->set(refresh->interval, interval);
refresh->set(refresh->period, period);
}
else if (ParserKeyword{"EVERY"}.ignore(pos, expected))
{
refresh->schedule_kind = ASTRefreshStrategy::ScheduleKind::EVERY;
refresh->schedule_kind = RefreshScheduleKind::EVERY;
ASTPtr period;
if (!ParserTimePeriod{}.parse(pos, period, expected))
if (!ParserTimeInterval{{.allow_mixing_calendar_and_clock_units = false}}.parse(pos, period, expected))
return false;
refresh->set(refresh->period, period);
if (ParserKeyword{"OFFSET"}.ignore(pos, expected))
{
ASTPtr periodic_offset;
if (!ParserTimeInterval{}.parse(pos, periodic_offset, expected))
if (!ParserTimeInterval{{.allow_zero = true}}.parse(pos, periodic_offset, expected))
return false;
refresh->set(refresh->periodic_offset, periodic_offset);
if (periodic_offset->as<ASTTimeInterval>()->interval.maxSeconds()
>= period->as<ASTTimeInterval>()->interval.minSeconds())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"OFFSET must be less than the period");
refresh->set(refresh->offset, periodic_offset);
}
}
if (refresh->schedule_kind == ASTRefreshStrategy::ScheduleKind::UNKNOWN)
if (refresh->schedule_kind == RefreshScheduleKind::UNKNOWN)
return false;
if (ParserKeyword{"RANDOMIZE FOR"}.ignore(pos, expected))
{
ASTPtr spread;
if (!ParserTimePeriod{}.parse(pos, spread, expected))
if (!ParserTimeInterval{{.allow_zero = true}}.parse(pos, spread, expected))
return false;
refresh->set(refresh->spread, spread);

View File

@ -14,64 +14,40 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}
namespace
{
struct ValKind
{
UInt64 val;
IntervalKind kind;
bool empty;
};
std::optional<ValKind> parseValKind(IParser::Pos & pos, Expected & expected)
{
ASTPtr value;
IntervalKind kind;
if (!ParserNumber{}.parse(pos, value, expected))
return ValKind{ .empty = true };
if (!parseIntervalKind(pos, expected, kind))
return {};
UInt64 val;
if (!value->as<ASTLiteral &>().value.tryGet(val))
throw Exception(ErrorCodes::SYNTAX_ERROR, "Time interval must be an integer");
return ValKind{ val, kind, false };
}
}
bool ParserTimePeriod::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto parsed = parseValKind(pos, expected);
if (!parsed || parsed->empty || parsed->val == 0)
return false;
auto time_period = std::make_shared<ASTTimePeriod>();
time_period->value = parsed->val;
time_period->kind = parsed->kind;
node = time_period;
return true;
}
ParserTimeInterval::ParserTimeInterval(Options opt) : options(opt) {}
ParserTimeInterval::ParserTimeInterval() = default;
bool ParserTimeInterval::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto time_interval = std::make_shared<ASTTimeInterval>();
auto parsed = parseValKind(pos, expected);
while (parsed && !parsed->empty)
CalendarTimeInterval::Intervals intervals;
while (true)
{
if (parsed->val == 0)
ASTPtr value;
IntervalKind kind;
if (!ParserNumber{}.parse(pos, value, expected))
break;
if (!parseIntervalKind(pos, expected, kind))
return false;
auto [it, inserted] = time_interval->kinds.emplace(parsed->kind, parsed->val);
if (!inserted)
return false;
parsed = parseValKind(pos, expected);
UInt64 val;
if (!value->as<ASTLiteral &>().value.tryGet(val))
throw Exception(ErrorCodes::SYNTAX_ERROR, "Time interval must be an integer");
intervals.emplace_back(kind, val);
}
if (!parsed || time_interval->kinds.empty())
if (intervals.empty())
return false;
CalendarTimeInterval interval(intervals);
if (!options.allow_zero)
interval.assertPositive();
if (!options.allow_mixing_calendar_and_clock_units)
interval.assertSingleUnit();
auto time_interval = std::make_shared<ASTTimeInterval>();
time_interval->interval = interval;
node = time_interval;
return true;
}

View File

@ -5,18 +5,22 @@
namespace DB
{
/// Parser for ASTTimePeriod
class ParserTimePeriod : public IParserBase
{
protected:
const char * getName() const override { return "time period"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// Parser for ASTTimeInterval
class ParserTimeInterval : public IParserBase
{
public:
struct Options
{
bool allow_mixing_calendar_and_clock_units = true;
bool allow_zero = false;
};
ParserTimeInterval();
explicit ParserTimeInterval(Options opt);
protected:
Options options;
const char * getName() const override { return "time interval"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};

View File

@ -0,0 +1,57 @@
#include <Storages/MaterializedView/RefreshSchedule.h>
#include <Common/thread_local_rng.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
RefreshSchedule::RefreshSchedule(const ASTRefreshStrategy & strategy)
{
kind = strategy.schedule_kind;
period = strategy.period->interval;
if (strategy.offset)
offset = strategy.offset->interval;
if (strategy.spread)
spread = strategy.spread->interval;
}
static std::chrono::sys_seconds advanceEvery(std::chrono::system_clock::time_point prev, CalendarTimeInterval period, CalendarTimeInterval offset)
{
auto period_start = period.floor(prev);
auto t = offset.advance(period_start);
if (t > prev)
return t;
t = offset.advance(period.advance(period_start));
chassert(t > prev);
return t;
}
std::chrono::sys_seconds RefreshSchedule::prescribeNext(
std::chrono::system_clock::time_point last_prescribed, std::chrono::system_clock::time_point now) const
{
if (kind == RefreshScheduleKind::AFTER)
return period.advance(now);
/// It's important to use prescribed instead of actual time here, otherwise we would do multiple
/// refreshes instead of one if the generated spread is negative and the the refresh completes
/// faster than the spread.
auto res = advanceEvery(last_prescribed, period, offset);
if (res < now)
res = advanceEvery(now, period, offset); // fell behind by a whole period, skip to current time
return res;
}
std::chrono::system_clock::time_point RefreshSchedule::addRandomSpread(std::chrono::sys_seconds prescribed_time) const
{
Int64 ms = Int64(spread.minSeconds() * 1000 / 2);
auto add = std::uniform_int_distribution(-ms, ms)(thread_local_rng);
return prescribed_time + std::chrono::milliseconds(add);
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Common/CalendarTimeInterval.h>
#include <Parsers/ASTRefreshStrategy.h>
#include <chrono>
namespace DB
{
class ASTRefreshStrategy;
struct RefreshSchedule
{
RefreshScheduleKind kind;
CalendarTimeInterval period;
CalendarTimeInterval offset;
CalendarTimeInterval spread;
explicit RefreshSchedule(const ASTRefreshStrategy & strategy);
/// Tells when to do the next refresh (without random spread).
std::chrono::sys_seconds prescribeNext(
std::chrono::system_clock::time_point last_prescribed, std::chrono::system_clock::time_point now) const;
std::chrono::system_clock::time_point addRandomSpread(std::chrono::sys_seconds prescribed_time) const;
};
}

View File

@ -17,24 +17,10 @@ namespace CurrentMetrics
namespace DB
{
namespace
{
std::uniform_int_distribution<Int64> makeSpreadDistribution(const ASTTimePeriod * spread)
{
if (!spread)
return std::uniform_int_distribution<Int64>(0, 0);
Int64 limit = spread->kind.toAvgSeconds() * spread->value / 2;
return std::uniform_int_distribution(-limit, limit);
}
}
RefreshTask::RefreshTask(
const ASTRefreshStrategy & strategy)
: log(&Poco::Logger::get("RefreshTask"))
, refresh_timer(strategy)
, refresh_spread{makeSpreadDistribution(strategy.spread)}
, refresh_schedule(strategy)
{}
RefreshTaskHolder RefreshTask::create(
@ -66,7 +52,7 @@ void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> vi
view_to_refresh = view;
/// TODO: Add a setting to stop views on startup, set `stop_requested = true` in that case.
populateDependencies();
calculateNextRefreshTime(std::chrono::system_clock::now());
advanceNextRefreshTime(std::chrono::system_clock::now());
refresh_task->schedule();
}
@ -139,7 +125,7 @@ void RefreshTask::shutdown()
set_entry.reset();
}
void RefreshTask::notify(const StorageID & parent_id, std::chrono::system_clock::time_point scheduled_time_without_spread, const RefreshTimer & parent_timer)
void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds prescribed_time, const RefreshSchedule & parent_schedule)
{
std::lock_guard guard(mutex);
if (!set_entry)
@ -177,7 +163,7 @@ void RefreshTask::notify(const StorageID & parent_id, std::chrono::system_clock:
/// Only accept the dependency's refresh if its next refresh time is after ours.
/// This takes care of cases (1)-(4), and seems harmless in all other cases.
/// Might be mildly helpful in weird cases like REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR.
if (parent_timer.next(scheduled_time_without_spread) <= next_refresh_without_spread)
if (parent_schedule.prescribeNext(prescribed_time, std::chrono::system_clock::now()) <= next_refresh_prescribed)
return;
if (arriveDependency(parent_id) && !std::exchange(refresh_immediately, true))
@ -185,11 +171,13 @@ void RefreshTask::notify(const StorageID & parent_id, std::chrono::system_clock:
/// Decrease delay in case (5).
/// Maybe we should do it for all AFTER-AFTER dependencies, even if periods are different.
if (refresh_timer == parent_timer && refresh_timer.tryGetAfter())
if (refresh_schedule.kind == RefreshScheduleKind::AFTER &&
parent_schedule.kind == RefreshScheduleKind::AFTER &&
refresh_schedule.period == parent_schedule.period)
{
/// TODO: Implement this:
/// TODO: Implement this.
/// * Add setting max_after_delay_adjustment_pct
/// * Decrease both next_refresh_without_spread and next_refresh_with_spread,
/// * Decrease both next_refresh_prescribed and next_refresh_with_spread,
/// but only if they haven't already been decreased this way during current period
/// * refresh_task->schedule()
}
@ -217,11 +205,11 @@ void RefreshTask::refreshTask()
if (cancel_requested)
{
/// Advance to the next refresh time according to schedule.
/// Move on to the next refresh time according to schedule.
/// Otherwise we'd start another refresh immediately after canceling this one.
auto now = std::chrono::system_clock::now();
if (now >= next_refresh_with_spread)
calculateNextRefreshTime(std::chrono::system_clock::now());
advanceNextRefreshTime(now);
}
}
@ -277,7 +265,7 @@ void RefreshTask::refreshTask()
reportState(RefreshState::Running);
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
auto scheduled_time_without_spread = next_refresh_without_spread;
auto prescribed_time = next_refresh_prescribed;
lock.unlock();
@ -291,7 +279,7 @@ void RefreshTask::refreshTask()
finished = executeRefresh();
if (finished)
completeRefresh(view, LastTaskResult::Finished, scheduled_time_without_spread);
completeRefresh(view, LastTaskResult::Finished, prescribed_time);
}
catch (...)
{
@ -311,7 +299,7 @@ void RefreshTask::refreshTask()
{
auto now = std::chrono::system_clock::now();
reportLastRefreshTime(now);
calculateNextRefreshTime(now);
advanceNextRefreshTime(now);
}
}
}
@ -351,7 +339,7 @@ bool RefreshTask::executeRefresh()
return !not_finished;
}
void RefreshTask::completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result, std::chrono::system_clock::time_point scheduled_time_without_spread)
void RefreshTask::completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result, std::chrono::sys_seconds prescribed_time)
{
auto stale_table = view->exchangeTargetTable(refresh_query->table_id);
@ -359,7 +347,7 @@ void RefreshTask::completeRefresh(std::shared_ptr<StorageMaterializedView> view,
StorageID my_id = set_entry->getID();
auto dependents = context->getRefreshSet().getDependents(my_id);
for (const RefreshTaskHolder & dep_task : dependents)
dep_task->notify(my_id, scheduled_time_without_spread, refresh_timer);
dep_task->notify(my_id, prescribed_time, refresh_schedule);
auto drop_context = Context::createCopy(context);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, drop_context, drop_context, stale_table, /*sync=*/true);
@ -396,28 +384,24 @@ void RefreshTask::cleanState()
refresh_query.reset();
}
void RefreshTask::calculateNextRefreshTime(std::chrono::system_clock::time_point now)
void RefreshTask::advanceNextRefreshTime(std::chrono::system_clock::time_point now)
{
/// TODO: Add a setting to randomize initial delay in case of AFTER, for the case when the server
/// is restarted more often than the refresh period.
/// TODO: Maybe do something like skip_update_after_seconds and skip_update_after_ratio.
/// Unclear if that's useful at all if the last refresh timestamp is not remembered across restarts.
/// Or maybe that should be checked in refreshTask(), just before starting a refresh.
/// Probably only useful after we have concurrency limits. Or maybe it's not useful even then?
/// It's important to use time without spread here, otherwise we would do multiple refreshes instead
/// of one, if the generated spread is negative and the first refresh completes faster than the spread.
std::chrono::sys_seconds next = refresh_timer.next(next_refresh_without_spread);
if (next < now)
next = refresh_timer.next(now); // fell behind, skip to current time
next_refresh_without_spread = next;
next_refresh_with_spread = next + std::chrono::seconds{refresh_spread(thread_local_rng)};
std::chrono::sys_seconds next = refresh_schedule.prescribeNext(next_refresh_prescribed, now);
next_refresh_prescribed = next;
next_refresh_with_spread = refresh_schedule.addRandomSpread(next);
reportNextRefreshTime(next_refresh_with_spread);
}
bool RefreshTask::arriveDependency(const StorageID & parent_table_or_timer)
bool RefreshTask::arriveDependency(const StorageID & parent)
{
remaining_dependencies.erase(parent_table_or_timer);
remaining_dependencies.erase(parent);
if (!remaining_dependencies.empty() || !time_arrived)
return false;
populateDependencies();

View File

@ -2,7 +2,7 @@
#include <Storages/MaterializedView/RefreshSet.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Storages/MaterializedView/RefreshTimers.h>
#include <Storages/MaterializedView/RefreshSchedule.h>
#include <Processors/Executors/ManualPipelineExecutor.h>
@ -72,16 +72,14 @@ public:
void shutdown();
/// Notify dependent task
void notify(const StorageID & parent_id, std::chrono::system_clock::time_point scheduled_time_without_spread, const RefreshTimer & parent_timer);
void notify(const StorageID & parent_id, std::chrono::sys_seconds prescribed_time, const RefreshSchedule & parent_schedule);
private:
Poco::Logger * log = nullptr;
std::weak_ptr<IStorage> view_to_refresh;
RefreshSet::Entry set_entry;
/// Refresh schedule
RefreshTimer refresh_timer;
std::uniform_int_distribution<Int64> refresh_spread;
RefreshSchedule refresh_schedule;
/// Task execution. Non-empty iff a refresh is in progress (possibly paused).
/// Whoever unsets these should also call storeLastState().
@ -112,7 +110,20 @@ private:
std::atomic_bool interrupt_execution {false};
/// When to refresh next. Updated when a refresh is finished or canceled.
std::chrono::system_clock::time_point next_refresh_without_spread;
/// We maintain the distinction between:
/// * The "prescribed" time of the refresh, dictated by the refresh schedule.
/// E.g. for REFERSH EVERY 1 DAY, the prescribed time is always at the exact start of a day.
/// * Actual wall clock timestamps, e.g. when the refresh is scheduled to happen
/// (including random spread) or when a refresh completed.
/// The prescribed time is required for:
/// * Doing REFRESH EVERY correctly if the random spread came up negative, and a refresh completed
/// before the prescribed time. E.g. suppose a refresh was prescribed at 05:00, which was randomly
/// adjusted to 4:50, and the refresh completed at 4:55; we shouldn't schedule another refresh
/// at 5:00, so we should remember that the 4:50-4:55 refresh actually had prescribed time 5:00.
/// * Similarly, for dependencies between REFRESH EVERY tables, using actual time would be unreliable.
/// E.g. for REFRESH EVERY 1 DAY, yesterday's refresh of the dependency shouldn't trigger today's
/// refresh of the dependent even if it happened today (e.g. it was slow or had random spread > 1 day).
std::chrono::sys_seconds next_refresh_prescribed;
std::chrono::system_clock::time_point next_refresh_with_spread;
/// Calls refreshTask() from background thread.
@ -132,15 +143,15 @@ private:
/// Methods that do the actual work: creating/dropping internal table, executing the query.
void initializeRefresh(std::shared_ptr<const StorageMaterializedView> view);
bool executeRefresh();
void completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result, std::chrono::system_clock::time_point scheduled_time_without_spread);
void completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result, std::chrono::sys_seconds prescribed_time);
void cancelRefresh(LastTaskResult result);
void cleanState();
/// Assigns next_refresh_*
void calculateNextRefreshTime(std::chrono::system_clock::time_point now);
void advanceNextRefreshTime(std::chrono::system_clock::time_point now);
/// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case.
bool arriveDependency(const StorageID & parent_table_or_timer);
bool arriveDependency(const StorageID & parent);
bool arriveTime();
void populateDependencies();

View File

@ -1,302 +0,0 @@
#include <Storages/MaterializedView/RefreshTimers.h>
#include <Parsers/ASTRefreshStrategy.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
constexpr std::chrono::days ZERO_DAYS{0};
constexpr std::chrono::days ONE_DAY{1};
}
RefreshAfterTimer::RefreshAfterTimer(const ASTTimeInterval * time_interval)
{
if (time_interval)
{
for (auto && [kind, value] : time_interval->kinds)
setWithKind(kind, value);
}
}
std::chrono::sys_seconds RefreshAfterTimer::after(std::chrono::system_clock::time_point tp) const
{
auto tp_date = std::chrono::floor<std::chrono::days>(tp);
auto tp_time_offset = std::chrono::floor<std::chrono::seconds>(tp - tp_date);
std::chrono::year_month_day ymd(tp_date);
ymd += years;
ymd += months;
std::chrono::sys_days date = ymd;
date += weeks;
date += days;
auto result = std::chrono::time_point_cast<std::chrono::seconds>(date);
result += tp_time_offset;
result += hours;
result += minutes;
result += seconds;
return result;
}
void RefreshAfterTimer::setWithKind(IntervalKind kind, UInt64 val)
{
switch (kind)
{
case IntervalKind::Second:
seconds = std::chrono::seconds{val};
break;
case IntervalKind::Minute:
minutes = std::chrono::minutes{val};
break;
case IntervalKind::Hour:
hours = std::chrono::hours{val};
break;
case IntervalKind::Day:
days = std::chrono::days{val};
break;
case IntervalKind::Week:
weeks = std::chrono::weeks{val};
break;
case IntervalKind::Month:
months = std::chrono::months{val};
break;
case IntervalKind::Year:
years = std::chrono::years{val};
break;
default:
break;
}
}
bool RefreshAfterTimer::operator==(const RefreshAfterTimer & rhs) const
{
/// (Or maybe different implementations of standard library have different sizes of chrono types.
/// If so, feel free to just remove this assert.)
static_assert(sizeof(*this) == 40, "RefreshAfterTimer fields appear to have changed. Please update this operator==() here.");
return std::tie(seconds, minutes, hours, days, weeks, months, years) == std::tie(rhs.seconds, rhs.minutes, rhs.hours, rhs.days, rhs.weeks, rhs.months, rhs.years);
}
RefreshEveryTimer::RefreshEveryTimer(const ASTTimePeriod & time_period, const ASTTimeInterval * time_offset)
: offset(time_offset)
, value{static_cast<UInt32>(time_period.value)}
, kind{time_period.kind}
{
// TODO: validate invariants
}
std::chrono::sys_seconds RefreshEveryTimer::next(std::chrono::system_clock::time_point tp) const
{
if (value == 0)
return std::chrono::floor<std::chrono::seconds>(tp);
switch (kind)
{
case IntervalKind::Second:
return alignedToSeconds(tp);
case IntervalKind::Minute:
return alignedToMinutes(tp);
case IntervalKind::Hour:
return alignedToHours(tp);
case IntervalKind::Day:
return alignedToDays(tp);
case IntervalKind::Week:
return alignedToWeeks(tp);
case IntervalKind::Month:
return alignedToMonths(tp);
case IntervalKind::Year:
return alignedToYears(tp);
default:
return std::chrono::ceil<std::chrono::seconds>(tp);
}
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToYears(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_day tp_ymd(tp_days);
auto normalize_years = [](std::chrono::year year) -> std::chrono::sys_days
{
return year / std::chrono::January / 1d;
};
auto prev_years = normalize_years(tp_ymd.year());
if (auto prev_time = offset.after(prev_years); prev_time > tp)
return prev_time;
auto next_years = normalize_years(std::chrono::year((int(tp_ymd.year()) / value + 1) * value));
return offset.after(next_years);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToMonths(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_day tp_ymd(tp_days);
auto normalize_months = [](const std::chrono::year_month_day & ymd, unsigned month_value) -> std::chrono::sys_days
{
return ymd.year() / std::chrono::month{month_value} / 1d;
};
auto prev_month_value = static_cast<unsigned>(tp_ymd.month()) / value * value;
auto prev_months = normalize_months(tp_ymd, prev_month_value);
if (auto prev_time = offset.after(prev_months); prev_time > tp)
return prev_time;
auto next_month_value = (static_cast<unsigned>(tp_ymd.month()) / value + 1) * value;
auto next_months = normalize_months(tp_ymd, next_month_value);
std::chrono::year_month_day next_ymd(next_months);
if (next_ymd.year() > tp_ymd.year())
return offset.after(normalize_months(next_ymd, value));
return offset.after(next_months);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToWeeks(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto cpp_weekday = offset.getDays() + ONE_DAY;
std::chrono::weekday offset_weekday((cpp_weekday - std::chrono::floor<std::chrono::weeks>(cpp_weekday)).count());
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_weekday tp_ymd(tp_days);
auto normalize_weeks = [offset_weekday](const std::chrono::year_month_weekday & ymd, unsigned week_value)
{
return std::chrono::sys_days(ymd.year() / ymd.month() / std::chrono::weekday{offset_weekday}[week_value]);
};
auto prev_week_value = tp_ymd.index() / value * value;
auto prev_days = normalize_weeks(tp_ymd, prev_week_value);
if (auto prev_time = offset.after(prev_days - offset.getDays()); prev_time > tp)
return prev_time;
auto next_day_value = (tp_ymd.index() / value + 1) * value;
auto next_days = normalize_weeks(tp_ymd, next_day_value);
std::chrono::year_month_weekday next_ymd(next_days);
if (next_ymd.year() > tp_ymd.year() || next_ymd.month() > tp_ymd.month())
return offset.after(normalize_weeks(next_ymd, value) - offset.getDays());
return offset.after(next_days);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToDays(std::chrono::system_clock::time_point tp) const
{
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_day tp_ymd(tp_days);
auto normalize_days = [](const std::chrono::year_month_day & ymd, unsigned day_value) -> std::chrono::sys_days
{
return ymd.year() / ymd.month() / std::chrono::day{day_value};
};
auto prev_day_value = static_cast<unsigned>(tp_ymd.day()) / value * value;
auto prev_days = normalize_days(tp_ymd, prev_day_value);
if (auto prev_time = offset.after(prev_days); prev_time > tp)
return prev_time;
auto next_day_value = (static_cast<unsigned>(tp_ymd.day()) / value + 1) * value;
auto next_days = normalize_days(tp_ymd, next_day_value);
std::chrono::year_month_day next_ymd(next_days);
if (next_ymd.year() > tp_ymd.year() || next_ymd.month() > tp_ymd.month())
return offset.after(normalize_days(next_ymd, value));
return offset.after(next_days);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToHours(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
auto tp_hours = std::chrono::floor<std::chrono::hours>(tp - tp_days);
auto prev_hours = (tp_hours / value) * value;
if (auto prev_time = offset.after(tp_days + prev_hours); prev_time > tp)
return prev_time;
auto next_hours = (tp_hours / value + 1h) * value;
if (std::chrono::floor<std::chrono::days>(next_hours - 1h) > ZERO_DAYS)
return offset.after(tp_days + ONE_DAY + std::chrono::hours{value});
return offset.after(tp_days + next_hours);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToMinutes(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_hours = std::chrono::floor<std::chrono::hours>(tp);
auto tp_minutes = std::chrono::floor<std::chrono::minutes>(tp - tp_hours);
auto prev_minutes = (tp_minutes / value) * value;
if (auto prev_time = offset.after(tp_hours + prev_minutes); prev_time > tp)
return prev_time;
auto next_minutes = (tp_minutes / value + 1min) * value;
if (std::chrono::floor<std::chrono::hours>(next_minutes - 1min) > 0h)
return offset.after(tp_hours + 1h + std::chrono::minutes{value});
return offset.after(tp_hours + next_minutes);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToSeconds(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_minutes = std::chrono::floor<std::chrono::minutes>(tp);
auto tp_seconds = std::chrono::floor<std::chrono::seconds>(tp - tp_minutes);
auto next_seconds = (tp_seconds / value + 1s) * value;
if (std::chrono::floor<std::chrono::minutes>(next_seconds - 1s) > 0min)
return tp_minutes + 1min + std::chrono::seconds{value};
return tp_minutes + next_seconds;
}
bool RefreshEveryTimer::operator==(const RefreshEveryTimer & rhs) const
{
static_assert(sizeof(*this) == sizeof(offset) + 8, "RefreshEveryTimer fields appear to have changed. Please update this operator==() here.");
return std::tie(offset, value, kind) == std::tie(rhs.offset, rhs.value, rhs.kind);
}
std::variant<RefreshEveryTimer, RefreshAfterTimer> makeTimer(const ASTRefreshStrategy & strategy)
{
using enum ASTRefreshStrategy::ScheduleKind;
switch (strategy.schedule_kind)
{
case EVERY:
return RefreshEveryTimer{*strategy.period, strategy.interval};
case AFTER:
return RefreshAfterTimer{strategy.interval};
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown refresh strategy kind");
}
}
RefreshTimer::RefreshTimer(const ASTRefreshStrategy & strategy) : timer(makeTimer(strategy)) {}
namespace
{
template <typename... Ts>
struct CombinedVisitor : Ts... { using Ts::operator()...; };
template <typename... Ts>
CombinedVisitor(Ts...) -> CombinedVisitor<Ts...>;
}
std::chrono::sys_seconds RefreshTimer::next(std::chrono::system_clock::time_point tp) const
{
CombinedVisitor visitor{
[tp](const RefreshAfterTimer & timer_) { return timer_.after(tp); },
[tp](const RefreshEveryTimer & timer_) { return timer_.next(tp); }};
auto r = std::visit<std::chrono::sys_seconds>(std::move(visitor), timer);
chassert(r > tp);
return r;
}
bool RefreshTimer::operator==(const RefreshTimer & rhs) const { return timer == rhs.timer; }
const RefreshAfterTimer * RefreshTimer::tryGetAfter() const { return std::get_if<RefreshAfterTimer>(&timer); }
const RefreshEveryTimer * RefreshTimer::tryGetEvery() const { return std::get_if<RefreshEveryTimer>(&timer); }
}

View File

@ -1,88 +0,0 @@
#pragma once
#include <Common/IntervalKind.h>
#include <chrono>
namespace DB
{
class ASTTimeInterval;
class ASTTimePeriod;
class ASTRefreshStrategy;
/// Schedule timer for MATERIALIZED VIEW ... REFRESH AFTER ... queries
class RefreshAfterTimer
{
public:
explicit RefreshAfterTimer(const ASTTimeInterval * time_interval);
std::chrono::sys_seconds after(std::chrono::system_clock::time_point tp) const;
std::chrono::seconds getSeconds() const { return seconds; }
std::chrono::minutes getMinutes() const { return minutes; }
std::chrono::hours getHours() const { return hours; }
std::chrono::days getDays() const { return days; }
std::chrono::weeks getWeeks() const { return weeks; }
std::chrono::months getMonths() const { return months; }
std::chrono::years getYears() const { return years; }
bool operator==(const RefreshAfterTimer & rhs) const;
private:
void setWithKind(IntervalKind kind, UInt64 val);
std::chrono::seconds seconds{0};
std::chrono::minutes minutes{0};
std::chrono::hours hours{0};
std::chrono::days days{0};
std::chrono::weeks weeks{0};
std::chrono::months months{0};
std::chrono::years years{0};
};
/// Schedule timer for MATERIALIZED VIEW ... REFRESH EVERY ... queries
class RefreshEveryTimer
{
public:
explicit RefreshEveryTimer(const ASTTimePeriod & time_period, const ASTTimeInterval * time_offset);
std::chrono::sys_seconds next(std::chrono::system_clock::time_point tp) const;
bool operator==(const RefreshEveryTimer & rhs) const;
private:
std::chrono::sys_seconds alignedToYears(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToMonths(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToWeeks(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToDays(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToHours(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToMinutes(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToSeconds(std::chrono::system_clock::time_point tp) const;
RefreshAfterTimer offset;
UInt32 value{0};
IntervalKind kind{IntervalKind::Second};
};
struct RefreshTimer
{
std::variant<RefreshEveryTimer, RefreshAfterTimer> timer;
explicit RefreshTimer(const ASTRefreshStrategy & strategy);
std::chrono::sys_seconds next(std::chrono::system_clock::time_point tp) const;
bool operator==(const RefreshTimer & rhs) const;
const RefreshAfterTimer * tryGetAfter() const;
const RefreshEveryTimer * tryGetEvery() const;
};
}