This commit is contained in:
Michael Kolupaev 2023-12-05 00:01:38 +00:00
parent 0fc7535eba
commit f0417d0ec3
15 changed files with 358 additions and 58 deletions

View File

@ -16,6 +16,7 @@ Columns:
- `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start. - `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start.
- `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled. - `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled.
- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Exception'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace. - `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Exception'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace.
- `refresh_count` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of successful refreshes since last server restart or table creation.
- `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1. - `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far. - `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far.
- `total_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Estimated total number of rows that need to be read by the current refresh. - `total_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Estimated total number of rows that need to be read by the current refresh.

View File

@ -130,7 +130,7 @@ Currently refreshable materialized views require [Atomic database engine](../../
Example refresh schedules: Example refresh schedules:
```sql ```sql
REFRESH EVERY 1 DAY -- every day, at midnight REFRESH EVERY 1 DAY -- every day, at midnight (UTC)
REFRESH EVERY 1 MONTH -- on 1st day of every month, at midnight REFRESH EVERY 1 MONTH -- on 1st day of every month, at midnight
REFRESH EVERY 1 MONTH OFFSET 5 DAY 2 HOUR -- on 6th day of every month, at 2:00 am REFRESH EVERY 1 MONTH OFFSET 5 DAY 2 HOUR -- on 6th day of every month, at 2:00 am
REFRESH EVERY 2 WEEK OFFSET 5 DAY 15 HOUR 10 MINUTE -- every other Saturday, at 3:10 pm REFRESH EVERY 2 WEEK OFFSET 5 DAY 15 HOUR 10 MINUTE -- every other Saturday, at 3:10 pm
@ -181,6 +181,10 @@ A few more examples:
* `REFRESH AFTER 1 HOUR` depends on `REFRESH AFTER 1 HOUR`<br/> * `REFRESH AFTER 1 HOUR` depends on `REFRESH AFTER 1 HOUR`<br/>
Currently this is not recommended. Currently this is not recommended.
:::note
`DEPENDS ON` only works between refreshable materialized views. Listing a regular table in the `DEPENDS ON` list will prevent the view from ever refreshing (dependencies can be reomoved with `ALTER`, see below).
:::
### Changing Refresh Parameters {#changing-refresh-parameters} ### Changing Refresh Parameters {#changing-refresh-parameters}
To change refresh parameters: To change refresh parameters:

View File

@ -136,4 +136,9 @@ bool CalendarTimeInterval::operator==(const CalendarTimeInterval & rhs) const
return std::tie(months, seconds) == std::tie(rhs.months, rhs.seconds); return std::tie(months, seconds) == std::tie(rhs.months, rhs.seconds);
} }
bool CalendarTimeInterval::operator!=(const CalendarTimeInterval & rhs) const
{
return !(*this == rhs);
}
} }

View File

@ -57,6 +57,7 @@ struct CalendarTimeInterval
std::chrono::sys_seconds floor(std::chrono::system_clock::time_point t) const; std::chrono::sys_seconds floor(std::chrono::system_clock::time_point t) const;
bool operator==(const CalendarTimeInterval & rhs) const; bool operator==(const CalendarTimeInterval & rhs) const;
bool operator!=(const CalendarTimeInterval & rhs) const;
}; };
} }

View File

@ -65,6 +65,11 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo
query->replace(ast_create_query.select, metadata.select.select_query); query->replace(ast_create_query.select, metadata.select.select_query);
} }
if (metadata.refresh)
{
query->replace(ast_create_query.refresh_strategy, metadata.refresh);
}
/// MaterializedView, Dictionary are types of CREATE query without storage. /// MaterializedView, Dictionary are types of CREATE query without storage.
if (ast_create_query.storage) if (ast_create_query.storage)
{ {

View File

@ -38,7 +38,7 @@ ManualPipelineExecutor::~ManualPipelineExecutor()
} }
catch (...) catch (...)
{ {
tryLogCurrentException("ManualPipelineExecutor"); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }

View File

@ -742,7 +742,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
} }
else if (type == MODIFY_REFRESH) else if (type == MODIFY_REFRESH)
{ {
metadata.refresh = metadata.refresh->clone(); metadata.refresh = refresh->clone();
} }
else if (type == MODIFY_SETTING) else if (type == MODIFY_SETTING)
{ {

View File

@ -18,7 +18,7 @@ RefreshSchedule::RefreshSchedule(const ASTRefreshStrategy & strategy)
bool RefreshSchedule::operator!=(const RefreshSchedule & rhs) const bool RefreshSchedule::operator!=(const RefreshSchedule & rhs) const
{ {
static_assert(sizeof(*this) == 7*8, "If fields were added or removed in RefreshSchedule, please update this comparator."); static_assert(sizeof(*this) == 7*8, "If fields were added or removed in RefreshSchedule, please update this comparator.");
return std::tie(kind, period, offset, spread) == std::tie(rhs.kind, rhs.period, rhs.offset, rhs.spread); return std::tie(kind, period, offset, spread) != std::tie(rhs.kind, rhs.period, rhs.offset, rhs.spread);
} }
static std::chrono::sys_seconds advanceEvery(std::chrono::system_clock::time_point prev, CalendarTimeInterval period, CalendarTimeInterval offset) static std::chrono::sys_seconds advanceEvery(std::chrono::system_clock::time_point prev, CalendarTimeInterval period, CalendarTimeInterval offset)

View File

@ -73,7 +73,7 @@ void RefreshSet::Handle::reset()
RefreshSet::RefreshSet() = default; RefreshSet::RefreshSet() = default;
RefreshSet::Handle RefreshSet::emplace(StorageID id, std::vector<StorageID> dependencies, RefreshTaskHolder task) RefreshSet::Handle RefreshSet::emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task)
{ {
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
auto [it, is_inserted] = tasks.emplace(id, task); auto [it, is_inserted] = tasks.emplace(id, task);

View File

@ -35,6 +35,7 @@ struct RefreshInfo
LastRefreshResult last_refresh_result = LastRefreshResult::Unknown; LastRefreshResult last_refresh_result = LastRefreshResult::Unknown;
std::optional<UInt32> last_refresh_time; std::optional<UInt32> last_refresh_time;
UInt32 next_refresh_time = 0; UInt32 next_refresh_time = 0;
UInt64 refresh_count = 0;
String exception_message; // if last_refresh_result is Exception String exception_message; // if last_refresh_result is Exception
std::vector<StorageID> remaining_dependencies; std::vector<StorageID> remaining_dependencies;
ProgressValues progress; ProgressValues progress;
@ -80,7 +81,7 @@ public:
RefreshSet(); RefreshSet();
Handle emplace(StorageID id, std::vector<StorageID> dependencies, RefreshTaskHolder task); Handle emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task);
RefreshTaskHolder getTask(const StorageID & id) const; RefreshTaskHolder getTask(const StorageID & id) const;

View File

@ -73,11 +73,10 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy
for (auto && dependency : new_strategy.dependencies->children) for (auto && dependency : new_strategy.dependencies->children)
deps.emplace_back(dependency->as<const ASTTableIdentifier &>()); deps.emplace_back(dependency->as<const ASTTableIdentifier &>());
refresh_schedule = new_schedule;
/// Reschedule next refresh. /// Reschedule next refresh.
if (new_schedule != refresh_schedule) if (new_schedule != refresh_schedule)
{ {
refresh_schedule = new_schedule;
next_refresh_prescribed = {}; next_refresh_prescribed = {};
advanceNextRefreshTime(currentTime()); advanceNextRefreshTime(currentTime());
refresh_task->schedule(); refresh_task->schedule();
@ -93,7 +92,8 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy
if (!deps_set.contains(id)) if (!deps_set.contains(id))
removed_deps.push_back(id); removed_deps.push_back(id);
for (const auto & id : removed_deps) for (const auto & id : removed_deps)
arriveDependency(id); if (arriveDependency(id) && !std::exchange(refresh_immediately, true))
refresh_task->schedule();
/// TODO: Update settings once we have them. /// TODO: Update settings once we have them.
} }
@ -180,7 +180,7 @@ void RefreshTask::shutdown()
set_handle.reset(); set_handle.reset();
} }
void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds prescribed_time, const RefreshSchedule & parent_schedule) void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time, const RefreshSchedule & parent_schedule)
{ {
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
if (!set_handle) if (!set_handle)
@ -218,7 +218,7 @@ void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds p
/// Only accept the dependency's refresh if its next refresh time is after ours. /// Only accept the dependency's refresh if its next refresh time is after ours.
/// This takes care of cases (1)-(4), and seems harmless in all other cases. /// This takes care of cases (1)-(4), and seems harmless in all other cases.
/// Might be mildly helpful in weird cases like REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR. /// Might be mildly helpful in weird cases like REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR.
if (parent_schedule.prescribeNext(prescribed_time, currentTime()) <= next_refresh_prescribed) if (parent_next_prescribed_time <= next_refresh_prescribed)
return; return;
if (arriveDependency(parent_id) && !std::exchange(refresh_immediately, true)) if (arriveDependency(parent_id) && !std::exchange(refresh_immediately, true))
@ -334,7 +334,6 @@ void RefreshTask::refreshTask()
info.state = RefreshState::Running; info.state = RefreshState::Running;
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews); CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
auto prescribed_time = next_refresh_prescribed;
lock.unlock(); lock.unlock();
@ -348,9 +347,7 @@ void RefreshTask::refreshTask()
finished = executeRefreshUnlocked(); finished = executeRefreshUnlocked();
if (finished) if (finished)
completeRefreshUnlocked(view, prescribed_time); completeRefreshUnlocked(view);
lock.lock();
} }
catch (...) catch (...)
{ {
@ -360,7 +357,7 @@ void RefreshTask::refreshTask()
LOG_ERROR(log, message); LOG_ERROR(log, message);
/// Don't leave a trash table. /// Don't leave a trash table.
if (!finished && refresh_context) if (refresh_context)
cancelRefreshUnlocked(); cancelRefreshUnlocked();
lock.lock(); lock.lock();
@ -369,19 +366,32 @@ void RefreshTask::refreshTask()
info.exception_message = text; info.exception_message = text;
/// TODO: Do a few retries with exponential backoff. /// TODO: Do a few retries with exponential backoff.
if (!finished) advanceNextRefreshTime(currentTime());
advanceNextRefreshTime(currentTime());
}
chassert(lock.owns_lock());
if (finished) continue;
{
auto now = currentTime();
auto secs = std::chrono::floor<std::chrono::seconds>(now);
info.last_refresh_time = UInt32(secs.time_since_epoch().count());
info.last_refresh_result = LastRefreshResult::Finished;
advanceNextRefreshTime(now);
} }
lock.lock();
if (!finished)
continue;
auto now = currentTime();
auto secs = std::chrono::floor<std::chrono::seconds>(now);
info.last_refresh_time = UInt32(secs.time_since_epoch().count());
info.last_refresh_result = LastRefreshResult::Finished;
info.refresh_count += 1;
advanceNextRefreshTime(now);
auto next_time = next_refresh_prescribed;
auto refresh_schedule_copy = refresh_schedule;
lock.unlock();
StorageID my_id = view->getStorageID();
auto dependents = view->getContext()->getRefreshSet().getDependents(my_id);
for (const RefreshTaskHolder & dep_task : dependents)
dep_task->notify(my_id, next_time, refresh_schedule_copy);
lock.lock();
} }
} }
catch (...) catch (...)
@ -427,24 +437,13 @@ bool RefreshTask::executeRefreshUnlocked()
return finished; return finished;
} }
void RefreshTask::completeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view, std::chrono::sys_seconds prescribed_time) void RefreshTask::completeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view)
{ {
auto stale_table = view->exchangeTargetTable(refresh_query->table_id, refresh_context); auto stale_table = view->exchangeTargetTable(refresh_query->table_id, refresh_context);
auto refresh_context_ptr_copy = refresh_context; auto refresh_context_ptr_copy = refresh_context;
cleanStateUnlocked(); cleanStateUnlocked();
std::unique_lock lock(mutex); InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, view->getContext(), refresh_context_ptr_copy, stale_table, /*sync*/ true, /*ignore_sync_setting*/ true);
auto refresh_schedule_copy = refresh_schedule;
lock.unlock();
auto global_context = view->getContext();
StorageID my_id = view->getStorageID();
auto dependents = global_context->getRefreshSet().getDependents(my_id);
for (const RefreshTaskHolder & dep_task : dependents)
dep_task->notify(my_id, prescribed_time, refresh_schedule_copy);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, global_context, refresh_context_ptr_copy, stale_table, /*sync*/ true, /*ignore_sync_setting*/ true);
} }
void RefreshTask::cancelRefreshUnlocked() void RefreshTask::cancelRefreshUnlocked()

View File

@ -62,7 +62,7 @@ public:
void shutdown(); void shutdown();
/// Notify dependent task /// Notify dependent task
void notify(const StorageID & parent_id, std::chrono::sys_seconds prescribed_time, const RefreshSchedule & parent_schedule); void notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time, const RefreshSchedule & parent_schedule);
void setFakeTime(std::optional<Int64> t); void setFakeTime(std::optional<Int64> t);
@ -145,7 +145,7 @@ private:
void initializeRefreshUnlocked(std::shared_ptr<const StorageMaterializedView> view); void initializeRefreshUnlocked(std::shared_ptr<const StorageMaterializedView> view);
bool executeRefreshUnlocked(); bool executeRefreshUnlocked();
/// Whoever calls complete/cancelRefreshUnlocked() should also assign info.last_refresh_result. /// Whoever calls complete/cancelRefreshUnlocked() should also assign info.last_refresh_result.
void completeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view, std::chrono::sys_seconds prescribed_time); void completeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view);
void cancelRefreshUnlocked(); void cancelRefreshUnlocked();
void cleanStateUnlocked(); void cleanStateUnlocked();

View File

@ -24,6 +24,7 @@ NamesAndTypesList StorageSystemViewRefreshes::getNamesAndTypes()
{"next_refresh_time", std::make_shared<DataTypeDateTime>()}, {"next_refresh_time", std::make_shared<DataTypeDateTime>()},
{"remaining_dependencies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, {"remaining_dependencies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"exception", std::make_shared<DataTypeString>()}, {"exception", std::make_shared<DataTypeString>()},
{"refresh_count", std::make_shared<DataTypeUInt64>()},
{"progress", std::make_shared<DataTypeFloat64>()}, {"progress", std::make_shared<DataTypeFloat64>()},
{"elapsed", std::make_shared<DataTypeFloat64>()}, {"elapsed", std::make_shared<DataTypeFloat64>()},
{"read_rows", std::make_shared<DataTypeUInt64>()}, {"read_rows", std::make_shared<DataTypeUInt64>()},
@ -68,6 +69,7 @@ void StorageSystemViewRefreshes::fillData(
res_columns[i++]->insert(Array(deps)); res_columns[i++]->insert(Array(deps));
res_columns[i++]->insert(refresh.exception_message); res_columns[i++]->insert(refresh.exception_message);
res_columns[i++]->insert(refresh.refresh_count);
res_columns[i++]->insert(Float64(refresh.progress.read_rows) / refresh.progress.total_rows_to_read); res_columns[i++]->insert(Float64(refresh.progress.read_rows) / refresh.progress.total_rows_to_read);
res_columns[i++]->insert(refresh.progress.elapsed_ns / 1e9); res_columns[i++]->insert(refresh.progress.elapsed_ns / 1e9);
res_columns[i++]->insert(refresh.progress.read_rows); res_columns[i++]->insert(refresh.progress.read_rows);

View File

@ -1,5 +1,42 @@
a [] 1 <1: created view> a [] 1
CREATE MATERIALIZED VIEW <db>a\nREFRESH AFTER 1 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory AS\nSELECT number\nFROM numbers(2)\nUNION ALL\nSELECT rand64() CREATE MATERIALIZED VIEW default.a\nREFRESH AFTER 1 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory AS\nSELECT number AS x\nFROM numbers(2)\nUNION ALL\nSELECT rand64() AS x
3 1 1 <2: refreshed> 3 1 1
500 <3: time difference at least> 500
1 <4: next refresh in> 1
CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 YEAR\n(\n `x` Int16\n)\nENGINE = Memory AS\nSELECT x * 2 AS x\nFROM default.src
<5: no refresh> 3
<6: refreshed> 2
<7: refreshed> Scheduled Finished 2054-01-01 00:00:00
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR DEPENDS ON default.a\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192 AS\nSELECT x * 10 AS y\nFROM default.a
<8: refreshed> 20
<9: refreshed> a Scheduled Finished 2054-01-01 00:00:00
<9: refreshed> b Scheduled Finished 2054-01-01 00:00:00
<10: waiting> a Scheduled [] 2054-01-01 00:00:00
<10: waiting> b WaitingForDependencies ['default.a'] 2054-01-01 00:00:00
<11: chain-refreshed a> 4
<12: chain-refreshed b> 40
<13: chain-refreshed> a Scheduled [] Finished 2054-01-01 00:00:01 2056-01-01 00:00:00
<13: chain-refreshed> b Scheduled ['default.a'] Finished 2054-01-24 23:22:21 2056-01-01 00:00:00
<14: waiting for next cycle> a Scheduled [] 2058-01-01 00:00:00
<14: waiting for next cycle> b WaitingForDependencies ['default.a'] 2060-01-01 00:00:00
<15: chain-refreshed a> 6
<16: chain-refreshed b> 60
<17: chain-refreshed> a Scheduled 2062-01-01 00:00:00
<17: chain-refreshed> b Scheduled 2062-01-01 00:00:00
<18: removed dependency> b Scheduled [] 2062-03-03 03:03:03 2064-01-01 00:00:00 5
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192 AS\nSELECT x * 10 AS y\nFROM default.a
<19: exception> 1
<20: unexception> 1
<21: rename> 1
<22: rename> d Finished
<23: simple refresh> 1
<24: rename during refresh> 1
<25: rename during refresh> f Running
<26: paused+resumed> 1
<27: canceled> f Scheduled
CREATE MATERIALIZED VIEW default.g\nREFRESH EVERY 1 WEEK OFFSET 3 DAY 4 HOUR RANDOMIZE FOR 4 DAY 1 HOUR\n(\n `x` Int64\n)\nENGINE = Memory AS\nSELECT 42
<29: randomize> 1 1
CREATE MATERIALIZED VIEW default.h\nREFRESH EVERY 1 SECOND TO default.dest\n(\n `x` Int64\n) AS\nSELECT x * 10 AS x\nFROM default.src
<30: to existing table> 10
<31: to existing table> 10
<31: to existing table> 20

View File

@ -7,7 +7,7 @@ CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh . "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nq "drop table if exists refreshes; drop table if exists a;" CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`"
$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view" $CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view"
@ -17,16 +17,16 @@ $CLICKHOUSE_CLIENT -nq "
create materialized view a create materialized view a
refresh after 1 second (x UInt64) refresh after 1 second (x UInt64)
engine Memory engine Memory
as select number from numbers(2) union all select rand64()" as select number as x from numbers(2) union all select rand64() as x"
$CLICKHOUSE_CLIENT -nq "select view, remaining_dependencies, exception, last_refresh_result in ('Unknown', 'Finished') from refreshes"; $CLICKHOUSE_CLIENT -nq "select '<1: created view>', view, remaining_dependencies, exception, last_refresh_result in ('Unknown', 'Finished') from refreshes";
$CLICKHOUSE_CLIENT -nq "show create a" | sed "s/$CLICKHOUSE_DATABASE./<db>/" $CLICKHOUSE_CLIENT -nq "show create a"
# Wait for any refresh. # Wait for any refresh. (xargs trims the string and turns \t and \n into spaces)
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes" | xargs`" == 'Unknown' ] while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes" | xargs`" == 'Unknown' ]
do do
sleep 0.1 sleep 0.1
done done
# Check table contents. # Check table contents.
$CLICKHOUSE_CLIENT -nq "select count(), sum(x=0), sum(x=1) from a" $CLICKHOUSE_CLIENT -nq "select '<2: refreshed>', count(), sum(x=0), sum(x=1) from a"
# Wait for table contents to change. # Wait for table contents to change.
res1="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values'`" res1="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values'`"
while : while :
@ -35,7 +35,7 @@ do
[ "$res2" == "$res1" ] || break [ "$res2" == "$res1" ] || break
sleep 0.1 sleep 0.1
done done
time2="`$CLICKHOUSE_CLIENT -nq \"select reinterpret(now64(), 'Int64')\"`" time2="`$CLICKHOUSE_CLIENT -nq "select reinterpret(now64(), 'Int64')"`"
# Wait for another change. # Wait for another change.
while : while :
do do
@ -43,12 +43,257 @@ do
[ "$res3" == "$res2" ] || break [ "$res3" == "$res2" ] || break
sleep 0.1 sleep 0.1
done done
time3="`$CLICKHOUSE_CLIENT -nq \"select reinterpret(now64(), 'Int64')\"`"
# Check that the two changes were at least 500ms apart, in particular that we're not refreshing # Check that the two changes were at least 500ms apart, in particular that we're not refreshing
# like crazy. This is potentially flaky, but we need at least one test that uses non-mocked timer # like crazy. This is potentially flaky, but we need at least one test that uses non-mocked timer
# to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above. # to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above.
$CLICKHOUSE_CLIENT -nq " $CLICKHOUSE_CLIENT -nq "
select min2($time3-$time2, 500); select '<3: time difference at least>', min2(reinterpret(now64(), 'Int64') - $time2, 500);
select next_refresh_time-last_refresh_time from refreshes;" select '<4: next refresh in>', next_refresh_time-last_refresh_time from refreshes;"
$CLICKHOUSE_CLIENT -nq "drop table refreshes; drop table a;" # Create a source table from which views will read.
$CLICKHOUSE_CLIENT -nq "
create table src (x Int8) engine Memory as select 1"
# Switch to fake clock, change refresh schedule, change query.
$CLICKHOUSE_CLIENT -nq "
system test view a set fake time '2050-01-01 00:00:01';
alter table a modify refresh every 2 year;
alter table a modify query select x*2 as x from src;"
$CLICKHOUSE_CLIENT -nq "show create a"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes" | xargs`" != 'Scheduled 2052-01-01 00:00:00' ]
do
sleep 0.1
done
# Advance time to trigger the refresh.
$CLICKHOUSE_CLIENT -nq "
select '<5: no refresh>', count() from a;
system test view a set fake time '2052-02-03 04:05:06';"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes" | xargs`" != '2052-02-03 04:05:06' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<6: refreshed>', * from a;
select '<7: refreshed>', status, last_refresh_result, next_refresh_time from refreshes;"
# Create a dependent view, refresh it once.
$CLICKHOUSE_CLIENT -nq "
create materialized view b refresh every 2 year depends on a (y Int32) engine MergeTree order by y as select x*10 as y from a;
show create b;
system test view b set fake time '2052-11-11 11:11:11';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes where view = 'b'" | xargs`" != '2052-11-11 11:11:11' ]
do
sleep 0.1
done
# Next refresh shouldn't start until the dependency refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<8: refreshed>', * from b;
select '<9: refreshed>', view, status, last_refresh_result, next_refresh_time from refreshes;
system test view b set fake time '2054-01-24 23:22:21';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes where view = 'b'" | xargs`" != 'WaitingForDependencies 2054-01-01 00:00:00' ]
do
sleep 0.1
done
# Update source table (by dropping and re-creating it - to test that tables are looked up by name
# rather than uuid), kick off refresh of the dependency.
$CLICKHOUSE_CLIENT -nq "
select '<10: waiting>', view, status, remaining_dependencies, next_refresh_time from refreshes;
drop table src;
create table src (x Int16) engine Memory as select 2;
system test view a set fake time '2054-01-01 00:00:01';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b'" | xargs`" != 'Scheduled' ]
do
sleep 0.1
done
# Both tables should've refreshed.
$CLICKHOUSE_CLIENT -nq "
select '<11: chain-refreshed a>', * from a;
select '<12: chain-refreshed b>', * from b;
select '<13: chain-refreshed>', view, status, remaining_dependencies, last_refresh_result, last_refresh_time, next_refresh_time, exception from refreshes;"
# Make the dependent table run ahead by one refresh cycle, make sure it waits for the dependency to
# catch up to the same cycle.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2059-01-01 00:00:00';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b'" | xargs`" != '2060-01-01 00:00:00' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2061-01-01 00:00:00';
system test view a set fake time '2057-01-01 00:00:00';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes" | xargs`" != 'Scheduled 2058-01-01 00:00:00 WaitingForDependencies 2060-01-01 00:00:00' ]
do
sleep 0.1
done
sleep 1
$CLICKHOUSE_CLIENT -nq "
select '<14: waiting for next cycle>', view, status, remaining_dependencies, next_refresh_time from refreshes;
truncate src;
insert into src values (3);
system test view a set fake time '2060-02-02 02:02:02';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b'" | xargs`" != '2062-01-01 00:00:00' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<15: chain-refreshed a>', * from a;
select '<16: chain-refreshed b>', * from b;
select '<17: chain-refreshed>', view, status, next_refresh_time from refreshes;"
# Get to WaitingForDependencies state and remove the depencency.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2062-03-03 03:03:03'"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b'" | xargs`" != 'WaitingForDependencies' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
alter table b modify refresh every 2 year"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time from refreshes where view = 'b'" | xargs`" != 'Scheduled 2062-03-03 03:03:03' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<18: removed dependency>', view, status, remaining_dependencies, last_refresh_time,next_refresh_time, refresh_count from refreshes where view = 'b';
show create b;"
# Select from a table that doesn't exist, get an exception.
$CLICKHOUSE_CLIENT -nq "
drop table a;
drop table b;
create materialized view c refresh every 1 second (x Int64) engine Memory as select * from src;
drop table src;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes" | xargs`" != 'Exception' ]
do
sleep 0.1
done
# Check exception, create src, expect successful refresh.
$CLICKHOUSE_CLIENT -nq "
select '<19: exception>', exception ilike '%table%src%exist%UNKNOWN_TABLE%' from refreshes;
create table src (x Int64) engine Memory as select 1;
system refresh view c;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes" | xargs`" != 'Finished' ]
do
sleep 0.1
done
# Rename table.
$CLICKHOUSE_CLIENT -nq "
select '<20: unexception>', * from c;
rename table c to d;
select '<21: rename>', * from d;
select '<22: rename>', view, last_refresh_result from refreshes;"
# Do various things during a refresh.
# First make a nonempty view.
$CLICKHOUSE_CLIENT -nq "
drop table d;
truncate src;
insert into src values (1)
create materialized view e refresh every 1 second (x Int64) engine MergeTree order by x as select x + sleepEachRow(1) as x from src settings max_block_size = 1;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes" | xargs`" != 'Finished' ]
do
sleep 0.1
done
# Stop refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<23: simple refresh>', * from e;
system stop view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes" | xargs`" != 'Disabled' ]
do
sleep 0.1
done
# Make refreshes slow, make for a slow refresh to start. (We stopped refreshes first to make sure
# we wait for a slow refresh, not a previous fast one.)
$CLICKHOUSE_CLIENT -nq "
insert into src select * from numbers(1000) settings max_block_size=1;
system start view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes" | xargs`" != 'Running' ]
do
sleep 0.1
done
# Rename.
$CLICKHOUSE_CLIENT -nq "
rename table e to f;
select '<24: rename during refresh>', * from f;
select '<25: rename during refresh>', view, status from refreshes;
alter table f modify refresh after 10 year;"
sleep 2 # make it likely that at least one row was processed
# Pause.
rows_before_pause="`$CLICKHOUSE_CLIENT -nq "select read_rows from refreshes" | xargs`"
$CLICKHOUSE_CLIENT -nq "
system pause view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes" | xargs`" != 'Paused' ]
do
sleep 0.1
done
# Resume.
$CLICKHOUSE_CLIENT -nq "
system resume view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes" | xargs`" != 'Running' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<26: paused+resumed>', read_rows >= $rows_before_pause from refreshes"
# Cancel.
$CLICKHOUSE_CLIENT -nq "
system cancel view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes" | xargs`" != 'Canceled' ]
do
sleep 0.1
done
# Check that another refresh doesn't immediately start after the canceled one.
sleep 1
$CLICKHOUSE_CLIENT -nq "
select '<27: canceled>', view, status from refreshes;
system refresh view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes" | xargs`" != 'Running' ]
do
sleep 0.1
done
# Drop.
$CLICKHOUSE_CLIENT -nq "
drop table f;
select '<28: drop during refresh>', view, status from refreshes;"
# Try OFFSET and RANDOMIZE FOR.
$CLICKHOUSE_CLIENT -nq "
create materialized view g refresh every 1 week offset 3 day 4 hour randomize for 4 day 1 hour (x Int64) engine Memory as select 42;
show create g;
system test view g set fake time '2050-02-03 15:30:13';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time > '2049-01-01' from refreshes" | xargs`" != '1' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
with '2050-02-10 04:00:00'::DateTime as expected
select '<29: randomize>', abs(next_refresh_time::Int64 - expected::Int64) <= 3600*(24*4+1), next_refresh_time != expected from refreshes;"
# Send data 'TO' an existing table.
$CLICKHOUSE_CLIENT -nq "
drop table g;
create table dest (x Int64) engine MergeTree order by x;
truncate src;
insert into src values (1);
create materialized view h refresh every 1 second to dest as select x*10 as x from src;
show create h;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes" | xargs`" != 'Finished' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<30: to existing table>', * from dest;
insert into src values (2);"
while [ "`$CLICKHOUSE_CLIENT -nq "select count() from dest" | xargs`" != '2' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<31: to existing table>', * from dest;
drop table dest;
drop table src;
drop table h;
drop table refreshes;"