Merge branch 'master' of github.com:ClickHouse/ClickHouse into dynamic-json-distinct-aggregate-functions

This commit is contained in:
avogar 2024-08-16 22:38:52 +00:00
commit 4bf9197849
48 changed files with 1114 additions and 308 deletions

View File

@ -17,7 +17,8 @@ Columns:
- `duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md)) — How long the last refresh attempt took. - `duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md)) — How long the last refresh attempt took.
- `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start. - `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start.
- `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled. - `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled.
- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Exception'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace. - `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Error'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace.
- `retry` ([UInt64](../../sql-reference/data-types/int-uint.md)) — If nonzero, the current or next refresh is a retry (see `refresh_retries` refresh setting), and `retry` is the 1-based index of that retry.
- `refresh_count` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of successful refreshes since last server restart or table creation. - `refresh_count` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of successful refreshes since last server restart or table creation.
- `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1. - `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far. - `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far.

View File

@ -13,8 +13,8 @@ Creates a new view. Views can be [normal](#normal-view), [materialized](#materia
Syntax: Syntax:
``` sql ``` sql
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }] [DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
AS SELECT ... AS SELECT ...
[COMMENT 'comment'] [COMMENT 'comment']
``` ```
@ -55,8 +55,8 @@ SELECT * FROM view(column1=value1, column2=value2 ...)
## Materialized View ## Materialized View
``` sql ``` sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE] CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }] [DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
AS SELECT ... AS SELECT ...
[COMMENT 'comment'] [COMMENT 'comment']
``` ```
@ -92,7 +92,7 @@ Given that `POPULATE` works like `CREATE TABLE ... AS SELECT ...` it has limitat
- It is not supported with Replicated database - It is not supported with Replicated database
- It is not supported in ClickHouse cloud - It is not supported in ClickHouse cloud
Instead a separate `INSERT ... SELECT` can be used. Instead a separate `INSERT ... SELECT` can be used.
::: :::
A `SELECT` query can contain `DISTINCT`, `GROUP BY`, `ORDER BY`, `LIMIT`. Note that the corresponding conversions are performed independently on each block of inserted data. For example, if `GROUP BY` is set, data is aggregated during insertion, but only within a single packet of inserted data. The data wont be further aggregated. The exception is when using an `ENGINE` that independently performs data aggregation, such as `SummingMergeTree`. A `SELECT` query can contain `DISTINCT`, `GROUP BY`, `ORDER BY`, `LIMIT`. Note that the corresponding conversions are performed independently on each block of inserted data. For example, if `GROUP BY` is set, data is aggregated during insertion, but only within a single packet of inserted data. The data wont be further aggregated. The exception is when using an `ENGINE` that independently performs data aggregation, such as `SummingMergeTree`.
@ -110,7 +110,7 @@ To delete a view, use [DROP VIEW](../../../sql-reference/statements/drop.md#drop
`DEFINER` and `SQL SECURITY` allow you to specify which ClickHouse user to use when executing the view's underlying query. `DEFINER` and `SQL SECURITY` allow you to specify which ClickHouse user to use when executing the view's underlying query.
`SQL SECURITY` has three legal values: `DEFINER`, `INVOKER`, or `NONE`. You can specify any existing user or `CURRENT_USER` in the `DEFINER` clause. `SQL SECURITY` has three legal values: `DEFINER`, `INVOKER`, or `NONE`. You can specify any existing user or `CURRENT_USER` in the `DEFINER` clause.
The following table will explain which rights are required for which user in order to select from view. The following table will explain which rights are required for which user in order to select from view.
Note that regardless of the SQL security option, in every case it is still required to have `GRANT SELECT ON <view>` in order to read from it. Note that regardless of the SQL security option, in every case it is still required to have `GRANT SELECT ON <view>` in order to read from it.
| SQL security option | View | Materialized View | | SQL security option | View | Materialized View |
@ -130,7 +130,7 @@ If `DEFINER`/`SQL SECURITY` aren't specified, the default values are used:
If a view is attached without `DEFINER`/`SQL SECURITY` specified, the default value is `SQL SECURITY NONE` for the materialized view and `SQL SECURITY INVOKER` for the normal view. If a view is attached without `DEFINER`/`SQL SECURITY` specified, the default value is `SQL SECURITY NONE` for the materialized view and `SQL SECURITY INVOKER` for the normal view.
To change SQL security for an existing view, use To change SQL security for an existing view, use
```sql ```sql
ALTER TABLE MODIFY SQL SECURITY { DEFINER | INVOKER | NONE } [DEFINER = { user | CURRENT_USER }] ALTER TABLE MODIFY SQL SECURITY { DEFINER | INVOKER | NONE } [DEFINER = { user | CURRENT_USER }]
``` ```
@ -161,6 +161,8 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
REFRESH EVERY|AFTER interval [OFFSET interval] REFRESH EVERY|AFTER interval [OFFSET interval]
RANDOMIZE FOR interval RANDOMIZE FOR interval
DEPENDS ON [db.]name [, [db.]name [, ...]] DEPENDS ON [db.]name [, [db.]name [, ...]]
SETTINGS name = value [, name = value [, ...]]
[APPEND]
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY] [TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
AS SELECT ... AS SELECT ...
[COMMENT 'comment'] [COMMENT 'comment']
@ -170,18 +172,23 @@ where `interval` is a sequence of simple intervals:
number SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR number SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR
``` ```
Periodically runs the corresponding query and stores its result in a table, atomically replacing the table's previous contents. Periodically runs the corresponding query and stores its result in a table.
* If the query says `APPEND`, each refresh inserts rows into the table without deleting existing rows. The insert is not atomic, just like a regular INSERT SELECT.
* Otherwise each refresh atomically replaces the table's previous contents.
Differences from regular non-refreshable materialized views: Differences from regular non-refreshable materialized views:
* No insert trigger. I.e. when new data is inserted into the table specified in SELECT, it's *not* automatically pushed to the refreshable materialized view. The periodic refresh runs the entire query and replaces the entire table. * No insert trigger. I.e. when new data is inserted into the table specified in SELECT, it's *not* automatically pushed to the refreshable materialized view. The periodic refresh runs the entire query.
* No restrictions on the SELECT query. Table functions (e.g. `url()`), views, UNION, JOIN, are all allowed. * No restrictions on the SELECT query. Table functions (e.g. `url()`), views, UNION, JOIN, are all allowed.
:::note
The settings in the `REFRESH ... SETTINGS` part of the query are refresh settings (e.g. `refresh_retries`), distinct from regular settings (e.g. `max_threads`). Regular settings can be specified using `SETTINGS` at the end of the query.
:::
:::note :::note
Refreshable materialized views are a work in progress. Setting `allow_experimental_refreshable_materialized_view = 1` is required for creating one. Current limitations: Refreshable materialized views are a work in progress. Setting `allow_experimental_refreshable_materialized_view = 1` is required for creating one. Current limitations:
* not compatible with Replicated database or table engines * not compatible with Replicated database or table engines
* It is not supported in ClickHouse Cloud * It is not supported in ClickHouse Cloud
* require [Atomic database engine](../../../engines/database-engines/atomic.md), * require [Atomic database engine](../../../engines/database-engines/atomic.md),
* no retries for failed refresh - we just skip to the next scheduled refresh time,
* no limit on number of concurrent refreshes. * no limit on number of concurrent refreshes.
::: :::
@ -246,15 +253,22 @@ A few more examples:
`DEPENDS ON` only works between refreshable materialized views. Listing a regular table in the `DEPENDS ON` list will prevent the view from ever refreshing (dependencies can be removed with `ALTER`, see below). `DEPENDS ON` only works between refreshable materialized views. Listing a regular table in the `DEPENDS ON` list will prevent the view from ever refreshing (dependencies can be removed with `ALTER`, see below).
::: :::
### Settings
Available refresh settings:
* `refresh_retries` - How many times to retry if refresh query fails with an exception. If all retries fail, skip to the next scheduled refresh time. 0 means no retries, -1 means infinite retries. Default: 0.
* `refresh_retry_initial_backoff_ms` - Delay before the first retry, if `refresh_retries` is not zero. Each subsequent retry doubles the delay, up to `refresh_retry_max_backoff_ms`. Default: 100 ms.
* `refresh_retry_max_backoff_ms` - Limit on the exponential growth of delay between refresh attempts. Default: 60000 ms (1 minute).
### Changing Refresh Parameters {#changing-refresh-parameters} ### Changing Refresh Parameters {#changing-refresh-parameters}
To change refresh parameters: To change refresh parameters:
``` ```
ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] [SETTINGS ...]
``` ```
:::note :::note
This replaces refresh schedule *and* dependencies. If the table had a `DEPENDS ON`, doing a `MODIFY REFRESH` without `DEPENDS ON` will remove the dependencies. This replaces *all* refresh parameters at once: schedule, dependencies, settings, and APPEND-ness. E.g. if the table had a `DEPENDS ON`, doing a `MODIFY REFRESH` without `DEPENDS ON` will remove the dependencies.
::: :::
### Other operations ### Other operations
@ -263,6 +277,10 @@ The status of all refreshable materialized views is available in table [`system.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views). To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
:::note
Fun fact: the refresh query is allowed to read from the view that's being refreshed, seeing pre-refresh version of the data. This means you can implement Conway's game of life: https://pastila.nl/?00021a4b/d6156ff819c83d490ad2dcec05676865#O0LGWTO7maUQIA4AcGUtlA==
:::
## Window View [Experimental] ## Window View [Experimental]
:::info :::info

View File

@ -400,7 +400,7 @@ SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_
After running this statement the `[db.]replicated_merge_tree_family_table_name` fetches commands from the common replicated log into its own replication queue, and then the query waits till the replica processes all of the fetched commands. The following modifiers are supported: After running this statement the `[db.]replicated_merge_tree_family_table_name` fetches commands from the common replicated log into its own replication queue, and then the query waits till the replica processes all of the fetched commands. The following modifiers are supported:
- If a `STRICT` modifier was specified then the query waits for the replication queue to become empty. The `STRICT` version may never succeed if new entries constantly appear in the replication queue. - If a `STRICT` modifier was specified then the query waits for the replication queue to become empty. The `STRICT` version may never succeed if new entries constantly appear in the replication queue.
- If a `LIGHTWEIGHT` modifier was specified then the query waits only for `GET_PART`, `ATTACH_PART`, `DROP_RANGE`, `REPLACE_RANGE` and `DROP_PART` entries to be processed. - If a `LIGHTWEIGHT` modifier was specified then the query waits only for `GET_PART`, `ATTACH_PART`, `DROP_RANGE`, `REPLACE_RANGE` and `DROP_PART` entries to be processed.
Additionally, the LIGHTWEIGHT modifier supports an optional FROM 'srcReplicas' clause, where 'srcReplicas' is a comma-separated list of source replica names. This extension allows for more targeted synchronization by focusing only on replication tasks originating from the specified source replicas. Additionally, the LIGHTWEIGHT modifier supports an optional FROM 'srcReplicas' clause, where 'srcReplicas' is a comma-separated list of source replica names. This extension allows for more targeted synchronization by focusing only on replication tasks originating from the specified source replicas.
- If a `PULL` modifier was specified then the query pulls new replication queue entries from ZooKeeper, but does not wait for anything to be processed. - If a `PULL` modifier was specified then the query pulls new replication queue entries from ZooKeeper, but does not wait for anything to be processed.
@ -526,6 +526,10 @@ Trigger an immediate out-of-schedule refresh of a given view.
SYSTEM REFRESH VIEW [db.]name SYSTEM REFRESH VIEW [db.]name
``` ```
### REFRESH VIEW
Wait for the currently running refresh to complete. If the refresh fails, throws an exception. If no refresh is running, completes immediately, throwing an exception if previous refresh failed.
### STOP VIEW, STOP VIEWS ### STOP VIEW, STOP VIEWS
Disable periodic refreshing of the given view or all refreshable views. If a refresh is in progress, cancel it too. Disable periodic refreshing of the given view or all refreshable views. If a refresh is in progress, cancel it too.

View File

@ -608,6 +608,7 @@
M(727, UNEXPECTED_TABLE_ENGINE) \ M(727, UNEXPECTED_TABLE_ENGINE) \
M(728, UNEXPECTED_DATA_TYPE) \ M(728, UNEXPECTED_DATA_TYPE) \
M(729, ILLEGAL_TIME_SERIES_TAGS) \ M(729, ILLEGAL_TIME_SERIES_TAGS) \
M(730, REFRESH_FAILED) \
\ \
M(900, DISTRIBUTED_CACHE_ERROR) \ M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -248,8 +248,31 @@ void StackTrace::forEachFrame(
auto dwarf_it = dwarfs.try_emplace(object->name, object->elf).first; auto dwarf_it = dwarfs.try_emplace(object->name, object->elf).first;
DB::Dwarf::LocationInfo location; DB::Dwarf::LocationInfo location;
if (dwarf_it->second.findAddress( uintptr_t adjusted_addr = uintptr_t(current_frame.physical_addr);
uintptr_t(current_frame.physical_addr), location, mode, inline_frames)) if (i > 0)
{
/// For non-innermost stack frames, the address points to the *next* instruction
/// after the `call` instruction. But we want the line number and inline function
/// information for the `call` instruction. So subtract 1 from the address.
/// Caveats:
/// * The `call` instruction can be longer than 1 byte, so addr-1 is in the middle
/// of the instruction. That's ok for debug info lookup: address ranges in debug
/// info cover the whole instruction.
/// * If the stack trace unwound out of a signal handler, the stack frame just
/// outside the signal didn't do a function call. It was interrupted by signal.
/// There's no `call` instruction, and decrementing the address is incorrect.
/// We may get incorrect line number and inlined functions in this case.
/// Unfortunate.
/// Note that libunwind, when producing this stack trace, knows whether this
/// frame is interrupted by signal or not. We could propagate this information
/// from libunwind to here and avoid subtracting 1 in this case, but currently
/// we don't do this.
/// But we don't do the decrement for findSymbol below (because `call` is
/// ~never the last instruction of a function), so the function name should be
/// correct for both pre-signal frames and regular frames.
adjusted_addr -= 1;
}
if (dwarf_it->second.findAddress(adjusted_addr, location, mode, inline_frames))
{ {
current_frame.file = location.file.toString(); current_frame.file = location.file.toString();
current_frame.line = location.line; current_frame.line = location.line;

View File

@ -616,6 +616,7 @@ class IColumn;
M(Bool, throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert, true, "Throw exception on INSERT query when the setting `deduplicate_blocks_in_dependent_materialized_views` is enabled along with `async_insert`. It guarantees correctness, because these features can't work together.", 0) \ M(Bool, throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert, true, "Throw exception on INSERT query when the setting `deduplicate_blocks_in_dependent_materialized_views` is enabled along with `async_insert`. It guarantees correctness, because these features can't work together.", 0) \
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \ M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
M(Bool, ignore_materialized_views_with_dropped_target_table, false, "Ignore MVs with dropped target table during pushing to views", 0) \ M(Bool, ignore_materialized_views_with_dropped_target_table, false, "Ignore MVs with dropped target table during pushing to views", 0) \
M(Bool, allow_materialized_view_with_bad_select, true, "Allow CREATE MATERIALIZED VIEW with SELECT query that references nonexistent tables or columns. It must still be syntactically valid. Doesn't apply to refreshable MVs. Doesn't apply if the MV schema needs to be inferred from the SELECT query (i.e. if the CREATE has no column list and no TO table). Can be used for creating MV before its source table.", 0) \
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \ M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \ M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \ M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \

View File

@ -84,6 +84,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"use_hive_partitioning", false, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines."}, {"use_hive_partitioning", false, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines."},
{"allow_experimental_kafka_offsets_storage_in_keeper", false, false, "Allow the usage of experimental Kafka storage engine that stores the committed offsets in ClickHouse Keeper"}, {"allow_experimental_kafka_offsets_storage_in_keeper", false, false, "Allow the usage of experimental Kafka storage engine that stores the committed offsets in ClickHouse Keeper"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."}, {"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
{"allow_materialized_view_with_bad_select", true, true, "Support (but not enable yet) stricter validation in CREATE MATERIALIZED VIEW"},
{"query_cache_tag", "", "", "New setting for labeling query cache settings."}, {"query_cache_tag", "", "", "New setting for labeling query cache settings."},
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"}, {"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."}, {"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},

View File

@ -180,8 +180,8 @@ ReturnType parseDateTimeBestEffortImpl(
} }
else if (num_digits == 10 && !year && !has_time) else if (num_digits == 10 && !year && !has_time)
{ {
if (strict && month) if (strict)
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: month component is duplicated"); return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Strict best effort parsing doesn't allow timestamps");
/// This is unix timestamp. /// This is unix timestamp.
readDecimalNumber<10>(res, digits); readDecimalNumber<10>(res, digits);
@ -189,8 +189,8 @@ ReturnType parseDateTimeBestEffortImpl(
} }
else if (num_digits == 9 && !year && !has_time) else if (num_digits == 9 && !year && !has_time)
{ {
if (strict && month) if (strict)
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: month component is duplicated"); return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Strict best effort parsing doesn't allow timestamps");
/// This is unix timestamp. /// This is unix timestamp.
readDecimalNumber<9>(res, digits); readDecimalNumber<9>(res, digits);
@ -198,8 +198,8 @@ ReturnType parseDateTimeBestEffortImpl(
} }
else if (num_digits == 14 && !year && !has_time) else if (num_digits == 14 && !year && !has_time)
{ {
if (strict && month) if (strict)
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: month component is duplicated"); return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Strict best effort parsing doesn't allow date times without separators");
/// This is YYYYMMDDhhmmss /// This is YYYYMMDDhhmmss
readDecimalNumber<4>(year, digits); readDecimalNumber<4>(year, digits);
@ -212,8 +212,8 @@ ReturnType parseDateTimeBestEffortImpl(
} }
else if (num_digits == 8 && !year) else if (num_digits == 8 && !year)
{ {
if (strict && month) if (strict)
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: month component is duplicated"); return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Strict best effort parsing doesn't allow date times without separators");
/// This is YYYYMMDD /// This is YYYYMMDD
readDecimalNumber<4>(year, digits); readDecimalNumber<4>(year, digits);
@ -222,6 +222,9 @@ ReturnType parseDateTimeBestEffortImpl(
} }
else if (num_digits == 6) else if (num_digits == 6)
{ {
if (strict)
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Strict best effort parsing doesn't allow date times without separators");
/// This is YYYYMM or hhmmss /// This is YYYYMM or hhmmss
if (!year && !month) if (!year && !month)
{ {
@ -350,7 +353,7 @@ ReturnType parseDateTimeBestEffortImpl(
if (month && !day_of_month) if (month && !day_of_month)
day_of_month = hour_or_day_of_month_or_month; day_of_month = hour_or_day_of_month_or_month;
} }
else if (checkChar('/', in) || checkChar('.', in) || checkChar('-', in)) else if ((!in.eof() && isSymbolIn(*in.position(), allowed_date_delimiters)) && (checkChar('/', in) || checkChar('.', in) || checkChar('-', in)))
{ {
if (day_of_month) if (day_of_month)
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: day of month is duplicated"); return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: day of month is duplicated");
@ -399,7 +402,7 @@ ReturnType parseDateTimeBestEffortImpl(
if (month > 12) if (month > 12)
std::swap(month, day_of_month); std::swap(month, day_of_month);
if (checkChar('/', in) || checkChar('.', in) || checkChar('-', in)) if ((!in.eof() && isSymbolIn(*in.position(), allowed_date_delimiters)) && (checkChar('/', in) || checkChar('.', in) || checkChar('-', in)))
{ {
if (year) if (year)
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: year component is duplicated"); return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: year component is duplicated");
@ -593,8 +596,8 @@ ReturnType parseDateTimeBestEffortImpl(
else else
return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: unexpected word"); return on_error(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot read DateTime: unexpected word");
// while (!in.eof() && isAlphaASCII(*in.position())) while (!in.eof() && isAlphaASCII(*in.position()))
// ++in.position(); ++in.position();
/// For RFC 2822 /// For RFC 2822
if (has_day_of_week) if (has_day_of_week)

View File

@ -121,6 +121,7 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED; extern const int SUPPORT_IS_DISABLED;
extern const int TOO_MANY_TABLES; extern const int TOO_MANY_TABLES;
extern const int TOO_MANY_DATABASES; extern const int TOO_MANY_DATABASES;
extern const int THERE_IS_NO_COLUMN;
} }
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -847,6 +848,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
} }
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList()); properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
properties.columns_inferred_from_select_query = true;
} }
else if (create.as_table_function) else if (create.as_table_function)
{ {
@ -936,6 +938,105 @@ void validateVirtualColumns(const IStorage & storage)
} }
} }
void InterpreterCreateQuery::validateMaterializedViewColumnsAndEngine(const ASTCreateQuery & create, const TableProperties & properties, const DatabasePtr & database)
{
/// This is not strict validation, just catches common errors that would make the view not work.
/// It's possible to circumvent these checks by ALTERing the view or target table after creation;
/// we should probably do some of these checks on ALTER as well.
NamesAndTypesList all_output_columns;
bool check_columns = false;
if (create.hasTargetTableID(ViewTarget::To))
{
if (StoragePtr to_table = DatabaseCatalog::instance().tryGetTable(
create.getTargetTableID(ViewTarget::To), getContext()))
{
all_output_columns = to_table->getInMemoryMetadataPtr()->getSampleBlock().getNamesAndTypesList();
check_columns = true;
}
}
else if (!properties.columns_inferred_from_select_query)
{
all_output_columns = properties.columns.getInsertable();
check_columns = true;
}
if (create.refresh_strategy && !create.refresh_strategy->append)
{
if (database && database->getEngineName() != "Atomic")
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Refreshable materialized views (except with APPEND) only support Atomic database engine, but database {} has engine {}", create.getDatabase(), database->getEngineName());
}
Block input_block;
if (check_columns)
{
try
{
if (getContext()->getSettingsRef().allow_experimental_analyzer)
{
input_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.select->clone(), getContext());
}
else
{
input_block = InterpreterSelectWithUnionQuery(create.select->clone(),
getContext(),
SelectQueryOptions().analyze()).getSampleBlock();
}
}
catch (Exception &)
{
if (!getContext()->getSettingsRef().allow_materialized_view_with_bad_select)
throw;
check_columns = false;
}
}
if (check_columns)
{
std::unordered_map<std::string_view, DataTypePtr> output_types;
for (const NameAndTypePair & nt : all_output_columns)
output_types[nt.name] = nt.type;
ColumnsWithTypeAndName input_columns;
ColumnsWithTypeAndName output_columns;
for (const auto & input_column : input_block)
{
auto it = output_types.find(input_column.name);
if (it != output_types.end())
{
input_columns.push_back(input_column.cloneEmpty());
output_columns.push_back(ColumnWithTypeAndName(it->second->createColumn(), it->second, input_column.name));
}
else if (create.refresh_strategy)
{
/// Unrecognized columns produced by SELECT query are allowed by regular materialized
/// views, but not by refreshable ones. This is in part because it was easier to
/// implement, in part because refreshable views have less concern about ALTERing target
/// tables.
///
/// The motivating scenario for allowing this in regular MV is ALTERing the table+query.
/// Suppose the user removes a column from target table, then a minute later
/// correspondingly updates the view's query to not produce that column.
/// If MV didn't allow unrecognized columns then during that minute all INSERTs into the
/// source table would fail - unacceptable.
/// For refreshable views, during that minute refreshes will fail - acceptable.
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "SELECT query outputs column with name '{}', which is not found in the target table. Use 'AS' to assign alias that matches a column name.", input_column.name);
}
}
if (input_columns.empty())
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "None of the columns produced by the SELECT query are present in the target table. Use 'AS' to assign aliases that match column names.");
ActionsDAG::makeConvertingActions(
input_columns,
output_columns,
ActionsDAG::MatchColumnsMode::Position
);
}
}
namespace namespace
{ {
void checkTemporaryTableEngineName(const String & name) void checkTemporaryTableEngineName(const String & name)
@ -1132,13 +1233,6 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table UUID is not specified in DDL log"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Table UUID is not specified in DDL log");
} }
if (create.refresh_strategy && database->getEngineName() != "Atomic")
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Refreshable materialized view requires Atomic database engine, but database {} has engine {}", create.getDatabase(), database->getEngineName());
/// TODO: Support Replicated databases, only with Shared/ReplicatedMergeTree.
/// Figure out how to make the refreshed data appear all at once on other
/// replicas; maybe a replicated SYSTEM SYNC REPLICA query before the rename?
if (database->getUUID() != UUIDHelpers::Nil) if (database->getUUID() != UUIDHelpers::Nil)
{ {
if (create.attach && !from_path && create.uuid == UUIDHelpers::Nil) if (create.attach && !from_path && create.uuid == UUIDHelpers::Nil)
@ -1359,51 +1453,16 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way. /// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create, mode); TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create, mode);
/// Check type compatible for materialized dest table and select columns
if (create.is_materialized_view_with_external_target() && create.select && mode <= LoadingStrictnessLevel::CREATE)
{
if (StoragePtr to_table = DatabaseCatalog::instance().tryGetTable(create.getTargetTableID(ViewTarget::To), getContext()))
{
Block input_block;
if (getContext()->getSettingsRef().allow_experimental_analyzer)
{
input_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.select->clone(), getContext());
}
else
{
input_block = InterpreterSelectWithUnionQuery(create.select->clone(),
getContext(),
SelectQueryOptions().analyze()).getSampleBlock();
}
Block output_block = to_table->getInMemoryMetadataPtr()->getSampleBlock();
ColumnsWithTypeAndName input_columns;
ColumnsWithTypeAndName output_columns;
for (const auto & input_column : input_block)
{
if (const auto * output_column = output_block.findByName(input_column.name))
{
input_columns.push_back(input_column.cloneEmpty());
output_columns.push_back(output_column->cloneEmpty());
}
}
ActionsDAG::makeConvertingActions(
input_columns,
output_columns,
ActionsDAG::MatchColumnsMode::Position
);
}
}
DatabasePtr database; DatabasePtr database;
bool need_add_to_database = !create.temporary; bool need_add_to_database = !create.temporary;
// In case of an ON CLUSTER query, the database may not be present on the initiator node // In case of an ON CLUSTER query, the database may not be present on the initiator node
if (need_add_to_database) if (need_add_to_database)
database = DatabaseCatalog::instance().tryGetDatabase(database_name); database = DatabaseCatalog::instance().tryGetDatabase(database_name);
/// Check type compatible for materialized dest table and select columns
if (create.select && create.is_materialized_view && mode <= LoadingStrictnessLevel::CREATE)
validateMaterializedViewColumnsAndEngine(create, properties, database);
bool allow_heavy_populate = getContext()->getSettingsRef().database_replicated_allow_heavy_create && create.is_populate; bool allow_heavy_populate = getContext()->getSettingsRef().database_replicated_allow_heavy_create && create.is_populate;
if (!allow_heavy_populate && database && database->getEngineName() == "Replicated" && (create.select || create.is_populate)) if (!allow_heavy_populate && database && database->getEngineName() == "Replicated" && (create.select || create.is_populate))
{ {

View File

@ -90,6 +90,7 @@ private:
IndicesDescription indices; IndicesDescription indices;
ConstraintsDescription constraints; ConstraintsDescription constraints;
ProjectionsDescription projections; ProjectionsDescription projections;
bool columns_inferred_from_select_query = false;
}; };
BlockIO createDatabase(ASTCreateQuery & create); BlockIO createDatabase(ASTCreateQuery & create);
@ -98,6 +99,7 @@ private:
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way. /// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create, LoadingStrictnessLevel mode) const; TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create, LoadingStrictnessLevel mode) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const; void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
void validateMaterializedViewColumnsAndEngine(const ASTCreateQuery & create, const TableProperties & properties, const DatabasePtr & database);
void setEngine(ASTCreateQuery & create) const; void setEngine(ASTCreateQuery & create) const;
AccessRightsElements getRequiredAccess() const; AccessRightsElements getRequiredAccess() const;

View File

@ -666,6 +666,10 @@ BlockIO InterpreterSystemQuery::execute()
for (const auto & task : getRefreshTasks()) for (const auto & task : getRefreshTasks())
task->run(); task->run();
break; break;
case Type::WAIT_VIEW:
for (const auto & task : getRefreshTasks())
task->wait();
break;
case Type::CANCEL_VIEW: case Type::CANCEL_VIEW:
for (const auto & task : getRefreshTasks()) for (const auto & task : getRefreshTasks())
task->cancel(); task->cancel();
@ -1409,6 +1413,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
break; break;
} }
case Type::REFRESH_VIEW: case Type::REFRESH_VIEW:
case Type::WAIT_VIEW:
case Type::START_VIEW: case Type::START_VIEW:
case Type::START_VIEWS: case Type::START_VIEWS:
case Type::STOP_VIEW: case Type::STOP_VIEW:

View File

@ -256,6 +256,8 @@ ASTPtr ASTCreateQuery::clone() const
res->set(res->dictionary, dictionary->clone()); res->set(res->dictionary, dictionary->clone());
} }
if (refresh_strategy)
res->set(res->refresh_strategy, refresh_strategy->clone());
if (as_table_function) if (as_table_function)
res->set(res->as_table_function, as_table_function->clone()); res->set(res->as_table_function, as_table_function->clone());
if (comment) if (comment)

View File

@ -20,7 +20,6 @@ ASTPtr ASTRefreshStrategy::clone() const
res->set(res->settings, settings->clone()); res->set(res->settings, settings->clone());
if (dependencies) if (dependencies)
res->set(res->dependencies, dependencies->clone()); res->set(res->dependencies, dependencies->clone());
res->schedule_kind = schedule_kind;
return res; return res;
} }
@ -66,6 +65,8 @@ void ASTRefreshStrategy::formatImpl(
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " SETTINGS " << (f_settings.hilite ? hilite_none : ""); f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " SETTINGS " << (f_settings.hilite ? hilite_none : "");
settings->formatImpl(f_settings, state, frame); settings->formatImpl(f_settings, state, frame);
} }
if (append)
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " APPEND" << (f_settings.hilite ? hilite_none : "");
} }
} }

View File

@ -24,6 +24,7 @@ public:
ASTTimeInterval * offset = nullptr; ASTTimeInterval * offset = nullptr;
ASTTimeInterval * spread = nullptr; ASTTimeInterval * spread = nullptr;
RefreshScheduleKind schedule_kind{RefreshScheduleKind::UNKNOWN}; RefreshScheduleKind schedule_kind{RefreshScheduleKind::UNKNOWN};
bool append = false;
String getID(char) const override { return "Refresh strategy definition"; } String getID(char) const override { return "Refresh strategy definition"; }

View File

@ -141,6 +141,19 @@ public:
QueryKind getQueryKind() const override { return QueryKind::Rename; } QueryKind getQueryKind() const override { return QueryKind::Rename; }
void addElement(const String & from_db, const String & from_table, const String & to_db, const String & to_table)
{
auto identifier = [&](const String & name) -> ASTPtr
{
if (name.empty())
return nullptr;
ASTPtr ast = std::make_shared<ASTIdentifier>(name);
children.push_back(ast);
return ast;
};
elements.push_back(Element {.from = Table {.database = identifier(from_db), .table = identifier(from_table)}, .to = Table {.database = identifier(to_db), .table = identifier(to_table)}});
}
protected: protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{ {

View File

@ -376,6 +376,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::START_VIEW: case Type::START_VIEW:
case Type::STOP_VIEW: case Type::STOP_VIEW:
case Type::CANCEL_VIEW: case Type::CANCEL_VIEW:
case Type::WAIT_VIEW:
{ {
settings.ostr << ' '; settings.ostr << ' ';
print_database_table(); print_database_table();

View File

@ -95,6 +95,7 @@ public:
START_CLEANUP, START_CLEANUP,
RESET_COVERAGE, RESET_COVERAGE,
REFRESH_VIEW, REFRESH_VIEW,
WAIT_VIEW,
START_VIEW, START_VIEW,
START_VIEWS, START_VIEWS,
STOP_VIEW, STOP_VIEW,

View File

@ -2811,8 +2811,8 @@ Action ParserExpressionImpl::tryParseOperator(Layers & layers, IParser::Pos & po
if (op.type == OperatorType::TupleElement) if (op.type == OperatorType::TupleElement)
{ {
ASTPtr tmp; ASTPtr tmp;
if (asterisk_parser.parse(pos, tmp, expected) || if (asterisk_parser.parse(pos, tmp, expected)
columns_matcher_parser.parse(pos, tmp, expected)) || columns_matcher_parser.parse(pos, tmp, expected))
{ {
if (auto * asterisk = tmp->as<ASTAsterisk>()) if (auto * asterisk = tmp->as<ASTAsterisk>())
{ {
@ -2833,6 +2833,17 @@ Action ParserExpressionImpl::tryParseOperator(Layers & layers, IParser::Pos & po
layers.back()->pushOperand(std::move(tmp)); layers.back()->pushOperand(std::move(tmp));
return Action::OPERATOR; return Action::OPERATOR;
} }
/// If it is an identifier,
/// replace it with literal, because an expression `expr().elem`
/// should be transformed to `tupleElement(expr(), 'elem')` for query analysis,
/// otherwise the identifier `elem` will not be found.
if (ParserIdentifier().parse(pos, tmp, expected))
{
layers.back()->pushOperator(op);
layers.back()->pushOperand(std::make_shared<ASTLiteral>(tmp->as<ASTIdentifier>()->name()));
return Action::OPERATOR;
}
} }
/// isNull & isNotNull are postfix unary operators /// isNull & isNotNull are postfix unary operators
@ -2863,7 +2874,7 @@ Action ParserExpressionImpl::tryParseOperator(Layers & layers, IParser::Pos & po
layers.push_back(std::make_unique<ArrayElementLayer>()); layers.push_back(std::make_unique<ArrayElementLayer>());
if (op.type == OperatorType::StartBetween || op.type == OperatorType::StartNotBetween) if (op.type == OperatorType::StartBetween || op.type == OperatorType::StartNotBetween)
layers.back()->between_counter++; ++layers.back()->between_counter;
return Action::OPERAND; return Action::OPERAND;
} }

View File

@ -96,6 +96,10 @@ bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return false; return false;
refresh->set(refresh->settings, settings); refresh->set(refresh->settings, settings);
} }
if (ParserKeyword{Keyword::APPEND}.ignore(pos, expected))
refresh->append = true;
node = refresh; node = refresh;
return true; return true;
} }

View File

@ -421,6 +421,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
break; break;
case Type::REFRESH_VIEW: case Type::REFRESH_VIEW:
case Type::WAIT_VIEW:
case Type::START_VIEW: case Type::START_VIEW:
case Type::STOP_VIEW: case Type::STOP_VIEW:
case Type::CANCEL_VIEW: case Type::CANCEL_VIEW:

View File

@ -103,7 +103,7 @@ public:
IStorage(const IStorage &) = delete; IStorage(const IStorage &) = delete;
IStorage & operator=(const IStorage &) = delete; IStorage & operator=(const IStorage &) = delete;
/// The main name of the table type (for example, StorageMergeTree). /// The main name of the table type (e.g. Memory, MergeTree, CollapsingMergeTree).
virtual std::string getName() const = 0; virtual std::string getName() const = 0;
/// The name of the table. /// The name of the table.

View File

@ -10,7 +10,7 @@
namespace DB namespace DB
{ {
enum class RefreshState : RefreshTaskStateUnderlying enum class RefreshState
{ {
Disabled = 0, Disabled = 0,
Scheduled, Scheduled,
@ -18,11 +18,11 @@ enum class RefreshState : RefreshTaskStateUnderlying
Running, Running,
}; };
enum class LastRefreshResult : RefreshTaskStateUnderlying enum class LastRefreshResult
{ {
Unknown = 0, Unknown = 0,
Cancelled, Cancelled,
Exception, Error,
Finished Finished
}; };
@ -36,7 +36,8 @@ struct RefreshInfo
UInt64 last_attempt_duration_ms = 0; UInt64 last_attempt_duration_ms = 0;
UInt32 next_refresh_time = 0; UInt32 next_refresh_time = 0;
UInt64 refresh_count = 0; UInt64 refresh_count = 0;
String exception_message; // if last_refresh_result is Exception UInt64 retry = 0;
String exception_message; // if last_refresh_result is Error
std::vector<StorageID> remaining_dependencies; std::vector<StorageID> remaining_dependencies;
ProgressValues progress; ProgressValues progress;
}; };

View File

@ -6,8 +6,10 @@ namespace DB
{ {
#define LIST_OF_REFRESH_SETTINGS(M, ALIAS) \ #define LIST_OF_REFRESH_SETTINGS(M, ALIAS) \
/// TODO: Add settings M(Int64, refresh_retries, 0, "How many times to retry refresh query if it fails. If all attempts fail, wait for the next refresh time according to schedule. 0 to disable retries. -1 for infinite retries.", 0) \
/// M(UInt64, name, 42, "...", 0) M(UInt64, refresh_retry_initial_backoff_ms, 100, "Delay before the first retry if refresh query fails (if refresh_retries setting is not zero). Each subsequent retry doubles the delay, up to refresh_retry_max_backoff_ms.", 0) \
M(UInt64, refresh_retry_max_backoff_ms, 60'000, "Limit on the exponential growth of delay between refresh attempts, if they keep failing and refresh_retries is positive.", 0) \
DECLARE_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS) DECLARE_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS)

View File

@ -1,7 +1,5 @@
#include <Storages/MaterializedView/RefreshTask.h> #include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/StorageMaterializedView.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -11,6 +9,7 @@
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Processors/Executors/PipelineExecutor.h> #include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/ReadProgressCallback.h> #include <QueryPipeline/ReadProgressCallback.h>
#include <Storages/StorageMaterializedView.h>
namespace CurrentMetrics namespace CurrentMetrics
{ {
@ -24,37 +23,39 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED; extern const int QUERY_WAS_CANCELLED;
extern const int REFRESH_FAILED;
} }
RefreshTask::RefreshTask( RefreshTask::RefreshTask(
const ASTRefreshStrategy & strategy) StorageMaterializedView * view_, const DB::ASTRefreshStrategy & strategy)
: log(getLogger("RefreshTask")) : log(getLogger("RefreshTask"))
, view(view_)
, refresh_schedule(strategy) , refresh_schedule(strategy)
{} , refresh_append(strategy.append)
{
if (strategy.settings != nullptr)
refresh_settings.applyChanges(strategy.settings->changes);
}
RefreshTaskHolder RefreshTask::create( OwnedRefreshTask RefreshTask::create(
StorageMaterializedView * view,
ContextMutablePtr context, ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy) const DB::ASTRefreshStrategy & strategy)
{ {
auto task = std::make_shared<RefreshTask>(strategy); auto task = std::make_shared<RefreshTask>(view, strategy);
task->refresh_task = context->getSchedulePool().createTask("MaterializedViewRefresherTask", task->refresh_task = context->getSchedulePool().createTask("RefreshTask",
[self = task->weak_from_this()] [self = task.get()] { self->refreshTask(); });
{
if (auto t = self.lock())
t->refreshTask();
});
if (strategy.dependencies) if (strategy.dependencies)
for (auto && dependency : strategy.dependencies->children) for (auto && dependency : strategy.dependencies->children)
task->initial_dependencies.emplace_back(dependency->as<const ASTTableIdentifier &>()); task->initial_dependencies.emplace_back(dependency->as<const ASTTableIdentifier &>());
return task; return OwnedRefreshTask(task);
} }
void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> view) void RefreshTask::initializeAndStart()
{ {
view_to_refresh = view;
if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup) if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup)
stop_requested = true; stop_requested = true;
view->getContext()->getRefreshSet().emplace(view->getStorageID(), initial_dependencies, shared_from_this()); view->getContext()->getRefreshSet().emplace(view->getStorageID(), initial_dependencies, shared_from_this());
@ -102,7 +103,11 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy
if (arriveDependency(id) && !std::exchange(refresh_immediately, true)) if (arriveDependency(id) && !std::exchange(refresh_immediately, true))
refresh_task->schedule(); refresh_task->schedule();
/// TODO: Update settings once we have them. refresh_settings = {};
if (new_strategy.settings != nullptr)
refresh_settings.applyChanges(new_strategy.settings->changes);
refresh_append = new_strategy.append;
} }
RefreshInfo RefreshTask::getInfo() const RefreshInfo RefreshTask::getInfo() const
@ -111,7 +116,7 @@ RefreshInfo RefreshTask::getInfo() const
auto res = info; auto res = info;
res.view_id = set_handle.getID(); res.view_id = set_handle.getID();
res.remaining_dependencies.assign(remaining_dependencies.begin(), remaining_dependencies.end()); res.remaining_dependencies.assign(remaining_dependencies.begin(), remaining_dependencies.end());
if (res.last_refresh_result != LastRefreshResult::Exception) if (res.last_refresh_result != LastRefreshResult::Error)
res.exception_message.clear(); res.exception_message.clear();
res.progress = progress.getValues(); res.progress = progress.getValues();
return res; return res;
@ -139,6 +144,8 @@ void RefreshTask::run()
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
if (std::exchange(refresh_immediately, true)) if (std::exchange(refresh_immediately, true))
return; return;
next_refresh_prescribed = std::chrono::floor<std::chrono::seconds>(currentTime());
next_refresh_actual = currentTime();
refresh_task->schedule(); refresh_task->schedule();
} }
@ -149,10 +156,22 @@ void RefreshTask::cancel()
refresh_task->schedule(); refresh_task->schedule();
} }
void RefreshTask::wait()
{
std::unique_lock lock(mutex);
refresh_cv.wait(lock, [&] { return info.state != RefreshState::Running && !refresh_immediately; });
if (info.last_refresh_result == LastRefreshResult::Error)
throw Exception(ErrorCodes::REFRESH_FAILED, "Refresh failed: {}", info.exception_message);
}
void RefreshTask::shutdown() void RefreshTask::shutdown()
{ {
{ {
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
if (view == nullptr)
return; // already shut down
stop_requested = true; stop_requested = true;
interruptExecution(); interruptExecution();
} }
@ -166,6 +185,8 @@ void RefreshTask::shutdown()
/// (Also, RefreshSet holds a shared_ptr to us.) /// (Also, RefreshSet holds a shared_ptr to us.)
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
set_handle.reset(); set_handle.reset();
view = nullptr;
} }
void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time) void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time)
@ -232,6 +253,7 @@ void RefreshTask::refreshTask()
chassert(lock.owns_lock()); chassert(lock.owns_lock());
interrupt_execution.store(false); interrupt_execution.store(false);
refresh_cv.notify_all(); // we'll assign info.state before unlocking the mutex
if (stop_requested) if (stop_requested)
{ {
@ -243,7 +265,7 @@ void RefreshTask::refreshTask()
if (!refresh_immediately) if (!refresh_immediately)
{ {
auto now = currentTime(); auto now = currentTime();
if (now >= next_refresh_with_spread) if (now >= next_refresh_actual)
{ {
if (arriveTime()) if (arriveTime())
refresh_immediately = true; refresh_immediately = true;
@ -256,7 +278,7 @@ void RefreshTask::refreshTask()
else else
{ {
size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>( size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
next_refresh_with_spread - now).count(); next_refresh_actual - now).count();
/// If we're in a test that fakes the clock, poll every 100ms. /// If we're in a test that fakes the clock, poll every 100ms.
if (fake_clock.load(std::memory_order_relaxed) != INT64_MIN) if (fake_clock.load(std::memory_order_relaxed) != INT64_MIN)
@ -270,19 +292,9 @@ void RefreshTask::refreshTask()
/// Perform a refresh. /// Perform a refresh.
bool append = refresh_append;
refresh_immediately = false; refresh_immediately = false;
auto view = lockView();
if (!view)
{
/// The view was dropped. This RefreshTask should be destroyed soon too.
/// (Maybe this is unreachable.)
info.state = RefreshState::Disabled;
break;
}
info.state = RefreshState::Running; info.state = RefreshState::Running;
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews); CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
lock.unlock(); lock.unlock();
@ -293,19 +305,13 @@ void RefreshTask::refreshTask()
try try
{ {
executeRefreshUnlocked(view); executeRefreshUnlocked(append);
refreshed = true; refreshed = true;
} }
catch (...) catch (...)
{ {
if (!interrupt_execution.load()) if (!interrupt_execution.load())
{ exception = getCurrentExceptionMessage(true);
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
auto text = message.text;
message.text = fmt::format("Refresh view {} failed: {}", view->getStorageID().getFullTableName(), message.text);
LOG_ERROR(log, message);
exception = text;
}
} }
lock.lock(); lock.lock();
@ -317,18 +323,18 @@ void RefreshTask::refreshTask()
if (exception) if (exception)
{ {
info.last_refresh_result = LastRefreshResult::Exception; info.last_refresh_result = LastRefreshResult::Error;
info.exception_message = *exception; info.exception_message = *exception;
Int64 attempt_number = num_retries + 1;
/// TODO: Do a few retries with exponential backoff. scheduleRetryOrSkipToNextRefresh(now);
advanceNextRefreshTime(now); LOG_ERROR(log, "Refresh view {} failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), attempt_number, refresh_settings.refresh_retries + 1, *exception);
} }
else if (!refreshed) else if (!refreshed)
{ {
info.last_refresh_result = LastRefreshResult::Cancelled; info.last_refresh_result = LastRefreshResult::Cancelled;
/// Make sure we don't just start another refresh immediately. /// Make sure we don't just start another refresh immediately.
if (!stop_requested && now >= next_refresh_with_spread) if (!stop_requested)
advanceNextRefreshTime(now); advanceNextRefreshTime(now);
} }
else else
@ -361,17 +367,18 @@ void RefreshTask::refreshTask()
} }
} }
void RefreshTask::executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view) void RefreshTask::executeRefreshUnlocked(bool append)
{ {
LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName()); LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName());
progress.reset(); progress.reset();
/// Create a table. ContextMutablePtr refresh_context = view->createRefreshContext();
auto [refresh_context, refresh_query] = view->prepareRefresh(); std::optional<StorageID> table_to_drop;
StorageID stale_table = StorageID::createEmpty();
try try
{ {
/// Create a table.
auto refresh_query = view->prepareRefresh(append, refresh_context, table_to_drop);
/// Run the query. /// Run the query.
{ {
CurrentThread::QueryScope query_scope(refresh_context); // create a thread group for the query CurrentThread::QueryScope query_scope(refresh_context); // create a thread group for the query
@ -429,37 +436,55 @@ void RefreshTask::executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView
} }
/// Exchange tables. /// Exchange tables.
stale_table = view->exchangeTargetTable(refresh_query->table_id, refresh_context); if (!append)
table_to_drop = view->exchangeTargetTable(refresh_query->table_id, refresh_context);
} }
catch (...) catch (...)
{ {
try if (table_to_drop.has_value())
{ view->dropTempTable(table_to_drop.value(), refresh_context);
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, refresh_query->table_id, /*sync*/ false, /*ignore_sync_setting*/ true);
}
catch (...)
{
tryLogCurrentException(log, "Failed to drop temporary table after a failed refresh");
/// Let's ignore this and keep going, at risk of accumulating many trash tables if this keeps happening.
}
throw; throw;
} }
/// Drop the old table (outside the try-catch so we don't try to drop the other table if this fails). if (table_to_drop.has_value())
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, stale_table, /*sync*/ true, /*ignore_sync_setting*/ true); view->dropTempTable(table_to_drop.value(), refresh_context);
} }
void RefreshTask::advanceNextRefreshTime(std::chrono::system_clock::time_point now) void RefreshTask::advanceNextRefreshTime(std::chrono::system_clock::time_point now)
{ {
std::chrono::sys_seconds next = refresh_schedule.prescribeNext(next_refresh_prescribed, now); std::chrono::sys_seconds next = refresh_schedule.prescribeNext(next_refresh_prescribed, now);
next_refresh_prescribed = next; next_refresh_prescribed = next;
next_refresh_with_spread = refresh_schedule.addRandomSpread(next); next_refresh_actual = refresh_schedule.addRandomSpread(next);
auto secs = std::chrono::floor<std::chrono::seconds>(next_refresh_with_spread); num_retries = 0;
info.retry = num_retries;
auto secs = std::chrono::floor<std::chrono::seconds>(next_refresh_actual);
info.next_refresh_time = UInt32(secs.time_since_epoch().count()); info.next_refresh_time = UInt32(secs.time_since_epoch().count());
} }
void RefreshTask::scheduleRetryOrSkipToNextRefresh(std::chrono::system_clock::time_point now)
{
if (refresh_settings.refresh_retries >= 0 && num_retries >= refresh_settings.refresh_retries)
{
advanceNextRefreshTime(now);
return;
}
num_retries += 1;
info.retry = num_retries;
UInt64 delay_ms;
UInt64 multiplier = UInt64(1) << std::min(num_retries - 1, Int64(62));
/// Overflow check: a*b <= c iff a <= c/b iff a <= floor(c/b).
if (refresh_settings.refresh_retry_initial_backoff_ms <= refresh_settings.refresh_retry_max_backoff_ms / multiplier)
delay_ms = refresh_settings.refresh_retry_initial_backoff_ms * multiplier;
else
delay_ms = refresh_settings.refresh_retry_max_backoff_ms;
next_refresh_actual = now + std::chrono::milliseconds(delay_ms);
}
bool RefreshTask::arriveDependency(const StorageID & parent) bool RefreshTask::arriveDependency(const StorageID & parent)
{ {
remaining_dependencies.erase(parent); remaining_dependencies.erase(parent);
@ -500,11 +525,6 @@ void RefreshTask::interruptExecution()
} }
} }
std::shared_ptr<StorageMaterializedView> RefreshTask::lockView()
{
return std::static_pointer_cast<StorageMaterializedView>(view_to_refresh.lock());
}
std::chrono::system_clock::time_point RefreshTask::currentTime() const std::chrono::system_clock::time_point RefreshTask::currentTime() const
{ {
Int64 fake = fake_clock.load(std::memory_order::relaxed); Int64 fake = fake_clock.load(std::memory_order::relaxed);

View File

@ -17,19 +17,21 @@ class PipelineExecutor;
class StorageMaterializedView; class StorageMaterializedView;
class ASTRefreshStrategy; class ASTRefreshStrategy;
struct OwnedRefreshTask;
class RefreshTask : public std::enable_shared_from_this<RefreshTask> class RefreshTask : public std::enable_shared_from_this<RefreshTask>
{ {
public: public:
/// Never call it manually, public for shared_ptr construction only /// Never call it manually, public for shared_ptr construction only
explicit RefreshTask(const ASTRefreshStrategy & strategy); RefreshTask(StorageMaterializedView * view_, const ASTRefreshStrategy & strategy);
/// The only proper way to construct task /// The only proper way to construct task
static RefreshTaskHolder create( static OwnedRefreshTask create(
StorageMaterializedView * view,
ContextMutablePtr context, ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy); const DB::ASTRefreshStrategy & strategy);
void initializeAndStart(std::shared_ptr<StorageMaterializedView> view); void initializeAndStart(); // called at most once
/// Call when renaming the materialized view. /// Call when renaming the materialized view.
void rename(StorageID new_id); void rename(StorageID new_id);
@ -51,7 +53,14 @@ public:
/// Cancel task execution /// Cancel task execution
void cancel(); void cancel();
/// Waits for the currently running refresh attempt to complete.
/// If the refresh fails, throws an exception.
/// If no refresh is running, completes immediately, throwing an exception if previous refresh failed.
void wait();
/// Permanently disable task scheduling and remove this table from RefreshSet. /// Permanently disable task scheduling and remove this table from RefreshSet.
/// Ok to call multiple times, but not in parallel.
/// Ok to call even if initializeAndStart() wasn't called or failed.
void shutdown(); void shutdown();
/// Notify dependent task /// Notify dependent task
@ -65,7 +74,7 @@ public:
private: private:
LoggerPtr log = nullptr; LoggerPtr log = nullptr;
std::weak_ptr<IStorage> view_to_refresh; StorageMaterializedView * view;
/// Protects interrupt_execution and running_executor. /// Protects interrupt_execution and running_executor.
/// Can be locked while holding `mutex`. /// Can be locked while holding `mutex`.
@ -82,8 +91,10 @@ private:
mutable std::mutex mutex; mutable std::mutex mutex;
RefreshSchedule refresh_schedule; RefreshSchedule refresh_schedule;
RefreshSettings refresh_settings; // TODO: populate, use, update on alter RefreshSettings refresh_settings;
std::vector<StorageID> initial_dependencies; std::vector<StorageID> initial_dependencies;
bool refresh_append;
RefreshSet::Handle set_handle; RefreshSet::Handle set_handle;
/// StorageIDs of our dependencies that we're waiting for. /// StorageIDs of our dependencies that we're waiting for.
@ -112,7 +123,8 @@ private:
/// E.g. for REFRESH EVERY 1 DAY, yesterday's refresh of the dependency shouldn't trigger today's /// E.g. for REFRESH EVERY 1 DAY, yesterday's refresh of the dependency shouldn't trigger today's
/// refresh of the dependent even if it happened today (e.g. it was slow or had random spread > 1 day). /// refresh of the dependent even if it happened today (e.g. it was slow or had random spread > 1 day).
std::chrono::sys_seconds next_refresh_prescribed; std::chrono::sys_seconds next_refresh_prescribed;
std::chrono::system_clock::time_point next_refresh_with_spread; std::chrono::system_clock::time_point next_refresh_actual;
Int64 num_retries = 0;
/// Calls refreshTask() from background thread. /// Calls refreshTask() from background thread.
BackgroundSchedulePool::TaskHolder refresh_task; BackgroundSchedulePool::TaskHolder refresh_task;
@ -123,6 +135,7 @@ private:
/// Just for observability. /// Just for observability.
RefreshInfo info; RefreshInfo info;
Progress progress; Progress progress;
std::condition_variable refresh_cv; // notified when info.state changes
/// The main loop of the refresh task. It examines the state, sees what needs to be /// The main loop of the refresh task. It examines the state, sees what needs to be
/// done and does it. If there's nothing to do at the moment, returns; it's then scheduled again, /// done and does it. If there's nothing to do at the moment, returns; it's then scheduled again,
@ -134,11 +147,14 @@ private:
/// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table. /// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table.
/// Mutex must be unlocked. Called only from refresh_task. /// Mutex must be unlocked. Called only from refresh_task.
void executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view); void executeRefreshUnlocked(bool append);
/// Assigns next_refresh_* /// Assigns next_refresh_*
void advanceNextRefreshTime(std::chrono::system_clock::time_point now); void advanceNextRefreshTime(std::chrono::system_clock::time_point now);
/// Either advances next_refresh_actual using exponential backoff or does advanceNextRefreshTime().
void scheduleRetryOrSkipToNextRefresh(std::chrono::system_clock::time_point now);
/// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case. /// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case.
bool arriveDependency(const StorageID & parent); bool arriveDependency(const StorageID & parent);
bool arriveTime(); bool arriveTime();
@ -146,9 +162,24 @@ private:
void interruptExecution(); void interruptExecution();
std::shared_ptr<StorageMaterializedView> lockView();
std::chrono::system_clock::time_point currentTime() const; std::chrono::system_clock::time_point currentTime() const;
}; };
/// Wrapper around shared_ptr<RefreshTask>, calls shutdown() in destructor.
struct OwnedRefreshTask
{
RefreshTaskHolder ptr;
OwnedRefreshTask() = default;
explicit OwnedRefreshTask(RefreshTaskHolder p) : ptr(std::move(p)) {}
OwnedRefreshTask(OwnedRefreshTask &&) = default;
OwnedRefreshTask & operator=(OwnedRefreshTask &&) = default;
~OwnedRefreshTask() { if (ptr) ptr->shutdown(); }
RefreshTask* operator->() const { return ptr.get(); }
RefreshTask& operator*() const { return *ptr; }
explicit operator bool() const { return ptr != nullptr; }
};
} }

View File

@ -8,9 +8,7 @@ namespace DB
class RefreshTask; class RefreshTask;
using RefreshTaskStateUnderlying = UInt8;
using RefreshTaskHolder = std::shared_ptr<RefreshTask>; using RefreshTaskHolder = std::shared_ptr<RefreshTask>;
using RefreshTaskObserver = std::weak_ptr<RefreshTask>;
using RefreshTaskList = std::list<RefreshTaskHolder>; using RefreshTaskList = std::list<RefreshTaskHolder>;
} }

View File

@ -519,11 +519,20 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
{ {
Block block; Stopwatch watch(CLOCK_MONOTONIC_COARSE);
if (!ctx->is_cancelled() && (global_ctx->merging_executor->pull(block))) UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
{
global_ctx->rows_written += block.rows();
do
{
Block block;
if (ctx->is_cancelled() || !global_ctx->merging_executor->pull(block))
{
finalize();
return false;
}
global_ctx->rows_written += block.rows();
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block); const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
UInt64 result_rows = 0; UInt64 result_rows = 0;
@ -543,11 +552,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
global_ctx->space_reservation->update(static_cast<size_t>((1. - progress) * ctx->initial_reservation)); global_ctx->space_reservation->update(static_cast<size_t>((1. - progress) * ctx->initial_reservation));
} }
} while (watch.elapsedMilliseconds() < step_time_ms);
/// Need execute again /// Need execute again
return true; return true;
} }
void MergeTask::ExecuteAndFinalizeHorizontalPart::finalize() const
{
global_ctx->merging_executor.reset(); global_ctx->merging_executor.reset();
global_ctx->merged_pipeline.reset(); global_ctx->merged_pipeline.reset();
@ -557,14 +569,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled()) if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts with expired TTL"); throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts with expired TTL");
const auto data_settings = global_ctx->data->getSettings();
const size_t sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed; const size_t sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed;
ctx->need_sync = needSyncPart(ctx->sum_input_rows_upper_bound, sum_compressed_bytes_upper_bound, *data_settings); ctx->need_sync = needSyncPart(ctx->sum_input_rows_upper_bound, sum_compressed_bytes_upper_bound, *global_ctx->data->getSettings());
return false;
} }
bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
{ {
/// No need to execute this part if it is horizontal merge. /// No need to execute this part if it is horizontal merge.
@ -741,17 +749,24 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
{ {
Block block; Stopwatch watch(CLOCK_MONOTONIC_COARSE);
if (!global_ctx->merges_blocker->isCancelled() && !global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed) UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
&& ctx->executor->pull(block))
do
{ {
Block block;
if (global_ctx->merges_blocker->isCancelled()
|| global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)
|| !ctx->executor->pull(block))
return false;
ctx->column_elems_written += block.rows(); ctx->column_elems_written += block.rows();
ctx->column_to->write(block); ctx->column_to->write(block);
} while (watch.elapsedMilliseconds() < step_time_ms);
/// Need execute again /// Need execute again
return true; return true;
}
return false;
} }

View File

@ -254,6 +254,7 @@ private:
bool prepare(); bool prepare();
bool executeImpl(); bool executeImpl();
void finalize() const;
/// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable /// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 2>; using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 2>;

View File

@ -84,6 +84,7 @@ struct Settings;
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \ M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \ M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \
M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \ M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \
M(Milliseconds, background_task_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge or mutation. Can be exceeded if one step takes longer time", 0) \
\ \
/** Inserts settings. */ \ /** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -1257,6 +1257,8 @@ public:
private: private:
void prepare(); void prepare();
bool mutateOriginalPartAndPrepareProjections(); bool mutateOriginalPartAndPrepareProjections();
void writeTempProjectionPart(size_t projection_idx, Chunk chunk);
void finalizeTempProjections();
bool iterateThroughAllProjections(); bool iterateThroughAllProjections();
void constructTaskForProjectionPartsMerge(); void constructTaskForProjectionPartsMerge();
void finalize(); void finalize();
@ -1307,10 +1309,22 @@ void PartMergerWriter::prepare()
bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
{ {
Block cur_block; Stopwatch watch(CLOCK_MONOTONIC_COARSE);
Block projection_header; UInt64 step_time_ms = ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block))
do
{ {
Block cur_block;
Block projection_header;
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
if (!ctx->mutating_executor->pull(cur_block))
{
finalizeTempProjections();
return false;
}
if (ctx->minmax_idx) if (ctx->minmax_idx)
ctx->minmax_idx->update(cur_block, MergeTreeData::getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey())); ctx->minmax_idx->update(cur_block, MergeTreeData::getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey()));
@ -1322,46 +1336,56 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{ {
const auto & projection = *ctx->projections_to_build[i]; Chunk squashed_chunk;
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
Block block_to_squash = projection.calculate(cur_block, ctx->context);
projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
Chunk squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
if (squashed_chunk)
{ {
auto result = projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns()); ProfileEventTimeIncrement<Microseconds> projection_watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, ctx->context);
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
tmp_part.finalize(); projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
tmp_part.part->getDataPartStorage().commitTransaction(); squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
} }
if (squashed_chunk)
writeTempProjectionPart(i, std::move(squashed_chunk));
} }
(*ctx->mutate_entry)->rows_written += cur_block.rows(); (*ctx->mutate_entry)->rows_written += cur_block.rows();
(*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes(); (*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes();
} while (watch.elapsedMilliseconds() < step_time_ms);
/// Need execute again /// Need execute again
return true; return true;
} }
void PartMergerWriter::writeTempProjectionPart(size_t projection_idx, Chunk chunk)
{
const auto & projection = *ctx->projections_to_build[projection_idx];
const auto & projection_plan = projection_squashes[projection_idx];
auto result = projection_plan.getHeader().cloneWithColumns(chunk.detachColumns());
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data,
ctx->log,
result,
projection,
ctx->new_data_part.get(),
++block_num);
tmp_part.finalize();
tmp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
}
void PartMergerWriter::finalizeTempProjections()
{
// Write the last block // Write the last block
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{ {
const auto & projection = *ctx->projections_to_build[i]; auto squashed_chunk = Squashing::squash(projection_squashes[i].flush());
auto & projection_squash_plan = projection_squashes[i];
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
if (squashed_chunk) if (squashed_chunk)
{ writeTempProjectionPart(i, std::move(squashed_chunk));
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
temp_part.finalize();
temp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(temp_part.part));
}
} }
projection_parts_iterator = std::make_move_iterator(projection_parts.begin()); projection_parts_iterator = std::make_move_iterator(projection_parts.begin());
@ -1369,12 +1393,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
/// Maybe there are no projections ? /// Maybe there are no projections ?
if (projection_parts_iterator != std::make_move_iterator(projection_parts.end())) if (projection_parts_iterator != std::make_move_iterator(projection_parts.end()))
constructTaskForProjectionPartsMerge(); constructTaskForProjectionPartsMerge();
/// Let's move on to the next stage
return false;
} }
void PartMergerWriter::constructTaskForProjectionPartsMerge() void PartMergerWriter::constructTaskForProjectionPartsMerge()
{ {
auto && [name, parts] = *projection_parts_iterator; auto && [name, parts] = *projection_parts_iterator;

View File

@ -3,6 +3,8 @@
#include <Storages/MaterializedView/RefreshTask.h> #include <Storages/MaterializedView/RefreshTask.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
@ -14,6 +16,7 @@
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h> #include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/getHeaderForProcessingStage.h> #include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/getTableExpressions.h> #include <Interpreters/getTableExpressions.h>
@ -146,6 +149,13 @@ StorageMaterializedView::StorageMaterializedView(
if (point_to_itself_by_uuid || point_to_itself_by_name) if (point_to_itself_by_uuid || point_to_itself_by_name)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Materialized view {} cannot point to itself", table_id_.getFullTableName()); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Materialized view {} cannot point to itself", table_id_.getFullTableName());
if (query.refresh_strategy)
{
fixed_uuid = false;
refresher = RefreshTask::create(this, getContext(), *query.refresh_strategy);
refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty;
}
if (!has_inner_table) if (!has_inner_table)
{ {
target_table_id = to_table_id; target_table_id = to_table_id;
@ -198,15 +208,6 @@ StorageMaterializedView::StorageMaterializedView(
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID(); target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID();
} }
if (query.refresh_strategy)
{
fixed_uuid = false;
refresher = RefreshTask::create(
getContext(),
*query.refresh_strategy);
refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty;
}
} }
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage( QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
@ -377,44 +378,71 @@ bool StorageMaterializedView::optimize(
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context); return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
} }
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> StorageMaterializedView::prepareRefresh() const ContextMutablePtr StorageMaterializedView::createRefreshContext() const
{ {
auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(getContext()); auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(getContext());
refresh_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY);
/// Generate a random query id. /// Generate a random query id.
refresh_context->setCurrentQueryId(""); refresh_context->setCurrentQueryId("");
/// TODO: Set view's definer as the current user in refresh_context, so that the correct user's
/// quotas and permissions apply for this query.
return refresh_context;
}
CurrentThread::QueryScope query_scope(refresh_context); std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const
{
auto inner_table_id = getTargetTableId(); auto inner_table_id = getTargetTableId();
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID()); StorageID target_table = inner_table_id;
auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name); if (!append)
{
CurrentThread::QueryScope query_scope(refresh_context);
auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext()); auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name);
auto & create_query = create_table_query->as<ASTCreateQuery &>(); String db_name = db->getDatabaseName();
create_query.setTable(new_table_name); auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
create_query.setDatabase(db->getDatabaseName());
create_query.create_or_replace = true;
create_query.replace_table = true;
create_query.uuid = UUIDHelpers::Nil;
InterpreterCreateQuery create_interpreter(create_table_query, refresh_context); auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext());
create_interpreter.setInternal(true); auto & create_query = create_table_query->as<ASTCreateQuery &>();
create_interpreter.execute(); create_query.setTable(new_table_name);
create_query.setDatabase(db->getDatabaseName());
create_query.create_or_replace = true;
create_query.replace_table = true;
create_query.uuid = UUIDHelpers::Nil;
StorageID fresh_table = DatabaseCatalog::instance().getTable({create_query.getDatabase(), create_query.getTable()}, getContext())->getStorageID(); InterpreterCreateQuery create_interpreter(create_table_query, refresh_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
target_table = DatabaseCatalog::instance().getTable({db_name, new_table_name}, getContext())->getStorageID();
out_temp_table_id = target_table;
}
auto insert_query = std::make_shared<ASTInsertQuery>(); auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query; insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query;
insert_query->setTable(fresh_table.table_name); insert_query->setTable(target_table.table_name);
insert_query->setDatabase(fresh_table.database_name); insert_query->setDatabase(target_table.database_name);
insert_query->table_id = fresh_table; insert_query->table_id = target_table;
return {refresh_context, insert_query}; Block header;
if (refresh_context->getSettingsRef().allow_experimental_analyzer)
header = InterpreterSelectQueryAnalyzer::getSampleBlock(insert_query->select, refresh_context);
else
header = InterpreterSelectWithUnionQuery(insert_query->select, refresh_context, SelectQueryOptions()).getSampleBlock();
auto columns = std::make_shared<ASTExpressionList>(',');
for (const String & name : header.getNames())
columns->children.push_back(std::make_shared<ASTIdentifier>(name));
insert_query->columns = std::move(columns);
return insert_query;
} }
StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context) StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context)
{ {
/// Known problem: if the target table was ALTERed during refresh, this will effectively revert
/// the ALTER.
auto stale_table_id = getTargetTableId(); auto stale_table_id = getTargetTableId();
auto db = DatabaseCatalog::instance().getDatabase(stale_table_id.database_name); auto db = DatabaseCatalog::instance().getDatabase(stale_table_id.database_name);
@ -422,15 +450,40 @@ StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, Co
CurrentThread::QueryScope query_scope(refresh_context); CurrentThread::QueryScope query_scope(refresh_context);
target_db->renameTable( auto rename_query = std::make_shared<ASTRenameQuery>();
refresh_context, fresh_table.table_name, *db, stale_table_id.table_name, /*exchange=*/true, /*dictionary=*/false); rename_query->exchange = true;
rename_query->addElement(fresh_table.database_name, fresh_table.table_name, stale_table_id.database_name, stale_table_id.table_name);
InterpreterRenameQuery(rename_query, refresh_context).execute();
std::swap(stale_table_id.database_name, fresh_table.database_name); std::swap(stale_table_id.database_name, fresh_table.database_name);
std::swap(stale_table_id.table_name, fresh_table.table_name); std::swap(stale_table_id.table_name, fresh_table.table_name);
setTargetTableId(std::move(fresh_table)); setTargetTableId(std::move(fresh_table));
return stale_table_id; return stale_table_id;
} }
void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context)
{
CurrentThread::QueryScope query_scope(refresh_context);
try
{
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->setDatabase(table_id.database_name);
drop_query->setTable(table_id.table_name);
drop_query->kind = ASTDropQuery::Kind::Drop;
drop_query->if_exists = true;
drop_query->sync = false;
InterpreterDropQuery(drop_query, refresh_context).execute();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("StorageMaterializedView"), "Failed to drop temporary table after refresh");
}
}
void StorageMaterializedView::alter( void StorageMaterializedView::alter(
const AlterCommands & params, const AlterCommands & params,
ContextPtr local_context, ContextPtr local_context,
@ -530,25 +583,11 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
{ {
auto new_target_table_name = generateInnerTableName(new_table_id); auto new_target_table_name = generateInnerTableName(new_table_id);
ASTRenameQuery::Elements rename_elements;
assert(inner_table_id.database_name == old_table_id.database_name); assert(inner_table_id.database_name == old_table_id.database_name);
ASTRenameQuery::Element elem auto rename = std::make_shared<ASTRenameQuery>();
{ rename->addElement(inner_table_id.database_name, inner_table_id.table_name, new_table_id.database_name, new_target_table_name);
ASTRenameQuery::Table
{
inner_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(inner_table_id.database_name),
std::make_shared<ASTIdentifier>(inner_table_id.table_name)
},
ASTRenameQuery::Table
{
new_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(new_table_id.database_name),
std::make_shared<ASTIdentifier>(new_target_table_name)
}
};
rename_elements.emplace_back(std::move(elem));
auto rename = std::make_shared<ASTRenameQuery>(std::move(rename_elements));
InterpreterRenameQuery(rename, getContext()).execute(); InterpreterRenameQuery(rename, getContext()).execute();
updateTargetTableId(new_table_id.database_name, new_target_table_name); updateTargetTableId(new_table_id.database_name, new_target_table_name);
} }
@ -576,7 +615,7 @@ void StorageMaterializedView::startup()
if (refresher) if (refresher)
{ {
refresher->initializeAndStart(std::static_pointer_cast<StorageMaterializedView>(shared_from_this())); refresher->initializeAndStart();
if (refresh_on_start) if (refresh_on_start)
refresher->run(); refresher->run();

View File

@ -5,7 +5,7 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h> #include <Storages/StorageInMemoryMetadata.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h> #include <Storages/MaterializedView/RefreshTask.h>
namespace DB namespace DB
{ {
@ -106,7 +106,7 @@ private:
/// Will be initialized in constructor /// Will be initialized in constructor
StorageID target_table_id = StorageID::createEmpty(); StorageID target_table_id = StorageID::createEmpty();
RefreshTaskHolder refresher; OwnedRefreshTask refresher;
bool refresh_on_start = false; bool refresh_on_start = false;
bool has_inner_table = false; bool has_inner_table = false;
@ -119,10 +119,14 @@ private:
void checkStatementCanBeForwarded() const; void checkStatementCanBeForwarded() const;
/// Prepare to refresh a refreshable materialized view: create query context, create temporary ContextMutablePtr createRefreshContext() const;
/// table, form the insert-select query. /// Prepare to refresh a refreshable materialized view: create temporary table and form the
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> prepareRefresh() const; /// insert-select query.
/// out_temp_table_id may be assigned before throwing an exception, in which case the caller
/// must drop the temp table before rethrowing.
std::shared_ptr<ASTInsertQuery> prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const;
StorageID exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context); StorageID exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context);
void dropTempTable(StorageID table, ContextMutablePtr refresh_context);
void setTargetTableId(StorageID id); void setTargetTableId(StorageID id);
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name); void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);

View File

@ -800,7 +800,8 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{ {
auto res = future.get(); auto res = future.get();
if (res.error != Coordination::Error::ZOK && res.error != Coordination::Error::ZNODEEXISTS) if (res.error != Coordination::Error::ZOK && res.error != Coordination::Error::ZNODEEXISTS)
throw Coordination::Exception(res.error, "Failed to create new nodes {} at {}", res.path_created, zookeeper_path); throw Coordination::Exception(res.error, "Failed to create new nodes {} at {} with error {}",
res.path_created, zookeeper_path, Coordination::errorMessage(res.error));
} }
} }

View File

@ -34,8 +34,9 @@ ColumnsDescription StorageSystemViewRefreshes::getColumnsDescription()
"If status = 'WaitingForDependencies', a refresh is ready to start as soon as these dependencies are fulfilled." "If status = 'WaitingForDependencies', a refresh is ready to start as soon as these dependencies are fulfilled."
}, },
{"exception", std::make_shared<DataTypeString>(), {"exception", std::make_shared<DataTypeString>(),
"if last_refresh_result = 'Exception', i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace." "if last_refresh_result = 'Error', i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace."
}, },
{"retry", std::make_shared<DataTypeUInt64>(), "How many failed attempts there were so far, for the current refresh."},
{"refresh_count", std::make_shared<DataTypeUInt64>(), "Number of successful refreshes since last server restart or table creation."}, {"refresh_count", std::make_shared<DataTypeUInt64>(), "Number of successful refreshes since last server restart or table creation."},
{"progress", std::make_shared<DataTypeFloat64>(), "Progress of the current refresh, between 0 and 1."}, {"progress", std::make_shared<DataTypeFloat64>(), "Progress of the current refresh, between 0 and 1."},
{"elapsed", std::make_shared<DataTypeFloat64>(), "The amount of nanoseconds the current refresh took."}, {"elapsed", std::make_shared<DataTypeFloat64>(), "The amount of nanoseconds the current refresh took."},
@ -88,6 +89,7 @@ void StorageSystemViewRefreshes::fillData(
res_columns[i++]->insert(Array(deps)); res_columns[i++]->insert(Array(deps));
res_columns[i++]->insert(refresh.exception_message); res_columns[i++]->insert(refresh.exception_message);
res_columns[i++]->insert(refresh.retry);
res_columns[i++]->insert(refresh.refresh_count); res_columns[i++]->insert(refresh.refresh_count);
res_columns[i++]->insert(Float64(refresh.progress.read_rows) / refresh.progress.total_rows_to_read); res_columns[i++]->insert(Float64(refresh.progress.read_rows) / refresh.progress.total_rows_to_read);
res_columns[i++]->insert(refresh.progress.elapsed_ns / 1e9); res_columns[i++]->insert(refresh.progress.elapsed_ns / 1e9);

View File

@ -11,13 +11,13 @@
<create_query>create table mt_4 (n UInt64, s String) engine = MergeTree order by tuple()</create_query> <create_query>create table mt_4 (n UInt64, s String) engine = MergeTree order by tuple()</create_query>
<create_query>create materialized view mv_1 to mt_1 as <create_query>create materialized view mv_1 to mt_1 as
select number, toString(number) from main_table where number % 13 != 0</create_query> select number as n, toString(number) as s from main_table where number % 13 != 0</create_query>
<create_query>create materialized view mv_2 to mt_2 as <create_query>create materialized view mv_2 to mt_2 as
select number, toString(number) from main_table where number % 13 != 1</create_query> select number as n, toString(number) as s from main_table where number % 13 != 1</create_query>
<create_query>create materialized view mv_3 to mt_3 as <create_query>create materialized view mv_3 to mt_3 as
select number, toString(number) from main_table where number % 13 != 3</create_query> select number as n, toString(number) as s from main_table where number % 13 != 3</create_query>
<create_query>create materialized view mv_4 to mt_4 as <create_query>create materialized view mv_4 to mt_4 as
select number, toString(number) from main_table where number % 13 != 4</create_query> select number as n, toString(number) as s from main_table where number % 13 != 4</create_query>
<fill_query>SYSTEM STOP MERGES main_table</fill_query> <fill_query>SYSTEM STOP MERGES main_table</fill_query>
<fill_query>SYSTEM STOP MERGES mt_1</fill_query> <fill_query>SYSTEM STOP MERGES mt_1</fill_query>

View File

@ -4,7 +4,7 @@ DROP TABLE IF EXISTS mat_view;
CREATE TABLE test1 (a LowCardinality(String)) ENGINE=MergeTree() ORDER BY a; CREATE TABLE test1 (a LowCardinality(String)) ENGINE=MergeTree() ORDER BY a;
CREATE TABLE test2 (a UInt64) engine=MergeTree() ORDER BY a; CREATE TABLE test2 (a UInt64) engine=MergeTree() ORDER BY a;
CREATE MATERIALIZED VIEW test_mv TO test2 AS SELECT toUInt64(a = 'test') FROM test1; CREATE MATERIALIZED VIEW test_mv TO test2 AS SELECT toUInt64(a = 'test') AS a FROM test1;
DROP TABLE test_mv; DROP TABLE test_mv;
DROP TABLE test1; DROP TABLE test1;

View File

@ -35,7 +35,7 @@ arraySort(loading_dependencies_table), arraySort(loading_dependent_table) from s
$CLICKHOUSE_CLIENT -q "select '====='" $CLICKHOUSE_CLIENT -q "select '====='"
$CLICKHOUSE_CLIENT -q "alter table t add column x int default in(1, $CLICKHOUSE_DATABASE.s), drop column y" $CLICKHOUSE_CLIENT -q "alter table t add column x int default in(1, $CLICKHOUSE_DATABASE.s), drop column y"
$CLICKHOUSE_CLIENT -q "create materialized view mv to s as select n from t where n in (select n from join)" $CLICKHOUSE_CLIENT -q "create materialized view mv to s as select n as x from t where n in (select n from join)"
$CLICKHOUSE_CLIENT -q "select table, arraySort(dependencies_table), $CLICKHOUSE_CLIENT -q "select table, arraySort(dependencies_table),
arraySort(loading_dependencies_table), arraySort(loading_dependent_table) from system.tables where database=currentDatabase() order by table" arraySort(loading_dependencies_table), arraySort(loading_dependent_table) from system.tables where database=currentDatabase() order by table"

View File

@ -3,7 +3,7 @@
CREATE TABLE landing (n Int64) engine=MergeTree order by n; CREATE TABLE landing (n Int64) engine=MergeTree order by n;
CREATE TABLE target (n Int64) engine=MergeTree order by n; CREATE TABLE target (n Int64) engine=MergeTree order by n;
CREATE MATERIALIZED VIEW landing_to_target TO target AS CREATE MATERIALIZED VIEW landing_to_target TO target AS
SELECT n + throwIf(n == 3333) SELECT n + throwIf(n == 3333) AS n
FROM landing; FROM landing;
INSERT INTO landing SELECT * FROM numbers(10000); -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } INSERT INTO landing SELECT * FROM numbers(10000); -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }

View File

@ -0,0 +1,32 @@
<1: created view> a [] 1
CREATE MATERIALIZED VIEW default.a\nREFRESH AFTER 2 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT number AS x\nFROM numbers(2)\nUNION ALL\nSELECT rand64() AS x
<2: refreshed> 3 1 1
<3: time difference at least> 1000
<4: next refresh in> 2
<4.1: fake clock> Scheduled 2050-01-01 00:00:01 2050-01-01 00:00:03
<4.5: altered> Scheduled Finished 2052-01-01 00:00:00
CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 YEAR\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT x * 2 AS x\nFROM default.src
<5: no refresh> 3
<6: refreshed> 2
<7: refreshed> Scheduled Finished 2054-01-01 00:00:00
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR DEPENDS ON default.a\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a
<7.5: created dependent> 2052-11-11 11:11:11
<8: refreshed> 20
<9: refreshed> a Scheduled Finished 2054-01-01 00:00:00
<9: refreshed> b Scheduled Finished 2054-01-01 00:00:00
<9.2: dropping> 0 2
<9.4: dropped> 0 2
<10: creating> a Scheduled [] 2054-01-01 00:00:00
<10: creating> b WaitingForDependencies ['default.a'] 2054-01-01 00:00:00
<11: chain-refreshed a> 4
<12: chain-refreshed b> 40
<13: chain-refreshed> a Scheduled [] Finished 2054-01-01 00:00:01 2056-01-01 00:00:00 1
<13: chain-refreshed> b Scheduled ['default.a'] Finished 2054-01-24 23:22:21 2056-01-01 00:00:00 1
<14: waiting for next cycle> a Scheduled [] 2058-01-01 00:00:00
<14: waiting for next cycle> b WaitingForDependencies ['default.a'] 2060-01-01 00:00:00
<15: chain-refreshed a> 6
<16: chain-refreshed b> 60
<17: chain-refreshed> a Scheduled 2062-01-01 00:00:00
<17: chain-refreshed> b Scheduled 2062-01-01 00:00:00
<18: removed dependency> b Scheduled [] 2062-03-03 03:03:03 2064-01-01 00:00:00 5
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a

View File

@ -0,0 +1,177 @@
#!/usr/bin/env bash
# Tags: atomic-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh
# scheduling is done in UTC.
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`"
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --session_timezone Etc/UTC"`"
$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view"
# Basic refreshing.
$CLICKHOUSE_CLIENT -nq "
create materialized view a
refresh after 2 second
engine Memory
empty
as select number as x from numbers(2) union all select rand64() as x;
select '<1: created view>', view, remaining_dependencies, exception, last_refresh_result in ('Unknown', 'Finished') from refreshes;
show create a;"
# Wait for any refresh. (xargs trims the string and turns \t and \n into spaces)
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" == 'Unknown' ]
do
sleep 0.5
done
start_time="`$CLICKHOUSE_CLIENT -nq "select reinterpret(now64(), 'Int64')"`"
# Check table contents.
$CLICKHOUSE_CLIENT -nq "select '<2: refreshed>', count(), sum(x=0), sum(x=1) from a"
# Wait for table contents to change.
res1="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values'`"
while :
do
res2="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res2" == "$res1" ] || break
sleep 0.5
done
# Wait for another change.
while :
do
res3="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res3" == "$res2" ] || break
sleep 0.5
done
# Check that the two changes were at least 1 second apart, in particular that we're not refreshing
# like crazy. This is potentially flaky, but we need at least one test that uses non-mocked timer
# to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above.
$CLICKHOUSE_CLIENT -nq "
select '<3: time difference at least>', min2(reinterpret(now64(), 'Int64') - $start_time, 1000);
select '<4: next refresh in>', next_refresh_time-last_refresh_time from refreshes;"
# Create a source table from which views will read.
$CLICKHOUSE_CLIENT -nq "
create table src (x Int8) engine Memory as select 1;"
# Switch to fake clock, change refresh schedule, change query.
$CLICKHOUSE_CLIENT -nq "
system test view a set fake time '2050-01-01 00:00:01';
system wait view a;
system refresh view a;
system wait view a;
select '<4.1: fake clock>', status, last_refresh_time, next_refresh_time from refreshes;
alter table a modify refresh every 2 year;
alter table a modify query select x*2 as x from src;
select '<4.5: altered>', status, last_refresh_result, next_refresh_time from refreshes;
show create a;"
# Advance time to trigger the refresh.
$CLICKHOUSE_CLIENT -nq "
select '<5: no refresh>', count() from a;
system test view a set fake time '2052-02-03 04:05:06';"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes -- $LINENO" | xargs`" != '2052-02-03 04:05:06' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<6: refreshed>', * from a;
select '<7: refreshed>', status, last_refresh_result, next_refresh_time from refreshes;"
# Create a dependent view, refresh it once.
$CLICKHOUSE_CLIENT -nq "
create materialized view b refresh every 2 year depends on a (y Int32) engine MergeTree order by y empty as select x*10 as y from a;
show create b;
system test view b set fake time '2052-11-11 11:11:11';
system refresh view b;
system wait view b;
select '<7.5: created dependent>', last_refresh_time from refreshes where view = 'b';"
# Next refresh shouldn't start until the dependency refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<8: refreshed>', * from b;
select '<9: refreshed>', view, status, last_refresh_result, next_refresh_time from refreshes;
system test view b set fake time '2054-01-24 23:22:21';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies 2054-01-01 00:00:00' ]
do
sleep 0.5
done
# Drop the source table, check that refresh fails and doesn't leave a temp table behind.
$CLICKHOUSE_CLIENT -nq "
select '<9.2: dropping>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase();
drop table src;
system refresh view a;"
$CLICKHOUSE_CLIENT -nq "system wait view a;" 2>/dev/null && echo "SYSTEM WAIT VIEW failed to fail at $LINENO"
$CLICKHOUSE_CLIENT -nq "
select '<9.4: dropped>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase();"
# Create the source table again, check that refresh succeeds (in particular that tables are looked
# up by name rather than uuid).
$CLICKHOUSE_CLIENT -nq "
select '<10: creating>', view, status, remaining_dependencies, next_refresh_time from refreshes;
create table src (x Int16) engine Memory as select 2;
system test view a set fake time '2054-01-01 00:00:01';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled' ]
do
sleep 0.5
done
# Both tables should've refreshed.
$CLICKHOUSE_CLIENT -nq "
select '<11: chain-refreshed a>', * from a;
select '<12: chain-refreshed b>', * from b;
select '<13: chain-refreshed>', view, status, remaining_dependencies, last_refresh_result, last_refresh_time, next_refresh_time, exception == '' from refreshes;"
# Make the dependent table run ahead by one refresh cycle, make sure it waits for the dependency to
# catch up to the same cycle.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2059-01-01 00:00:00';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2060-01-01 00:00:00' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2061-01-01 00:00:00';
system test view a set fake time '2057-01-01 00:00:00';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes -- $LINENO" | xargs`" != 'Scheduled 2058-01-01 00:00:00 WaitingForDependencies 2060-01-01 00:00:00' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<14: waiting for next cycle>', view, status, remaining_dependencies, next_refresh_time from refreshes;
truncate src;
insert into src values (3);
system test view a set fake time '2060-02-02 02:02:02';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2062-01-01 00:00:00' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<15: chain-refreshed a>', * from a;
select '<16: chain-refreshed b>', * from b;
select '<17: chain-refreshed>', view, status, next_refresh_time from refreshes;"
# Get to WaitingForDependencies state and remove the depencency.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2062-03-03 03:03:03'"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
alter table b modify refresh every 2 year"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled 2062-03-03 03:03:03' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<18: removed dependency>', view, status, remaining_dependencies, last_refresh_time,next_refresh_time, refresh_count from refreshes where view = 'b';
show create b;"
$CLICKHOUSE_CLIENT -nq "
drop table src;
drop table a;
drop table b;
drop table refreshes;"

View File

@ -0,0 +1,30 @@
<19: exception> 1
<20: unexception> 1
<21: rename> 1
<22: rename> d Finished
<23: simple refresh> 1
<24: rename during refresh> 1
<25: rename during refresh> f Running
<27: cancelled> f Scheduled Cancelled
<28: drop during refresh> 0 0
CREATE MATERIALIZED VIEW default.g\nREFRESH EVERY 1 WEEK OFFSET 3 DAY 4 HOUR RANDOMIZE FOR 4 DAY 1 HOUR\n(\n `x` Int64\n)\nENGINE = Memory\nAS SELECT 42 AS x
<29: randomize> 1 1
CREATE MATERIALIZED VIEW default.h\nREFRESH EVERY 1 SECOND TO default.dest\n(\n `x` Int64\n)\nAS SELECT x * 10 AS x\nFROM default.src
<30: to existing table> 10
<31: to existing table> 10
<31: to existing table> 20
<31.5: will retry> Error 1
<31.6: did retry> 10
<32: empty> i Scheduled Unknown 0
<32: empty> j Scheduled Finished 0
<34: append> 10
<35: append> 10
<35: append> 20
<35: append> 30
<36: not append> 20
<36: not append> 30
<37: append chain> 100
<38: append chain> 100
<38: append chain> 100
<38: append chain> 200
creating MergeTree without ORDER BY failed, as expected

View File

@ -0,0 +1,222 @@
#!/usr/bin/env bash
# Tags: atomic-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh
# scheduling is done in UTC.
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`"
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --allow_materialized_view_with_bad_select=0 --session_timezone Etc/UTC"`"
$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view"
# Select from a table that doesn't exist, get an exception.
$CLICKHOUSE_CLIENT -nq "
create table src (x Int8) engine Memory as select 1;
create materialized view c refresh every 1 second (x Int64) engine Memory empty as select * from src;
drop table src;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes where view = 'c' -- $LINENO" | xargs`" != 'Error' ]
do
sleep 0.5
done
# Check exception, create src, expect successful refresh.
$CLICKHOUSE_CLIENT -nq "
select '<19: exception>', exception ilike '%UNKNOWN_TABLE%' ? '1' : exception from refreshes where view = 'c';
create table src (x Int64) engine Memory as select 1;
system refresh view c;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.5
done
# Rename table.
$CLICKHOUSE_CLIENT -nq "
select '<20: unexception>', * from c;
rename table c to d;
select '<21: rename>', * from d;
select '<22: rename>', view, last_refresh_result from refreshes;"
# Do various things during a refresh.
# First make a nonempty view.
$CLICKHOUSE_CLIENT -nq "
drop table d;
truncate src;
insert into src values (1);
create materialized view e refresh every 1 second (x Int64) engine MergeTree order by x empty as select x + sleepEachRow(1) as x from src settings max_block_size = 1;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.5
done
# Stop refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<23: simple refresh>', * from e;
system stop view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Disabled' ]
do
sleep 0.5
done
# Make refreshes slow, wait for a slow refresh to start. (We stopped refreshes first to make sure
# we wait for a slow refresh, not a previous fast one.)
$CLICKHOUSE_CLIENT -nq "
insert into src select * from numbers(1000) settings max_block_size=1;
system start view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.5
done
# Rename.
$CLICKHOUSE_CLIENT -nq "
rename table e to f;
select '<24: rename during refresh>', * from f;
select '<25: rename during refresh>', view, status from refreshes where view = 'f';
alter table f modify refresh after 10 year;"
# Cancel.
$CLICKHOUSE_CLIENT -nq "
system cancel view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Scheduled' ]
do
sleep 0.5
done
# Check that another refresh doesn't immediately start after the cancelled one.
$CLICKHOUSE_CLIENT -nq "
select '<27: cancelled>', view, status, last_refresh_result from refreshes where view = 'f';
system refresh view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.5
done
# Drop.
$CLICKHOUSE_CLIENT -nq "
drop table f;
select '<28: drop during refresh>', view, status from refreshes;
select '<28: drop during refresh>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase()"
# Try OFFSET and RANDOMIZE FOR.
$CLICKHOUSE_CLIENT -nq "
create materialized view g refresh every 1 week offset 3 day 4 hour randomize for 4 day 1 hour (x Int64) engine Memory empty as select 42 as x;
show create g;
system test view g set fake time '2050-02-03 15:30:13';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time > '2049-01-01' from refreshes -- $LINENO" | xargs`" != '1' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
with '2050-02-10 04:00:00'::DateTime as expected
select '<29: randomize>', abs(next_refresh_time::Int64 - expected::Int64) <= 3600*(24*4+1), next_refresh_time != expected from refreshes;"
# Send data 'TO' an existing table.
$CLICKHOUSE_CLIENT -nq "
drop table g;
create table dest (x Int64) engine MergeTree order by x;
truncate src;
insert into src values (1);
create materialized view h refresh every 1 second to dest empty as select x*10 as x from src;
show create h;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<30: to existing table>', * from dest;
insert into src values (2);"
while [ "`$CLICKHOUSE_CLIENT -nq "select count() from dest -- $LINENO" | xargs`" != '2' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<31: to existing table>', * from dest;
drop table dest;
drop table h;"
# Retries.
$CLICKHOUSE_CLIENT -nq "
create materialized view h2 refresh after 1 year settings refresh_retries = 10 (x Int64) engine Memory as select x*10 + throwIf(x % 2 == 0) as x from src;"
$CLICKHOUSE_CLIENT -nq "system wait view h2;" 2>/dev/null && echo "SYSTEM WAIT VIEW failed to fail at $LINENO"
$CLICKHOUSE_CLIENT -nq "
select '<31.5: will retry>', last_refresh_result, retry > 0 from refreshes;
create table src2 (x Int8) engine Memory;
insert into src2 values (1);
exchange tables src and src2;
drop table src2;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result, retry from refreshes -- $LINENO" | xargs`" != 'Finished 0' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<31.6: did retry>', x from h2;
drop table h2"
# EMPTY
$CLICKHOUSE_CLIENT -nq "
create materialized view i refresh after 1 year engine Memory empty as select number as x from numbers(2);
create materialized view j refresh after 1 year engine Memory as select number as x from numbers(2);"
while [ "`$CLICKHOUSE_CLIENT -nq "select sum(last_success_time is null) from refreshes -- $LINENO" | xargs`" == '2' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<32: empty>', view, status, last_refresh_result, retry from refreshes order by view;
drop table i;
drop table j;"
# APPEND
$CLICKHOUSE_CLIENT -nq "
create materialized view k refresh every 10 year append (x Int64) engine Memory empty as select x*10 as x from src;
select '<33: append>', * from k;
system refresh view k;
system wait view k;
select '<34: append>', * from k;
truncate table src;
insert into src values (2), (3);
system refresh view k;
system wait view k;
select '<35: append>', * from k order by x;"
# ALTER to non-APPEND
$CLICKHOUSE_CLIENT -nq "
alter table k modify refresh every 10 year;
system wait view k;
system refresh view k;
system wait view k;
select '<36: not append>', * from k order by x;
drop table k;
truncate table src;"
# APPEND + TO + regular materialized view reading from it.
$CLICKHOUSE_CLIENT -nq "
create table mid (x Int64) engine MergeTree order by x;
create materialized view l refresh every 10 year append to mid empty as select x*10 as x from src;
create materialized view m (x Int64) engine Memory as select x*10 as x from mid;
insert into src values (1);
system refresh view l;
system wait view l;
select '<37: append chain>', * from m;
insert into src values (2);
system refresh view l;
system wait view l;
select '<38: append chain>', * from m order by x;
drop table l;
drop table m;
drop table mid;"
# Failing to create inner table.
$CLICKHOUSE_CLIENT -nq "
create materialized view n refresh every 1 second (x Int64) engine MergeTree as select 1 as x from numbers(2);" 2>/dev/null || echo "creating MergeTree without ORDER BY failed, as expected"
$CLICKHOUSE_CLIENT -nq "
create materialized view n refresh every 1 second (x Int64) engine MergeTree order by x as select 1 as x from numbers(2);
drop table n;"
# Reading from table that doesn't exist yet.
$CLICKHOUSE_CLIENT -nq "
create materialized view o refresh every 1 second (x Int64) engine Memory as select x from nonexist; -- { serverError UNKNOWN_TABLE }
create materialized view o (x Int64) engine Memory as select x from nonexist; -- { serverError UNKNOWN_TABLE }
create materialized view o (x Int64) engine Memory as select x from nope.nonexist; -- { serverError UNKNOWN_DATABASE }
create materialized view o refresh every 1 second (x Int64) engine Memory as select x from nope.nonexist settings allow_materialized_view_with_bad_select = 1;
drop table o;"
$CLICKHOUSE_CLIENT -nq "
drop table refreshes;"

View File

@ -12,15 +12,15 @@ ${CLICKHOUSE_CLIENT} --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine =
# Non-replicated engines are allowed # Non-replicated engines are allowed
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test (id UInt64) ENGINE = MergeTree() ORDER BY id AS SELECT 1" ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test (id UInt64) ENGINE = MergeTree() ORDER BY id AS SELECT 1"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv (id UInt64) ENGINE = MergeTree() ORDER BY id POPULATE AS SELECT 1" ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv (id UInt64) ENGINE = MergeTree() ORDER BY id POPULATE AS SELECT 1 AS id"
# Replicated storafes are forbidden # Replicated storafes are forbidden
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED" ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED" ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" |& grep -cm1 "SUPPORT_IS_DISABLED"
# POPULATE is allowed with the special setting # POPULATE is allowed with the special setting
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" --database_replicated_allow_heavy_create=1 ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" --database_replicated_allow_heavy_create=1
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv3 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" --compatibility='24.6' ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv3 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" --compatibility='24.6'
# AS SELECT is forbidden even with the setting # AS SELECT is forbidden even with the setting
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" --database_replicated_allow_heavy_create=1 |& grep -cm1 "SUPPORT_IS_DISABLED" ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" --database_replicated_allow_heavy_create=1 |& grep -cm1 "SUPPORT_IS_DISABLED"

View File

@ -45,34 +45,34 @@ DateTime/DateTime64 best effort
2000-01-01 00:00:00 DateTime 2000-01-01 00:00:00 DateTime
2000-01-01 01:00:00 DateTime 2000-01-01 01:00:00 DateTime
2000-01-01 01:00:00.000000000 DateTime64(9) 2000-01-01 01:00:00.000000000 DateTime64(9)
2017-01-01 22:02:03 DateTime 02/01/17 010203 MSK String
2017-01-01 22:02:03.000000000 DateTime64(9) 02/01/17 010203.000 MSK String
2017-01-01 21:02:03 DateTime 02/01/17 010203 MSK+0100 String
2017-01-01 21:02:03.000000000 DateTime64(9) 02/01/17 010203.000 MSK+0100 String
2017-01-01 22:02:03 DateTime 02/01/17 010203 UTC+0300 String
2017-01-01 22:02:03.000000000 DateTime64(9) 02/01/17 010203.000 UTC+0300 String
2017-01-02 01:02:03 DateTime 02/01/17 010203Z String
2017-01-02 01:02:03.000000000 DateTime64(9) 02/01/17 010203.000Z String
1970-01-02 01:02:03 DateTime 02/01/1970 010203Z String
1970-01-02 01:02:03.000000000 DateTime64(9) 02/01/1970 010203.000Z String
1970-01-02 01:02:03 DateTime 02/01/70 010203Z String
1970-01-02 01:02:03.000000000 DateTime64(9) 02/01/70 010203.000Z String
2018-02-11 03:40:50 DateTime 2018-02-11 03:40:50 DateTime
2018-02-11 03:40:50.000000000 DateTime64(9) 2018-02-11 03:40:50.000000000 DateTime64(9)
2000-04-17 01:02:03 DateTime 2000-04-17 01:02:03 DateTime
2000-04-17 01:02:03.000000000 DateTime64(9) 2000-04-17 01:02:03.000000000 DateTime64(9)
1970-01-02 01:00:00 DateTime 19700102 01:00:00 String
1970-01-02 01:00:00.000000000 DateTime64(9) 19700102 01:00:00.000 String
1970-01-02 01:02:03 DateTime 19700102010203Z String
1970-01-02 01:02:03.000000000 DateTime64(9) 19700102010203Z.000 String
1970-01-02 01:02:03 DateTime 1970/01/02 010203Z String
1970-01-02 01:02:03.000000000 DateTime64(9) 1970/01/02 010203.000Z String
2015-12-31 20:00:00 DateTime 2015-12-31 20:00:00 DateTime
2015-12-31 20:00:00 DateTime 2015-12-31 20:00:00 DateTime
2016-01-01 00:00:00 DateTime 2016-01-01 00:00:00 DateTime
2016-01-01 00:00:00 DateTime 2016-01-01 00:00:00 DateTime
2017-01-01 22:02:03 DateTime 201701 02 010203 UTC+0300 String
2017-01-01 22:02:03.000000000 DateTime64(9) 201701 02 010203.000 UTC+0300 String
2017-01-02 03:04:05 DateTime 2017-01-02 03:04:05 DateTime
2017-01-02 03:04:05.000000000 DateTime64(9) 2017-01-02 03:04:05.000000000 DateTime64(9)
2017-01-02 03:04:05 DateTime 2017-01-02 03:04:05 DateTime
@ -117,8 +117,8 @@ DateTime/DateTime64 best effort
2017-01-02 03:04:05.000000000 DateTime64(9) 2017-01-02 03:04:05.000000000 DateTime64(9)
2017-04-01 11:22:33 DateTime 2017-04-01 11:22:33 DateTime
2017-04-01 11:22:33.000000000 DateTime64(9) 2017-04-01 11:22:33.000000000 DateTime64(9)
2017-04-01 22:02:03 DateTime 2017 Apr 02 010203 UTC+0300 String
2017-04-01 22:02:03.000000000 DateTime64(9) 2017 Apr 02 010203.000 UTC+0300 String
2017-04-01 22:02:03 DateTime 2017-04-01 22:02:03 DateTime
2017-04-01 22:02:03.000000000 DateTime64(9) 2017-04-01 22:02:03.000000000 DateTime64(9)
2017-04-02 01:02:03 DateTime 2017-04-02 01:02:03 DateTime
@ -143,8 +143,8 @@ DateTime/DateTime64 best effort
2017-04-01 21:02:03.000000000 DateTime64(9) 2017-04-01 21:02:03.000000000 DateTime64(9)
2017-04-02 01:02:03 DateTime 2017-04-02 01:02:03 DateTime
2017-04-02 01:02:03.000000000 DateTime64(9) 2017-04-02 01:02:03.000000000 DateTime64(9)
2017-01-01 22:02:03 DateTime 2017 Jan 02 010203 UTC+0300 String
2017-01-01 22:02:03.000000000 DateTime64(9) 2017 Jan 02 010203.000 UTC+0300 String
2017-04-25 01:02:03 DateTime 2017-04-25 01:02:03 DateTime
2017-04-25 01:02:03.000000000 DateTime64(9) 2017-04-25 01:02:03.000000000 DateTime64(9)
2017-04-25 01:02:03 DateTime 2017-04-25 01:02:03 DateTime
@ -231,6 +231,25 @@ Mar01012020010101 String
Mar 01012020010101 String Mar 01012020010101 String
Mar01012020010101.000 String Mar01012020010101.000 String
Mar 0101202001010101.000 String Mar 0101202001010101.000 String
Sun String
Sun1 String
Sun 1 String
Sun01 String
Sun 01 String
Sun2020 String
Sun 2020 String
Sun012020 String
Sun 012020 String
Sun01012020 String
Sun 01012020 String
Sun0101202001 String
Sun 0101202001 String
Sun010120200101 String
Sun 010120200101 String
Sun01012020010101 String
Sun 01012020010101 String
Sun01012020010101.000 String
Sun 0101202001010101.000 String
2000 01 01 01:00:00 String 2000 01 01 01:00:00 String
2000 01 01 01:00:00.000 String 2000 01 01 01:00:00.000 String
2000a01a01 01:00:00 String 2000a01a01 01:00:00 String
@ -251,3 +270,4 @@ Mar 2000 00:00:00.000 String
2000 00:00:00.000 String 2000 00:00:00.000 String
Mar 2000-01-01 00:00:00 String Mar 2000-01-01 00:00:00 String
Mar 2000-01-01 00:00:00.000 String Mar 2000-01-01 00:00:00.000 String
1.7.10 String

View File

@ -245,6 +245,25 @@ select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar01012020010101"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 01012020010101"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 01012020010101"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar01012020010101.000"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar01012020010101.000"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 0101202001010101.000"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 0101202001010101.000"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun1"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 1"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun01"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 01"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun2020"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 2020"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun012020"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 012020"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun01012020"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 01012020"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun0101202001"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 0101202001"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun010120200101"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 010120200101"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun01012020010101"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 01012020010101"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun01012020010101.000"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Sun 0101202001010101.000"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000 01 01 01:00:00"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000 01 01 01:00:00"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000 01 01 01:00:00.000"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000 01 01 01:00:00.000"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000a01a01 01:00:00"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000a01a01 01:00:00"}');
@ -265,5 +284,5 @@ select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000 00:00:00"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000 00:00:00.000"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "2000 00:00:00.000"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 2000-01-01 00:00:00"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 2000-01-01 00:00:00"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 2000-01-01 00:00:00.000"}'); select x, toTypeName(x) from format(JSONEachRow, '{"x" : "Mar 2000-01-01 00:00:00.000"}');
select x, toTypeName(x) from format(JSONEachRow, '{"x" : "1.7.10"}');

View File

@ -0,0 +1,4 @@
([('wtf')]) [('wtf')] wtf
([('wtf')]) [('wtf')] wtf
Hello
('Hello') Hello Hello Hello

View File

@ -0,0 +1,13 @@
SET enable_analyzer = 1;
SELECT JSONExtract('{"hello":[{"world":"wtf"}]}', 'Tuple(hello Array(Tuple(world String)))') AS x,
x.hello, x.hello[1].world;
SELECT JSONExtract('{"hello":[{" wow ":"wtf"}]}', 'Tuple(hello Array(Tuple(` wow ` String)))') AS x,
x.hello, x.hello[1].` wow `;
SELECT JSONExtract('{"hello":[{" wow ":"wtf"}]}', 'Tuple(hello Array(Tuple(` wow ` String)))') AS x,
x.hello, x.hello[1].`wow`; -- { serverError NOT_FOUND_COLUMN_IN_BLOCK }
SELECT ('Hello' AS world,).world;
SELECT ('Hello' AS world,) AS t, t.world, (t).world, identity(t).world;

View File

@ -2132,6 +2132,7 @@ namespace
namespaces namespaces
natively natively
nats nats
ness
nestjs nestjs
netloc netloc
newjson newjson