Tests: add RMV tests

This commit is contained in:
Nikita Fomichev 2024-08-10 08:11:35 +02:00
parent 2f04d537cf
commit 83f3738b62
5 changed files with 745 additions and 1041 deletions

View File

@ -282,7 +282,7 @@ This replaces *all* refresh parameters at once: schedule, dependencies, settings
The status of all refreshable materialized views is available in table [`system.view_refreshes`](../../../operations/system-tables/view_refreshes.md). In particular, it contains refresh progress (if running), last and next refresh time, exception message if a refresh failed.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|WAIT|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To wait for a refresh to complete, use [`SYSTEM WAIT VIEW`](../system.md#refreshable-materialized-views). In particular, useful for waiting for initial refresh after creating a view.

View File

@ -182,3 +182,16 @@ def csv_compare(result, expected):
mismatch.append("+[%d]=%s" % (i, csv_result.lines[i]))
return "\n".join(mismatch)
def wait_condition(func, condition, max_attempts=10, delay=0.1):
attempts = 0
while attempts < max_attempts:
result = func()
if condition(result):
return result
attempts += 1
if attempts < max_attempts:
time.sleep(delay)
raise Exception(f"Function did not satisfy condition after {max_attempts} attempts")

View File

@ -16,11 +16,8 @@ def relative_offset(unit, value):
return rd.relativedelta(hours=value)
elif unit == "DAY":
return rd.relativedelta(days=value)
# elif unit == "WEEK":
# return rd.relativedelta(days=7 * value)
elif unit == "WEEK":
return rd.relativedelta(weeks=7 * value)
elif unit == "MONTH":
return rd.relativedelta(months=value)
elif unit == "YEAR":
@ -42,7 +39,7 @@ def group_and_sort(parts, reverse=False):
return sorted_parts
def get_next_refresh_time(schedule, current_time: datetime):
def get_next_refresh_time(schedule, current_time: datetime, first_week=False):
parts = schedule.split()
randomize_offset = timedelta()
@ -65,6 +62,7 @@ def get_next_refresh_time(schedule, current_time: datetime):
parts = parts[:offset_index]
week_in_primary = False
if parts[0] == "EVERY":
parts = group_and_sort(parts[1:])
for part in parts:
@ -88,9 +86,10 @@ def get_next_refresh_time(schedule, current_time: datetime):
hour=0, minute=0, second=0, microsecond=0
) + rd.relativedelta(days=value)
elif unit == "WEEK":
week_in_primary = True
current_time = current_time.replace(
hour=0, minute=0, second=0, microsecond=0
) + rd.relativedelta(weekday=0, weeks=value)
) + rd.relativedelta(weekday=0, weeks=0 if first_week else value)
elif unit == "MONTH":
current_time = current_time.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
@ -103,12 +102,16 @@ def get_next_refresh_time(schedule, current_time: datetime):
current_time += offset
if randomize_offset:
half_offset = (current_time + randomize_offset - current_time) / 2
return (
current_time - half_offset,
current_time + half_offset,
)
return {
"type": "randomize",
"time": (
current_time - half_offset,
current_time + half_offset,
),
"week": week_in_primary,
}
return current_time
return {"type": "regular", "time": current_time, "week": week_in_primary}
elif parts[0] == "AFTER":
parts = group_and_sort(parts[1:], reverse=True)
@ -126,6 +129,7 @@ def get_next_refresh_time(schedule, current_time: datetime):
elif unit == "DAY":
interval += rd.relativedelta(days=value)
elif unit == "WEEK":
week_in_primary = True
interval += rd.relativedelta(weeks=value)
elif unit == "MONTH":
interval += rd.relativedelta(months=value)
@ -135,11 +139,65 @@ def get_next_refresh_time(schedule, current_time: datetime):
current_time += interval
if randomize_offset:
half_offset = (current_time + randomize_offset - current_time) / 2
return (
current_time - half_offset,
# current_time,
current_time + half_offset,
)
return {
"type": "randomize",
"time": (
current_time - half_offset,
current_time + half_offset,
),
"week": week_in_primary,
}
return current_time
return {"type": "regular", "time": current_time, "week": week_in_primary}
raise ValueError("Invalid refresh schedule")
def compare_dates(
date1: str | datetime,
date2: dict,
first_week=False,
):
"""
Special logic for weeks for first refresh:
The desired behavior for EVERY 1 WEEK is "every Monday". This has the properties: (a) it doesn't depend on when the materialized view was created, (b) consecutive refreshes are exactly 1 week apart. And non-properties: (c) the first refresh doesn't happen exactly 1 week after view creation, it can be anywhere between 0 and 1 week, (d) the schedule is not aligned with months or years.
I would expect EVERY 2 WEEK to have the same two properties and two non-properties, and also to fall on Mondays. There are exactly two possible ways to achieve that: all even-numbered Mondays or all odd-numbered Mondays. I just picked one.
"""
weeks = []
if date2["week"] and first_week:
for i in [0, 1, 2]:
if date2["type"] == "randomize":
weeks.append(
(
date2["time"][0] + rd.relativedelta(weeks=i),
date2["time"][1] + rd.relativedelta(weeks=i),
)
)
else:
weeks.append(date2["time"] + rd.relativedelta(weeks=i))
for week in weeks:
if compare_dates_(date1, week):
return True
raise ValueError("Invalid week")
else:
assert compare_dates_(date1, date2["time"])
def compare_dates_(
date1: str | datetime,
date2: str | datetime | tuple[datetime],
inaccuracy=timedelta(minutes=10),
format_str="%Y-%m-%d %H:%M:%S",
) -> bool:
"""
Compares two dates with small inaccuracy.
"""
if isinstance(date1, str):
date1 = datetime.strptime(date1, format_str)
if isinstance(date2, str):
date2 = datetime.strptime(date2, format_str)
if isinstance(date2, datetime):
return abs(date1 - date2) <= inaccuracy
else:
return date2[0] - inaccuracy <= date1 <= date2[1] + inaccuracy

File diff suppressed because it is too large Load Diff

View File

@ -3,79 +3,87 @@ from datetime import datetime
from test_refreshable_mat_view.schedule_model import get_next_refresh_time
def get_next_refresh_time_(*args, **kwargs):
return get_next_refresh_time(*args, **kwargs)["time"]
def test_refresh_schedules():
t = datetime(2000, 1, 1, 1, 1, 1)
# assert get_next_refresh_time("EVERY 1 SECOND", t) == datetime(2000, 1, 1, 1, 1, 2)
# assert get_next_refresh_time("EVERY 1 MINUTE", t) == datetime(
# 2000,
# 1,
# 1,
# 1,
# 2,
# )
# assert get_next_refresh_time("EVERY 1 HOUR", t) == datetime(
# 2000,
# 1,
# 1,
# 2,
# )
# assert get_next_refresh_time("EVERY 1 DAY", t) == datetime(2000, 1, 2)
# assert get_next_refresh_time("EVERY 1 WEEK", t) == datetime(2000, 1, 10)
# assert get_next_refresh_time("EVERY 2 WEEK", t) == datetime(2000, 1, 17)
# assert get_next_refresh_time("EVERY 1 MONTH", t) == datetime(2000, 2, 1)
# assert get_next_refresh_time("EVERY 1 YEAR", t) == datetime(2001, 1, 1)
#
# assert get_next_refresh_time("EVERY 3 YEAR 4 MONTH 10 DAY", t) == datetime(
# 2003, 5, 11
# )
#
# # OFFSET
# assert get_next_refresh_time(
# "EVERY 1 MONTH OFFSET 5 DAY 2 HOUR 30 MINUTE 15 SECOND", t
# ) == datetime(2000, 2, 6, 2, 30, 15)
# assert get_next_refresh_time(
# "EVERY 1 YEAR 2 MONTH OFFSET 5 DAY 2 HOUR 30 MINUTE 15 SECOND", t
# ) == datetime(2001, 3, 6, 2, 30, 15)
#
# assert get_next_refresh_time(
# "EVERY 2 WEEK OFFSET 5 DAY 15 HOUR 10 MINUTE", t
# ) == datetime(2000, 1, 22, 15, 10)
#
# # AFTER
# assert get_next_refresh_time("AFTER 30 SECOND", t) == datetime(2000, 1, 1, 1, 1, 31)
# assert get_next_refresh_time("AFTER 30 MINUTE", t) == datetime(2000, 1, 1, 1, 31, 1)
# assert get_next_refresh_time("AFTER 2 HOUR", t) == datetime(2000, 1, 1, 3, 1, 1)
# assert get_next_refresh_time("AFTER 2 DAY", t) == datetime(2000, 1, 3, 1, 1, 1)
# assert get_next_refresh_time("AFTER 2 WEEK", t) == datetime(2000, 1, 15, 1, 1, 1)
# assert get_next_refresh_time("AFTER 2 MONTH", t) == datetime(2000, 3, 1, 1, 1, 1)
# assert get_next_refresh_time("AFTER 2 YEAR", t) == datetime(2002, 1, 1, 1, 1, 1)
#
# assert get_next_refresh_time("AFTER 2 YEAR 1 MONTH", t) == datetime(
# 2002, 2, 1, 1, 1, 1
# )
#
# assert get_next_refresh_time("AFTER 1 MONTH 2 YEAR", t) == datetime(
# 2002, 2, 1, 1, 1, 1
# )
assert get_next_refresh_time_("EVERY 1 SECOND", t) == datetime(2000, 1, 1, 1, 1, 2)
assert get_next_refresh_time_("EVERY 1 MINUTE", t) == datetime(
2000,
1,
1,
1,
2,
)
assert get_next_refresh_time_("EVERY 1 HOUR", t) == datetime(
2000,
1,
1,
2,
)
assert get_next_refresh_time_("EVERY 1 DAY", t) == datetime(2000, 1, 2)
assert get_next_refresh_time_("EVERY 1 WEEK", t) == datetime(2000, 1, 10)
assert get_next_refresh_time_("EVERY 2 WEEK", t) == datetime(2000, 1, 17)
assert get_next_refresh_time_("EVERY 1 MONTH", t) == datetime(2000, 2, 1)
assert get_next_refresh_time_("EVERY 1 YEAR", t) == datetime(2001, 1, 1)
assert get_next_refresh_time_("EVERY 3 YEAR 4 MONTH 10 DAY", t) == datetime(
2003, 5, 11
)
# OFFSET
assert get_next_refresh_time_(
"EVERY 1 MONTH OFFSET 5 DAY 2 HOUR 30 MINUTE 15 SECOND", t
) == datetime(2000, 2, 6, 2, 30, 15)
assert get_next_refresh_time_(
"EVERY 1 YEAR 2 MONTH OFFSET 5 DAY 2 HOUR 30 MINUTE 15 SECOND", t
) == datetime(2001, 3, 6, 2, 30, 15)
assert get_next_refresh_time_(
"EVERY 2 WEEK OFFSET 5 DAY 15 HOUR 10 MINUTE", t
) == datetime(2000, 1, 22, 15, 10)
# AFTER
assert get_next_refresh_time_("AFTER 30 SECOND", t) == datetime(
2000, 1, 1, 1, 1, 31
)
assert get_next_refresh_time_("AFTER 30 MINUTE", t) == datetime(
2000, 1, 1, 1, 31, 1
)
assert get_next_refresh_time_("AFTER 2 HOUR", t) == datetime(2000, 1, 1, 3, 1, 1)
assert get_next_refresh_time_("AFTER 2 DAY", t) == datetime(2000, 1, 3, 1, 1, 1)
assert get_next_refresh_time_("AFTER 2 WEEK", t) == datetime(2000, 1, 15, 1, 1, 1)
assert get_next_refresh_time_("AFTER 2 MONTH", t) == datetime(2000, 3, 1, 1, 1, 1)
assert get_next_refresh_time_("AFTER 2 YEAR", t) == datetime(2002, 1, 1, 1, 1, 1)
assert get_next_refresh_time_("AFTER 2 YEAR 1 MONTH", t) == datetime(
2002, 2, 1, 1, 1, 1
)
assert get_next_refresh_time_("AFTER 1 MONTH 2 YEAR", t) == datetime(
2002, 2, 1, 1, 1, 1
)
# RANDOMIZE
next_refresh = get_next_refresh_time(
next_refresh = get_next_refresh_time_(
"EVERY 1 DAY OFFSET 2 HOUR RANDOMIZE FOR 1 HOUR", t
)
assert next_refresh == (datetime(2000, 1, 2, 2, 0), datetime(2000, 1, 2, 3, 0))
assert next_refresh == (datetime(2000, 1, 2, 1, 30), datetime(2000, 1, 2, 2, 30))
next_refresh = get_next_refresh_time(
next_refresh = get_next_refresh_time_(
"EVERY 2 MONTH 3 DAY 5 HOUR OFFSET 3 HOUR 20 SECOND RANDOMIZE FOR 3 DAY 1 HOUR",
t,
)
assert next_refresh == (
datetime(2000, 3, 4, 8, 0, 20),
datetime(2000, 3, 7, 9, 0, 20),
datetime(2000, 3, 2, 19, 30, 20),
datetime(2000, 3, 5, 20, 30, 20),
)
assert get_next_refresh_time("AFTER 2 MONTH 3 DAY RANDOMIZE FOR 1 DAY", t) == (
datetime(2000, 3, 4, 1, 1, 1),
datetime(2000, 3, 5, 1, 1, 1),
assert get_next_refresh_time_("AFTER 2 MONTH 3 DAY RANDOMIZE FOR 1 DAY", t) == (
datetime(2000, 3, 3, 13, 1, 1),
datetime(2000, 3, 4, 13, 1, 1),
)