Merge pull request #56946 from ClickHouse/mv

Refreshable materialized views (takeover)
This commit is contained in:
Alexey Milovidov 2023-12-28 09:29:44 +01:00 committed by GitHub
commit a2faa65b08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 2738 additions and 84 deletions

View File

@ -0,0 +1,43 @@
---
slug: /en/operations/system-tables/view_refreshes
---
# view_refreshes
Information about [Refreshable Materialized Views](../../sql-reference/statements/create/view.md#refreshable-materialized-view). Contains all refreshable materialized views, regardless of whether there's a refresh in progress or not.
Columns:
- `database` ([String](../../sql-reference/data-types/string.md)) — The name of the database the table is in.
- `view` ([String](../../sql-reference/data-types/string.md)) — Table name.
- `status` ([String](../../sql-reference/data-types/string.md)) — Current state of the refresh.
- `last_refresh_result` ([String](../../sql-reference/data-types/string.md)) — Outcome of the latest refresh attempt.
- `last_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time of the last refresh attempt. `NULL` if no refresh attempts happened since server startup or table creation.
- `last_success_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time of the last successful refresh. `NULL` if no successful refreshes happened since server startup or table creation.
- `duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md)) — How long the last refresh attempt took.
- `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start.
- `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled.
- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Exception'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace.
- `refresh_count` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of successful refreshes since last server restart or table creation.
- `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far.
- `total_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Estimated total number of rows that need to be read by the current refresh.
(There are additional columns related to current refresh progress, but they are currently unreliable.)
**Example**
```sql
SELECT
database,
view,
status,
last_refresh_result,
last_refresh_time,
next_refresh_time
FROM system.view_refreshes
┌─database─┬─view───────────────────────┬─status────┬─last_refresh_result─┬───last_refresh_time─┬───next_refresh_time─┐
│ default │ hello_documentation_reader │ Scheduled │ Finished │ 2023-12-01 01:24:00 │ 2023-12-01 01:25:00 │
└──────────┴────────────────────────────┴───────────┴─────────────────────┴─────────────────────┴─────────────────────┘
```

View File

@ -6,28 +6,28 @@ sidebar_label: VIEW
# ALTER TABLE … MODIFY QUERY Statement
You can modify `SELECT` query that was specified when a [materialized view](../create/view.md#materialized) was created with the `ALTER TABLE … MODIFY QUERY` statement without interrupting ingestion process.
You can modify `SELECT` query that was specified when a [materialized view](../create/view.md#materialized) was created with the `ALTER TABLE … MODIFY QUERY` statement without interrupting ingestion process.
The `allow_experimental_alter_materialized_view_structure` setting must be enabled.
The `allow_experimental_alter_materialized_view_structure` setting must be enabled.
This command is created to change materialized view created with `TO [db.]name` clause. It does not change the structure of the underling storage table and it does not change the columns' definition of the materialized view, because of this the application of this command is very limited for materialized views are created without `TO [db.]name` clause.
**Example with TO table**
```sql
CREATE TABLE events (ts DateTime, event_type String)
CREATE TABLE events (ts DateTime, event_type String)
ENGINE = MergeTree ORDER BY (event_type, ts);
CREATE TABLE events_by_day (ts DateTime, event_type String, events_cnt UInt64)
CREATE TABLE events_by_day (ts DateTime, event_type String, events_cnt UInt64)
ENGINE = SummingMergeTree ORDER BY (event_type, ts);
CREATE MATERIALIZED VIEW mv TO events_by_day AS
CREATE MATERIALIZED VIEW mv TO events_by_day AS
SELECT toStartOfDay(ts) ts, event_type, count() events_cnt
FROM events
GROUP BY ts, event_type;
GROUP BY ts, event_type;
INSERT INTO events
SELECT Date '2020-01-01' + interval number * 900 second,
INSERT INTO events
SELECT Date '2020-01-01' + interval number * 900 second,
['imp', 'click'][number%2+1]
FROM numbers(100);
@ -43,23 +43,23 @@ ORDER BY ts, event_type;
│ 2020-01-02 00:00:00 │ imp │ 2 │
└─────────────────────┴────────────┴─────────────────┘
-- Let's add the new measurment `cost`
-- Let's add the new measurment `cost`
-- and the new dimension `browser`.
ALTER TABLE events
ALTER TABLE events
ADD COLUMN browser String,
ADD COLUMN cost Float64;
-- Column do not have to match in a materialized view and TO
-- (destination table), so the next alter does not break insertion.
ALTER TABLE events_by_day
ALTER TABLE events_by_day
ADD COLUMN cost Float64,
ADD COLUMN browser String after event_type,
MODIFY ORDER BY (event_type, ts, browser);
INSERT INTO events
SELECT Date '2020-01-02' + interval number * 900 second,
INSERT INTO events
SELECT Date '2020-01-02' + interval number * 900 second,
['imp', 'click'][number%2+1],
['firefox', 'safary', 'chrome'][number%3+1],
10/(number+1)%33
@ -82,16 +82,16 @@ ORDER BY ts, event_type;
└─────────────────────┴────────────┴─────────┴────────────┴──────┘
SET allow_experimental_alter_materialized_view_structure=1;
ALTER TABLE mv MODIFY QUERY
ALTER TABLE mv MODIFY QUERY
SELECT toStartOfDay(ts) ts, event_type, browser,
count() events_cnt,
sum(cost) cost
FROM events
GROUP BY ts, event_type, browser;
INSERT INTO events
SELECT Date '2020-01-03' + interval number * 900 second,
INSERT INTO events
SELECT Date '2020-01-03' + interval number * 900 second,
['imp', 'click'][number%2+1],
['firefox', 'safary', 'chrome'][number%3+1],
10/(number+1)%33
@ -138,7 +138,7 @@ PRIMARY KEY (event_type, ts)
ORDER BY (event_type, ts, browser)
SETTINGS index_granularity = 8192
-- !!! The columns' definition is unchanged but it does not matter, we are not quering
-- !!! The columns' definition is unchanged but it does not matter, we are not quering
-- MATERIALIZED VIEW, we are quering TO (storage) table.
-- SELECT section is updated.
@ -169,7 +169,7 @@ The application is very limited because you can only change the `SELECT` section
```sql
CREATE TABLE src_table (`a` UInt32) ENGINE = MergeTree ORDER BY a;
CREATE MATERIALIZED VIEW mv (`a` UInt32) ENGINE = MergeTree ORDER BY a AS SELECT a FROM src_table;
CREATE MATERIALIZED VIEW mv (`a` UInt32) ENGINE = MergeTree ORDER BY a AS SELECT a FROM src_table;
INSERT INTO src_table (a) VALUES (1), (2);
SELECT * FROM mv;
```
@ -199,3 +199,7 @@ SELECT * FROM mv;
## ALTER LIVE VIEW Statement
`ALTER LIVE VIEW ... REFRESH` statement refreshes a [Live view](../create/view.md#live-view). See [Force Live View Refresh](../create/view.md#live-view-alter-refresh).
## ALTER TABLE … MODIFY REFRESH Statement
`ALTER TABLE ... MODIFY REFRESH` statement changes refresh parameters of a [Refreshable Materialized View](../create/view.md#refreshable-materialized-view). See [Changing Refresh Parameters](../create/view.md#changing-refresh-parameters).

View File

@ -37,6 +37,7 @@ SELECT a, b, c FROM (SELECT ...)
```
## Parameterized View
Parametrized views are similar to normal views, but can be created with parameters which are not resolved immediately. These views can be used with table functions, which specify the name of the view as function name and the parameter values as its arguments.
``` sql
@ -66,7 +67,7 @@ When creating a materialized view with `TO [db].[table]`, you can't also use `PO
A materialized view is implemented as follows: when inserting data to the table specified in `SELECT`, part of the inserted data is converted by this `SELECT` query, and the result is inserted in the view.
:::note
:::note
Materialized views in ClickHouse use **column names** instead of column order during insertion into destination table. If some column names are not present in the `SELECT` query result, ClickHouse uses a default value, even if the column is not [Nullable](../../data-types/nullable.md). A safe practice would be to add aliases for every column when using Materialized views.
Materialized views in ClickHouse are implemented more like insert triggers. If theres some aggregation in the view query, its applied only to the batch of freshly inserted data. Any changes to existing data of source table (like update, delete, drop partition, etc.) does not change the materialized view.
@ -96,9 +97,116 @@ This feature is deprecated and will be removed in the future.
For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md)
## Refreshable Materialized View {#refreshable-materialized-view}
```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
REFRESH EVERY|AFTER interval [OFFSET interval]
RANDOMIZE FOR interval
DEPENDS ON [db.]name [, [db.]name [, ...]]
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
AS SELECT ...
```
where `interval` is a sequence of simple intervals:
```sql
number SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR
```
Periodically runs the corresponding query and stores its result in a table, atomically replacing the table's previous contents.
Differences from regular non-refreshable materialized views:
* No insert trigger. I.e. when new data is inserted into the table specified in SELECT, it's *not* automatically pushed to the refreshable materialized view. The periodic refresh runs the entire query and replaces the entire table.
* No restrictions on the SELECT query. Table functions (e.g. `url()`), views, UNION, JOIN, are all allowed.
:::note
Refreshable materialized views are a work in progress. Setting `allow_experimental_refreshable_materialized_view = 1` is required for creating one. Current limitations:
* not compatible with Replicated database or table engines,
* require [Atomic database engine](../../../engines/database-engines/atomic.md),
* no retries for failed refresh - we just skip to the next scheduled refresh time,
* no limit on number of concurrent refreshes.
:::
### Refresh Schedule
Example refresh schedules:
```sql
REFRESH EVERY 1 DAY -- every day, at midnight (UTC)
REFRESH EVERY 1 MONTH -- on 1st day of every month, at midnight
REFRESH EVERY 1 MONTH OFFSET 5 DAY 2 HOUR -- on 6th day of every month, at 2:00 am
REFRESH EVERY 2 WEEK OFFSET 5 DAY 15 HOUR 10 MINUTE -- every other Saturday, at 3:10 pm
REFRESH EVERY 30 MINUTE -- at 00:00, 00:30, 01:00, 01:30, etc
REFRESH AFTER 30 MINUTE -- 30 minutes after the previous refresh completes, no alignment with time of day
-- REFRESH AFTER 1 HOUR OFFSET 1 MINUTE -- syntax errror, OFFSET is not allowed with AFTER
```
`RANDOMIZE FOR` randomly adjusts the time of each refresh, e.g.:
```sql
REFRESH EVERY 1 DAY OFFSET 2 HOUR RANDOMIZE FOR 1 HOUR -- every day at random time between 01:30 and 02:30
```
At most one refresh may be running at a time, for a given view. E.g. if a view with `REFRESH EVERY 1 MINUTE` takes 2 minutes to refresh, it'll just be refreshing every 2 minutes. If it then becomes faster and starts refreshing in 10 seconds, it'll go back to refreshing every minute. (In particular, it won't refresh every 10 seconds to catch up with a backlog of missed refreshes - there's no such backlog.)
Additionally, a refresh is started immediately after the materialized view is created, unless `EMPTY` is specified in the `CREATE` query. If `EMPTY` is specified, the first refresh happens according to schedule.
### Dependencies {#refresh-dependencies}
`DEPENDS ON` synchronizes refreshes of different tables. By way of example, suppose there's a chain of two refreshable materialized views:
```sql
CREATE MATERIALIZED VIEW source REFRESH EVERY 1 DAY AS SELECT * FROM url(...)
CREATE MATERIALIZED VIEW destination REFRESH EVERY 1 DAY AS SELECT ... FROM source
```
Without `DEPENDS ON`, both views will start a refresh at midnight, and `destination` typically will see yesterday's data in `source`. If we add dependency:
```
CREATE MATERIALIZED VIEW destination REFRESH EVERY 1 DAY DEPENDS ON source AS SELECT ... FROM source
```
then `destination`'s refresh will start only after `source`'s refresh finished for that day, so `destination` will be based on fresh data.
Alternatively, the same result can be achieved with:
```
CREATE MATERIALIZED VIEW destination REFRESH AFTER 1 HOUR DEPENDS ON source AS SELECT ... FROM source
```
where `1 HOUR` can be any duration less than `source`'s refresh period. The dependent table won't be refreshed more frequently than any of its dependencies. This is a valid way to set up a chain of refreshable views without specifying the real refresh period more than once.
A few more examples:
* `REFRESH EVERY 1 DAY OFFSET 10 MINUTE` (`destination`) depends on `REFRESH EVERY 1 DAY` (`source`)<br/>
If `source` refresh takes more than 10 minutes, `destination` will wait for it.
* `REFRESH EVERY 1 DAY OFFSET 1 HOUR` depends on `REFRESH EVERY 1 DAY OFFSET 23 HOUR`<br/>
Similar to the above, even though the corresponding refreshes happen on different calendar days.
`destination`'s refresh on day X+1 will wait for `source`'s refresh on day X (if it takes more than 2 hours).
* `REFRESH EVERY 2 HOUR` depends on `REFRESH EVERY 1 HOUR`<br/>
The 2 HOUR refresh happens after the 1 HOUR refresh for every other hour, e.g. after the midnight
refresh, then after the 2am refresh, etc.
* `REFRESH EVERY 1 MINUTE` depends on `REFRESH EVERY 2 HOUR`<br/>
`REFRESH AFTER 1 MINUTE` depends on `REFRESH EVERY 2 HOUR`<br/>
`REFRESH AFTER 1 MINUTE` depends on `REFRESH AFTER 2 HOUR`<br/>
`destination` is refreshed once after every `source` refresh, i.e. every 2 hours. The `1 MINUTE` is effectively ignored.
* `REFRESH AFTER 1 HOUR` depends on `REFRESH AFTER 1 HOUR`<br/>
Currently this is not recommended.
:::note
`DEPENDS ON` only works between refreshable materialized views. Listing a regular table in the `DEPENDS ON` list will prevent the view from ever refreshing (dependencies can be removed with `ALTER`, see below).
:::
### Changing Refresh Parameters {#changing-refresh-parameters}
To change refresh parameters:
```
ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...]
```
:::note
This replaces refresh schedule *and* dependencies. If the table had a `DEPENDS ON`, doing a `MODIFY REFRESH` without `DEPENDS ON` will remove the dependencies.
:::
### Other operations
The status of all refreshable materialized views is available in table [`system.view_refreshes`](../../../operations/system-tables/view_refreshes.md). In particular, it contains refresh progress (if running), last and next refresh time, exception message if a refresh failed.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
## Window View [Experimental]
:::info
:::info
This is an experimental feature that may change in backwards-incompatible ways in the future releases. Enable usage of window views and `WATCH` query using [allow_experimental_window_view](../../../operations/settings/settings.md#allow-experimental-window-view) setting. Input the command `set allow_experimental_window_view = 1`.
:::

View File

@ -449,7 +449,7 @@ SYSTEM SYNC FILE CACHE [ON CLUSTER cluster_name]
```
### SYSTEM STOP LISTEN
## SYSTEM STOP LISTEN
Closes the socket and gracefully terminates the existing connections to the server on the specified port with the specified protocol.
@ -464,7 +464,7 @@ SYSTEM STOP LISTEN [ON CLUSTER cluster_name] [QUERIES ALL | QUERIES DEFAULT | QU
- If `QUERIES DEFAULT [EXCEPT .. [,..]]` modifier is specified, all default protocols are stopped, unless specified with `EXCEPT` clause.
- If `QUERIES CUSTOM [EXCEPT .. [,..]]` modifier is specified, all custom protocols are stopped, unless specified with `EXCEPT` clause.
### SYSTEM START LISTEN
## SYSTEM START LISTEN
Allows new connections to be established on the specified protocols.
@ -473,3 +473,47 @@ However, if the server on the specified port and protocol was not stopped using
```sql
SYSTEM START LISTEN [ON CLUSTER cluster_name] [QUERIES ALL | QUERIES DEFAULT | QUERIES CUSTOM | TCP | TCP WITH PROXY | TCP SECURE | HTTP | HTTPS | MYSQL | GRPC | POSTGRESQL | PROMETHEUS | CUSTOM 'protocol']
```
## Managing Refreshable Materialized Views {#refreshable-materialized-views}
Commands to control background tasks performed by [Refreshable Materialized Views](../../sql-reference/statements/create/view.md#refreshable-materialized-view)
Keep an eye on [`system.view_refreshes`](../../operations/system-tables/view_refreshes.md) while using them.
### SYSTEM REFRESH VIEW
Trigger an immediate out-of-schedule refresh of a given view.
```sql
SYSTEM REFRESH VIEW [db.]name
```
### SYSTEM STOP VIEW, SYSTEM STOP VIEWS
Disable periodic refreshing of the given view or all refreshable views. If a refresh is in progress, cancel it too.
```sql
SYSTEM STOP VIEW [db.]name
```
```sql
SYSTEM STOP VIEWS
```
### SYSTEM START VIEW, SYSTEM START VIEWS
Enable periodic refreshing for the given view or all refreshable views. No immediate refresh is triggered.
```sql
SYSTEM START VIEW [db.]name
```
```sql
SYSTEM START VIEWS
```
### SYSTEM CANCEL VIEW
If there's a refresh in progress for the given view, interrupt and cancel it. Otherwise do nothing.
```sql
SYSTEM CANCEL VIEW [db.]name
```

View File

@ -82,7 +82,8 @@ enum class AccessType
\
M(ALTER_VIEW_REFRESH, "ALTER LIVE VIEW REFRESH, REFRESH VIEW", VIEW, ALTER_VIEW) \
M(ALTER_VIEW_MODIFY_QUERY, "ALTER TABLE MODIFY QUERY", VIEW, ALTER_VIEW) \
M(ALTER_VIEW, "", GROUP, ALTER) /* allows to execute ALTER VIEW REFRESH, ALTER VIEW MODIFY QUERY;
M(ALTER_VIEW_MODIFY_REFRESH, "ALTER TABLE MODIFY QUERY", VIEW, ALTER_VIEW) \
M(ALTER_VIEW, "", GROUP, ALTER) /* allows to execute ALTER VIEW REFRESH, ALTER VIEW MODIFY QUERY, ALTER VIEW MODIFY REFRESH;
implicitly enabled by the grant ALTER_TABLE */\
\
M(ALTER, "", GROUP, ALL) /* allows to execute ALTER {TABLE|LIVE VIEW} */\
@ -177,6 +178,7 @@ enum class AccessType
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \
M(SYSTEM_DISTRIBUTED_SENDS, "SYSTEM STOP DISTRIBUTED SENDS, SYSTEM START DISTRIBUTED SENDS, STOP DISTRIBUTED SENDS, START DISTRIBUTED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP REPLICATED SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \

View File

@ -51,7 +51,7 @@ TEST(AccessRights, Union)
"CREATE DICTIONARY, DROP DATABASE, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, "
"TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, "
"SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, "
"SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION ADMIN ON db1");
}

View File

@ -226,6 +226,7 @@ add_object_library(clickhouse_storages_statistics Storages/Statistics)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/S3Queue)
add_object_library(clickhouse_storages_materializedview Storages/MaterializedView)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)
add_object_library(clickhouse_server Server)

View File

@ -0,0 +1,144 @@
#include <Common/CalendarTimeInterval.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
CalendarTimeInterval::CalendarTimeInterval(const CalendarTimeInterval::Intervals & intervals)
{
for (auto [kind, val] : intervals)
{
switch (kind.kind)
{
case IntervalKind::Nanosecond:
case IntervalKind::Microsecond:
case IntervalKind::Millisecond:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Sub-second intervals are not supported here");
case IntervalKind::Second:
case IntervalKind::Minute:
case IntervalKind::Hour:
case IntervalKind::Day:
case IntervalKind::Week:
seconds += val * kind.toAvgSeconds();
break;
case IntervalKind::Month:
months += val;
break;
case IntervalKind::Quarter:
months += val * 3;
break;
case IntervalKind::Year:
months += val * 12;
break;
}
}
}
CalendarTimeInterval::Intervals CalendarTimeInterval::toIntervals() const
{
Intervals res;
auto greedy = [&](UInt64 x, std::initializer_list<std::pair<IntervalKind, UInt64>> kinds)
{
for (auto [kind, count] : kinds)
{
UInt64 k = x / count;
if (k == 0)
continue;
x -= k * count;
res.emplace_back(kind, k);
}
chassert(x == 0);
};
greedy(months, {{IntervalKind::Year, 12}, {IntervalKind::Month, 1}});
greedy(seconds, {{IntervalKind::Week, 3600*24*7}, {IntervalKind::Day, 3600*24}, {IntervalKind::Hour, 3600}, {IntervalKind::Minute, 60}, {IntervalKind::Second, 1}});
return res;
}
UInt64 CalendarTimeInterval::minSeconds() const
{
return 3600*24 * (months/12 * 365 + months%12 * 28) + seconds;
}
UInt64 CalendarTimeInterval::maxSeconds() const
{
return 3600*24 * (months/12 * 366 + months%12 * 31) + seconds;
}
void CalendarTimeInterval::assertSingleUnit() const
{
if (seconds && months)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Interval shouldn't contain both calendar units and clock units (e.g. months and days)");
}
void CalendarTimeInterval::assertPositive() const
{
if (!seconds && !months)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Interval must be positive");
}
/// Number of whole months between 1970-01-01 and `t`.
static Int64 toAbsoluteMonth(std::chrono::system_clock::time_point t)
{
std::chrono::year_month_day ymd(std::chrono::floor<std::chrono::days>(t));
return (Int64(int(ymd.year())) - 1970) * 12 + Int64(unsigned(ymd.month()) - 1);
}
static std::chrono::sys_seconds startOfAbsoluteMonth(Int64 absolute_month)
{
Int64 year = absolute_month >= 0 ? absolute_month/12 : -((-absolute_month+11)/12);
Int64 month = absolute_month - year*12;
chassert(month >= 0 && month < 12);
std::chrono::year_month_day ymd(
std::chrono::year(int(year + 1970)),
std::chrono::month(unsigned(month + 1)),
std::chrono::day(1));
return std::chrono::sys_days(ymd);
}
std::chrono::sys_seconds CalendarTimeInterval::advance(std::chrono::system_clock::time_point tp) const
{
auto t = std::chrono::sys_seconds(std::chrono::floor<std::chrono::seconds>(tp));
if (months)
{
auto m = toAbsoluteMonth(t);
auto s = t - startOfAbsoluteMonth(m);
t = startOfAbsoluteMonth(m + Int64(months)) + s;
}
return t + std::chrono::seconds(Int64(seconds));
}
std::chrono::sys_seconds CalendarTimeInterval::floor(std::chrono::system_clock::time_point tp) const
{
assertSingleUnit();
assertPositive();
if (months)
return startOfAbsoluteMonth(toAbsoluteMonth(tp) / months * months);
else
{
constexpr std::chrono::seconds epoch(-3600*24*3);
auto t = std::chrono::sys_seconds(std::chrono::floor<std::chrono::seconds>(tp));
/// We want to align with weeks, but 1970-01-01 is a Thursday, so align with 1969-12-29 instead.
return std::chrono::sys_seconds((t.time_since_epoch() - epoch) / seconds * seconds + epoch);
}
}
bool CalendarTimeInterval::operator==(const CalendarTimeInterval & rhs) const
{
return std::tie(months, seconds) == std::tie(rhs.months, rhs.seconds);
}
bool CalendarTimeInterval::operator!=(const CalendarTimeInterval & rhs) const
{
return !(*this == rhs);
}
}

View File

@ -0,0 +1,63 @@
#pragma once
#include <Common/IntervalKind.h>
#include <chrono>
namespace DB
{
/// Represents a duration of calendar time, e.g.:
/// * 2 weeks + 5 minutes + and 21 seconds (aka 605121 seconds),
/// * 1 (calendar) month - not equivalent to any number of seconds!
/// * 3 years + 2 weeks (aka 36 months + 604800 seconds).
///
/// Be careful with calendar arithmetic: it's missing many familiar properties of numbers.
/// E.g. x + y - y is not always equal to x (October 31 + 1 month - 1 month = November 1).
struct CalendarTimeInterval
{
UInt64 seconds = 0;
UInt64 months = 0;
using Intervals = std::vector<std::pair<IntervalKind, UInt64>>;
CalendarTimeInterval() = default;
/// Year, Quarter, Month are converted to months.
/// Week, Day, Hour, Minute, Second are converted to seconds.
/// Millisecond, Microsecond, Nanosecond throw exception.
explicit CalendarTimeInterval(const Intervals & intervals);
/// E.g. for {36 months, 604801 seconds} returns {3 years, 2 weeks, 1 second}.
Intervals toIntervals() const;
/// Approximate shortest and longest duration in seconds. E.g. a month is [28, 31] days.
UInt64 minSeconds() const;
UInt64 maxSeconds() const;
/// Checks that the interval has only months or only seconds, throws otherwise.
void assertSingleUnit() const;
void assertPositive() const;
/// Add this interval to the timestamp. First months, then seconds.
/// Gets weird near month boundaries: October 31 + 1 month = December 1.
std::chrono::sys_seconds advance(std::chrono::system_clock::time_point t) const;
/// Rounds the timestamp down to the nearest timestamp "aligned" with this interval.
/// The interval must satisfy assertSingleUnit() and assertPositive().
/// * For months, rounds to the start of a month whose abosolute index is divisible by `months`.
/// The month index is 0-based starting from January 1970.
/// E.g. if the interval is 1 month, rounds down to the start of the month.
/// * For seconds, rounds to a timestamp x such that (x - December 29 1969 (Monday)) is divisible
/// by this interval.
/// E.g. if the interval is 1 week, rounds down to the start of the week (Monday).
///
/// Guarantees:
/// * advance(floor(x)) > x
/// * floor(advance(floor(x))) = advance(floor(x))
std::chrono::sys_seconds floor(std::chrono::system_clock::time_point t) const;
bool operator==(const CalendarTimeInterval & rhs) const;
bool operator!=(const CalendarTimeInterval & rhs) const;
};
}

View File

@ -253,6 +253,8 @@
M(MergeTreeAllRangesAnnouncementsSent, "The current number of announcement being sent in flight from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
M(CreatedTimersInQueryProfiler, "Number of Created thread local timers in QueryProfiler") \
M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \
M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \
M(RefreshingViews, "Number of materialized views currently executing a refresh") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -71,6 +71,8 @@ struct IntervalKind
/// Returns false if the conversion did not succeed.
/// For example, `IntervalKind::tryParseString('second', result)` returns `result` equals `IntervalKind::Kind::Second`.
static bool tryParseString(const std::string & kind, IntervalKind::Kind & result);
auto operator<=>(const IntervalKind & other) const { return kind <=> other.kind; }
};
/// NOLINTNEXTLINE

View File

@ -31,7 +31,7 @@ bool BackgroundSchedulePoolTaskInfo::schedule()
return true;
}
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t milliseconds, bool overwrite)
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t milliseconds, bool overwrite, bool only_if_scheduled)
{
std::lock_guard lock(schedule_mutex);
@ -39,6 +39,8 @@ bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t milliseconds, bool ove
return false;
if (delayed && !overwrite)
return false;
if (!delayed && only_if_scheduled)
return false;
pool.scheduleDelayedTask(shared_from_this(), milliseconds, lock);
return true;

View File

@ -106,8 +106,10 @@ public:
bool schedule();
/// Schedule for execution after specified delay.
/// If overwrite is set then the task will be re-scheduled (if it was already scheduled, i.e. delayed == true).
bool scheduleAfter(size_t milliseconds, bool overwrite = true);
/// If overwrite is set, and the task is already scheduled with a delay (delayed == true),
/// the task will be re-scheduled with the new delay.
/// If only_if_scheduled is set, don't do anything unless the task is already scheduled with a delay.
bool scheduleAfter(size_t milliseconds, bool overwrite = true, bool only_if_scheduled = false);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();

View File

@ -584,6 +584,8 @@ class IColumn;
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there're constants there", 0) \
M(Bool, deduplicate_blocks_in_dependent_materialized_views, false, "Should deduplicate blocks for materialized views if the block is not a duplicate for the table. Use true to always deduplicate in dependent tables.", 0) \
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
M(Bool, allow_experimental_refreshable_materialized_view, false, "Allow refreshable materialized views (CREATE MATERIALIZED VIEW <name> REFRESH ...).", 0) \
M(Bool, stop_refreshable_materialized_views_on_startup, false, "On server startup, prevent scheduling of refreshable materialized views, as if with SYSTEM STOP VIEWS. You can manually start them with SYSTEM START VIEWS or SYSTEM START VIEW <name> afterwards. Also applies to newly created views. Has no effect on non-refreshable materialized views.", 0) \
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \

View File

@ -65,6 +65,11 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo
query->replace(ast_create_query.select, metadata.select.select_query);
}
if (metadata.refresh)
{
query->replace(ast_create_query.refresh_strategy, metadata.refresh);
}
/// MaterializedView, Dictionary are types of CREATE query without storage.
if (ast_create_query.storage)
{

View File

@ -60,7 +60,7 @@ public:
/// Removes all dependencies of "table_id", returns those dependencies.
std::vector<StorageID> removeDependencies(const StorageID & table_id, bool remove_isolated_tables = false);
/// Removes a table from the graph and removes all references to in from the graph (both from its dependencies and dependents).
/// Removes a table from the graph and removes all references to it from the graph (both from its dependencies and dependents).
bool removeTable(const StorageID & table_id);
/// Removes tables from the graph by a specified filter.

View File

@ -18,6 +18,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsMove = 7;
extern const StorageActionBlockType PullReplicationLog = 8;
extern const StorageActionBlockType Cleanup = 9;
extern const StorageActionBlockType ViewRefresh = 10;
}

View File

@ -5,6 +5,7 @@
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTRefreshStrategy.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
@ -87,6 +88,12 @@ public:
visit(child);
}
void visit(ASTRefreshStrategy & refresh) const
{
ASTPtr unused;
visit(refresh, unused);
}
private:
ContextPtr context;
@ -229,6 +236,13 @@ private:
}
}
void visit(ASTRefreshStrategy & refresh, ASTPtr &) const
{
if (refresh.dependencies)
for (auto & table : refresh.dependencies->children)
tryVisit<ASTTableIdentifier>(table);
}
void visitChildren(IAST & ast) const
{
for (auto & child : ast.children)

View File

@ -95,6 +95,7 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/MaterializedView/RefreshSet.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
@ -289,6 +290,7 @@ struct ContextSharedPart : boost::noncopyable
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView)
ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
@ -825,6 +827,8 @@ MovesList & Context::getMovesList() { return shared->moves_list; }
const MovesList & Context::getMovesList() const { return shared->moves_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
RefreshSet & Context::getRefreshSet() { return shared->refresh_set; }
const RefreshSet & Context::getRefreshSet() const { return shared->refresh_set; }
String Context::resolveDatabase(const String & database_name) const
{

View File

@ -74,6 +74,7 @@ class BackgroundSchedulePool;
class MergeList;
class MovesList;
class ReplicatedFetchList;
class RefreshSet;
class Cluster;
class Compiler;
class MarkCache;
@ -922,6 +923,9 @@ public:
ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;
RefreshSet & getRefreshSet();
const RefreshSet & getRefreshSet() const;
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
/// If no ZooKeeper configured, throws an exception.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;

View File

@ -460,6 +460,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_VIEW_MODIFY_QUERY, database, table);
break;
}
case ASTAlterCommand::MODIFY_REFRESH:
{
required_access.emplace_back(AccessType::ALTER_VIEW_MODIFY_REFRESH, database, table);
break;
}
case ASTAlterCommand::LIVE_VIEW_REFRESH:
{
required_access.emplace_back(AccessType::ALTER_VIEW_REFRESH, database, table);

View File

@ -1089,6 +1089,13 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
"{} UUID specified, but engine of database {} is not Atomic", kind, create.getDatabase());
}
if (create.refresh_strategy && database->getEngineName() != "Atomic")
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Refreshable materialized view requires Atomic database engine, but database {} has engine {}", create.getDatabase(), database->getEngineName());
/// TODO: Support Replicated databases, only with Shared/ReplicatedMergeTree.
/// Figure out how to make the refreshed data appear all at once on other
/// replicas; maybe a replicated SYSTEM SYNC REPLICA query before the rename?
/// The database doesn't support UUID so we'll ignore it. The UUID could be set here because of either
/// a) the initiator of `ON CLUSTER` query generated it to ensure the same UUIDs are used on different hosts; or
/// b) `RESTORE from backup` query generated it to ensure the same UUIDs are used on different hosts.
@ -1210,6 +1217,16 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
visitor.visit(*create.select);
}
if (create.refresh_strategy)
{
if (!getContext()->getSettingsRef().allow_experimental_refreshable_materialized_view)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use.");
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
visitor.visit(*create.refresh_strategy);
}
if (create.columns_list)
{
AddDefaultDatabaseVisitor visitor(getContext(), current_database);

View File

@ -54,6 +54,7 @@
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/StorageAzureBlob.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Parsers/ASTSystemQuery.h>
@ -108,6 +109,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsMove;
extern const StorageActionBlockType PullReplicationLog;
extern const StorageActionBlockType Cleanup;
extern const StorageActionBlockType ViewRefresh;
}
@ -165,6 +167,8 @@ AccessType getRequiredAccessType(StorageActionBlockType action_type)
return AccessType::SYSTEM_PULLING_REPLICATION_LOG;
else if (action_type == ActionLocks::Cleanup)
return AccessType::SYSTEM_CLEANUP;
else if (action_type == ActionLocks::ViewRefresh)
return AccessType::SYSTEM_VIEWS;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type: {}", std::to_string(action_type));
}
@ -605,6 +609,23 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_CLEANUP:
startStopAction(ActionLocks::Cleanup, true);
break;
case Type::START_VIEW:
case Type::START_VIEWS:
startStopAction(ActionLocks::ViewRefresh, true);
break;
case Type::STOP_VIEW:
case Type::STOP_VIEWS:
startStopAction(ActionLocks::ViewRefresh, false);
break;
case Type::REFRESH_VIEW:
getRefreshTask()->run();
break;
case Type::CANCEL_VIEW:
getRefreshTask()->cancel();
break;
case Type::TEST_VIEW:
getRefreshTask()->setFakeTime(query.fake_time_for_view);
break;
case Type::DROP_REPLICA:
dropReplica(query);
break;
@ -1092,6 +1113,17 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYSTEM RESTART DISK is not supported");
}
RefreshTaskHolder InterpreterSystemQuery::getRefreshTask()
{
auto ctx = getContext();
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
auto task = ctx->getRefreshSet().getTask(table_id);
if (!task)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Refreshable view {} doesn't exist", table_id.getNameForLogs());
return task;
}
AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() const
{
@ -1241,6 +1273,20 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES, query.getDatabase(), query.getTable());
break;
}
case Type::REFRESH_VIEW:
case Type::START_VIEW:
case Type::START_VIEWS:
case Type::STOP_VIEW:
case Type::STOP_VIEWS:
case Type::CANCEL_VIEW:
case Type::TEST_VIEW:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_VIEWS);
else
required_access.emplace_back(AccessType::SYSTEM_VIEWS, query.getDatabase(), query.getTable());
break;
}
case Type::DROP_REPLICA:
case Type::DROP_DATABASE_REPLICA:
{

View File

@ -3,6 +3,7 @@
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Interpreters/StorageID.h>
#include <Common/ActionLock.h>
#include <Disks/IVolume.h>
@ -72,6 +73,8 @@ private:
void flushDistributed(ASTSystemQuery & query);
[[noreturn]] void restartDisk(String & name);
RefreshTaskHolder getRefreshTask();
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);
};

View File

@ -453,6 +453,12 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState &
<< (settings.hilite ? hilite_none : "");
select->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MODIFY_REFRESH)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "MODIFY REFRESH " << settings.nl_or_ws
<< (settings.hilite ? hilite_none : "");
refresh->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_VIEW_REFRESH)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "REFRESH " << (settings.hilite ? hilite_none : "");

View File

@ -40,6 +40,7 @@ public:
MODIFY_SETTING,
RESET_SETTING,
MODIFY_QUERY,
MODIFY_REFRESH,
REMOVE_TTL,
REMOVE_SAMPLE_BY,
@ -166,6 +167,9 @@ public:
*/
ASTPtr values;
/// For MODIFY REFRESH
ASTPtr refresh;
bool detach = false; /// true for DETACH PARTITION
bool part = false; /// true for ATTACH PART, DROP DETACHED PART and MOVE

View File

@ -2,7 +2,6 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Common/quoteString.h>
#include <Interpreters/StorageID.h>
#include <IO/Operators.h>
@ -340,6 +339,12 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
formatOnCluster(settings);
}
if (refresh_strategy)
{
settings.ostr << settings.nl_or_ws;
refresh_strategy->formatImpl(settings, state, frame);
}
if (to_table_id)
{
assert((is_materialized_view || is_window_view) && to_inner_uuid == UUIDHelpers::Nil);

View File

@ -5,6 +5,7 @@
#include <Parsers/ASTDictionary.h>
#include <Parsers/ASTDictionaryAttributeDeclaration.h>
#include <Parsers/ASTTableOverrides.h>
#include <Parsers/ASTRefreshStrategy.h>
#include <Interpreters/StorageID.h>
namespace DB
@ -116,6 +117,7 @@ public:
ASTExpressionList * dictionary_attributes_list = nullptr; /// attributes of
ASTDictionary * dictionary = nullptr; /// dictionary definition (layout, primary key, etc.)
ASTRefreshStrategy * refresh_strategy = nullptr; // For CREATE MATERIALIZED VIEW ... REFRESH ...
std::optional<UInt64> live_view_periodic_refresh; /// For CREATE LIVE VIEW ... WITH [PERIODIC] REFRESH ...
bool is_watermark_strictly_ascending{false}; /// STRICTLY ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW

View File

@ -0,0 +1,71 @@
#include <Parsers/ASTRefreshStrategy.h>
#include <IO/Operators.h>
namespace DB
{
ASTPtr ASTRefreshStrategy::clone() const
{
auto res = std::make_shared<ASTRefreshStrategy>(*this);
res->children.clear();
if (period)
res->set(res->period, period->clone());
if (offset)
res->set(res->offset, offset->clone());
if (spread)
res->set(res->spread, spread->clone());
if (settings)
res->set(res->settings, settings->clone());
if (dependencies)
res->set(res->dependencies, dependencies->clone());
res->schedule_kind = schedule_kind;
return res;
}
void ASTRefreshStrategy::formatImpl(
const IAST::FormatSettings & f_settings, IAST::FormatState & state, IAST::FormatStateStacked frame) const
{
frame.need_parens = false;
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << "REFRESH " << (f_settings.hilite ? hilite_none : "");
using enum RefreshScheduleKind;
switch (schedule_kind)
{
case AFTER:
f_settings.ostr << "AFTER " << (f_settings.hilite ? hilite_none : "");
period->formatImpl(f_settings, state, frame);
break;
case EVERY:
f_settings.ostr << "EVERY " << (f_settings.hilite ? hilite_none : "");
period->formatImpl(f_settings, state, frame);
if (offset)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " OFFSET " << (f_settings.hilite ? hilite_none : "");
offset->formatImpl(f_settings, state, frame);
}
break;
default:
f_settings.ostr << (f_settings.hilite ? hilite_none : "");
break;
}
if (spread)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " RANDOMIZE FOR " << (f_settings.hilite ? hilite_none : "");
spread->formatImpl(f_settings, state, frame);
}
if (dependencies)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " DEPENDS ON " << (f_settings.hilite ? hilite_none : "");
dependencies->formatImpl(f_settings, state, frame);
}
if (settings)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " SETTINGS " << (f_settings.hilite ? hilite_none : "");
settings->formatImpl(f_settings, state, frame);
}
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTTimeInterval.h>
namespace DB
{
enum class RefreshScheduleKind : UInt8
{
UNKNOWN = 0,
AFTER,
EVERY
};
/// Strategy for MATERIALIZED VIEW ... REFRESH ..
class ASTRefreshStrategy : public IAST
{
public:
ASTSetQuery * settings = nullptr;
ASTExpressionList * dependencies = nullptr;
ASTTimeInterval * period = nullptr;
ASTTimeInterval * offset = nullptr;
ASTTimeInterval * spread = nullptr;
RefreshScheduleKind schedule_kind{RefreshScheduleKind::UNKNOWN};
String getID(char) const override { return "Refresh strategy definition"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -90,6 +90,13 @@ public:
STOP_CLEANUP,
START_CLEANUP,
RESET_COVERAGE,
REFRESH_VIEW,
START_VIEW,
START_VIEWS,
STOP_VIEW,
STOP_VIEWS,
CANCEL_VIEW,
TEST_VIEW,
END
};
@ -133,6 +140,10 @@ public:
ServerType server_type;
/// For SYSTEM TEST VIEW <name> (SET FAKE TIME <time> | UNSET FAKE TIME).
/// Unix time.
std::optional<Int64> fake_time_for_view;
String getID(char) const override { return "SYSTEM query"; }
ASTPtr clone() const override

View File

@ -0,0 +1,28 @@
#include <Parsers/ASTTimeInterval.h>
#include <IO/Operators.h>
#include <ranges>
namespace DB
{
ASTPtr ASTTimeInterval::clone() const
{
return std::make_shared<ASTTimeInterval>(*this);
}
void ASTTimeInterval::formatImpl(const FormatSettings & f_settings, FormatState &, FormatStateStacked frame) const
{
frame.need_parens = false;
for (bool is_first = true; auto [kind, value] : interval.toIntervals())
{
if (!std::exchange(is_first, false))
f_settings.ostr << ' ';
f_settings.ostr << value << ' ';
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << kind.toKeyword() << (f_settings.hilite ? hilite_none : "");
}
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Parsers/IAST.h>
#include <Common/CalendarTimeInterval.h>
#include <map>
namespace DB
{
/// Compound time interval like 1 YEAR 3 DAY 15 MINUTE
class ASTTimeInterval : public IAST
{
public:
CalendarTimeInterval interval;
String getID(char) const override { return "TimeInterval"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -5,6 +5,7 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserPartition.h>
#include <Parsers/ParserRefreshStrategy.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -38,6 +39,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_modify_setting("MODIFY SETTING");
ParserKeyword s_reset_setting("RESET SETTING");
ParserKeyword s_modify_query("MODIFY QUERY");
ParserKeyword s_modify_refresh("MODIFY REFRESH");
ParserKeyword s_add_index("ADD INDEX");
ParserKeyword s_drop_index("DROP INDEX");
@ -133,6 +135,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
/* allow_empty = */ false);
ParserNameList values_p;
ParserSelectWithUnionQuery select_p;
ParserRefreshStrategy refresh_p;
ParserTTLExpressionList parser_ttl_list;
switch (alter_object)
@ -817,6 +820,12 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
command->type = ASTAlterCommand::MODIFY_QUERY;
}
else if (s_modify_refresh.ignore(pos, expected))
{
if (!refresh_p.parse(pos, command->refresh, expected))
return false;
command->type = ASTAlterCommand::MODIFY_REFRESH;
}
else if (s_modify_comment.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->comment, expected))

View File

@ -20,6 +20,7 @@
#include <Parsers/ParserProjectionSelectQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ParserRefreshStrategy.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTColumnDeclaration.h>
@ -1390,6 +1391,7 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ASTPtr as_database;
ASTPtr as_table;
ASTPtr select;
ASTPtr refresh_strategy;
String cluster_str;
bool attach = false;
@ -1436,6 +1438,15 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return false;
}
if (ParserKeyword{"REFRESH"}.ignore(pos, expected))
{
// REFRESH only with materialized views
if (!is_materialized_view)
return false;
if (!ParserRefreshStrategy{}.parse(pos, refresh_strategy, expected))
return false;
}
if (is_materialized_view && ParserKeyword{"TO INNER UUID"}.ignore(pos, expected))
{
ParserStringLiteral literal_p;
@ -1459,34 +1470,42 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return false;
}
if (is_materialized_view && !to_table)
if (is_materialized_view)
{
/// Internal ENGINE for MATERIALIZED VIEW must be specified.
/// Actually check it in Interpreter as default_table_engine can be set
storage_p.parse(pos, storage, expected);
if (!to_table)
{
/// Internal ENGINE for MATERIALIZED VIEW must be specified.
/// Actually check it in Interpreter as default_table_engine can be set
storage_p.parse(pos, storage, expected);
if (s_populate.ignore(pos, expected))
is_populate = true;
else if (s_empty.ignore(pos, expected))
is_create_empty = true;
if (s_populate.ignore(pos, expected))
is_populate = true;
else if (s_empty.ignore(pos, expected))
is_create_empty = true;
if (ParserKeyword{"TO"}.ignore(pos, expected))
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'ENGINE' and 'TO [db].[table]'");
}
else
{
if (storage_p.ignore(pos, expected))
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'ENGINE'");
if (ParserKeyword{"TO"}.ignore(pos, expected))
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'ENGINE' and 'TO [db].[table]'");
}
else
{
if (storage_p.ignore(pos, expected))
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'ENGINE'");
if (s_populate.ignore(pos, expected))
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'POPULATE'");
if (s_populate.ignore(pos, expected))
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'POPULATE'");
if (s_empty.ignore(pos, expected))
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'EMPTY'");
if (s_empty.ignore(pos, expected))
{
if (!refresh_strategy)
throw Exception(
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'EMPTY'");
is_create_empty = true;
}
}
}
/// AS SELECT ...
@ -1527,6 +1546,8 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
query->set(query->columns_list, columns_list);
query->set(query->storage, storage);
if (refresh_strategy)
query->set(query->refresh_strategy, refresh_strategy);
if (comment)
query->set(query->comment, comment);
@ -1535,7 +1556,6 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
query->set(query->select, select);
return true;
}
bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)

View File

@ -0,0 +1,96 @@
#include <Parsers/ParserRefreshStrategy.h>
#include <Parsers/ASTRefreshStrategy.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ParserTimeInterval.h>
#include <Parsers/CommonParsers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto refresh = std::make_shared<ASTRefreshStrategy>();
if (ParserKeyword{"AFTER"}.ignore(pos, expected))
{
refresh->schedule_kind = RefreshScheduleKind::AFTER;
ASTPtr period;
if (!ParserTimeInterval{}.parse(pos, period, expected))
return false;
refresh->set(refresh->period, period);
}
else if (ParserKeyword{"EVERY"}.ignore(pos, expected))
{
refresh->schedule_kind = RefreshScheduleKind::EVERY;
ASTPtr period;
if (!ParserTimeInterval{{.allow_mixing_calendar_and_clock_units = false}}.parse(pos, period, expected))
return false;
refresh->set(refresh->period, period);
if (ParserKeyword{"OFFSET"}.ignore(pos, expected))
{
ASTPtr periodic_offset;
if (!ParserTimeInterval{{.allow_zero = true}}.parse(pos, periodic_offset, expected))
return false;
if (periodic_offset->as<ASTTimeInterval>()->interval.maxSeconds()
>= period->as<ASTTimeInterval>()->interval.minSeconds())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"OFFSET must be less than the period");
refresh->set(refresh->offset, periodic_offset);
}
}
if (refresh->schedule_kind == RefreshScheduleKind::UNKNOWN)
return false;
if (ParserKeyword{"RANDOMIZE FOR"}.ignore(pos, expected))
{
ASTPtr spread;
if (!ParserTimeInterval{{.allow_zero = true}}.parse(pos, spread, expected))
return false;
refresh->set(refresh->spread, spread);
}
if (ParserKeyword{"DEPENDS ON"}.ignore(pos, expected))
{
if (refresh->schedule_kind == RefreshScheduleKind::AFTER)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"DEPENDS ON is allowed only for REFRESH EVERY, not REFRESH AFTER");
ASTPtr dependencies;
auto list_parser = ParserList{
std::make_unique<ParserCompoundIdentifier>(
/*table_name_with_optional_uuid_*/ true, /*allow_query_parameter_*/ false),
std::make_unique<ParserToken>(TokenType::Comma),
/*allow_empty*/ false};
if (!list_parser.parse(pos, dependencies, expected))
return false;
refresh->set(refresh->dependencies, dependencies);
}
// Refresh SETTINGS
if (ParserKeyword{"SETTINGS"}.ignore(pos, expected))
{
/// Settings are written like SET query, so parse them with ParserSetQuery
ASTPtr settings;
if (!ParserSetQuery{true}.parse(pos, settings, expected))
return false;
refresh->set(refresh->settings, settings);
}
node = refresh;
return true;
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/// Parser for ASTRefreshStrategy
class ParserRefreshStrategy : public IParserBase
{
protected:
const char * getName() const override { return "refresh strategy"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -5,6 +5,8 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/parseDatabaseAndTableName.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <magic_enum.hpp>
@ -388,6 +390,40 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
break;
case Type::REFRESH_VIEW:
case Type::START_VIEW:
case Type::STOP_VIEW:
case Type::CANCEL_VIEW:
if (!parseDatabaseAndTableAsAST(pos, expected, res->database, res->table))
return false;
break;
case Type::START_VIEWS:
case Type::STOP_VIEWS:
break;
case Type::TEST_VIEW:
{
if (!parseDatabaseAndTableAsAST(pos, expected, res->database, res->table))
return false;
if (ParserKeyword{"UNSET FAKE TIME"}.ignore(pos, expected))
break;
if (!ParserKeyword{"SET FAKE TIME"}.ignore(pos, expected))
return false;
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
String time_str = ast->as<ASTLiteral &>().value.get<const String &>();
ReadBufferFromString buf(time_str);
time_t time;
readDateTimeText(time, buf);
res->fake_time_for_view = Int64(time);
break;
}
case Type::SUSPEND:
{
if (!parseQueryWithOnCluster(res, pos, expected))

View File

@ -0,0 +1,55 @@
#include <Parsers/ParserTimeInterval.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseIntervalKind.h>
#include <Parsers/ASTTimeInterval.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
ParserTimeInterval::ParserTimeInterval(Options opt) : options(opt) {}
ParserTimeInterval::ParserTimeInterval() = default;
bool ParserTimeInterval::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
CalendarTimeInterval::Intervals intervals;
while (true)
{
ASTPtr value;
IntervalKind kind;
if (!ParserNumber{}.parse(pos, value, expected))
break;
if (!parseIntervalKind(pos, expected, kind))
return false;
UInt64 val;
if (!value->as<ASTLiteral &>().value.tryGet(val))
throw Exception(ErrorCodes::SYNTAX_ERROR, "Time interval must be an integer");
intervals.emplace_back(kind, val);
}
if (intervals.empty())
return false;
CalendarTimeInterval interval(intervals);
if (!options.allow_zero)
interval.assertPositive();
if (!options.allow_mixing_calendar_and_clock_units)
interval.assertSingleUnit();
auto time_interval = std::make_shared<ASTTimeInterval>();
time_interval->interval = interval;
node = time_interval;
return true;
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/// Parser for ASTTimeInterval
class ParserTimeInterval : public IParserBase
{
public:
struct Options
{
bool allow_mixing_calendar_and_clock_units = true;
bool allow_zero = false;
};
ParserTimeInterval();
explicit ParserTimeInterval(Options opt);
protected:
Options options;
const char * getName() const override { return "time interval"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -167,6 +167,7 @@ private:
friend class PushingAsyncPipelineExecutor;
friend class PullingAsyncPipelineExecutor;
friend class CompletedPipelineExecutor;
friend class RefreshTask;
friend class QueryPipelineBuilder;
};

View File

@ -402,6 +402,14 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
command.select = command_ast->select;
return command;
}
else if (command_ast->type == ASTAlterCommand::MODIFY_REFRESH)
{
AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::MODIFY_REFRESH;
command.refresh = command_ast->refresh;
return command;
}
else if (command_ast->type == ASTAlterCommand::RENAME_COLUMN)
{
AlterCommand command;
@ -715,7 +723,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
}
else if (type == MODIFY_QUERY)
{
metadata.select = SelectQueryDescription::getSelectQueryFromASTForMatView(select, context);
metadata.select = SelectQueryDescription::getSelectQueryFromASTForMatView(select, metadata.refresh != nullptr, context);
Block as_select_sample;
if (context->getSettingsRef().allow_experimental_analyzer)
@ -732,6 +740,10 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
metadata.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
}
else if (type == MODIFY_REFRESH)
{
metadata.refresh = refresh->clone();
}
else if (type == MODIFY_SETTING)
{
auto & settings_from_storage = metadata.settings_changes->as<ASTSetQuery &>().changes;

View File

@ -44,6 +44,7 @@ struct AlterCommand
MODIFY_SETTING,
RESET_SETTING,
MODIFY_QUERY,
MODIFY_REFRESH,
RENAME_COLUMN,
REMOVE_TTL,
MODIFY_DATABASE_SETTING,
@ -145,6 +146,9 @@ struct AlterCommand
/// For MODIFY_QUERY
ASTPtr select = nullptr;
/// For MODIFY_REFRESH
ASTPtr refresh = nullptr;
/// Target column name
String rename_to;

View File

@ -0,0 +1,58 @@
#include <Storages/MaterializedView/RefreshSchedule.h>
#include <Common/thread_local_rng.h>
namespace DB
{
RefreshSchedule::RefreshSchedule(const ASTRefreshStrategy & strategy)
{
kind = strategy.schedule_kind;
period = strategy.period->interval;
if (strategy.offset)
offset = strategy.offset->interval;
if (strategy.spread)
spread = strategy.spread->interval;
}
bool RefreshSchedule::operator!=(const RefreshSchedule & rhs) const
{
static_assert(sizeof(*this) == 7*8, "If fields were added or removed in RefreshSchedule, please update this comparator.");
return std::tie(kind, period, offset, spread) != std::tie(rhs.kind, rhs.period, rhs.offset, rhs.spread);
}
static std::chrono::sys_seconds advanceEvery(std::chrono::system_clock::time_point prev, CalendarTimeInterval period, CalendarTimeInterval offset)
{
auto period_start = period.floor(prev);
auto t = offset.advance(period_start);
if (t > prev)
return t;
t = offset.advance(period.advance(period_start));
chassert(t > prev);
return t;
}
std::chrono::sys_seconds RefreshSchedule::prescribeNext(
std::chrono::system_clock::time_point last_prescribed, std::chrono::system_clock::time_point now) const
{
if (kind == RefreshScheduleKind::AFTER)
return period.advance(now);
/// It's important to use prescribed instead of actual time here, otherwise we would do multiple
/// refreshes instead of one if the generated spread is negative and the the refresh completes
/// faster than the spread.
auto res = advanceEvery(last_prescribed, period, offset);
if (res < now)
res = advanceEvery(now, period, offset); // fell behind by a whole period, skip to current time
return res;
}
std::chrono::system_clock::time_point RefreshSchedule::addRandomSpread(std::chrono::sys_seconds prescribed_time) const
{
Int64 ms = Int64(spread.minSeconds() * 1000 / 2);
auto add = std::uniform_int_distribution(-ms, ms)(thread_local_rng);
return prescribed_time + std::chrono::milliseconds(add);
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Common/CalendarTimeInterval.h>
#include <Parsers/ASTRefreshStrategy.h>
#include <chrono>
namespace DB
{
class ASTRefreshStrategy;
struct RefreshSchedule
{
RefreshScheduleKind kind;
CalendarTimeInterval period;
CalendarTimeInterval offset;
CalendarTimeInterval spread;
explicit RefreshSchedule(const ASTRefreshStrategy & strategy);
bool operator!=(const RefreshSchedule & rhs) const;
/// Tells when to do the next refresh (without random spread).
std::chrono::sys_seconds prescribeNext(
std::chrono::system_clock::time_point last_prescribed, std::chrono::system_clock::time_point now) const;
std::chrono::system_clock::time_point addRandomSpread(std::chrono::sys_seconds prescribed_time) const;
};
}

View File

@ -0,0 +1,141 @@
#include <Storages/MaterializedView/RefreshSet.h>
#include <Storages/MaterializedView/RefreshTask.h>
namespace CurrentMetrics
{
extern const Metric RefreshableViews;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RefreshSet::Handle::Handle(Handle && other) noexcept
{
*this = std::move(other);
}
RefreshSet::Handle & RefreshSet::Handle::operator=(Handle && other) noexcept
{
if (this == &other)
return *this;
reset();
parent_set = std::exchange(other.parent_set, nullptr);
id = std::move(other.id);
dependencies = std::move(other.dependencies);
metric_increment = std::move(other.metric_increment);
return *this;
}
RefreshSet::Handle::~Handle()
{
reset();
}
void RefreshSet::Handle::rename(StorageID new_id)
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
auto it = parent_set->tasks.find(id);
auto task = it->second;
parent_set->tasks.erase(it);
id = new_id;
parent_set->tasks.emplace(id, task);
parent_set->addDependenciesLocked(id, dependencies);
}
void RefreshSet::Handle::changeDependencies(std::vector<StorageID> deps)
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
dependencies = std::move(deps);
parent_set->addDependenciesLocked(id, dependencies);
}
void RefreshSet::Handle::reset()
{
if (!parent_set)
return;
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
parent_set->tasks.erase(id);
}
parent_set = nullptr;
metric_increment.reset();
}
RefreshSet::RefreshSet() = default;
RefreshSet::Handle RefreshSet::emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task)
{
std::lock_guard guard(mutex);
auto [it, is_inserted] = tasks.emplace(id, task);
if (!is_inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already exists for table {}", id.getFullTableName());
addDependenciesLocked(id, dependencies);
return Handle(this, id, dependencies);
}
void RefreshSet::addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
{
for (const StorageID & dep : dependencies)
dependents[dep].insert(id);
}
void RefreshSet::removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
{
for (const StorageID & dep : dependencies)
{
auto & set = dependents[dep];
set.erase(id);
if (set.empty())
dependents.erase(dep);
}
}
RefreshTaskHolder RefreshSet::getTask(const StorageID & id) const
{
std::lock_guard lock(mutex);
if (auto task = tasks.find(id); task != tasks.end())
return task->second;
return nullptr;
}
RefreshSet::InfoContainer RefreshSet::getInfo() const
{
std::unique_lock lock(mutex);
auto tasks_copy = tasks;
lock.unlock();
InfoContainer res;
for (auto [id, task] : tasks_copy)
res.push_back(task->getInfo());
return res;
}
std::vector<RefreshTaskHolder> RefreshSet::getDependents(const StorageID & id) const
{
std::lock_guard lock(mutex);
std::vector<RefreshTaskHolder> res;
auto it = dependents.find(id);
if (it == dependents.end())
return {};
for (const StorageID & dep_id : it->second)
if (auto task = tasks.find(dep_id); task != tasks.end())
res.push_back(task->second);
return res;
}
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_)
: parent_set(parent_set_), id(std::move(id_)), dependencies(std::move(dependencies_))
, metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}
}

View File

@ -0,0 +1,109 @@
#pragma once
#include <Parsers/ASTIdentifier.h>
#include <Storages/IStorage.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Common/CurrentMetrics.h>
namespace DB
{
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
enum class RefreshState : RefreshTaskStateUnderlying
{
Disabled = 0,
Scheduled,
WaitingForDependencies,
Running,
};
enum class LastRefreshResult : RefreshTaskStateUnderlying
{
Unknown = 0,
Cancelled,
Exception,
Finished
};
struct RefreshInfo
{
StorageID view_id = StorageID::createEmpty();
RefreshState state = RefreshState::Scheduled;
LastRefreshResult last_refresh_result = LastRefreshResult::Unknown;
std::optional<UInt32> last_attempt_time;
std::optional<UInt32> last_success_time;
UInt64 last_attempt_duration_ms = 0;
UInt32 next_refresh_time = 0;
UInt64 refresh_count = 0;
String exception_message; // if last_refresh_result is Exception
std::vector<StorageID> remaining_dependencies;
ProgressValues progress;
};
/// Set of refreshable views
class RefreshSet
{
public:
/// RAII thing that unregisters a task and its dependencies in destructor.
/// Storage IDs must be unique. Not thread safe.
class Handle
{
friend class RefreshSet;
public:
Handle() = default;
Handle(Handle &&) noexcept;
Handle & operator=(Handle &&) noexcept;
~Handle();
void rename(StorageID new_id);
void changeDependencies(std::vector<StorageID> deps);
void reset();
explicit operator bool() const { return parent_set != nullptr; }
const StorageID & getID() const { return id; }
const std::vector<StorageID> & getDependencies() const { return dependencies; }
private:
RefreshSet * parent_set = nullptr;
StorageID id = StorageID::createEmpty();
std::vector<StorageID> dependencies;
std::optional<CurrentMetrics::Increment> metric_increment;
Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_);
};
using InfoContainer = std::vector<RefreshInfo>;
RefreshSet();
Handle emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task);
RefreshTaskHolder getTask(const StorageID & id) const;
InfoContainer getInfo() const;
/// Get tasks that depend on the given one.
std::vector<RefreshTaskHolder> getDependents(const StorageID & id) const;
private:
using TaskMap = std::unordered_map<StorageID, RefreshTaskHolder, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using DependentsMap = std::unordered_map<StorageID, DatabaseAndTableNameSet, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
/// Protects the two maps below, not locked for any nontrivial operations (e.g. operations that
/// block or lock other mutexes).
mutable std::mutex mutex;
TaskMap tasks;
DependentsMap dependents;
void addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
void removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
};
}

View File

@ -0,0 +1,8 @@
#include <Storages/MaterializedView/RefreshSettings.h>
namespace DB
{
IMPLEMENT_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS)
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <Core/BaseSettings.h>
namespace DB
{
#define LIST_OF_REFRESH_SETTINGS(M, ALIAS) \
/// TODO: Add settings
/// M(UInt64, name, 42, "...", 0)
DECLARE_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS)
struct RefreshSettings : public BaseSettings<RefreshSettingsTraits> {};
}

View File

@ -0,0 +1,504 @@
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/StorageMaterializedView.h>
#include <Common/CurrentMetrics.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/ProcessList.h>
#include <Parsers/ASTCreateQuery.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/ReadProgressCallback.h>
namespace CurrentMetrics
{
extern const Metric RefreshingViews;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
}
RefreshTask::RefreshTask(
const ASTRefreshStrategy & strategy)
: log(&Poco::Logger::get("RefreshTask"))
, refresh_schedule(strategy)
{}
RefreshTaskHolder RefreshTask::create(
const StorageMaterializedView & view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy)
{
auto task = std::make_shared<RefreshTask>(strategy);
task->refresh_task = context->getSchedulePool().createTask("MaterializedViewRefresherTask",
[self = task->weak_from_this()]
{
if (auto t = self.lock())
t->refreshTask();
});
std::vector<StorageID> deps;
if (strategy.dependencies)
for (auto && dependency : strategy.dependencies->children)
deps.emplace_back(dependency->as<const ASTTableIdentifier &>());
task->set_handle = context->getRefreshSet().emplace(view.getStorageID(), deps, task);
return task;
}
void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> view)
{
view_to_refresh = view;
if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup)
stop_requested = true;
populateDependencies();
advanceNextRefreshTime(currentTime());
refresh_task->schedule();
}
void RefreshTask::rename(StorageID new_id)
{
std::lock_guard guard(mutex);
set_handle.rename(new_id);
}
void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy)
{
std::lock_guard guard(mutex);
RefreshSchedule new_schedule(new_strategy);
std::vector<StorageID> deps;
if (new_strategy.dependencies)
for (auto && dependency : new_strategy.dependencies->children)
deps.emplace_back(dependency->as<const ASTTableIdentifier &>());
/// Reschedule next refresh.
if (new_schedule != refresh_schedule)
{
refresh_schedule = new_schedule;
next_refresh_prescribed = {};
advanceNextRefreshTime(currentTime());
refresh_task->schedule();
}
/// Update dependency graph.
set_handle.changeDependencies(deps);
/// Mark removed dependencies as satisfied.
DatabaseAndTableNameSet deps_set(deps.begin(), deps.end());
std::vector<StorageID> removed_deps;
for (const auto & id : remaining_dependencies)
if (!deps_set.contains(id))
removed_deps.push_back(id);
for (const auto & id : removed_deps)
if (arriveDependency(id) && !std::exchange(refresh_immediately, true))
refresh_task->schedule();
/// TODO: Update settings once we have them.
}
RefreshInfo RefreshTask::getInfo() const
{
std::lock_guard guard(mutex);
auto res = info;
res.view_id = set_handle.getID();
res.remaining_dependencies.assign(remaining_dependencies.begin(), remaining_dependencies.end());
if (res.last_refresh_result != LastRefreshResult::Exception)
res.exception_message.clear();
res.progress = progress.getValues();
return res;
}
void RefreshTask::start()
{
std::lock_guard guard(mutex);
if (!std::exchange(stop_requested, false))
return;
refresh_task->schedule();
}
void RefreshTask::stop()
{
std::lock_guard guard(mutex);
if (std::exchange(stop_requested, true))
return;
interruptExecution();
refresh_task->schedule();
}
void RefreshTask::run()
{
std::lock_guard guard(mutex);
if (std::exchange(refresh_immediately, true))
return;
refresh_task->schedule();
}
void RefreshTask::cancel()
{
std::lock_guard guard(mutex);
interruptExecution();
refresh_task->schedule();
}
void RefreshTask::shutdown()
{
{
std::lock_guard guard(mutex);
stop_requested = true;
interruptExecution();
}
/// Wait for the task to return and prevent it from being scheduled in future.
refresh_task->deactivate();
/// Remove from RefreshSet on DROP, without waiting for the IStorage to be destroyed.
/// This matters because a table may get dropped and immediately created again with the same name,
/// while the old table's IStorage still exists (pinned by ongoing queries).
/// (Also, RefreshSet holds a shared_ptr to us.)
std::lock_guard guard(mutex);
set_handle.reset();
}
void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time)
{
std::lock_guard guard(mutex);
if (!set_handle)
return; // we've shut down
/// In the general case, it's not clear what the meaning of dependencies should be.
/// E.g. what behavior would the user want/expect in the following cases?:
/// * REFRESH EVERY 3 HOUR depends on REFRESH EVERY 2 HOUR
/// * REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR
/// * REFRESH AFTER 3 HOUR depends on REFRESH EVERY 1 DAY
/// I don't know.
///
/// Cases that are important to support well include:
/// (1) REFRESH EVERY 1 DAY depends on REFRESH EVERY 1 DAY
/// Here the second refresh should start only after the first refresh completed *for the same day*.
/// Yesterday's refresh of the dependency shouldn't trigger today's refresh of the dependent,
/// even if it completed today.
/// (2) REFRESH EVERY 1 DAY OFFSET 2 HOUR depends on REFRESH EVERY 1 DAY OFFSET 1 HOUR
/// (3) REFRESH EVERY 1 DAY OFFSET 1 HOUR depends on REFRESH EVERY 1 DAY OFFSET 23 HOUR
/// Here the dependency's refresh on day X should trigger dependent's refresh on day X+1.
/// (4) REFRESH EVERY 2 HOUR depends on REFRESH EVERY 1 HOUR
/// The 2 HOUR refresh should happen after the 1 HOUR refresh for every other hour, e.g.
/// after the 2pm refresh, then after the 4pm refresh, etc.
///
/// We currently don't allow dependencies in REFRESH AFTER case, because its unclear how to define
/// it in a non-confusing way. Consider view y that depends on view x, both with
/// REFRESH AFTER 1 hour. The user's intention is probably to make y always refresh immediately
/// after x. But suppose y takes slightly longer to refresh than x. If we don't do anything
/// special, x's refresh schedule will run ahead, and the DEPENDS ON will have pretty much no
/// effect - confusing! As a dirty way to prevent this, we could just decrease refresh period by,
/// say, 50%, if the view has dependencies at all. But that still sounds more confusing than useful.
/// Or we could say that we only refresh y if x refreshes less than 10% of 1 HOUR ago, so in our
/// scenario y would be refreshing every 2 hours instead of 1 hour sometimes.
/// Only accept the dependency's refresh if its next refresh time is after ours.
/// This takes care of cases (1)-(4).
if (parent_next_prescribed_time <= next_refresh_prescribed)
return;
if (arriveDependency(parent_id) && !std::exchange(refresh_immediately, true))
refresh_task->schedule();
}
void RefreshTask::setFakeTime(std::optional<Int64> t)
{
std::unique_lock lock(mutex);
fake_clock.store(t.value_or(INT64_MIN), std::memory_order_relaxed);
/// Reschedule task with shorter delay if currently scheduled.
refresh_task->scheduleAfter(100, /*overwrite*/ true, /*only_if_scheduled*/ true);
}
void RefreshTask::refreshTask()
{
try
{
std::unique_lock lock(mutex);
/// Whoever breaks out of this loop should assign info.state first.
while (true)
{
chassert(lock.owns_lock());
interrupt_execution.store(false);
if (stop_requested)
{
/// Exit the task and wait for the user to start or resume, which will schedule the task again.
info.state = RefreshState::Disabled;
break;
}
if (!refresh_immediately)
{
auto now = currentTime();
if (now >= next_refresh_with_spread)
{
if (arriveTime())
refresh_immediately = true;
else
{
info.state = RefreshState::WaitingForDependencies;
break;
}
}
else
{
size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
next_refresh_with_spread - now).count();
/// If we're in a test that fakes the clock, poll every 100ms.
if (fake_clock.load(std::memory_order_relaxed) != INT64_MIN)
delay_ms = 100;
refresh_task->scheduleAfter(delay_ms);
info.state = RefreshState::Scheduled;
break;
}
}
/// Perform a refresh.
refresh_immediately = false;
auto view = lockView();
if (!view)
{
/// The view was dropped. This RefreshTask should be destroyed soon too.
/// (Maybe this is unreachable.)
info.state = RefreshState::Disabled;
break;
}
info.state = RefreshState::Running;
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
lock.unlock();
bool refreshed = false;
std::optional<String> exception;
auto start_time = std::chrono::steady_clock::now();
try
{
executeRefreshUnlocked(view);
refreshed = true;
}
catch (...)
{
if (!interrupt_execution.load())
{
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
auto text = message.text;
message.text = fmt::format("Refresh failed: {}", message.text);
LOG_ERROR(log, message);
exception = text;
}
}
lock.lock();
auto now = currentTime();
auto secs = std::chrono::floor<std::chrono::seconds>(now);
info.last_attempt_time = UInt32(secs.time_since_epoch().count());
info.last_attempt_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
if (exception)
{
info.last_refresh_result = LastRefreshResult::Exception;
info.exception_message = *exception;
/// TODO: Do a few retries with exponential backoff.
advanceNextRefreshTime(now);
}
else if (!refreshed)
{
info.last_refresh_result = LastRefreshResult::Cancelled;
/// Make sure we don't just start another refresh immediately.
if (!stop_requested && now >= next_refresh_with_spread)
advanceNextRefreshTime(now);
}
else
{
info.last_refresh_result = LastRefreshResult::Finished;
info.last_success_time = info.last_attempt_time;
info.refresh_count += 1;
advanceNextRefreshTime(now);
auto next_time = next_refresh_prescribed;
lock.unlock();
StorageID my_id = view->getStorageID();
auto dependents = view->getContext()->getRefreshSet().getDependents(my_id);
for (const RefreshTaskHolder & dep_task : dependents)
dep_task->notify(my_id, next_time);
lock.lock();
}
}
}
catch (...)
{
std::unique_lock lock(mutex);
stop_requested = true;
tryLogCurrentException(log,
"Unexpected exception in refresh scheduling, please investigate. The view will be stopped.");
#ifdef ABORT_ON_LOGICAL_ERROR
abortOnFailedAssertion("Unexpected exception in refresh scheduling");
#endif
}
}
void RefreshTask::executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view)
{
LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName());
progress.reset();
/// Create a table.
auto [refresh_context, refresh_query] = view->prepareRefresh();
StorageID stale_table = StorageID::createEmpty();
try
{
/// Run the query.
{
CurrentThread::QueryScope query_scope(refresh_context); // create a thread group for the query
BlockIO block_io = InterpreterInsertQuery(refresh_query, refresh_context).execute();
QueryPipeline & pipeline = block_io.pipeline;
pipeline.setProgressCallback([this](const Progress & prog)
{
/// TODO: Investigate why most fields are not populated. Change columns in system.view_refreshes as needed, update documentation (docs/en/operations/system-tables/view_refreshes.md).
progress.incrementPiecewiseAtomically(prog);
});
/// Add the query to system.processes and allow it to be killed with KILL QUERY.
String query_for_logging = refresh_query->formatForLogging(
refresh_context->getSettingsRef().log_queries_cut_to_length);
block_io.process_list_entry = refresh_context->getProcessList().insert(
query_for_logging, refresh_query.get(), refresh_context, Stopwatch{CLOCK_MONOTONIC}.getStart());
pipeline.setProcessListElement(block_io.process_list_entry->getQueryStatus());
refresh_context->setProcessListElement(block_io.process_list_entry->getQueryStatus());
if (!pipeline.completed())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for view refresh must be completed");
PipelineExecutor executor(pipeline.processors, pipeline.process_list_element);
executor.setReadProgressCallback(pipeline.getReadProgressCallback());
{
std::unique_lock exec_lock(executor_mutex);
if (interrupt_execution.load())
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled");
running_executor = &executor;
}
SCOPE_EXIT({
std::unique_lock exec_lock(executor_mutex);
running_executor = nullptr;
});
executor.execute(pipeline.getNumThreads(), pipeline.getConcurrencyControl());
}
/// (Just in case PipelineExecutor can somehow return without exception but with incomplete
/// results if cancelled at the wrong moment.)
if (interrupt_execution.load())
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled");
/// Exchange tables.
stale_table = view->exchangeTargetTable(refresh_query->table_id, refresh_context);
}
catch (...)
{
try
{
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, refresh_query->table_id, /*sync*/ false, /*ignore_sync_setting*/ true);
}
catch (...)
{
tryLogCurrentException(log, "Failed to drop temporary table after a failed refresh");
/// Let's ignore this and keep going, at risk of accumulating many trash tables if this keeps happening.
}
throw;
}
/// Drop the old table (outside the try-catch so we don't try to drop the other table if this fails).
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, stale_table, /*sync*/ true, /*ignore_sync_setting*/ true);
}
void RefreshTask::advanceNextRefreshTime(std::chrono::system_clock::time_point now)
{
std::chrono::sys_seconds next = refresh_schedule.prescribeNext(next_refresh_prescribed, now);
next_refresh_prescribed = next;
next_refresh_with_spread = refresh_schedule.addRandomSpread(next);
auto secs = std::chrono::floor<std::chrono::seconds>(next_refresh_with_spread);
info.next_refresh_time = UInt32(secs.time_since_epoch().count());
}
bool RefreshTask::arriveDependency(const StorageID & parent)
{
remaining_dependencies.erase(parent);
if (!remaining_dependencies.empty() || !time_arrived)
return false;
populateDependencies();
return true;
}
bool RefreshTask::arriveTime()
{
time_arrived = true;
if (!remaining_dependencies.empty() || !time_arrived)
return false;
populateDependencies();
return true;
}
void RefreshTask::populateDependencies()
{
chassert(remaining_dependencies.empty());
auto deps = set_handle.getDependencies();
remaining_dependencies.insert(deps.begin(), deps.end());
time_arrived = false;
}
void RefreshTask::interruptExecution()
{
chassert(!mutex.try_lock());
std::unique_lock lock(executor_mutex);
if (interrupt_execution.exchange(true))
return;
if (running_executor)
running_executor->cancel();
}
std::shared_ptr<StorageMaterializedView> RefreshTask::lockView()
{
return std::static_pointer_cast<StorageMaterializedView>(view_to_refresh.lock());
}
std::chrono::system_clock::time_point RefreshTask::currentTime() const
{
Int64 fake = fake_clock.load(std::memory_order::relaxed);
if (fake == INT64_MIN)
return std::chrono::system_clock::now();
else
return std::chrono::system_clock::time_point(std::chrono::seconds(fake));
}
}

View File

@ -0,0 +1,150 @@
#pragma once
#include <Storages/MaterializedView/RefreshSet.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Storages/MaterializedView/RefreshSchedule.h>
#include <Storages/MaterializedView/RefreshSettings.h>
#include <Core/BackgroundSchedulePool.h>
#include <random>
namespace DB
{
class PipelineExecutor;
class StorageMaterializedView;
class ASTRefreshStrategy;
class RefreshTask : public std::enable_shared_from_this<RefreshTask>
{
public:
/// Never call it manually, public for shared_ptr construction only
explicit RefreshTask(const ASTRefreshStrategy & strategy);
/// The only proper way to construct task
static RefreshTaskHolder create(
const StorageMaterializedView & view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy);
void initializeAndStart(std::shared_ptr<StorageMaterializedView> view);
/// Call when renaming the materialized view.
void rename(StorageID new_id);
/// Call when changing refresh params (ALTER MODIFY REFRESH).
void alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy);
RefreshInfo getInfo() const;
/// Enable task scheduling
void start();
/// Disable task scheduling
void stop();
/// Schedule task immediately
void run();
/// Cancel task execution
void cancel();
/// Permanently disable task scheduling and remove this table from RefreshSet.
void shutdown();
/// Notify dependent task
void notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time);
/// For tests
void setFakeTime(std::optional<Int64> t);
private:
Poco::Logger * log = nullptr;
std::weak_ptr<IStorage> view_to_refresh;
/// Protects interrupt_execution and running_executor.
/// Can be locked while holding `mutex`.
std::mutex executor_mutex;
/// If there's a refresh in progress, it can be aborted by setting this flag and cancel()ling
/// this executor. Refresh task will then reconsider what to do, re-checking `stop_requested`,
/// `cancel_requested`, etc.
std::atomic_bool interrupt_execution {false};
PipelineExecutor * running_executor = nullptr;
/// Protects all fields below.
/// Never locked for blocking operations (e.g. creating or dropping the internal table).
/// Can't be locked while holding `executor_mutex`.
mutable std::mutex mutex;
RefreshSchedule refresh_schedule;
RefreshSettings refresh_settings; // TODO: populate, use, update on alter
RefreshSet::Handle set_handle;
/// StorageIDs of our dependencies that we're waiting for.
DatabaseAndTableNameSet remaining_dependencies;
bool time_arrived = false;
/// Refreshes are stopped (e.g. by SYSTEM STOP VIEW).
bool stop_requested = false;
/// If true, we should start a refresh right away. All refreshes go through this flag.
bool refresh_immediately = false;
/// When to refresh next. Updated when a refresh is finished or cancelled.
/// We maintain the distinction between:
/// * The "prescribed" time of the refresh, dictated by the refresh schedule.
/// E.g. for REFERSH EVERY 1 DAY, the prescribed time is always at the exact start of a day.
/// * Actual wall clock timestamps, e.g. when the refresh is scheduled to happen
/// (including random spread) or when a refresh completed.
/// The prescribed time is required for:
/// * Doing REFRESH EVERY correctly if the random spread came up negative, and a refresh completed
/// before the prescribed time. E.g. suppose a refresh was prescribed at 05:00, which was randomly
/// adjusted to 4:50, and the refresh completed at 4:55; we shouldn't schedule another refresh
/// at 5:00, so we should remember that the 4:50-4:55 refresh actually had prescribed time 5:00.
/// * Similarly, for dependencies between REFRESH EVERY tables, using actual time would be unreliable.
/// E.g. for REFRESH EVERY 1 DAY, yesterday's refresh of the dependency shouldn't trigger today's
/// refresh of the dependent even if it happened today (e.g. it was slow or had random spread > 1 day).
std::chrono::sys_seconds next_refresh_prescribed;
std::chrono::system_clock::time_point next_refresh_with_spread;
/// Calls refreshTask() from background thread.
BackgroundSchedulePool::TaskHolder refresh_task;
/// Used in tests. If not INT64_MIN, we pretend that this is the current time, instead of calling system_clock::now().
std::atomic<Int64> fake_clock {INT64_MIN};
/// Just for observability.
RefreshInfo info;
Progress progress;
/// The main loop of the refresh task. It examines the state, sees what needs to be
/// done and does it. If there's nothing to do at the moment, returns; it's then scheduled again,
/// when needed, by public methods or by timer.
///
/// Public methods just provide inputs for the refreshTask()'s decisions
/// (e.g. stop_requested, cancel_requested), they don't do anything significant themselves.
void refreshTask();
/// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table.
/// Mutex must be unlocked. Called only from refresh_task.
void executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view);
/// Assigns next_refresh_*
void advanceNextRefreshTime(std::chrono::system_clock::time_point now);
/// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case.
bool arriveDependency(const StorageID & parent);
bool arriveTime();
void populateDependencies();
void interruptExecution();
std::shared_ptr<StorageMaterializedView> lockView();
std::chrono::system_clock::time_point currentTime() const;
};
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <base/types.h>
#include <memory>
namespace DB
{
class RefreshTask;
using RefreshTaskStateUnderlying = UInt8;
using RefreshTaskHolder = std::shared_ptr<RefreshTask>;
using RefreshTaskObserver = std::weak_ptr<RefreshTask>;
}

View File

@ -3044,6 +3044,9 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
if (command.type == AlterCommand::MODIFY_QUERY)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"ALTER MODIFY QUERY is not supported by MergeTree engines family");
if (command.type == AlterCommand::MODIFY_REFRESH)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"ALTER MODIFY REFRESH is not supported by MergeTree engines family");
if (command.type == AlterCommand::MODIFY_ORDER_BY && !is_custom_partitioned)
{

View File

@ -113,8 +113,17 @@ static bool isSingleSelect(const ASTPtr & select, ASTPtr & res)
return isSingleSelect(new_inner_query, res);
}
SelectQueryDescription SelectQueryDescription::getSelectQueryFromASTForMatView(const ASTPtr & select, ContextPtr context)
SelectQueryDescription SelectQueryDescription::getSelectQueryFromASTForMatView(const ASTPtr & select, bool refreshable, ContextPtr context)
{
SelectQueryDescription result;
result.select_query = select->as<ASTSelectWithUnionQuery &>().clone();
/// Skip all the checks, none of them apply to refreshable views.
/// Don't assign select_table_id. This way no materialized view dependency gets registered,
/// so data doesn't get pushed to the refreshable view on source table inserts.
if (refreshable)
return result;
ASTPtr new_inner_query;
if (!isSingleSelect(select, new_inner_query))
@ -123,9 +132,7 @@ SelectQueryDescription SelectQueryDescription::getSelectQueryFromASTForMatView(c
auto & select_query = new_inner_query->as<ASTSelectQuery &>();
checkAllowedQueries(select_query);
SelectQueryDescription result;
result.select_table_id = extractDependentTableFromSelectQuery(select_query, context);
result.select_query = select->as<ASTSelectWithUnionQuery &>().clone();
result.inner_query = new_inner_query->clone();
return result;

View File

@ -9,7 +9,7 @@ namespace DB
/// Select query for different view in storages
struct SelectQueryDescription
{
/// Table id for select query
/// Table id for select query. Only for non-refreshable materialized views.
StorageID select_table_id = StorageID::createEmpty();
/// Select query itself (ASTSelectWithUnionQuery)
ASTPtr select_query;
@ -18,7 +18,7 @@ struct SelectQueryDescription
/// Parse description from select query for materialized view. Also
/// validates query.
static SelectQueryDescription getSelectQueryFromASTForMatView(const ASTPtr & select, ContextPtr context);
static SelectQueryDescription getSelectQueryFromASTForMatView(const ASTPtr & select, bool refreshable, ContextPtr context);
SelectQueryDescription() = default;
SelectQueryDescription(const SelectQueryDescription & other);

View File

@ -40,6 +40,7 @@ StorageInMemoryMetadata::StorageInMemoryMetadata(const StorageInMemoryMetadata &
, table_ttl(other.table_ttl)
, settings_changes(other.settings_changes ? other.settings_changes->clone() : nullptr)
, select(other.select)
, refresh(other.refresh ? other.refresh->clone() : nullptr)
, comment(other.comment)
, metadata_version(other.metadata_version)
{
@ -69,6 +70,7 @@ StorageInMemoryMetadata & StorageInMemoryMetadata::operator=(const StorageInMemo
else
settings_changes.reset();
select = other.select;
refresh = other.refresh ? other.refresh->clone() : nullptr;
comment = other.comment;
metadata_version = other.metadata_version;
return *this;
@ -124,6 +126,11 @@ void StorageInMemoryMetadata::setSelectQuery(const SelectQueryDescription & sele
select = select_;
}
void StorageInMemoryMetadata::setRefresh(ASTPtr refresh_)
{
refresh = refresh_;
}
void StorageInMemoryMetadata::setMetadataVersion(int32_t metadata_version_)
{
metadata_version = metadata_version_;

View File

@ -9,6 +9,7 @@
#include <Storages/KeyDescription.h>
#include <Storages/SelectQueryDescription.h>
#include <Storages/TTLDescription.h>
#include <Storages/MaterializedView/RefreshSchedule.h>
#include <Common/MultiVersion.h>
@ -47,6 +48,8 @@ struct StorageInMemoryMetadata
ASTPtr settings_changes;
/// SELECT QUERY. Supported for MaterializedView and View (have to support LiveView).
SelectQueryDescription select;
/// Materialized view REFRESH parameters.
ASTPtr refresh;
String comment;
@ -94,6 +97,9 @@ struct StorageInMemoryMetadata
/// Set SELECT query for (Materialized)View
void setSelectQuery(const SelectQueryDescription & select_);
/// Set refresh parameters for materialized view (REFRESH ... [DEPENDS ON ...] [SETTINGS ...]).
void setRefresh(ASTPtr refresh_);
/// Set version of metadata.
void setMetadataVersion(int32_t metadata_version_);
/// Get copy of current metadata with metadata_version_

View File

@ -1,5 +1,7 @@
#include <Storages/StorageMaterializedView.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
@ -7,6 +9,7 @@
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Access/Common/AccessFlags.h>
@ -35,7 +38,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_QUERY;
extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW;
}
namespace ActionLocks
{
extern const StorageActionBlockType ViewRefresh;
}
static inline String generateInnerTableName(const StorageID & view_id)
@ -79,13 +86,12 @@ StorageMaterializedView::StorageMaterializedView(
"You must specify where to save results of a MaterializedView query: "
"either ENGINE or an existing table in a TO clause");
if (query.select->list_of_selects->children.size() != 1)
throw Exception(ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW, "UNION is not supported for MATERIALIZED VIEW");
auto select = SelectQueryDescription::getSelectQueryFromASTForMatView(query.select->clone(), local_context);
auto select = SelectQueryDescription::getSelectQueryFromASTForMatView(query.select->clone(), query.refresh_strategy != nullptr, local_context);
storage_metadata.setSelectQuery(select);
if (!comment.empty())
storage_metadata.setComment(comment);
if (query.refresh_strategy)
storage_metadata.setRefresh(query.refresh_strategy->clone());
setInMemoryMetadata(storage_metadata);
@ -126,6 +132,15 @@ StorageMaterializedView::StorageMaterializedView(
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID();
}
if (query.refresh_strategy)
{
refresher = RefreshTask::create(
*this,
getContext(),
*query.refresh_strategy);
refresh_on_start = !attach_ && !query.is_create_empty;
}
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
@ -236,23 +251,24 @@ void StorageMaterializedView::dropInnerTableIfAny(bool sync, ContextPtr local_co
/// See the comment in StorageMaterializedView::drop.
/// DDL queries with StorageMaterializedView are fundamentally broken.
/// Best-effort to make them work: the inner table name is almost always less than the MV name (so it's safe to lock DDLGuard)
bool may_lock_ddl_guard = getStorageID().getQualifiedName() < target_table_id.getQualifiedName();
auto inner_table_id = getTargetTableId();
bool may_lock_ddl_guard = getStorageID().getQualifiedName() < inner_table_id.getQualifiedName();
if (has_inner_table && tryGetTargetTable())
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id,
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id,
sync, /* ignore_sync_setting */ true, may_lock_ddl_guard);
}
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
if (has_inner_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, target_table_id, true);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, getTargetTableId(), true);
}
void StorageMaterializedView::checkStatementCanBeForwarded() const
{
if (!has_inner_table)
throw Exception(ErrorCodes::INCORRECT_QUERY, "MATERIALIZED VIEW targets existing table {}. "
"Execute the statement directly on it.", target_table_id.getNameForLogs());
"Execute the statement directly on it.", getTargetTableId().getNameForLogs());
}
bool StorageMaterializedView::optimize(
@ -267,7 +283,61 @@ bool StorageMaterializedView::optimize(
checkStatementCanBeForwarded();
auto storage_ptr = getTargetTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
}
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> StorageMaterializedView::prepareRefresh() const
{
auto refresh_context = Context::createCopy(getContext());
/// Generate a random query id.
refresh_context->setCurrentQueryId("");
CurrentThread::QueryScope query_scope(refresh_context);
auto inner_table_id = getTargetTableId();
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name);
auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext());
auto & create_query = create_table_query->as<ASTCreateQuery &>();
create_query.setTable(new_table_name);
create_query.setDatabase(db->getDatabaseName());
create_query.create_or_replace = true;
create_query.replace_table = true;
create_query.uuid = UUIDHelpers::Nil;
InterpreterCreateQuery create_interpreter(create_table_query, refresh_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
StorageID fresh_table = DatabaseCatalog::instance().getTable({create_query.getDatabase(), create_query.getTable()}, getContext())->getStorageID();
auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query;
insert_query->setTable(fresh_table.table_name);
insert_query->setDatabase(fresh_table.database_name);
insert_query->table_id = fresh_table;
return {refresh_context, insert_query};
}
StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context)
{
auto stale_table_id = getTargetTableId();
auto db = DatabaseCatalog::instance().getDatabase(stale_table_id.database_name);
auto target_db = DatabaseCatalog::instance().getDatabase(fresh_table.database_name);
CurrentThread::QueryScope query_scope(refresh_context);
target_db->renameTable(
refresh_context, fresh_table.table_name, *db, stale_table_id.table_name, /*exchange=*/true, /*dictionary=*/false);
std::swap(stale_table_id.database_name, fresh_table.database_name);
std::swap(stale_table_id.table_name, fresh_table.table_name);
setTargetTableId(std::move(fresh_table));
return stale_table_id;
}
void StorageMaterializedView::alter(
@ -289,6 +359,9 @@ void StorageMaterializedView::alter(
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);
if (refresher)
refresher->alterRefreshParams(new_metadata.refresh->as<const ASTRefreshStrategy &>());
}
@ -296,9 +369,14 @@ void StorageMaterializedView::checkAlterIsPossible(const AlterCommands & command
{
for (const auto & command : commands)
{
if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}",
command.type, getName());
if (command.isCommentAlter())
continue;
if (command.type == AlterCommand::MODIFY_QUERY)
continue;
if (command.type == AlterCommand::MODIFY_REFRESH && refresher)
continue;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}",
command.type, getName());
}
}
@ -332,6 +410,7 @@ void StorageMaterializedView::mutate(const MutationCommands & commands, ContextP
void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
{
auto old_table_id = getStorageID();
auto inner_table_id = getTargetTableId();
auto metadata_snapshot = getInMemoryMetadataPtr();
bool from_atomic_to_atomic_database = old_table_id.hasUUID() && new_table_id.hasUUID();
@ -340,14 +419,14 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
auto new_target_table_name = generateInnerTableName(new_table_id);
auto rename = std::make_shared<ASTRenameQuery>();
assert(target_table_id.database_name == old_table_id.database_name);
assert(inner_table_id.database_name == old_table_id.database_name);
ASTRenameQuery::Element elem
{
ASTRenameQuery::Table
{
target_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(target_table_id.database_name),
std::make_shared<ASTIdentifier>(target_table_id.table_name)
inner_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(inner_table_id.database_name),
std::make_shared<ASTIdentifier>(inner_table_id.table_name)
},
ASTRenameQuery::Table
{
@ -358,19 +437,21 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
rename->elements.emplace_back(std::move(elem));
InterpreterRenameQuery(rename, getContext()).execute();
target_table_id.database_name = new_table_id.database_name;
target_table_id.table_name = new_target_table_name;
updateTargetTableId(new_table_id.database_name, new_target_table_name);
}
IStorage::renameInMemory(new_table_id);
if (from_atomic_to_atomic_database && has_inner_table)
{
assert(target_table_id.database_name == old_table_id.database_name);
target_table_id.database_name = new_table_id.database_name;
assert(inner_table_id.database_name == old_table_id.database_name);
updateTargetTableId(new_table_id.database_name, std::nullopt);
}
const auto & select_query = metadata_snapshot->getSelectQuery();
// TODO Actually we don't need to update dependency if MV has UUID, but then db and table name will be outdated
DatabaseCatalog::instance().updateViewDependency(select_query.select_table_id, old_table_id, select_query.select_table_id, getStorageID());
if (refresher)
refresher->rename(new_table_id);
}
void StorageMaterializedView::startup()
@ -379,10 +460,21 @@ void StorageMaterializedView::startup()
const auto & select_query = metadata_snapshot->getSelectQuery();
if (!select_query.select_table_id.empty())
DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID());
if (refresher)
{
refresher->initializeAndStart(std::static_pointer_cast<StorageMaterializedView>(shared_from_this()));
if (refresh_on_start)
refresher->run();
}
}
void StorageMaterializedView::shutdown(bool)
{
if (refresher)
refresher->shutdown();
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & select_query = metadata_snapshot->getSelectQuery();
/// Make sure the dependency is removed after DETACH TABLE
@ -393,13 +485,13 @@ void StorageMaterializedView::shutdown(bool)
StoragePtr StorageMaterializedView::getTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
return DatabaseCatalog::instance().getTable(getTargetTableId(), getContext());
}
StoragePtr StorageMaterializedView::tryGetTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().tryGetTable(target_table_id, getContext());
return DatabaseCatalog::instance().tryGetTable(getTargetTableId(), getContext());
}
NamesAndTypesList StorageMaterializedView::getVirtuals() const
@ -472,6 +564,8 @@ std::optional<UInt64> StorageMaterializedView::totalBytesUncompressed(const Sett
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)
{
if (type == ActionLocks::ViewRefresh && refresher)
refresher->stop();
if (has_inner_table)
{
if (auto target_table = tryGetTargetTable())
@ -487,6 +581,33 @@ bool StorageMaterializedView::isRemote() const
return false;
}
void StorageMaterializedView::onActionLockRemove(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::ViewRefresh && refresher)
refresher->start();
}
DB::StorageID StorageMaterializedView::getTargetTableId() const
{
std::lock_guard guard(target_table_id_mutex);
return target_table_id;
}
void StorageMaterializedView::setTargetTableId(DB::StorageID id)
{
std::lock_guard guard(target_table_id_mutex);
target_table_id = std::move(id);
}
void StorageMaterializedView::updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name)
{
std::lock_guard guard(target_table_id_mutex);
if (database_name)
target_table_id.database_name = *std::move(database_name);
if (table_name)
target_table_id.table_name = *std::move(table_name);
}
void registerStorageMaterializedView(StorageFactory & factory)
{
factory.registerStorage("MaterializedView", [](const StorageFactory::Arguments & args)

View File

@ -5,6 +5,7 @@
#include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
namespace DB
{
@ -76,6 +77,7 @@ public:
NamesAndTypesList getVirtuals() const override;
ActionLock getActionLock(StorageActionBlockType type) override;
void onActionLockRemove(StorageActionBlockType action_type) override;
void read(
QueryPlan & query_plan,
@ -98,12 +100,27 @@ public:
std::optional<UInt64> totalBytesUncompressed(const Settings & settings) const override;
private:
mutable std::mutex target_table_id_mutex;
/// Will be initialized in constructor
StorageID target_table_id = StorageID::createEmpty();
RefreshTaskHolder refresher;
bool refresh_on_start = false;
bool has_inner_table = false;
friend class RefreshTask;
void checkStatementCanBeForwarded() const;
/// Prepare to refresh a refreshable materialized view: create query context, create temporary
/// table, form the insert-select query.
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> prepareRefresh() const;
StorageID exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context);
StorageID getTargetTableId() const;
void setTargetTableId(StorageID id);
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);
};
}

View File

@ -0,0 +1,94 @@
#include <Storages/System/StorageSystemViewRefreshes.h>
#include <Access/ContextAccess.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Storages/MaterializedView/RefreshSet.h>
namespace DB
{
NamesAndTypesList StorageSystemViewRefreshes::getNamesAndTypes()
{
return {
{"database", std::make_shared<DataTypeString>()},
{"view", std::make_shared<DataTypeString>()},
{"status", std::make_shared<DataTypeString>()},
{"last_refresh_result", std::make_shared<DataTypeString>()},
{"last_refresh_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"last_success_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"duration_ms", std::make_shared<DataTypeUInt64>()},
{"next_refresh_time", std::make_shared<DataTypeDateTime>()},
{"remaining_dependencies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"exception", std::make_shared<DataTypeString>()},
{"refresh_count", std::make_shared<DataTypeUInt64>()},
{"progress", std::make_shared<DataTypeFloat64>()},
{"elapsed", std::make_shared<DataTypeFloat64>()},
{"read_rows", std::make_shared<DataTypeUInt64>()},
{"read_bytes", std::make_shared<DataTypeUInt64>()},
{"total_rows", std::make_shared<DataTypeUInt64>()},
{"total_bytes", std::make_shared<DataTypeUInt64>()},
{"written_rows", std::make_shared<DataTypeUInt64>()},
{"written_bytes", std::make_shared<DataTypeUInt64>()},
{"result_rows", std::make_shared<DataTypeUInt64>()},
{"result_bytes", std::make_shared<DataTypeUInt64>()},
};
}
void StorageSystemViewRefreshes::fillData(
MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
auto access = context->getAccess();
auto valid_access = AccessType::SHOW_TABLES;
bool check_access_for_tables = !access->isGranted(valid_access);
for (const auto & refresh : context->getRefreshSet().getInfo())
{
if (check_access_for_tables && !access->isGranted(valid_access, refresh.view_id.getDatabaseName(), refresh.view_id.getTableName()))
continue;
std::size_t i = 0;
res_columns[i++]->insert(refresh.view_id.getDatabaseName());
res_columns[i++]->insert(refresh.view_id.getTableName());
res_columns[i++]->insert(toString(refresh.state));
res_columns[i++]->insert(toString(refresh.last_refresh_result));
if (refresh.last_attempt_time.has_value())
res_columns[i++]->insert(refresh.last_attempt_time.value());
else
res_columns[i++]->insertDefault(); // NULL
if (refresh.last_success_time.has_value())
res_columns[i++]->insert(refresh.last_success_time.value());
else
res_columns[i++]->insertDefault(); // NULL
res_columns[i++]->insert(refresh.last_attempt_duration_ms);
res_columns[i++]->insert(refresh.next_refresh_time);
Array deps;
for (const StorageID & id : refresh.remaining_dependencies)
deps.push_back(id.getFullTableName());
res_columns[i++]->insert(Array(deps));
res_columns[i++]->insert(refresh.exception_message);
res_columns[i++]->insert(refresh.refresh_count);
res_columns[i++]->insert(Float64(refresh.progress.read_rows) / refresh.progress.total_rows_to_read);
res_columns[i++]->insert(refresh.progress.elapsed_ns / 1e9);
res_columns[i++]->insert(refresh.progress.read_rows);
res_columns[i++]->insert(refresh.progress.read_bytes);
res_columns[i++]->insert(refresh.progress.total_rows_to_read);
res_columns[i++]->insert(refresh.progress.total_bytes_to_read);
res_columns[i++]->insert(refresh.progress.written_rows);
res_columns[i++]->insert(refresh.progress.written_bytes);
res_columns[i++]->insert(refresh.progress.result_rows);
res_columns[i++]->insert(refresh.progress.result_bytes);
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class StorageSystemViewRefreshes final : public IStorageSystemOneBlock<StorageSystemViewRefreshes>
{
public:
std::string getName() const override { return "SystemViewRefreshes"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -87,6 +87,7 @@
#include <Storages/System/StorageSystemScheduler.h>
#include <Storages/System/StorageSystemS3Queue.h>
#include <Storages/System/StorageSystemDashboards.h>
#include <Storages/System/StorageSystemViewRefreshes.h>
#if defined(__ELF__) && !defined(OS_FREEBSD)
#include <Storages/System/StorageSystemSymbols.h>
@ -209,6 +210,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attach<StorageSystemJemallocBins>(context, system_database, "jemalloc_bins");
attach<StorageSystemS3Queue>(context, system_database, "s3queue");
attach<StorageSystemDashboards>(context, system_database, "dashboards");
attach<StorageSystemViewRefreshes>(context, system_database, "view_refreshes");
if (has_zookeeper)
{

View File

@ -531,6 +531,7 @@ class FailureReason(enum.Enum):
FAST_ONLY = "running fast tests only"
NO_LONG = "not running long tests"
REPLICATED_DB = "replicated-database"
NON_ATOMIC_DB = "database engine not Atomic"
S3_STORAGE = "s3-storage"
BUILD = "not running for current build"
NO_PARALLEL_REPLICAS = "smth in not supported with parallel replicas"
@ -997,6 +998,13 @@ class TestCase:
elif tags and ("no-replicated-database" in tags) and args.replicated_database:
return FailureReason.REPLICATED_DB
elif (
tags
and ("atomic-database" in tags)
and (args.replicated_database or args.db_engine not in (None, "Atomic"))
):
return FailureReason.NON_ATOMIC_DB
elif tags and ("no-s3-storage" in tags) and args.s3_storage:
return FailureReason.S3_STORAGE
elif (

View File

@ -188,7 +188,7 @@ def test_grant_all_on_table():
instance.query("SHOW GRANTS FOR B")
== "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER TABLE, ALTER VIEW, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, "
"DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, SHOW ROW POLICIES, "
"SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, "
"SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, "
"SYSTEM RESTART REPLICA, SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM FLUSH DISTRIBUTED, dictGet ON test.table TO B\n"
)
instance.query("REVOKE ALL ON test.table FROM B", user="A")

View File

@ -48,6 +48,7 @@ ALTER TABLE [] \N ALTER
ALTER DATABASE [] \N ALTER
ALTER VIEW REFRESH ['ALTER LIVE VIEW REFRESH','REFRESH VIEW'] VIEW ALTER VIEW
ALTER VIEW MODIFY QUERY ['ALTER TABLE MODIFY QUERY'] VIEW ALTER VIEW
ALTER VIEW MODIFY REFRESH ['ALTER TABLE MODIFY QUERY'] VIEW ALTER VIEW
ALTER VIEW [] \N ALTER
ALTER [] \N ALL
CREATE DATABASE [] DATABASE CREATE
@ -127,6 +128,7 @@ SYSTEM FETCHES ['SYSTEM STOP FETCHES','SYSTEM START FETCHES','STOP FETCHES','STA
SYSTEM MOVES ['SYSTEM STOP MOVES','SYSTEM START MOVES','STOP MOVES','START MOVES'] TABLE SYSTEM
SYSTEM PULLING REPLICATION LOG ['SYSTEM STOP PULLING REPLICATION LOG','SYSTEM START PULLING REPLICATION LOG'] TABLE SYSTEM
SYSTEM CLEANUP ['SYSTEM STOP CLEANUP','SYSTEM START CLEANUP'] TABLE SYSTEM
SYSTEM VIEWS ['SYSTEM REFRESH VIEW','SYSTEM START VIEWS','SYSTEM STOP VIEWS','SYSTEM START VIEW','SYSTEM STOP VIEW','SYSTEM CANCEL VIEW','REFRESH VIEW','START VIEWS','STOP VIEWS','START VIEW','STOP VIEW','CANCEL VIEW'] VIEW SYSTEM
SYSTEM DISTRIBUTED SENDS ['SYSTEM STOP DISTRIBUTED SENDS','SYSTEM START DISTRIBUTED SENDS','STOP DISTRIBUTED SENDS','START DISTRIBUTED SENDS'] TABLE SYSTEM SENDS
SYSTEM REPLICATED SENDS ['SYSTEM STOP REPLICATED SENDS','SYSTEM START REPLICATED SENDS','STOP REPLICATED SENDS','START REPLICATED SENDS'] TABLE SYSTEM SENDS
SYSTEM SENDS ['SYSTEM STOP SENDS','SYSTEM START SENDS','STOP SENDS','START SENDS'] \N SYSTEM

View File

@ -0,0 +1,44 @@
<1: created view> a [] 1
CREATE MATERIALIZED VIEW default.a\nREFRESH AFTER 1 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory AS\nSELECT number AS x\nFROM numbers(2)\nUNION ALL\nSELECT rand64() AS x
<2: refreshed> 3 1 1
<3: time difference at least> 500
<4: next refresh in> 1
<4.5: altered> Scheduled Finished 2052-01-01 00:00:00
CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 YEAR\n(\n `x` Int16\n)\nENGINE = Memory AS\nSELECT x * 2 AS x\nFROM default.src
<5: no refresh> 3
<6: refreshed> 2
<7: refreshed> Scheduled Finished 2054-01-01 00:00:00
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR DEPENDS ON default.a\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192 AS\nSELECT x * 10 AS y\nFROM default.a
<8: refreshed> 20
<9: refreshed> a Scheduled Finished 2054-01-01 00:00:00
<9: refreshed> b Scheduled Finished 2054-01-01 00:00:00
<10: waiting> a Scheduled [] 2054-01-01 00:00:00
<10: waiting> b WaitingForDependencies ['default.a'] 2054-01-01 00:00:00
<11: chain-refreshed a> 4
<12: chain-refreshed b> 40
<13: chain-refreshed> a Scheduled [] Finished 2054-01-01 00:00:01 2056-01-01 00:00:00
<13: chain-refreshed> b Scheduled ['default.a'] Finished 2054-01-24 23:22:21 2056-01-01 00:00:00
<14: waiting for next cycle> a Scheduled [] 2058-01-01 00:00:00
<14: waiting for next cycle> b WaitingForDependencies ['default.a'] 2060-01-01 00:00:00
<15: chain-refreshed a> 6
<16: chain-refreshed b> 60
<17: chain-refreshed> a Scheduled 2062-01-01 00:00:00
<17: chain-refreshed> b Scheduled 2062-01-01 00:00:00
<18: removed dependency> b Scheduled [] 2062-03-03 03:03:03 2064-01-01 00:00:00 5
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192 AS\nSELECT x * 10 AS y\nFROM default.a
<19: exception> 1
<20: unexception> 1
<21: rename> 1
<22: rename> d Finished
<23: simple refresh> 1
<24: rename during refresh> 1
<25: rename during refresh> f Running
<27: cancelled> f Scheduled
CREATE MATERIALIZED VIEW default.g\nREFRESH EVERY 1 WEEK OFFSET 3 DAY 4 HOUR RANDOMIZE FOR 4 DAY 1 HOUR\n(\n `x` Int64\n)\nENGINE = Memory AS\nSELECT 42
<29: randomize> 1 1
CREATE MATERIALIZED VIEW default.h\nREFRESH EVERY 1 SECOND TO default.dest\n(\n `x` Int64\n) AS\nSELECT x * 10 AS x\nFROM default.src
<30: to existing table> 10
<31: to existing table> 10
<31: to existing table> 20
<32: empty> i Scheduled Unknown
<32: empty> j Scheduled Finished

View File

@ -0,0 +1,303 @@
#!/usr/bin/env bash
# Tags: atomic-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh
# scheduling is done in UTC.
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`"
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --session_timezone Etc/UTC"`"
$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view"
# Basic refreshing.
$CLICKHOUSE_CLIENT -nq "
create materialized view a
refresh after 1 second
engine Memory
empty
as select number as x from numbers(2) union all select rand64() as x"
$CLICKHOUSE_CLIENT -nq "select '<1: created view>', view, remaining_dependencies, exception, last_refresh_result in ('Unknown', 'Finished') from refreshes";
$CLICKHOUSE_CLIENT -nq "show create a"
# Wait for any refresh. (xargs trims the string and turns \t and \n into spaces)
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" == 'Unknown' ]
do
sleep 0.1
done
# Check table contents.
$CLICKHOUSE_CLIENT -nq "select '<2: refreshed>', count(), sum(x=0), sum(x=1) from a"
# Wait for table contents to change.
res1="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values'`"
while :
do
res2="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res2" == "$res1" ] || break
sleep 0.1
done
time2="`$CLICKHOUSE_CLIENT -nq "select reinterpret(now64(), 'Int64')"`"
# Wait for another change.
while :
do
res3="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res3" == "$res2" ] || break
sleep 0.1
done
# Check that the two changes were at least 500ms apart, in particular that we're not refreshing
# like crazy. This is potentially flaky, but we need at least one test that uses non-mocked timer
# to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above.
$CLICKHOUSE_CLIENT -nq "
select '<3: time difference at least>', min2(reinterpret(now64(), 'Int64') - $time2, 500);
select '<4: next refresh in>', next_refresh_time-last_refresh_time from refreshes;"
# Create a source table from which views will read.
$CLICKHOUSE_CLIENT -nq "
create table src (x Int8) engine Memory as select 1"
# Switch to fake clock, change refresh schedule, change query.
$CLICKHOUSE_CLIENT -nq "
system test view a set fake time '2050-01-01 00:00:01';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time, next_refresh_time from refreshes -- $LINENO" | xargs`" != 'Scheduled 2050-01-01 00:00:01 2050-01-01 00:00:02' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
alter table a modify refresh every 2 year;
alter table a modify query select x*2 as x from src;
select '<4.5: altered>', status, last_refresh_result, next_refresh_time from refreshes;
show create a;"
# Advance time to trigger the refresh.
$CLICKHOUSE_CLIENT -nq "
select '<5: no refresh>', count() from a;
system test view a set fake time '2052-02-03 04:05:06';"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes -- $LINENO" | xargs`" != '2052-02-03 04:05:06' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<6: refreshed>', * from a;
select '<7: refreshed>', status, last_refresh_result, next_refresh_time from refreshes;"
# Create a dependent view, refresh it once.
$CLICKHOUSE_CLIENT -nq "
create materialized view b refresh every 2 year depends on a (y Int32) engine MergeTree order by y empty as select x*10 as y from a;
show create b;
system test view b set fake time '2052-11-11 11:11:11';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2052-11-11 11:11:11' ]
do
sleep 0.1
done
# Next refresh shouldn't start until the dependency refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<8: refreshed>', * from b;
select '<9: refreshed>', view, status, last_refresh_result, next_refresh_time from refreshes;
system test view b set fake time '2054-01-24 23:22:21';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies 2054-01-01 00:00:00' ]
do
sleep 0.1
done
# Update source table (by dropping and re-creating it - to test that tables are looked up by name
# rather than uuid), kick off refresh of the dependency.
$CLICKHOUSE_CLIENT -nq "
select '<10: waiting>', view, status, remaining_dependencies, next_refresh_time from refreshes;
drop table src;
create table src (x Int16) engine Memory as select 2;
system test view a set fake time '2054-01-01 00:00:01';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled' ]
do
sleep 0.1
done
# Both tables should've refreshed.
$CLICKHOUSE_CLIENT -nq "
select '<11: chain-refreshed a>', * from a;
select '<12: chain-refreshed b>', * from b;
select '<13: chain-refreshed>', view, status, remaining_dependencies, last_refresh_result, last_refresh_time, next_refresh_time, exception from refreshes;"
# Make the dependent table run ahead by one refresh cycle, make sure it waits for the dependency to
# catch up to the same cycle.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2059-01-01 00:00:00';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2060-01-01 00:00:00' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2061-01-01 00:00:00';
system test view a set fake time '2057-01-01 00:00:00';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes -- $LINENO" | xargs`" != 'Scheduled 2058-01-01 00:00:00 WaitingForDependencies 2060-01-01 00:00:00' ]
do
sleep 0.1
done
sleep 1
$CLICKHOUSE_CLIENT -nq "
select '<14: waiting for next cycle>', view, status, remaining_dependencies, next_refresh_time from refreshes;
truncate src;
insert into src values (3);
system test view a set fake time '2060-02-02 02:02:02';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2062-01-01 00:00:00' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<15: chain-refreshed a>', * from a;
select '<16: chain-refreshed b>', * from b;
select '<17: chain-refreshed>', view, status, next_refresh_time from refreshes;"
# Get to WaitingForDependencies state and remove the depencency.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2062-03-03 03:03:03'"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
alter table b modify refresh every 2 year"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled 2062-03-03 03:03:03' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<18: removed dependency>', view, status, remaining_dependencies, last_refresh_time,next_refresh_time, refresh_count from refreshes where view = 'b';
show create b;"
# Select from a table that doesn't exist, get an exception.
$CLICKHOUSE_CLIENT -nq "
drop table a;
drop table b;
create materialized view c refresh every 1 second (x Int64) engine Memory empty as select * from src;
drop table src;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Exception' ]
do
sleep 0.1
done
# Check exception, create src, expect successful refresh.
$CLICKHOUSE_CLIENT -nq "
select '<19: exception>', exception ilike '%UNKNOWN_TABLE%' from refreshes;
create table src (x Int64) engine Memory as select 1;
system refresh view c;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.1
done
# Rename table.
$CLICKHOUSE_CLIENT -nq "
select '<20: unexception>', * from c;
rename table c to d;
select '<21: rename>', * from d;
select '<22: rename>', view, last_refresh_result from refreshes;"
# Do various things during a refresh.
# First make a nonempty view.
$CLICKHOUSE_CLIENT -nq "
drop table d;
truncate src;
insert into src values (1)
create materialized view e refresh every 1 second (x Int64) engine MergeTree order by x empty as select x + sleepEachRow(1) as x from src settings max_block_size = 1;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.1
done
# Stop refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<23: simple refresh>', * from e;
system stop view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Disabled' ]
do
sleep 0.1
done
# Make refreshes slow, wait for a slow refresh to start. (We stopped refreshes first to make sure
# we wait for a slow refresh, not a previous fast one.)
$CLICKHOUSE_CLIENT -nq "
insert into src select * from numbers(1000) settings max_block_size=1;
system start view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.1
done
# Rename.
$CLICKHOUSE_CLIENT -nq "
rename table e to f;
select '<24: rename during refresh>', * from f;
select '<25: rename during refresh>', view, status from refreshes;
alter table f modify refresh after 10 year;"
sleep 2 # make it likely that at least one row was processed
# Cancel.
$CLICKHOUSE_CLIENT -nq "
system cancel view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Cancelled' ]
do
sleep 0.1
done
# Check that another refresh doesn't immediately start after the cancelled one.
sleep 1
$CLICKHOUSE_CLIENT -nq "
select '<27: cancelled>', view, status from refreshes;
system refresh view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.1
done
# Drop.
$CLICKHOUSE_CLIENT -nq "
drop table f;
select '<28: drop during refresh>', view, status from refreshes;"
# Try OFFSET and RANDOMIZE FOR.
$CLICKHOUSE_CLIENT -nq "
create materialized view g refresh every 1 week offset 3 day 4 hour randomize for 4 day 1 hour (x Int64) engine Memory empty as select 42;
show create g;
system test view g set fake time '2050-02-03 15:30:13';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time > '2049-01-01' from refreshes -- $LINENO" | xargs`" != '1' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
with '2050-02-10 04:00:00'::DateTime as expected
select '<29: randomize>', abs(next_refresh_time::Int64 - expected::Int64) <= 3600*(24*4+1), next_refresh_time != expected from refreshes;"
# Send data 'TO' an existing table.
$CLICKHOUSE_CLIENT -nq "
drop table g;
create table dest (x Int64) engine MergeTree order by x;
truncate src;
insert into src values (1);
create materialized view h refresh every 1 second to dest empty as select x*10 as x from src;
show create h;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<30: to existing table>', * from dest;
insert into src values (2);"
while [ "`$CLICKHOUSE_CLIENT -nq "select count() from dest -- $LINENO" | xargs`" != '2' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<31: to existing table>', * from dest;
drop table dest;
drop table src;
drop table h;"
# EMPTY
$CLICKHOUSE_CLIENT -nq "
create materialized view i refresh after 1 year engine Memory empty as select number as x from numbers(2);
create materialized view j refresh after 1 year engine Memory as select number as x from numbers(2)"
while [ "`$CLICKHOUSE_CLIENT -nq "select sum(last_success_time is null) from refreshes -- $LINENO" | xargs`" == '2' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<32: empty>', view, status, last_refresh_result from refreshes order by view;
drop table i;
drop table j"
$CLICKHOUSE_CLIENT -nq "
drop table refreshes;"

View File

@ -1,4 +1,4 @@
personal_ws-1.1 en 2646
personal_ws-1.1 en 2657
AArch
ACLs
ALTERs
@ -753,6 +753,7 @@ Redash
Reddit
Refactorings
ReferenceKeyed
Refreshable
RegexpTree
RemoteRead
ReplacingMergeTree
@ -2152,6 +2153,7 @@ reddit
redis
redisstreams
refcounter
refreshable
regexpExtract
regexpQuoteMeta
regionHierarchy