diff --git a/docs/en/operations/system-tables/view_refreshes.md b/docs/en/operations/system-tables/view_refreshes.md index 12377507b39..e792e0d095d 100644 --- a/docs/en/operations/system-tables/view_refreshes.md +++ b/docs/en/operations/system-tables/view_refreshes.md @@ -17,7 +17,8 @@ Columns: - `duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md)) — How long the last refresh attempt took. - `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start. - `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled. -- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Exception'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace. +- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Error'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace. +- `retry` ([UInt64](../../sql-reference/data-types/int-uint.md)) — If nonzero, the current or next refresh is a retry (see `refresh_retries` refresh setting), and `retry` is the 1-based index of that retry. - `refresh_count` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of successful refreshes since last server restart or table creation. - `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1. - `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far. diff --git a/docs/en/sql-reference/statements/create/view.md b/docs/en/sql-reference/statements/create/view.md index 2931f7020fb..45e7a41e8a2 100644 --- a/docs/en/sql-reference/statements/create/view.md +++ b/docs/en/sql-reference/statements/create/view.md @@ -13,8 +13,8 @@ Creates a new view. Views can be [normal](#normal-view), [materialized](#materia Syntax: ``` sql -CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] -[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }] +CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] +[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }] AS SELECT ... [COMMENT 'comment'] ``` @@ -55,8 +55,8 @@ SELECT * FROM view(column1=value1, column2=value2 ...) ## Materialized View ``` sql -CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE] -[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }] +CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE] +[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }] AS SELECT ... [COMMENT 'comment'] ``` @@ -92,7 +92,7 @@ Given that `POPULATE` works like `CREATE TABLE ... AS SELECT ...` it has limitat - It is not supported with Replicated database - It is not supported in ClickHouse cloud -Instead a separate `INSERT ... SELECT` can be used. +Instead a separate `INSERT ... SELECT` can be used. ::: A `SELECT` query can contain `DISTINCT`, `GROUP BY`, `ORDER BY`, `LIMIT`. Note that the corresponding conversions are performed independently on each block of inserted data. For example, if `GROUP BY` is set, data is aggregated during insertion, but only within a single packet of inserted data. The data won’t be further aggregated. The exception is when using an `ENGINE` that independently performs data aggregation, such as `SummingMergeTree`. @@ -110,7 +110,7 @@ To delete a view, use [DROP VIEW](../../../sql-reference/statements/drop.md#drop `DEFINER` and `SQL SECURITY` allow you to specify which ClickHouse user to use when executing the view's underlying query. `SQL SECURITY` has three legal values: `DEFINER`, `INVOKER`, or `NONE`. You can specify any existing user or `CURRENT_USER` in the `DEFINER` clause. -The following table will explain which rights are required for which user in order to select from view. +The following table will explain which rights are required for which user in order to select from view. Note that regardless of the SQL security option, in every case it is still required to have `GRANT SELECT ON ` in order to read from it. | SQL security option | View | Materialized View | @@ -130,7 +130,7 @@ If `DEFINER`/`SQL SECURITY` aren't specified, the default values are used: If a view is attached without `DEFINER`/`SQL SECURITY` specified, the default value is `SQL SECURITY NONE` for the materialized view and `SQL SECURITY INVOKER` for the normal view. -To change SQL security for an existing view, use +To change SQL security for an existing view, use ```sql ALTER TABLE MODIFY SQL SECURITY { DEFINER | INVOKER | NONE } [DEFINER = { user | CURRENT_USER }] ``` @@ -161,6 +161,8 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name REFRESH EVERY|AFTER interval [OFFSET interval] RANDOMIZE FOR interval DEPENDS ON [db.]name [, [db.]name [, ...]] +SETTINGS name = value [, name = value [, ...]] +[APPEND] [TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY] AS SELECT ... [COMMENT 'comment'] @@ -170,18 +172,23 @@ where `interval` is a sequence of simple intervals: number SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR ``` -Periodically runs the corresponding query and stores its result in a table, atomically replacing the table's previous contents. +Periodically runs the corresponding query and stores its result in a table. + * If the query says `APPEND`, each refresh inserts rows into the table without deleting existing rows. The insert is not atomic, just like a regular INSERT SELECT. + * Otherwise each refresh atomically replaces the table's previous contents. Differences from regular non-refreshable materialized views: - * No insert trigger. I.e. when new data is inserted into the table specified in SELECT, it's *not* automatically pushed to the refreshable materialized view. The periodic refresh runs the entire query and replaces the entire table. + * No insert trigger. I.e. when new data is inserted into the table specified in SELECT, it's *not* automatically pushed to the refreshable materialized view. The periodic refresh runs the entire query. * No restrictions on the SELECT query. Table functions (e.g. `url()`), views, UNION, JOIN, are all allowed. +:::note +The settings in the `REFRESH ... SETTINGS` part of the query are refresh settings (e.g. `refresh_retries`), distinct from regular settings (e.g. `max_threads`). Regular settings can be specified using `SETTINGS` at the end of the query. +::: + :::note Refreshable materialized views are a work in progress. Setting `allow_experimental_refreshable_materialized_view = 1` is required for creating one. Current limitations: * not compatible with Replicated database or table engines * It is not supported in ClickHouse Cloud * require [Atomic database engine](../../../engines/database-engines/atomic.md), - * no retries for failed refresh - we just skip to the next scheduled refresh time, * no limit on number of concurrent refreshes. ::: @@ -246,15 +253,22 @@ A few more examples: `DEPENDS ON` only works between refreshable materialized views. Listing a regular table in the `DEPENDS ON` list will prevent the view from ever refreshing (dependencies can be removed with `ALTER`, see below). ::: +### Settings + +Available refresh settings: + * `refresh_retries` - How many times to retry if refresh query fails with an exception. If all retries fail, skip to the next scheduled refresh time. 0 means no retries, -1 means infinite retries. Default: 0. + * `refresh_retry_initial_backoff_ms` - Delay before the first retry, if `refresh_retries` is not zero. Each subsequent retry doubles the delay, up to `refresh_retry_max_backoff_ms`. Default: 100 ms. + * `refresh_retry_max_backoff_ms` - Limit on the exponential growth of delay between refresh attempts. Default: 60000 ms (1 minute). + ### Changing Refresh Parameters {#changing-refresh-parameters} To change refresh parameters: ``` -ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] +ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] [SETTINGS ...] ``` :::note -This replaces refresh schedule *and* dependencies. If the table had a `DEPENDS ON`, doing a `MODIFY REFRESH` without `DEPENDS ON` will remove the dependencies. +This replaces *all* refresh parameters at once: schedule, dependencies, settings, and APPEND-ness. E.g. if the table had a `DEPENDS ON`, doing a `MODIFY REFRESH` without `DEPENDS ON` will remove the dependencies. ::: ### Other operations @@ -263,6 +277,10 @@ The status of all refreshable materialized views is available in table [`system. To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views). +:::note +Fun fact: the refresh query is allowed to read from the view that's being refreshed, seeing pre-refresh version of the data. This means you can implement Conway's game of life: https://pastila.nl/?00021a4b/d6156ff819c83d490ad2dcec05676865#O0LGWTO7maUQIA4AcGUtlA== +::: + ## Window View [Experimental] :::info diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md index 35f2f15dd80..3ebcf617491 100644 --- a/docs/en/sql-reference/statements/system.md +++ b/docs/en/sql-reference/statements/system.md @@ -400,7 +400,7 @@ SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_ After running this statement the `[db.]replicated_merge_tree_family_table_name` fetches commands from the common replicated log into its own replication queue, and then the query waits till the replica processes all of the fetched commands. The following modifiers are supported: - If a `STRICT` modifier was specified then the query waits for the replication queue to become empty. The `STRICT` version may never succeed if new entries constantly appear in the replication queue. - - If a `LIGHTWEIGHT` modifier was specified then the query waits only for `GET_PART`, `ATTACH_PART`, `DROP_RANGE`, `REPLACE_RANGE` and `DROP_PART` entries to be processed. + - If a `LIGHTWEIGHT` modifier was specified then the query waits only for `GET_PART`, `ATTACH_PART`, `DROP_RANGE`, `REPLACE_RANGE` and `DROP_PART` entries to be processed. Additionally, the LIGHTWEIGHT modifier supports an optional FROM 'srcReplicas' clause, where 'srcReplicas' is a comma-separated list of source replica names. This extension allows for more targeted synchronization by focusing only on replication tasks originating from the specified source replicas. - If a `PULL` modifier was specified then the query pulls new replication queue entries from ZooKeeper, but does not wait for anything to be processed. @@ -526,6 +526,10 @@ Trigger an immediate out-of-schedule refresh of a given view. SYSTEM REFRESH VIEW [db.]name ``` +### REFRESH VIEW + +Wait for the currently running refresh to complete. If the refresh fails, throws an exception. If no refresh is running, completes immediately, throwing an exception if previous refresh failed. + ### STOP VIEW, STOP VIEWS Disable periodic refreshing of the given view or all refreshable views. If a refresh is in progress, cancel it too. diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 44a1cd071cb..1055b3d34db 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -608,6 +608,7 @@ M(727, UNEXPECTED_TABLE_ENGINE) \ M(728, UNEXPECTED_DATA_TYPE) \ M(729, ILLEGAL_TIME_SERIES_TAGS) \ + M(730, REFRESH_FAILED) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index dfcff052740..0d84ad9022a 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -616,6 +616,7 @@ class IColumn; M(Bool, throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert, true, "Throw exception on INSERT query when the setting `deduplicate_blocks_in_dependent_materialized_views` is enabled along with `async_insert`. It guarantees correctness, because these features can't work together.", 0) \ M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \ M(Bool, ignore_materialized_views_with_dropped_target_table, false, "Ignore MVs with dropped target table during pushing to views", 0) \ + M(Bool, allow_materialized_view_with_bad_select, true, "Allow CREATE MATERIALIZED VIEW with SELECT query that references nonexistent tables or columns. It must still be syntactically valid. Doesn't apply to refreshable MVs. Doesn't apply if the MV schema needs to be inferred from the SELECT query (i.e. if the CREATE has no column list and no TO table). Can be used for creating MV before its source table.", 0) \ M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \ M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \ M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 806b09d8b9d..2415323b4a0 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -84,6 +84,7 @@ static std::initializer_listgetInMemoryMetadataPtr()->getSampleBlock().getNamesAndTypesList(); + check_columns = true; + } + } + else if (!properties.columns_inferred_from_select_query) + { + all_output_columns = properties.columns.getInsertable(); + check_columns = true; + } + + if (create.refresh_strategy && !create.refresh_strategy->append) + { + if (database && database->getEngineName() != "Atomic") + throw Exception(ErrorCodes::INCORRECT_QUERY, + "Refreshable materialized views (except with APPEND) only support Atomic database engine, but database {} has engine {}", create.getDatabase(), database->getEngineName()); + } + + Block input_block; + + if (check_columns) + { + try + { + if (getContext()->getSettingsRef().allow_experimental_analyzer) + { + input_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.select->clone(), getContext()); + } + else + { + input_block = InterpreterSelectWithUnionQuery(create.select->clone(), + getContext(), + SelectQueryOptions().analyze()).getSampleBlock(); + } + } + catch (Exception &) + { + if (!getContext()->getSettingsRef().allow_materialized_view_with_bad_select) + throw; + check_columns = false; + } + } + + if (check_columns) + { + std::unordered_map output_types; + for (const NameAndTypePair & nt : all_output_columns) + output_types[nt.name] = nt.type; + + ColumnsWithTypeAndName input_columns; + ColumnsWithTypeAndName output_columns; + for (const auto & input_column : input_block) + { + auto it = output_types.find(input_column.name); + if (it != output_types.end()) + { + input_columns.push_back(input_column.cloneEmpty()); + output_columns.push_back(ColumnWithTypeAndName(it->second->createColumn(), it->second, input_column.name)); + } + else if (create.refresh_strategy) + { + /// Unrecognized columns produced by SELECT query are allowed by regular materialized + /// views, but not by refreshable ones. This is in part because it was easier to + /// implement, in part because refreshable views have less concern about ALTERing target + /// tables. + /// + /// The motivating scenario for allowing this in regular MV is ALTERing the table+query. + /// Suppose the user removes a column from target table, then a minute later + /// correspondingly updates the view's query to not produce that column. + /// If MV didn't allow unrecognized columns then during that minute all INSERTs into the + /// source table would fail - unacceptable. + /// For refreshable views, during that minute refreshes will fail - acceptable. + throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "SELECT query outputs column with name '{}', which is not found in the target table. Use 'AS' to assign alias that matches a column name.", input_column.name); + } + } + + if (input_columns.empty()) + throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "None of the columns produced by the SELECT query are present in the target table. Use 'AS' to assign aliases that match column names."); + + ActionsDAG::makeConvertingActions( + input_columns, + output_columns, + ActionsDAG::MatchColumnsMode::Position + ); + } +} + namespace { void checkTemporaryTableEngineName(const String & name) @@ -1132,13 +1233,6 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data throw Exception(ErrorCodes::LOGICAL_ERROR, "Table UUID is not specified in DDL log"); } - if (create.refresh_strategy && database->getEngineName() != "Atomic") - throw Exception(ErrorCodes::INCORRECT_QUERY, - "Refreshable materialized view requires Atomic database engine, but database {} has engine {}", create.getDatabase(), database->getEngineName()); - /// TODO: Support Replicated databases, only with Shared/ReplicatedMergeTree. - /// Figure out how to make the refreshed data appear all at once on other - /// replicas; maybe a replicated SYSTEM SYNC REPLICA query before the rename? - if (database->getUUID() != UUIDHelpers::Nil) { if (create.attach && !from_path && create.uuid == UUIDHelpers::Nil) @@ -1359,51 +1453,16 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) /// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way. TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create, mode); - /// Check type compatible for materialized dest table and select columns - if (create.is_materialized_view_with_external_target() && create.select && mode <= LoadingStrictnessLevel::CREATE) - { - if (StoragePtr to_table = DatabaseCatalog::instance().tryGetTable(create.getTargetTableID(ViewTarget::To), getContext())) - { - Block input_block; - - if (getContext()->getSettingsRef().allow_experimental_analyzer) - { - input_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.select->clone(), getContext()); - } - else - { - input_block = InterpreterSelectWithUnionQuery(create.select->clone(), - getContext(), - SelectQueryOptions().analyze()).getSampleBlock(); - } - - Block output_block = to_table->getInMemoryMetadataPtr()->getSampleBlock(); - - ColumnsWithTypeAndName input_columns; - ColumnsWithTypeAndName output_columns; - for (const auto & input_column : input_block) - { - if (const auto * output_column = output_block.findByName(input_column.name)) - { - input_columns.push_back(input_column.cloneEmpty()); - output_columns.push_back(output_column->cloneEmpty()); - } - } - - ActionsDAG::makeConvertingActions( - input_columns, - output_columns, - ActionsDAG::MatchColumnsMode::Position - ); - } - } - DatabasePtr database; bool need_add_to_database = !create.temporary; // In case of an ON CLUSTER query, the database may not be present on the initiator node if (need_add_to_database) database = DatabaseCatalog::instance().tryGetDatabase(database_name); + /// Check type compatible for materialized dest table and select columns + if (create.select && create.is_materialized_view && mode <= LoadingStrictnessLevel::CREATE) + validateMaterializedViewColumnsAndEngine(create, properties, database); + bool allow_heavy_populate = getContext()->getSettingsRef().database_replicated_allow_heavy_create && create.is_populate; if (!allow_heavy_populate && database && database->getEngineName() == "Replicated" && (create.select || create.is_populate)) { diff --git a/src/Interpreters/InterpreterCreateQuery.h b/src/Interpreters/InterpreterCreateQuery.h index 3982ea2cabc..5047c372c71 100644 --- a/src/Interpreters/InterpreterCreateQuery.h +++ b/src/Interpreters/InterpreterCreateQuery.h @@ -90,6 +90,7 @@ private: IndicesDescription indices; ConstraintsDescription constraints; ProjectionsDescription projections; + bool columns_inferred_from_select_query = false; }; BlockIO createDatabase(ASTCreateQuery & create); @@ -98,6 +99,7 @@ private: /// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way. TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create, LoadingStrictnessLevel mode) const; void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const; + void validateMaterializedViewColumnsAndEngine(const ASTCreateQuery & create, const TableProperties & properties, const DatabasePtr & database); void setEngine(ASTCreateQuery & create) const; AccessRightsElements getRequiredAccess() const; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 1cd55a0020c..21c8b44b374 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -666,6 +666,10 @@ BlockIO InterpreterSystemQuery::execute() for (const auto & task : getRefreshTasks()) task->run(); break; + case Type::WAIT_VIEW: + for (const auto & task : getRefreshTasks()) + task->wait(); + break; case Type::CANCEL_VIEW: for (const auto & task : getRefreshTasks()) task->cancel(); @@ -1409,6 +1413,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() break; } case Type::REFRESH_VIEW: + case Type::WAIT_VIEW: case Type::START_VIEW: case Type::START_VIEWS: case Type::STOP_VIEW: diff --git a/src/Parsers/ASTCreateQuery.cpp b/src/Parsers/ASTCreateQuery.cpp index 359e93ab269..d7f5b8f9702 100644 --- a/src/Parsers/ASTCreateQuery.cpp +++ b/src/Parsers/ASTCreateQuery.cpp @@ -256,6 +256,8 @@ ASTPtr ASTCreateQuery::clone() const res->set(res->dictionary, dictionary->clone()); } + if (refresh_strategy) + res->set(res->refresh_strategy, refresh_strategy->clone()); if (as_table_function) res->set(res->as_table_function, as_table_function->clone()); if (comment) diff --git a/src/Parsers/ASTRefreshStrategy.cpp b/src/Parsers/ASTRefreshStrategy.cpp index 2e0c6ee4638..d10c1b4e7f5 100644 --- a/src/Parsers/ASTRefreshStrategy.cpp +++ b/src/Parsers/ASTRefreshStrategy.cpp @@ -20,7 +20,6 @@ ASTPtr ASTRefreshStrategy::clone() const res->set(res->settings, settings->clone()); if (dependencies) res->set(res->dependencies, dependencies->clone()); - res->schedule_kind = schedule_kind; return res; } @@ -66,6 +65,8 @@ void ASTRefreshStrategy::formatImpl( f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " SETTINGS " << (f_settings.hilite ? hilite_none : ""); settings->formatImpl(f_settings, state, frame); } + if (append) + f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " APPEND" << (f_settings.hilite ? hilite_none : ""); } } diff --git a/src/Parsers/ASTRefreshStrategy.h b/src/Parsers/ASTRefreshStrategy.h index ca248b76b40..bb5ac97c054 100644 --- a/src/Parsers/ASTRefreshStrategy.h +++ b/src/Parsers/ASTRefreshStrategy.h @@ -24,6 +24,7 @@ public: ASTTimeInterval * offset = nullptr; ASTTimeInterval * spread = nullptr; RefreshScheduleKind schedule_kind{RefreshScheduleKind::UNKNOWN}; + bool append = false; String getID(char) const override { return "Refresh strategy definition"; } diff --git a/src/Parsers/ASTRenameQuery.h b/src/Parsers/ASTRenameQuery.h index d51c382f374..39fc4f787ec 100644 --- a/src/Parsers/ASTRenameQuery.h +++ b/src/Parsers/ASTRenameQuery.h @@ -141,6 +141,19 @@ public: QueryKind getQueryKind() const override { return QueryKind::Rename; } + void addElement(const String & from_db, const String & from_table, const String & to_db, const String & to_table) + { + auto identifier = [&](const String & name) -> ASTPtr + { + if (name.empty()) + return nullptr; + ASTPtr ast = std::make_shared(name); + children.push_back(ast); + return ast; + }; + elements.push_back(Element {.from = Table {.database = identifier(from_db), .table = identifier(from_table)}, .to = Table {.database = identifier(to_db), .table = identifier(to_table)}}); + } + protected: void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override { diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 7780544d5c2..b5e5e0f208d 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -376,6 +376,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s case Type::START_VIEW: case Type::STOP_VIEW: case Type::CANCEL_VIEW: + case Type::WAIT_VIEW: { settings.ostr << ' '; print_database_table(); diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 167e724dcee..59de90b1d8e 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -95,6 +95,7 @@ public: START_CLEANUP, RESET_COVERAGE, REFRESH_VIEW, + WAIT_VIEW, START_VIEW, START_VIEWS, STOP_VIEW, diff --git a/src/Parsers/ParserRefreshStrategy.cpp b/src/Parsers/ParserRefreshStrategy.cpp index e7912293d85..4f3b7c66558 100644 --- a/src/Parsers/ParserRefreshStrategy.cpp +++ b/src/Parsers/ParserRefreshStrategy.cpp @@ -96,6 +96,10 @@ bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expec return false; refresh->set(refresh->settings, settings); } + + if (ParserKeyword{Keyword::APPEND}.ignore(pos, expected)) + refresh->append = true; + node = refresh; return true; } diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index 81b64ab47c6..efabbbfa479 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -421,6 +421,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & break; case Type::REFRESH_VIEW: + case Type::WAIT_VIEW: case Type::START_VIEW: case Type::STOP_VIEW: case Type::CANCEL_VIEW: diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0477a08b0d2..6de7e60285f 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -103,7 +103,7 @@ public: IStorage(const IStorage &) = delete; IStorage & operator=(const IStorage &) = delete; - /// The main name of the table type (for example, StorageMergeTree). + /// The main name of the table type (e.g. Memory, MergeTree, CollapsingMergeTree). virtual std::string getName() const = 0; /// The name of the table. diff --git a/src/Storages/MaterializedView/RefreshSet.h b/src/Storages/MaterializedView/RefreshSet.h index 7fb583fd316..6141a69996a 100644 --- a/src/Storages/MaterializedView/RefreshSet.h +++ b/src/Storages/MaterializedView/RefreshSet.h @@ -10,7 +10,7 @@ namespace DB { -enum class RefreshState : RefreshTaskStateUnderlying +enum class RefreshState { Disabled = 0, Scheduled, @@ -18,11 +18,11 @@ enum class RefreshState : RefreshTaskStateUnderlying Running, }; -enum class LastRefreshResult : RefreshTaskStateUnderlying +enum class LastRefreshResult { Unknown = 0, Cancelled, - Exception, + Error, Finished }; @@ -36,7 +36,8 @@ struct RefreshInfo UInt64 last_attempt_duration_ms = 0; UInt32 next_refresh_time = 0; UInt64 refresh_count = 0; - String exception_message; // if last_refresh_result is Exception + UInt64 retry = 0; + String exception_message; // if last_refresh_result is Error std::vector remaining_dependencies; ProgressValues progress; }; diff --git a/src/Storages/MaterializedView/RefreshSettings.h b/src/Storages/MaterializedView/RefreshSettings.h index 814c7e52b32..23676538788 100644 --- a/src/Storages/MaterializedView/RefreshSettings.h +++ b/src/Storages/MaterializedView/RefreshSettings.h @@ -6,8 +6,10 @@ namespace DB { #define LIST_OF_REFRESH_SETTINGS(M, ALIAS) \ - /// TODO: Add settings - /// M(UInt64, name, 42, "...", 0) + M(Int64, refresh_retries, 0, "How many times to retry refresh query if it fails. If all attempts fail, wait for the next refresh time according to schedule. 0 to disable retries. -1 for infinite retries.", 0) \ + M(UInt64, refresh_retry_initial_backoff_ms, 100, "Delay before the first retry if refresh query fails (if refresh_retries setting is not zero). Each subsequent retry doubles the delay, up to refresh_retry_max_backoff_ms.", 0) \ + M(UInt64, refresh_retry_max_backoff_ms, 60'000, "Limit on the exponential growth of delay between refresh attempts, if they keep failing and refresh_retries is positive.", 0) \ + DECLARE_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS) diff --git a/src/Storages/MaterializedView/RefreshTask.cpp b/src/Storages/MaterializedView/RefreshTask.cpp index 0837eaf97fd..ed5a6652288 100644 --- a/src/Storages/MaterializedView/RefreshTask.cpp +++ b/src/Storages/MaterializedView/RefreshTask.cpp @@ -1,7 +1,5 @@ #include -#include - #include #include #include @@ -11,6 +9,7 @@ #include #include #include +#include namespace CurrentMetrics { @@ -24,37 +23,39 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int QUERY_WAS_CANCELLED; + extern const int REFRESH_FAILED; } RefreshTask::RefreshTask( - const ASTRefreshStrategy & strategy) + StorageMaterializedView * view_, const DB::ASTRefreshStrategy & strategy) : log(getLogger("RefreshTask")) + , view(view_) , refresh_schedule(strategy) -{} + , refresh_append(strategy.append) +{ + if (strategy.settings != nullptr) + refresh_settings.applyChanges(strategy.settings->changes); +} -RefreshTaskHolder RefreshTask::create( +OwnedRefreshTask RefreshTask::create( + StorageMaterializedView * view, ContextMutablePtr context, const DB::ASTRefreshStrategy & strategy) { - auto task = std::make_shared(strategy); + auto task = std::make_shared(view, strategy); - task->refresh_task = context->getSchedulePool().createTask("MaterializedViewRefresherTask", - [self = task->weak_from_this()] - { - if (auto t = self.lock()) - t->refreshTask(); - }); + task->refresh_task = context->getSchedulePool().createTask("RefreshTask", + [self = task.get()] { self->refreshTask(); }); if (strategy.dependencies) for (auto && dependency : strategy.dependencies->children) task->initial_dependencies.emplace_back(dependency->as()); - return task; + return OwnedRefreshTask(task); } -void RefreshTask::initializeAndStart(std::shared_ptr view) +void RefreshTask::initializeAndStart() { - view_to_refresh = view; if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup) stop_requested = true; view->getContext()->getRefreshSet().emplace(view->getStorageID(), initial_dependencies, shared_from_this()); @@ -102,7 +103,11 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy if (arriveDependency(id) && !std::exchange(refresh_immediately, true)) refresh_task->schedule(); - /// TODO: Update settings once we have them. + refresh_settings = {}; + if (new_strategy.settings != nullptr) + refresh_settings.applyChanges(new_strategy.settings->changes); + + refresh_append = new_strategy.append; } RefreshInfo RefreshTask::getInfo() const @@ -111,7 +116,7 @@ RefreshInfo RefreshTask::getInfo() const auto res = info; res.view_id = set_handle.getID(); res.remaining_dependencies.assign(remaining_dependencies.begin(), remaining_dependencies.end()); - if (res.last_refresh_result != LastRefreshResult::Exception) + if (res.last_refresh_result != LastRefreshResult::Error) res.exception_message.clear(); res.progress = progress.getValues(); return res; @@ -139,6 +144,8 @@ void RefreshTask::run() std::lock_guard guard(mutex); if (std::exchange(refresh_immediately, true)) return; + next_refresh_prescribed = std::chrono::floor(currentTime()); + next_refresh_actual = currentTime(); refresh_task->schedule(); } @@ -149,10 +156,22 @@ void RefreshTask::cancel() refresh_task->schedule(); } +void RefreshTask::wait() +{ + std::unique_lock lock(mutex); + refresh_cv.wait(lock, [&] { return info.state != RefreshState::Running && !refresh_immediately; }); + if (info.last_refresh_result == LastRefreshResult::Error) + throw Exception(ErrorCodes::REFRESH_FAILED, "Refresh failed: {}", info.exception_message); +} + void RefreshTask::shutdown() { { std::lock_guard guard(mutex); + + if (view == nullptr) + return; // already shut down + stop_requested = true; interruptExecution(); } @@ -166,6 +185,8 @@ void RefreshTask::shutdown() /// (Also, RefreshSet holds a shared_ptr to us.) std::lock_guard guard(mutex); set_handle.reset(); + + view = nullptr; } void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time) @@ -232,6 +253,7 @@ void RefreshTask::refreshTask() chassert(lock.owns_lock()); interrupt_execution.store(false); + refresh_cv.notify_all(); // we'll assign info.state before unlocking the mutex if (stop_requested) { @@ -243,7 +265,7 @@ void RefreshTask::refreshTask() if (!refresh_immediately) { auto now = currentTime(); - if (now >= next_refresh_with_spread) + if (now >= next_refresh_actual) { if (arriveTime()) refresh_immediately = true; @@ -256,7 +278,7 @@ void RefreshTask::refreshTask() else { size_t delay_ms = std::chrono::duration_cast( - next_refresh_with_spread - now).count(); + next_refresh_actual - now).count(); /// If we're in a test that fakes the clock, poll every 100ms. if (fake_clock.load(std::memory_order_relaxed) != INT64_MIN) @@ -270,19 +292,9 @@ void RefreshTask::refreshTask() /// Perform a refresh. + bool append = refresh_append; refresh_immediately = false; - - auto view = lockView(); - if (!view) - { - /// The view was dropped. This RefreshTask should be destroyed soon too. - /// (Maybe this is unreachable.) - info.state = RefreshState::Disabled; - break; - } - info.state = RefreshState::Running; - CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews); lock.unlock(); @@ -293,19 +305,13 @@ void RefreshTask::refreshTask() try { - executeRefreshUnlocked(view); + executeRefreshUnlocked(append); refreshed = true; } catch (...) { if (!interrupt_execution.load()) - { - PreformattedMessage message = getCurrentExceptionMessageAndPattern(true); - auto text = message.text; - message.text = fmt::format("Refresh view {} failed: {}", view->getStorageID().getFullTableName(), message.text); - LOG_ERROR(log, message); - exception = text; - } + exception = getCurrentExceptionMessage(true); } lock.lock(); @@ -317,18 +323,18 @@ void RefreshTask::refreshTask() if (exception) { - info.last_refresh_result = LastRefreshResult::Exception; + info.last_refresh_result = LastRefreshResult::Error; info.exception_message = *exception; - - /// TODO: Do a few retries with exponential backoff. - advanceNextRefreshTime(now); + Int64 attempt_number = num_retries + 1; + scheduleRetryOrSkipToNextRefresh(now); + LOG_ERROR(log, "Refresh view {} failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), attempt_number, refresh_settings.refresh_retries + 1, *exception); } else if (!refreshed) { info.last_refresh_result = LastRefreshResult::Cancelled; /// Make sure we don't just start another refresh immediately. - if (!stop_requested && now >= next_refresh_with_spread) + if (!stop_requested) advanceNextRefreshTime(now); } else @@ -361,17 +367,18 @@ void RefreshTask::refreshTask() } } -void RefreshTask::executeRefreshUnlocked(std::shared_ptr view) +void RefreshTask::executeRefreshUnlocked(bool append) { LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName()); progress.reset(); - /// Create a table. - auto [refresh_context, refresh_query] = view->prepareRefresh(); - - StorageID stale_table = StorageID::createEmpty(); + ContextMutablePtr refresh_context = view->createRefreshContext(); + std::optional table_to_drop; try { + /// Create a table. + auto refresh_query = view->prepareRefresh(append, refresh_context, table_to_drop); + /// Run the query. { CurrentThread::QueryScope query_scope(refresh_context); // create a thread group for the query @@ -429,37 +436,55 @@ void RefreshTask::executeRefreshUnlocked(std::shared_ptrexchangeTargetTable(refresh_query->table_id, refresh_context); + if (!append) + table_to_drop = view->exchangeTargetTable(refresh_query->table_id, refresh_context); } catch (...) { - try - { - InterpreterDropQuery::executeDropQuery( - ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, refresh_query->table_id, /*sync*/ false, /*ignore_sync_setting*/ true); - } - catch (...) - { - tryLogCurrentException(log, "Failed to drop temporary table after a failed refresh"); - /// Let's ignore this and keep going, at risk of accumulating many trash tables if this keeps happening. - } + if (table_to_drop.has_value()) + view->dropTempTable(table_to_drop.value(), refresh_context); throw; } - /// Drop the old table (outside the try-catch so we don't try to drop the other table if this fails). - InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, stale_table, /*sync*/ true, /*ignore_sync_setting*/ true); + if (table_to_drop.has_value()) + view->dropTempTable(table_to_drop.value(), refresh_context); } void RefreshTask::advanceNextRefreshTime(std::chrono::system_clock::time_point now) { std::chrono::sys_seconds next = refresh_schedule.prescribeNext(next_refresh_prescribed, now); next_refresh_prescribed = next; - next_refresh_with_spread = refresh_schedule.addRandomSpread(next); + next_refresh_actual = refresh_schedule.addRandomSpread(next); - auto secs = std::chrono::floor(next_refresh_with_spread); + num_retries = 0; + info.retry = num_retries; + + auto secs = std::chrono::floor(next_refresh_actual); info.next_refresh_time = UInt32(secs.time_since_epoch().count()); } +void RefreshTask::scheduleRetryOrSkipToNextRefresh(std::chrono::system_clock::time_point now) +{ + if (refresh_settings.refresh_retries >= 0 && num_retries >= refresh_settings.refresh_retries) + { + advanceNextRefreshTime(now); + return; + } + + num_retries += 1; + info.retry = num_retries; + + UInt64 delay_ms; + UInt64 multiplier = UInt64(1) << std::min(num_retries - 1, Int64(62)); + /// Overflow check: a*b <= c iff a <= c/b iff a <= floor(c/b). + if (refresh_settings.refresh_retry_initial_backoff_ms <= refresh_settings.refresh_retry_max_backoff_ms / multiplier) + delay_ms = refresh_settings.refresh_retry_initial_backoff_ms * multiplier; + else + delay_ms = refresh_settings.refresh_retry_max_backoff_ms; + + next_refresh_actual = now + std::chrono::milliseconds(delay_ms); +} + bool RefreshTask::arriveDependency(const StorageID & parent) { remaining_dependencies.erase(parent); @@ -500,11 +525,6 @@ void RefreshTask::interruptExecution() } } -std::shared_ptr RefreshTask::lockView() -{ - return std::static_pointer_cast(view_to_refresh.lock()); -} - std::chrono::system_clock::time_point RefreshTask::currentTime() const { Int64 fake = fake_clock.load(std::memory_order::relaxed); diff --git a/src/Storages/MaterializedView/RefreshTask.h b/src/Storages/MaterializedView/RefreshTask.h index 623493f6aec..ad9d949e18e 100644 --- a/src/Storages/MaterializedView/RefreshTask.h +++ b/src/Storages/MaterializedView/RefreshTask.h @@ -17,19 +17,21 @@ class PipelineExecutor; class StorageMaterializedView; class ASTRefreshStrategy; +struct OwnedRefreshTask; class RefreshTask : public std::enable_shared_from_this { public: /// Never call it manually, public for shared_ptr construction only - explicit RefreshTask(const ASTRefreshStrategy & strategy); + RefreshTask(StorageMaterializedView * view_, const ASTRefreshStrategy & strategy); /// The only proper way to construct task - static RefreshTaskHolder create( + static OwnedRefreshTask create( + StorageMaterializedView * view, ContextMutablePtr context, const DB::ASTRefreshStrategy & strategy); - void initializeAndStart(std::shared_ptr view); + void initializeAndStart(); // called at most once /// Call when renaming the materialized view. void rename(StorageID new_id); @@ -51,7 +53,14 @@ public: /// Cancel task execution void cancel(); + /// Waits for the currently running refresh attempt to complete. + /// If the refresh fails, throws an exception. + /// If no refresh is running, completes immediately, throwing an exception if previous refresh failed. + void wait(); + /// Permanently disable task scheduling and remove this table from RefreshSet. + /// Ok to call multiple times, but not in parallel. + /// Ok to call even if initializeAndStart() wasn't called or failed. void shutdown(); /// Notify dependent task @@ -65,7 +74,7 @@ public: private: LoggerPtr log = nullptr; - std::weak_ptr view_to_refresh; + StorageMaterializedView * view; /// Protects interrupt_execution and running_executor. /// Can be locked while holding `mutex`. @@ -82,8 +91,10 @@ private: mutable std::mutex mutex; RefreshSchedule refresh_schedule; - RefreshSettings refresh_settings; // TODO: populate, use, update on alter + RefreshSettings refresh_settings; std::vector initial_dependencies; + bool refresh_append; + RefreshSet::Handle set_handle; /// StorageIDs of our dependencies that we're waiting for. @@ -112,7 +123,8 @@ private: /// E.g. for REFRESH EVERY 1 DAY, yesterday's refresh of the dependency shouldn't trigger today's /// refresh of the dependent even if it happened today (e.g. it was slow or had random spread > 1 day). std::chrono::sys_seconds next_refresh_prescribed; - std::chrono::system_clock::time_point next_refresh_with_spread; + std::chrono::system_clock::time_point next_refresh_actual; + Int64 num_retries = 0; /// Calls refreshTask() from background thread. BackgroundSchedulePool::TaskHolder refresh_task; @@ -123,6 +135,7 @@ private: /// Just for observability. RefreshInfo info; Progress progress; + std::condition_variable refresh_cv; // notified when info.state changes /// The main loop of the refresh task. It examines the state, sees what needs to be /// done and does it. If there's nothing to do at the moment, returns; it's then scheduled again, @@ -134,11 +147,14 @@ private: /// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table. /// Mutex must be unlocked. Called only from refresh_task. - void executeRefreshUnlocked(std::shared_ptr view); + void executeRefreshUnlocked(bool append); /// Assigns next_refresh_* void advanceNextRefreshTime(std::chrono::system_clock::time_point now); + /// Either advances next_refresh_actual using exponential backoff or does advanceNextRefreshTime(). + void scheduleRetryOrSkipToNextRefresh(std::chrono::system_clock::time_point now); + /// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case. bool arriveDependency(const StorageID & parent); bool arriveTime(); @@ -146,9 +162,24 @@ private: void interruptExecution(); - std::shared_ptr lockView(); - std::chrono::system_clock::time_point currentTime() const; }; +/// Wrapper around shared_ptr, calls shutdown() in destructor. +struct OwnedRefreshTask +{ + RefreshTaskHolder ptr; + + OwnedRefreshTask() = default; + explicit OwnedRefreshTask(RefreshTaskHolder p) : ptr(std::move(p)) {} + OwnedRefreshTask(OwnedRefreshTask &&) = default; + OwnedRefreshTask & operator=(OwnedRefreshTask &&) = default; + + ~OwnedRefreshTask() { if (ptr) ptr->shutdown(); } + + RefreshTask* operator->() const { return ptr.get(); } + RefreshTask& operator*() const { return *ptr; } + explicit operator bool() const { return ptr != nullptr; } +}; + } diff --git a/src/Storages/MaterializedView/RefreshTask_fwd.h b/src/Storages/MaterializedView/RefreshTask_fwd.h index 9a0a122381e..ff17c839dc5 100644 --- a/src/Storages/MaterializedView/RefreshTask_fwd.h +++ b/src/Storages/MaterializedView/RefreshTask_fwd.h @@ -8,9 +8,7 @@ namespace DB class RefreshTask; -using RefreshTaskStateUnderlying = UInt8; using RefreshTaskHolder = std::shared_ptr; -using RefreshTaskObserver = std::weak_ptr; using RefreshTaskList = std::list; } diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 4c6c2fff209..e1256032493 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -3,6 +3,8 @@ #include #include +#include +#include #include #include @@ -14,6 +16,7 @@ #include #include #include +#include #include #include @@ -146,6 +149,13 @@ StorageMaterializedView::StorageMaterializedView( if (point_to_itself_by_uuid || point_to_itself_by_name) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Materialized view {} cannot point to itself", table_id_.getFullTableName()); + if (query.refresh_strategy) + { + fixed_uuid = false; + refresher = RefreshTask::create(this, getContext(), *query.refresh_strategy); + refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty; + } + if (!has_inner_table) { target_table_id = to_table_id; @@ -198,15 +208,6 @@ StorageMaterializedView::StorageMaterializedView( target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID(); } - - if (query.refresh_strategy) - { - fixed_uuid = false; - refresher = RefreshTask::create( - getContext(), - *query.refresh_strategy); - refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty; - } } QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage( @@ -377,44 +378,71 @@ bool StorageMaterializedView::optimize( return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context); } -std::tuple> StorageMaterializedView::prepareRefresh() const +ContextMutablePtr StorageMaterializedView::createRefreshContext() const { auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(getContext()); + refresh_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY); /// Generate a random query id. refresh_context->setCurrentQueryId(""); + /// TODO: Set view's definer as the current user in refresh_context, so that the correct user's + /// quotas and permissions apply for this query. + return refresh_context; +} - CurrentThread::QueryScope query_scope(refresh_context); - +std::shared_ptr StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional & out_temp_table_id) const +{ auto inner_table_id = getTargetTableId(); - auto new_table_name = ".tmp" + generateInnerTableName(getStorageID()); + StorageID target_table = inner_table_id; - auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name); + if (!append) + { + CurrentThread::QueryScope query_scope(refresh_context); - auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext()); - auto & create_query = create_table_query->as(); - create_query.setTable(new_table_name); - create_query.setDatabase(db->getDatabaseName()); - create_query.create_or_replace = true; - create_query.replace_table = true; - create_query.uuid = UUIDHelpers::Nil; + auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name); + String db_name = db->getDatabaseName(); + auto new_table_name = ".tmp" + generateInnerTableName(getStorageID()); - InterpreterCreateQuery create_interpreter(create_table_query, refresh_context); - create_interpreter.setInternal(true); - create_interpreter.execute(); + auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext()); + auto & create_query = create_table_query->as(); + create_query.setTable(new_table_name); + create_query.setDatabase(db->getDatabaseName()); + create_query.create_or_replace = true; + create_query.replace_table = true; + create_query.uuid = UUIDHelpers::Nil; - StorageID fresh_table = DatabaseCatalog::instance().getTable({create_query.getDatabase(), create_query.getTable()}, getContext())->getStorageID(); + InterpreterCreateQuery create_interpreter(create_table_query, refresh_context); + create_interpreter.setInternal(true); + create_interpreter.execute(); + + target_table = DatabaseCatalog::instance().getTable({db_name, new_table_name}, getContext())->getStorageID(); + out_temp_table_id = target_table; + } auto insert_query = std::make_shared(); insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query; - insert_query->setTable(fresh_table.table_name); - insert_query->setDatabase(fresh_table.database_name); - insert_query->table_id = fresh_table; + insert_query->setTable(target_table.table_name); + insert_query->setDatabase(target_table.database_name); + insert_query->table_id = target_table; - return {refresh_context, insert_query}; + Block header; + if (refresh_context->getSettingsRef().allow_experimental_analyzer) + header = InterpreterSelectQueryAnalyzer::getSampleBlock(insert_query->select, refresh_context); + else + header = InterpreterSelectWithUnionQuery(insert_query->select, refresh_context, SelectQueryOptions()).getSampleBlock(); + + auto columns = std::make_shared(','); + for (const String & name : header.getNames()) + columns->children.push_back(std::make_shared(name)); + insert_query->columns = std::move(columns); + + return insert_query; } StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context) { + /// Known problem: if the target table was ALTERed during refresh, this will effectively revert + /// the ALTER. + auto stale_table_id = getTargetTableId(); auto db = DatabaseCatalog::instance().getDatabase(stale_table_id.database_name); @@ -422,15 +450,40 @@ StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, Co CurrentThread::QueryScope query_scope(refresh_context); - target_db->renameTable( - refresh_context, fresh_table.table_name, *db, stale_table_id.table_name, /*exchange=*/true, /*dictionary=*/false); + auto rename_query = std::make_shared(); + rename_query->exchange = true; + rename_query->addElement(fresh_table.database_name, fresh_table.table_name, stale_table_id.database_name, stale_table_id.table_name); + + InterpreterRenameQuery(rename_query, refresh_context).execute(); std::swap(stale_table_id.database_name, fresh_table.database_name); std::swap(stale_table_id.table_name, fresh_table.table_name); + setTargetTableId(std::move(fresh_table)); return stale_table_id; } +void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context) +{ + CurrentThread::QueryScope query_scope(refresh_context); + + try + { + auto drop_query = std::make_shared(); + drop_query->setDatabase(table_id.database_name); + drop_query->setTable(table_id.table_name); + drop_query->kind = ASTDropQuery::Kind::Drop; + drop_query->if_exists = true; + drop_query->sync = false; + + InterpreterDropQuery(drop_query, refresh_context).execute(); + } + catch (...) + { + tryLogCurrentException(&Poco::Logger::get("StorageMaterializedView"), "Failed to drop temporary table after refresh"); + } +} + void StorageMaterializedView::alter( const AlterCommands & params, ContextPtr local_context, @@ -530,25 +583,11 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id) { auto new_target_table_name = generateInnerTableName(new_table_id); - ASTRenameQuery::Elements rename_elements; assert(inner_table_id.database_name == old_table_id.database_name); - ASTRenameQuery::Element elem - { - ASTRenameQuery::Table - { - inner_table_id.database_name.empty() ? nullptr : std::make_shared(inner_table_id.database_name), - std::make_shared(inner_table_id.table_name) - }, - ASTRenameQuery::Table - { - new_table_id.database_name.empty() ? nullptr : std::make_shared(new_table_id.database_name), - std::make_shared(new_target_table_name) - } - }; - rename_elements.emplace_back(std::move(elem)); + auto rename = std::make_shared(); + rename->addElement(inner_table_id.database_name, inner_table_id.table_name, new_table_id.database_name, new_target_table_name); - auto rename = std::make_shared(std::move(rename_elements)); InterpreterRenameQuery(rename, getContext()).execute(); updateTargetTableId(new_table_id.database_name, new_target_table_name); } @@ -576,7 +615,7 @@ void StorageMaterializedView::startup() if (refresher) { - refresher->initializeAndStart(std::static_pointer_cast(shared_from_this())); + refresher->initializeAndStart(); if (refresh_on_start) refresher->run(); diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index 5ecd2ec3819..a09ee07b3f6 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -5,7 +5,7 @@ #include #include -#include +#include namespace DB { @@ -106,7 +106,7 @@ private: /// Will be initialized in constructor StorageID target_table_id = StorageID::createEmpty(); - RefreshTaskHolder refresher; + OwnedRefreshTask refresher; bool refresh_on_start = false; bool has_inner_table = false; @@ -119,10 +119,14 @@ private: void checkStatementCanBeForwarded() const; - /// Prepare to refresh a refreshable materialized view: create query context, create temporary - /// table, form the insert-select query. - std::tuple> prepareRefresh() const; + ContextMutablePtr createRefreshContext() const; + /// Prepare to refresh a refreshable materialized view: create temporary table and form the + /// insert-select query. + /// out_temp_table_id may be assigned before throwing an exception, in which case the caller + /// must drop the temp table before rethrowing. + std::shared_ptr prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional & out_temp_table_id) const; StorageID exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context); + void dropTempTable(StorageID table, ContextMutablePtr refresh_context); void setTargetTableId(StorageID id); void updateTargetTableId(std::optional database_name, std::optional table_name); diff --git a/src/Storages/System/StorageSystemViewRefreshes.cpp b/src/Storages/System/StorageSystemViewRefreshes.cpp index 061201017a7..6e0dab1468d 100644 --- a/src/Storages/System/StorageSystemViewRefreshes.cpp +++ b/src/Storages/System/StorageSystemViewRefreshes.cpp @@ -34,8 +34,9 @@ ColumnsDescription StorageSystemViewRefreshes::getColumnsDescription() "If status = 'WaitingForDependencies', a refresh is ready to start as soon as these dependencies are fulfilled." }, {"exception", std::make_shared(), - "if last_refresh_result = 'Exception', i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace." + "if last_refresh_result = 'Error', i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace." }, + {"retry", std::make_shared(), "How many failed attempts there were so far, for the current refresh."}, {"refresh_count", std::make_shared(), "Number of successful refreshes since last server restart or table creation."}, {"progress", std::make_shared(), "Progress of the current refresh, between 0 and 1."}, {"elapsed", std::make_shared(), "The amount of nanoseconds the current refresh took."}, @@ -88,6 +89,7 @@ void StorageSystemViewRefreshes::fillData( res_columns[i++]->insert(Array(deps)); res_columns[i++]->insert(refresh.exception_message); + res_columns[i++]->insert(refresh.retry); res_columns[i++]->insert(refresh.refresh_count); res_columns[i++]->insert(Float64(refresh.progress.read_rows) / refresh.progress.total_rows_to_read); res_columns[i++]->insert(refresh.progress.elapsed_ns / 1e9); diff --git a/tests/performance/parallel_mv.xml b/tests/performance/parallel_mv.xml index 5b856740a19..0bf5ed1be09 100644 --- a/tests/performance/parallel_mv.xml +++ b/tests/performance/parallel_mv.xml @@ -11,13 +11,13 @@ create table mt_4 (n UInt64, s String) engine = MergeTree order by tuple() create materialized view mv_1 to mt_1 as - select number, toString(number) from main_table where number % 13 != 0 + select number as n, toString(number) as s from main_table where number % 13 != 0 create materialized view mv_2 to mt_2 as - select number, toString(number) from main_table where number % 13 != 1 + select number as n, toString(number) as s from main_table where number % 13 != 1 create materialized view mv_3 to mt_3 as - select number, toString(number) from main_table where number % 13 != 3 + select number as n, toString(number) as s from main_table where number % 13 != 3 create materialized view mv_4 to mt_4 as - select number, toString(number) from main_table where number % 13 != 4 + select number as n, toString(number) as s from main_table where number % 13 != 4 SYSTEM STOP MERGES main_table SYSTEM STOP MERGES mt_1 diff --git a/tests/queries/0_stateless/00982_low_cardinality_setting_in_mv.sql b/tests/queries/0_stateless/00982_low_cardinality_setting_in_mv.sql index 7192642bcde..e545dec90b7 100644 --- a/tests/queries/0_stateless/00982_low_cardinality_setting_in_mv.sql +++ b/tests/queries/0_stateless/00982_low_cardinality_setting_in_mv.sql @@ -4,7 +4,7 @@ DROP TABLE IF EXISTS mat_view; CREATE TABLE test1 (a LowCardinality(String)) ENGINE=MergeTree() ORDER BY a; CREATE TABLE test2 (a UInt64) engine=MergeTree() ORDER BY a; -CREATE MATERIALIZED VIEW test_mv TO test2 AS SELECT toUInt64(a = 'test') FROM test1; +CREATE MATERIALIZED VIEW test_mv TO test2 AS SELECT toUInt64(a = 'test') AS a FROM test1; DROP TABLE test_mv; DROP TABLE test1; diff --git a/tests/queries/0_stateless/01160_table_dependencies.sh b/tests/queries/0_stateless/01160_table_dependencies.sh index acb6522e9e2..b72acf62610 100755 --- a/tests/queries/0_stateless/01160_table_dependencies.sh +++ b/tests/queries/0_stateless/01160_table_dependencies.sh @@ -35,7 +35,7 @@ arraySort(loading_dependencies_table), arraySort(loading_dependent_table) from s $CLICKHOUSE_CLIENT -q "select '====='" $CLICKHOUSE_CLIENT -q "alter table t add column x int default in(1, $CLICKHOUSE_DATABASE.s), drop column y" -$CLICKHOUSE_CLIENT -q "create materialized view mv to s as select n from t where n in (select n from join)" +$CLICKHOUSE_CLIENT -q "create materialized view mv to s as select n as x from t where n in (select n from join)" $CLICKHOUSE_CLIENT -q "select table, arraySort(dependencies_table), arraySort(loading_dependencies_table), arraySort(loading_dependent_table) from system.tables where database=currentDatabase() order by table" diff --git a/tests/queries/0_stateless/02345_implicit_transaction.sql b/tests/queries/0_stateless/02345_implicit_transaction.sql index ee2e0a07c3e..9496de71e13 100644 --- a/tests/queries/0_stateless/02345_implicit_transaction.sql +++ b/tests/queries/0_stateless/02345_implicit_transaction.sql @@ -3,7 +3,7 @@ CREATE TABLE landing (n Int64) engine=MergeTree order by n; CREATE TABLE target (n Int64) engine=MergeTree order by n; CREATE MATERIALIZED VIEW landing_to_target TO target AS - SELECT n + throwIf(n == 3333) + SELECT n + throwIf(n == 3333) AS n FROM landing; INSERT INTO landing SELECT * FROM numbers(10000); -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } diff --git a/tests/queries/0_stateless/02932_refreshable_materialized_views_1.reference b/tests/queries/0_stateless/02932_refreshable_materialized_views_1.reference new file mode 100644 index 00000000000..bfc6add90a7 --- /dev/null +++ b/tests/queries/0_stateless/02932_refreshable_materialized_views_1.reference @@ -0,0 +1,32 @@ +<1: created view> a [] 1 +CREATE MATERIALIZED VIEW default.a\nREFRESH AFTER 2 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT number AS x\nFROM numbers(2)\nUNION ALL\nSELECT rand64() AS x +<2: refreshed> 3 1 1 +<3: time difference at least> 1000 +<4: next refresh in> 2 +<4.1: fake clock> Scheduled 2050-01-01 00:00:01 2050-01-01 00:00:03 +<4.5: altered> Scheduled Finished 2052-01-01 00:00:00 +CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 YEAR\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT x * 2 AS x\nFROM default.src +<5: no refresh> 3 +<6: refreshed> 2 +<7: refreshed> Scheduled Finished 2054-01-01 00:00:00 +CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR DEPENDS ON default.a\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a +<7.5: created dependent> 2052-11-11 11:11:11 +<8: refreshed> 20 +<9: refreshed> a Scheduled Finished 2054-01-01 00:00:00 +<9: refreshed> b Scheduled Finished 2054-01-01 00:00:00 +<9.2: dropping> 0 2 +<9.4: dropped> 0 2 +<10: creating> a Scheduled [] 2054-01-01 00:00:00 +<10: creating> b WaitingForDependencies ['default.a'] 2054-01-01 00:00:00 +<11: chain-refreshed a> 4 +<12: chain-refreshed b> 40 +<13: chain-refreshed> a Scheduled [] Finished 2054-01-01 00:00:01 2056-01-01 00:00:00 1 +<13: chain-refreshed> b Scheduled ['default.a'] Finished 2054-01-24 23:22:21 2056-01-01 00:00:00 1 +<14: waiting for next cycle> a Scheduled [] 2058-01-01 00:00:00 +<14: waiting for next cycle> b WaitingForDependencies ['default.a'] 2060-01-01 00:00:00 +<15: chain-refreshed a> 6 +<16: chain-refreshed b> 60 +<17: chain-refreshed> a Scheduled 2062-01-01 00:00:00 +<17: chain-refreshed> b Scheduled 2062-01-01 00:00:00 +<18: removed dependency> b Scheduled [] 2062-03-03 03:03:03 2064-01-01 00:00:00 5 +CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a diff --git a/tests/queries/0_stateless/02932_refreshable_materialized_views_1.sh b/tests/queries/0_stateless/02932_refreshable_materialized_views_1.sh new file mode 100755 index 00000000000..2b92a113e91 --- /dev/null +++ b/tests/queries/0_stateless/02932_refreshable_materialized_views_1.sh @@ -0,0 +1,177 @@ +#!/usr/bin/env bash +# Tags: atomic-database + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh +# scheduling is done in UTC. +CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`" +CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --session_timezone Etc/UTC"`" + +$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view" + + +# Basic refreshing. +$CLICKHOUSE_CLIENT -nq " + create materialized view a + refresh after 2 second + engine Memory + empty + as select number as x from numbers(2) union all select rand64() as x; + select '<1: created view>', view, remaining_dependencies, exception, last_refresh_result in ('Unknown', 'Finished') from refreshes; + show create a;" +# Wait for any refresh. (xargs trims the string and turns \t and \n into spaces) +while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" == 'Unknown' ] +do + sleep 0.5 +done +start_time="`$CLICKHOUSE_CLIENT -nq "select reinterpret(now64(), 'Int64')"`" +# Check table contents. +$CLICKHOUSE_CLIENT -nq "select '<2: refreshed>', count(), sum(x=0), sum(x=1) from a" +# Wait for table contents to change. +res1="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values'`" +while : +do + res2="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`" + [ "$res2" == "$res1" ] || break + sleep 0.5 +done +# Wait for another change. +while : +do + res3="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`" + [ "$res3" == "$res2" ] || break + sleep 0.5 +done +# Check that the two changes were at least 1 second apart, in particular that we're not refreshing +# like crazy. This is potentially flaky, but we need at least one test that uses non-mocked timer +# to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above. +$CLICKHOUSE_CLIENT -nq " + select '<3: time difference at least>', min2(reinterpret(now64(), 'Int64') - $start_time, 1000); + select '<4: next refresh in>', next_refresh_time-last_refresh_time from refreshes;" + +# Create a source table from which views will read. +$CLICKHOUSE_CLIENT -nq " + create table src (x Int8) engine Memory as select 1;" + +# Switch to fake clock, change refresh schedule, change query. +$CLICKHOUSE_CLIENT -nq " + system test view a set fake time '2050-01-01 00:00:01'; + system wait view a; + system refresh view a; + system wait view a; + select '<4.1: fake clock>', status, last_refresh_time, next_refresh_time from refreshes; + alter table a modify refresh every 2 year; + alter table a modify query select x*2 as x from src; + select '<4.5: altered>', status, last_refresh_result, next_refresh_time from refreshes; + show create a;" +# Advance time to trigger the refresh. +$CLICKHOUSE_CLIENT -nq " + select '<5: no refresh>', count() from a; + system test view a set fake time '2052-02-03 04:05:06';" +while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes -- $LINENO" | xargs`" != '2052-02-03 04:05:06' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + select '<6: refreshed>', * from a; + select '<7: refreshed>', status, last_refresh_result, next_refresh_time from refreshes;" + +# Create a dependent view, refresh it once. +$CLICKHOUSE_CLIENT -nq " + create materialized view b refresh every 2 year depends on a (y Int32) engine MergeTree order by y empty as select x*10 as y from a; + show create b; + system test view b set fake time '2052-11-11 11:11:11'; + system refresh view b; + system wait view b; + select '<7.5: created dependent>', last_refresh_time from refreshes where view = 'b';" +# Next refresh shouldn't start until the dependency refreshes. +$CLICKHOUSE_CLIENT -nq " + select '<8: refreshed>', * from b; + select '<9: refreshed>', view, status, last_refresh_result, next_refresh_time from refreshes; + system test view b set fake time '2054-01-24 23:22:21';" +while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies 2054-01-01 00:00:00' ] +do + sleep 0.5 +done + +# Drop the source table, check that refresh fails and doesn't leave a temp table behind. +$CLICKHOUSE_CLIENT -nq " + select '<9.2: dropping>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase(); + drop table src; + system refresh view a;" +$CLICKHOUSE_CLIENT -nq "system wait view a;" 2>/dev/null && echo "SYSTEM WAIT VIEW failed to fail at $LINENO" +$CLICKHOUSE_CLIENT -nq " + select '<9.4: dropped>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase();" + +# Create the source table again, check that refresh succeeds (in particular that tables are looked +# up by name rather than uuid). +$CLICKHOUSE_CLIENT -nq " + select '<10: creating>', view, status, remaining_dependencies, next_refresh_time from refreshes; + create table src (x Int16) engine Memory as select 2; + system test view a set fake time '2054-01-01 00:00:01';" +while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled' ] +do + sleep 0.5 +done +# Both tables should've refreshed. +$CLICKHOUSE_CLIENT -nq " + select '<11: chain-refreshed a>', * from a; + select '<12: chain-refreshed b>', * from b; + select '<13: chain-refreshed>', view, status, remaining_dependencies, last_refresh_result, last_refresh_time, next_refresh_time, exception == '' from refreshes;" + +# Make the dependent table run ahead by one refresh cycle, make sure it waits for the dependency to +# catch up to the same cycle. +$CLICKHOUSE_CLIENT -nq " + system test view b set fake time '2059-01-01 00:00:00'; + system refresh view b;" +while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2060-01-01 00:00:00' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + system test view b set fake time '2061-01-01 00:00:00'; + system test view a set fake time '2057-01-01 00:00:00';" +while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes -- $LINENO" | xargs`" != 'Scheduled 2058-01-01 00:00:00 WaitingForDependencies 2060-01-01 00:00:00' ] +do + sleep 0.5 +done + +$CLICKHOUSE_CLIENT -nq " + select '<14: waiting for next cycle>', view, status, remaining_dependencies, next_refresh_time from refreshes; + truncate src; + insert into src values (3); + system test view a set fake time '2060-02-02 02:02:02';" +while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2062-01-01 00:00:00' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + select '<15: chain-refreshed a>', * from a; + select '<16: chain-refreshed b>', * from b; + select '<17: chain-refreshed>', view, status, next_refresh_time from refreshes;" + +# Get to WaitingForDependencies state and remove the depencency. +$CLICKHOUSE_CLIENT -nq " + system test view b set fake time '2062-03-03 03:03:03'" +while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + alter table b modify refresh every 2 year" +while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled 2062-03-03 03:03:03' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + select '<18: removed dependency>', view, status, remaining_dependencies, last_refresh_time,next_refresh_time, refresh_count from refreshes where view = 'b'; + show create b;" + +$CLICKHOUSE_CLIENT -nq " + drop table src; + drop table a; + drop table b; + drop table refreshes;" diff --git a/tests/queries/0_stateless/02932_refreshable_materialized_views_2.reference b/tests/queries/0_stateless/02932_refreshable_materialized_views_2.reference new file mode 100644 index 00000000000..cdaad32de0a --- /dev/null +++ b/tests/queries/0_stateless/02932_refreshable_materialized_views_2.reference @@ -0,0 +1,30 @@ +<19: exception> 1 +<20: unexception> 1 +<21: rename> 1 +<22: rename> d Finished +<23: simple refresh> 1 +<24: rename during refresh> 1 +<25: rename during refresh> f Running +<27: cancelled> f Scheduled Cancelled +<28: drop during refresh> 0 0 +CREATE MATERIALIZED VIEW default.g\nREFRESH EVERY 1 WEEK OFFSET 3 DAY 4 HOUR RANDOMIZE FOR 4 DAY 1 HOUR\n(\n `x` Int64\n)\nENGINE = Memory\nAS SELECT 42 AS x +<29: randomize> 1 1 +CREATE MATERIALIZED VIEW default.h\nREFRESH EVERY 1 SECOND TO default.dest\n(\n `x` Int64\n)\nAS SELECT x * 10 AS x\nFROM default.src +<30: to existing table> 10 +<31: to existing table> 10 +<31: to existing table> 20 +<31.5: will retry> Error 1 +<31.6: did retry> 10 +<32: empty> i Scheduled Unknown 0 +<32: empty> j Scheduled Finished 0 +<34: append> 10 +<35: append> 10 +<35: append> 20 +<35: append> 30 +<36: not append> 20 +<36: not append> 30 +<37: append chain> 100 +<38: append chain> 100 +<38: append chain> 100 +<38: append chain> 200 +creating MergeTree without ORDER BY failed, as expected diff --git a/tests/queries/0_stateless/02932_refreshable_materialized_views_2.sh b/tests/queries/0_stateless/02932_refreshable_materialized_views_2.sh new file mode 100755 index 00000000000..50a905576d5 --- /dev/null +++ b/tests/queries/0_stateless/02932_refreshable_materialized_views_2.sh @@ -0,0 +1,222 @@ +#!/usr/bin/env bash +# Tags: atomic-database + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# reset --log_comment +CLICKHOUSE_LOG_COMMENT= +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh +# scheduling is done in UTC. +CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`" +CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --allow_materialized_view_with_bad_select=0 --session_timezone Etc/UTC"`" + +$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view" + + +# Select from a table that doesn't exist, get an exception. +$CLICKHOUSE_CLIENT -nq " + create table src (x Int8) engine Memory as select 1; + create materialized view c refresh every 1 second (x Int64) engine Memory empty as select * from src; + drop table src;" +while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes where view = 'c' -- $LINENO" | xargs`" != 'Error' ] +do + sleep 0.5 +done +# Check exception, create src, expect successful refresh. +$CLICKHOUSE_CLIENT -nq " + select '<19: exception>', exception ilike '%UNKNOWN_TABLE%' ? '1' : exception from refreshes where view = 'c'; + create table src (x Int64) engine Memory as select 1; + system refresh view c;" +while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ] +do + sleep 0.5 +done +# Rename table. +$CLICKHOUSE_CLIENT -nq " + select '<20: unexception>', * from c; + rename table c to d; + select '<21: rename>', * from d; + select '<22: rename>', view, last_refresh_result from refreshes;" + +# Do various things during a refresh. +# First make a nonempty view. +$CLICKHOUSE_CLIENT -nq " + drop table d; + truncate src; + insert into src values (1); + create materialized view e refresh every 1 second (x Int64) engine MergeTree order by x empty as select x + sleepEachRow(1) as x from src settings max_block_size = 1;" +while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ] +do + sleep 0.5 +done +# Stop refreshes. +$CLICKHOUSE_CLIENT -nq " + select '<23: simple refresh>', * from e; + system stop view e;" +while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Disabled' ] +do + sleep 0.5 +done +# Make refreshes slow, wait for a slow refresh to start. (We stopped refreshes first to make sure +# we wait for a slow refresh, not a previous fast one.) +$CLICKHOUSE_CLIENT -nq " + insert into src select * from numbers(1000) settings max_block_size=1; + system start view e;" +while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Running' ] +do + sleep 0.5 +done +# Rename. +$CLICKHOUSE_CLIENT -nq " + rename table e to f; + select '<24: rename during refresh>', * from f; + select '<25: rename during refresh>', view, status from refreshes where view = 'f'; + alter table f modify refresh after 10 year;" + +# Cancel. +$CLICKHOUSE_CLIENT -nq " + system cancel view f;" +while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Scheduled' ] +do + sleep 0.5 +done +# Check that another refresh doesn't immediately start after the cancelled one. +$CLICKHOUSE_CLIENT -nq " + select '<27: cancelled>', view, status, last_refresh_result from refreshes where view = 'f'; + system refresh view f;" +while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Running' ] +do + sleep 0.5 +done +# Drop. +$CLICKHOUSE_CLIENT -nq " + drop table f; + select '<28: drop during refresh>', view, status from refreshes; + select '<28: drop during refresh>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase()" + +# Try OFFSET and RANDOMIZE FOR. +$CLICKHOUSE_CLIENT -nq " + create materialized view g refresh every 1 week offset 3 day 4 hour randomize for 4 day 1 hour (x Int64) engine Memory empty as select 42 as x; + show create g; + system test view g set fake time '2050-02-03 15:30:13';" +while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time > '2049-01-01' from refreshes -- $LINENO" | xargs`" != '1' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + with '2050-02-10 04:00:00'::DateTime as expected + select '<29: randomize>', abs(next_refresh_time::Int64 - expected::Int64) <= 3600*(24*4+1), next_refresh_time != expected from refreshes;" + +# Send data 'TO' an existing table. +$CLICKHOUSE_CLIENT -nq " + drop table g; + create table dest (x Int64) engine MergeTree order by x; + truncate src; + insert into src values (1); + create materialized view h refresh every 1 second to dest empty as select x*10 as x from src; + show create h;" +while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + select '<30: to existing table>', * from dest; + insert into src values (2);" +while [ "`$CLICKHOUSE_CLIENT -nq "select count() from dest -- $LINENO" | xargs`" != '2' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + select '<31: to existing table>', * from dest; + drop table dest; + drop table h;" + +# Retries. +$CLICKHOUSE_CLIENT -nq " + create materialized view h2 refresh after 1 year settings refresh_retries = 10 (x Int64) engine Memory as select x*10 + throwIf(x % 2 == 0) as x from src;" +$CLICKHOUSE_CLIENT -nq "system wait view h2;" 2>/dev/null && echo "SYSTEM WAIT VIEW failed to fail at $LINENO" +$CLICKHOUSE_CLIENT -nq " + select '<31.5: will retry>', last_refresh_result, retry > 0 from refreshes; + create table src2 (x Int8) engine Memory; + insert into src2 values (1); + exchange tables src and src2; + drop table src2;" +while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result, retry from refreshes -- $LINENO" | xargs`" != 'Finished 0' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + select '<31.6: did retry>', x from h2; + drop table h2" + +# EMPTY +$CLICKHOUSE_CLIENT -nq " + create materialized view i refresh after 1 year engine Memory empty as select number as x from numbers(2); + create materialized view j refresh after 1 year engine Memory as select number as x from numbers(2);" +while [ "`$CLICKHOUSE_CLIENT -nq "select sum(last_success_time is null) from refreshes -- $LINENO" | xargs`" == '2' ] +do + sleep 0.5 +done +$CLICKHOUSE_CLIENT -nq " + select '<32: empty>', view, status, last_refresh_result, retry from refreshes order by view; + drop table i; + drop table j;" + +# APPEND +$CLICKHOUSE_CLIENT -nq " + create materialized view k refresh every 10 year append (x Int64) engine Memory empty as select x*10 as x from src; + select '<33: append>', * from k; + system refresh view k; + system wait view k; + select '<34: append>', * from k; + truncate table src; + insert into src values (2), (3); + system refresh view k; + system wait view k; + select '<35: append>', * from k order by x;" +# ALTER to non-APPEND +$CLICKHOUSE_CLIENT -nq " + alter table k modify refresh every 10 year; + system wait view k; + system refresh view k; + system wait view k; + select '<36: not append>', * from k order by x; + drop table k; + truncate table src;" + +# APPEND + TO + regular materialized view reading from it. +$CLICKHOUSE_CLIENT -nq " + create table mid (x Int64) engine MergeTree order by x; + create materialized view l refresh every 10 year append to mid empty as select x*10 as x from src; + create materialized view m (x Int64) engine Memory as select x*10 as x from mid; + insert into src values (1); + system refresh view l; + system wait view l; + select '<37: append chain>', * from m; + insert into src values (2); + system refresh view l; + system wait view l; + select '<38: append chain>', * from m order by x; + drop table l; + drop table m; + drop table mid;" + +# Failing to create inner table. +$CLICKHOUSE_CLIENT -nq " + create materialized view n refresh every 1 second (x Int64) engine MergeTree as select 1 as x from numbers(2);" 2>/dev/null || echo "creating MergeTree without ORDER BY failed, as expected" +$CLICKHOUSE_CLIENT -nq " + create materialized view n refresh every 1 second (x Int64) engine MergeTree order by x as select 1 as x from numbers(2); + drop table n;" + +# Reading from table that doesn't exist yet. +$CLICKHOUSE_CLIENT -nq " + create materialized view o refresh every 1 second (x Int64) engine Memory as select x from nonexist; -- { serverError UNKNOWN_TABLE } + create materialized view o (x Int64) engine Memory as select x from nonexist; -- { serverError UNKNOWN_TABLE } + create materialized view o (x Int64) engine Memory as select x from nope.nonexist; -- { serverError UNKNOWN_DATABASE } + create materialized view o refresh every 1 second (x Int64) engine Memory as select x from nope.nonexist settings allow_materialized_view_with_bad_select = 1; + drop table o;" + +$CLICKHOUSE_CLIENT -nq " + drop table refreshes;" diff --git a/tests/queries/0_stateless/02933_replicated_database_forbid_create_as_select.sh b/tests/queries/0_stateless/02933_replicated_database_forbid_create_as_select.sh index b587549cb60..2b78746ae2c 100755 --- a/tests/queries/0_stateless/02933_replicated_database_forbid_create_as_select.sh +++ b/tests/queries/0_stateless/02933_replicated_database_forbid_create_as_select.sh @@ -12,15 +12,15 @@ ${CLICKHOUSE_CLIENT} --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = # Non-replicated engines are allowed ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test (id UInt64) ENGINE = MergeTree() ORDER BY id AS SELECT 1" -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv (id UInt64) ENGINE = MergeTree() ORDER BY id POPULATE AS SELECT 1" +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv (id UInt64) ENGINE = MergeTree() ORDER BY id POPULATE AS SELECT 1 AS id" # Replicated storafes are forbidden ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED" -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED" +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" |& grep -cm1 "SUPPORT_IS_DISABLED" # POPULATE is allowed with the special setting -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" --database_replicated_allow_heavy_create=1 -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv3 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" --compatibility='24.6' +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" --database_replicated_allow_heavy_create=1 +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv3 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" --compatibility='24.6' # AS SELECT is forbidden even with the setting ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" --database_replicated_allow_heavy_create=1 |& grep -cm1 "SUPPORT_IS_DISABLED" diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index fd836d93143..4a8ef449e72 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -2127,6 +2127,7 @@ namespace namespaces natively nats +ness nestjs netloc newjson