Merge pull request #58934 from ClickHouse/mv4

Refreshable materialized views improvements
This commit is contained in:
Michael Kolupaev 2024-08-16 17:16:28 +00:00 committed by GitHub
commit cb3468f6d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 887 additions and 212 deletions

View File

@ -17,7 +17,8 @@ Columns:
- `duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md)) — How long the last refresh attempt took.
- `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start.
- `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled.
- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Exception'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace.
- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Error'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace.
- `retry` ([UInt64](../../sql-reference/data-types/int-uint.md)) — If nonzero, the current or next refresh is a retry (see `refresh_retries` refresh setting), and `retry` is the 1-based index of that retry.
- `refresh_count` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of successful refreshes since last server restart or table creation.
- `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far.

View File

@ -13,8 +13,8 @@ Creates a new view. Views can be [normal](#normal-view), [materialized](#materia
Syntax:
``` sql
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
AS SELECT ...
[COMMENT 'comment']
```
@ -55,8 +55,8 @@ SELECT * FROM view(column1=value1, column2=value2 ...)
## Materialized View
``` sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
AS SELECT ...
[COMMENT 'comment']
```
@ -92,7 +92,7 @@ Given that `POPULATE` works like `CREATE TABLE ... AS SELECT ...` it has limitat
- It is not supported with Replicated database
- It is not supported in ClickHouse cloud
Instead a separate `INSERT ... SELECT` can be used.
Instead a separate `INSERT ... SELECT` can be used.
:::
A `SELECT` query can contain `DISTINCT`, `GROUP BY`, `ORDER BY`, `LIMIT`. Note that the corresponding conversions are performed independently on each block of inserted data. For example, if `GROUP BY` is set, data is aggregated during insertion, but only within a single packet of inserted data. The data wont be further aggregated. The exception is when using an `ENGINE` that independently performs data aggregation, such as `SummingMergeTree`.
@ -110,7 +110,7 @@ To delete a view, use [DROP VIEW](../../../sql-reference/statements/drop.md#drop
`DEFINER` and `SQL SECURITY` allow you to specify which ClickHouse user to use when executing the view's underlying query.
`SQL SECURITY` has three legal values: `DEFINER`, `INVOKER`, or `NONE`. You can specify any existing user or `CURRENT_USER` in the `DEFINER` clause.
The following table will explain which rights are required for which user in order to select from view.
The following table will explain which rights are required for which user in order to select from view.
Note that regardless of the SQL security option, in every case it is still required to have `GRANT SELECT ON <view>` in order to read from it.
| SQL security option | View | Materialized View |
@ -130,7 +130,7 @@ If `DEFINER`/`SQL SECURITY` aren't specified, the default values are used:
If a view is attached without `DEFINER`/`SQL SECURITY` specified, the default value is `SQL SECURITY NONE` for the materialized view and `SQL SECURITY INVOKER` for the normal view.
To change SQL security for an existing view, use
To change SQL security for an existing view, use
```sql
ALTER TABLE MODIFY SQL SECURITY { DEFINER | INVOKER | NONE } [DEFINER = { user | CURRENT_USER }]
```
@ -161,6 +161,8 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
REFRESH EVERY|AFTER interval [OFFSET interval]
RANDOMIZE FOR interval
DEPENDS ON [db.]name [, [db.]name [, ...]]
SETTINGS name = value [, name = value [, ...]]
[APPEND]
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
AS SELECT ...
[COMMENT 'comment']
@ -170,18 +172,23 @@ where `interval` is a sequence of simple intervals:
number SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR
```
Periodically runs the corresponding query and stores its result in a table, atomically replacing the table's previous contents.
Periodically runs the corresponding query and stores its result in a table.
* If the query says `APPEND`, each refresh inserts rows into the table without deleting existing rows. The insert is not atomic, just like a regular INSERT SELECT.
* Otherwise each refresh atomically replaces the table's previous contents.
Differences from regular non-refreshable materialized views:
* No insert trigger. I.e. when new data is inserted into the table specified in SELECT, it's *not* automatically pushed to the refreshable materialized view. The periodic refresh runs the entire query and replaces the entire table.
* No insert trigger. I.e. when new data is inserted into the table specified in SELECT, it's *not* automatically pushed to the refreshable materialized view. The periodic refresh runs the entire query.
* No restrictions on the SELECT query. Table functions (e.g. `url()`), views, UNION, JOIN, are all allowed.
:::note
The settings in the `REFRESH ... SETTINGS` part of the query are refresh settings (e.g. `refresh_retries`), distinct from regular settings (e.g. `max_threads`). Regular settings can be specified using `SETTINGS` at the end of the query.
:::
:::note
Refreshable materialized views are a work in progress. Setting `allow_experimental_refreshable_materialized_view = 1` is required for creating one. Current limitations:
* not compatible with Replicated database or table engines
* It is not supported in ClickHouse Cloud
* require [Atomic database engine](../../../engines/database-engines/atomic.md),
* no retries for failed refresh - we just skip to the next scheduled refresh time,
* no limit on number of concurrent refreshes.
:::
@ -246,15 +253,22 @@ A few more examples:
`DEPENDS ON` only works between refreshable materialized views. Listing a regular table in the `DEPENDS ON` list will prevent the view from ever refreshing (dependencies can be removed with `ALTER`, see below).
:::
### Settings
Available refresh settings:
* `refresh_retries` - How many times to retry if refresh query fails with an exception. If all retries fail, skip to the next scheduled refresh time. 0 means no retries, -1 means infinite retries. Default: 0.
* `refresh_retry_initial_backoff_ms` - Delay before the first retry, if `refresh_retries` is not zero. Each subsequent retry doubles the delay, up to `refresh_retry_max_backoff_ms`. Default: 100 ms.
* `refresh_retry_max_backoff_ms` - Limit on the exponential growth of delay between refresh attempts. Default: 60000 ms (1 minute).
### Changing Refresh Parameters {#changing-refresh-parameters}
To change refresh parameters:
```
ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...]
ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] [SETTINGS ...]
```
:::note
This replaces refresh schedule *and* dependencies. If the table had a `DEPENDS ON`, doing a `MODIFY REFRESH` without `DEPENDS ON` will remove the dependencies.
This replaces *all* refresh parameters at once: schedule, dependencies, settings, and APPEND-ness. E.g. if the table had a `DEPENDS ON`, doing a `MODIFY REFRESH` without `DEPENDS ON` will remove the dependencies.
:::
### Other operations
@ -263,6 +277,10 @@ The status of all refreshable materialized views is available in table [`system.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
:::note
Fun fact: the refresh query is allowed to read from the view that's being refreshed, seeing pre-refresh version of the data. This means you can implement Conway's game of life: https://pastila.nl/?00021a4b/d6156ff819c83d490ad2dcec05676865#O0LGWTO7maUQIA4AcGUtlA==
:::
## Window View [Experimental]
:::info

View File

@ -400,7 +400,7 @@ SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_
After running this statement the `[db.]replicated_merge_tree_family_table_name` fetches commands from the common replicated log into its own replication queue, and then the query waits till the replica processes all of the fetched commands. The following modifiers are supported:
- If a `STRICT` modifier was specified then the query waits for the replication queue to become empty. The `STRICT` version may never succeed if new entries constantly appear in the replication queue.
- If a `LIGHTWEIGHT` modifier was specified then the query waits only for `GET_PART`, `ATTACH_PART`, `DROP_RANGE`, `REPLACE_RANGE` and `DROP_PART` entries to be processed.
- If a `LIGHTWEIGHT` modifier was specified then the query waits only for `GET_PART`, `ATTACH_PART`, `DROP_RANGE`, `REPLACE_RANGE` and `DROP_PART` entries to be processed.
Additionally, the LIGHTWEIGHT modifier supports an optional FROM 'srcReplicas' clause, where 'srcReplicas' is a comma-separated list of source replica names. This extension allows for more targeted synchronization by focusing only on replication tasks originating from the specified source replicas.
- If a `PULL` modifier was specified then the query pulls new replication queue entries from ZooKeeper, but does not wait for anything to be processed.
@ -526,6 +526,10 @@ Trigger an immediate out-of-schedule refresh of a given view.
SYSTEM REFRESH VIEW [db.]name
```
### REFRESH VIEW
Wait for the currently running refresh to complete. If the refresh fails, throws an exception. If no refresh is running, completes immediately, throwing an exception if previous refresh failed.
### STOP VIEW, STOP VIEWS
Disable periodic refreshing of the given view or all refreshable views. If a refresh is in progress, cancel it too.

View File

@ -608,6 +608,7 @@
M(727, UNEXPECTED_TABLE_ENGINE) \
M(728, UNEXPECTED_DATA_TYPE) \
M(729, ILLEGAL_TIME_SERIES_TAGS) \
M(730, REFRESH_FAILED) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -616,6 +616,7 @@ class IColumn;
M(Bool, throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert, true, "Throw exception on INSERT query when the setting `deduplicate_blocks_in_dependent_materialized_views` is enabled along with `async_insert`. It guarantees correctness, because these features can't work together.", 0) \
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
M(Bool, ignore_materialized_views_with_dropped_target_table, false, "Ignore MVs with dropped target table during pushing to views", 0) \
M(Bool, allow_materialized_view_with_bad_select, true, "Allow CREATE MATERIALIZED VIEW with SELECT query that references nonexistent tables or columns. It must still be syntactically valid. Doesn't apply to refreshable MVs. Doesn't apply if the MV schema needs to be inferred from the SELECT query (i.e. if the CREATE has no column list and no TO table). Can be used for creating MV before its source table.", 0) \
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \

View File

@ -84,6 +84,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"use_hive_partitioning", false, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines."},
{"allow_experimental_kafka_offsets_storage_in_keeper", false, false, "Allow the usage of experimental Kafka storage engine that stores the committed offsets in ClickHouse Keeper"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
{"allow_materialized_view_with_bad_select", true, true, "Support (but not enable yet) stricter validation in CREATE MATERIALIZED VIEW"},
{"query_cache_tag", "", "", "New setting for labeling query cache settings."},
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},

View File

@ -121,6 +121,7 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
extern const int TOO_MANY_TABLES;
extern const int TOO_MANY_DATABASES;
extern const int THERE_IS_NO_COLUMN;
}
namespace fs = std::filesystem;
@ -847,6 +848,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
}
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
properties.columns_inferred_from_select_query = true;
}
else if (create.as_table_function)
{
@ -936,6 +938,105 @@ void validateVirtualColumns(const IStorage & storage)
}
}
void InterpreterCreateQuery::validateMaterializedViewColumnsAndEngine(const ASTCreateQuery & create, const TableProperties & properties, const DatabasePtr & database)
{
/// This is not strict validation, just catches common errors that would make the view not work.
/// It's possible to circumvent these checks by ALTERing the view or target table after creation;
/// we should probably do some of these checks on ALTER as well.
NamesAndTypesList all_output_columns;
bool check_columns = false;
if (create.hasTargetTableID(ViewTarget::To))
{
if (StoragePtr to_table = DatabaseCatalog::instance().tryGetTable(
create.getTargetTableID(ViewTarget::To), getContext()))
{
all_output_columns = to_table->getInMemoryMetadataPtr()->getSampleBlock().getNamesAndTypesList();
check_columns = true;
}
}
else if (!properties.columns_inferred_from_select_query)
{
all_output_columns = properties.columns.getInsertable();
check_columns = true;
}
if (create.refresh_strategy && !create.refresh_strategy->append)
{
if (database && database->getEngineName() != "Atomic")
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Refreshable materialized views (except with APPEND) only support Atomic database engine, but database {} has engine {}", create.getDatabase(), database->getEngineName());
}
Block input_block;
if (check_columns)
{
try
{
if (getContext()->getSettingsRef().allow_experimental_analyzer)
{
input_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.select->clone(), getContext());
}
else
{
input_block = InterpreterSelectWithUnionQuery(create.select->clone(),
getContext(),
SelectQueryOptions().analyze()).getSampleBlock();
}
}
catch (Exception &)
{
if (!getContext()->getSettingsRef().allow_materialized_view_with_bad_select)
throw;
check_columns = false;
}
}
if (check_columns)
{
std::unordered_map<std::string_view, DataTypePtr> output_types;
for (const NameAndTypePair & nt : all_output_columns)
output_types[nt.name] = nt.type;
ColumnsWithTypeAndName input_columns;
ColumnsWithTypeAndName output_columns;
for (const auto & input_column : input_block)
{
auto it = output_types.find(input_column.name);
if (it != output_types.end())
{
input_columns.push_back(input_column.cloneEmpty());
output_columns.push_back(ColumnWithTypeAndName(it->second->createColumn(), it->second, input_column.name));
}
else if (create.refresh_strategy)
{
/// Unrecognized columns produced by SELECT query are allowed by regular materialized
/// views, but not by refreshable ones. This is in part because it was easier to
/// implement, in part because refreshable views have less concern about ALTERing target
/// tables.
///
/// The motivating scenario for allowing this in regular MV is ALTERing the table+query.
/// Suppose the user removes a column from target table, then a minute later
/// correspondingly updates the view's query to not produce that column.
/// If MV didn't allow unrecognized columns then during that minute all INSERTs into the
/// source table would fail - unacceptable.
/// For refreshable views, during that minute refreshes will fail - acceptable.
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "SELECT query outputs column with name '{}', which is not found in the target table. Use 'AS' to assign alias that matches a column name.", input_column.name);
}
}
if (input_columns.empty())
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "None of the columns produced by the SELECT query are present in the target table. Use 'AS' to assign aliases that match column names.");
ActionsDAG::makeConvertingActions(
input_columns,
output_columns,
ActionsDAG::MatchColumnsMode::Position
);
}
}
namespace
{
void checkTemporaryTableEngineName(const String & name)
@ -1132,13 +1233,6 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table UUID is not specified in DDL log");
}
if (create.refresh_strategy && database->getEngineName() != "Atomic")
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Refreshable materialized view requires Atomic database engine, but database {} has engine {}", create.getDatabase(), database->getEngineName());
/// TODO: Support Replicated databases, only with Shared/ReplicatedMergeTree.
/// Figure out how to make the refreshed data appear all at once on other
/// replicas; maybe a replicated SYSTEM SYNC REPLICA query before the rename?
if (database->getUUID() != UUIDHelpers::Nil)
{
if (create.attach && !from_path && create.uuid == UUIDHelpers::Nil)
@ -1359,51 +1453,16 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create, mode);
/// Check type compatible for materialized dest table and select columns
if (create.is_materialized_view_with_external_target() && create.select && mode <= LoadingStrictnessLevel::CREATE)
{
if (StoragePtr to_table = DatabaseCatalog::instance().tryGetTable(create.getTargetTableID(ViewTarget::To), getContext()))
{
Block input_block;
if (getContext()->getSettingsRef().allow_experimental_analyzer)
{
input_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.select->clone(), getContext());
}
else
{
input_block = InterpreterSelectWithUnionQuery(create.select->clone(),
getContext(),
SelectQueryOptions().analyze()).getSampleBlock();
}
Block output_block = to_table->getInMemoryMetadataPtr()->getSampleBlock();
ColumnsWithTypeAndName input_columns;
ColumnsWithTypeAndName output_columns;
for (const auto & input_column : input_block)
{
if (const auto * output_column = output_block.findByName(input_column.name))
{
input_columns.push_back(input_column.cloneEmpty());
output_columns.push_back(output_column->cloneEmpty());
}
}
ActionsDAG::makeConvertingActions(
input_columns,
output_columns,
ActionsDAG::MatchColumnsMode::Position
);
}
}
DatabasePtr database;
bool need_add_to_database = !create.temporary;
// In case of an ON CLUSTER query, the database may not be present on the initiator node
if (need_add_to_database)
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
/// Check type compatible for materialized dest table and select columns
if (create.select && create.is_materialized_view && mode <= LoadingStrictnessLevel::CREATE)
validateMaterializedViewColumnsAndEngine(create, properties, database);
bool allow_heavy_populate = getContext()->getSettingsRef().database_replicated_allow_heavy_create && create.is_populate;
if (!allow_heavy_populate && database && database->getEngineName() == "Replicated" && (create.select || create.is_populate))
{

View File

@ -90,6 +90,7 @@ private:
IndicesDescription indices;
ConstraintsDescription constraints;
ProjectionsDescription projections;
bool columns_inferred_from_select_query = false;
};
BlockIO createDatabase(ASTCreateQuery & create);
@ -98,6 +99,7 @@ private:
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create, LoadingStrictnessLevel mode) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
void validateMaterializedViewColumnsAndEngine(const ASTCreateQuery & create, const TableProperties & properties, const DatabasePtr & database);
void setEngine(ASTCreateQuery & create) const;
AccessRightsElements getRequiredAccess() const;

View File

@ -666,6 +666,10 @@ BlockIO InterpreterSystemQuery::execute()
for (const auto & task : getRefreshTasks())
task->run();
break;
case Type::WAIT_VIEW:
for (const auto & task : getRefreshTasks())
task->wait();
break;
case Type::CANCEL_VIEW:
for (const auto & task : getRefreshTasks())
task->cancel();
@ -1409,6 +1413,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
break;
}
case Type::REFRESH_VIEW:
case Type::WAIT_VIEW:
case Type::START_VIEW:
case Type::START_VIEWS:
case Type::STOP_VIEW:

View File

@ -256,6 +256,8 @@ ASTPtr ASTCreateQuery::clone() const
res->set(res->dictionary, dictionary->clone());
}
if (refresh_strategy)
res->set(res->refresh_strategy, refresh_strategy->clone());
if (as_table_function)
res->set(res->as_table_function, as_table_function->clone());
if (comment)

View File

@ -20,7 +20,6 @@ ASTPtr ASTRefreshStrategy::clone() const
res->set(res->settings, settings->clone());
if (dependencies)
res->set(res->dependencies, dependencies->clone());
res->schedule_kind = schedule_kind;
return res;
}
@ -66,6 +65,8 @@ void ASTRefreshStrategy::formatImpl(
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " SETTINGS " << (f_settings.hilite ? hilite_none : "");
settings->formatImpl(f_settings, state, frame);
}
if (append)
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " APPEND" << (f_settings.hilite ? hilite_none : "");
}
}

View File

@ -24,6 +24,7 @@ public:
ASTTimeInterval * offset = nullptr;
ASTTimeInterval * spread = nullptr;
RefreshScheduleKind schedule_kind{RefreshScheduleKind::UNKNOWN};
bool append = false;
String getID(char) const override { return "Refresh strategy definition"; }

View File

@ -141,6 +141,19 @@ public:
QueryKind getQueryKind() const override { return QueryKind::Rename; }
void addElement(const String & from_db, const String & from_table, const String & to_db, const String & to_table)
{
auto identifier = [&](const String & name) -> ASTPtr
{
if (name.empty())
return nullptr;
ASTPtr ast = std::make_shared<ASTIdentifier>(name);
children.push_back(ast);
return ast;
};
elements.push_back(Element {.from = Table {.database = identifier(from_db), .table = identifier(from_table)}, .to = Table {.database = identifier(to_db), .table = identifier(to_table)}});
}
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{

View File

@ -376,6 +376,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::START_VIEW:
case Type::STOP_VIEW:
case Type::CANCEL_VIEW:
case Type::WAIT_VIEW:
{
settings.ostr << ' ';
print_database_table();

View File

@ -95,6 +95,7 @@ public:
START_CLEANUP,
RESET_COVERAGE,
REFRESH_VIEW,
WAIT_VIEW,
START_VIEW,
START_VIEWS,
STOP_VIEW,

View File

@ -96,6 +96,10 @@ bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return false;
refresh->set(refresh->settings, settings);
}
if (ParserKeyword{Keyword::APPEND}.ignore(pos, expected))
refresh->append = true;
node = refresh;
return true;
}

View File

@ -421,6 +421,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
break;
case Type::REFRESH_VIEW:
case Type::WAIT_VIEW:
case Type::START_VIEW:
case Type::STOP_VIEW:
case Type::CANCEL_VIEW:

View File

@ -103,7 +103,7 @@ public:
IStorage(const IStorage &) = delete;
IStorage & operator=(const IStorage &) = delete;
/// The main name of the table type (for example, StorageMergeTree).
/// The main name of the table type (e.g. Memory, MergeTree, CollapsingMergeTree).
virtual std::string getName() const = 0;
/// The name of the table.

View File

@ -10,7 +10,7 @@
namespace DB
{
enum class RefreshState : RefreshTaskStateUnderlying
enum class RefreshState
{
Disabled = 0,
Scheduled,
@ -18,11 +18,11 @@ enum class RefreshState : RefreshTaskStateUnderlying
Running,
};
enum class LastRefreshResult : RefreshTaskStateUnderlying
enum class LastRefreshResult
{
Unknown = 0,
Cancelled,
Exception,
Error,
Finished
};
@ -36,7 +36,8 @@ struct RefreshInfo
UInt64 last_attempt_duration_ms = 0;
UInt32 next_refresh_time = 0;
UInt64 refresh_count = 0;
String exception_message; // if last_refresh_result is Exception
UInt64 retry = 0;
String exception_message; // if last_refresh_result is Error
std::vector<StorageID> remaining_dependencies;
ProgressValues progress;
};

View File

@ -6,8 +6,10 @@ namespace DB
{
#define LIST_OF_REFRESH_SETTINGS(M, ALIAS) \
/// TODO: Add settings
/// M(UInt64, name, 42, "...", 0)
M(Int64, refresh_retries, 0, "How many times to retry refresh query if it fails. If all attempts fail, wait for the next refresh time according to schedule. 0 to disable retries. -1 for infinite retries.", 0) \
M(UInt64, refresh_retry_initial_backoff_ms, 100, "Delay before the first retry if refresh query fails (if refresh_retries setting is not zero). Each subsequent retry doubles the delay, up to refresh_retry_max_backoff_ms.", 0) \
M(UInt64, refresh_retry_max_backoff_ms, 60'000, "Limit on the exponential growth of delay between refresh attempts, if they keep failing and refresh_retries is positive.", 0) \
DECLARE_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS)

View File

@ -1,7 +1,5 @@
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/StorageMaterializedView.h>
#include <Common/CurrentMetrics.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
@ -11,6 +9,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/ReadProgressCallback.h>
#include <Storages/StorageMaterializedView.h>
namespace CurrentMetrics
{
@ -24,37 +23,39 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
extern const int REFRESH_FAILED;
}
RefreshTask::RefreshTask(
const ASTRefreshStrategy & strategy)
StorageMaterializedView * view_, const DB::ASTRefreshStrategy & strategy)
: log(getLogger("RefreshTask"))
, view(view_)
, refresh_schedule(strategy)
{}
, refresh_append(strategy.append)
{
if (strategy.settings != nullptr)
refresh_settings.applyChanges(strategy.settings->changes);
}
RefreshTaskHolder RefreshTask::create(
OwnedRefreshTask RefreshTask::create(
StorageMaterializedView * view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy)
{
auto task = std::make_shared<RefreshTask>(strategy);
auto task = std::make_shared<RefreshTask>(view, strategy);
task->refresh_task = context->getSchedulePool().createTask("MaterializedViewRefresherTask",
[self = task->weak_from_this()]
{
if (auto t = self.lock())
t->refreshTask();
});
task->refresh_task = context->getSchedulePool().createTask("RefreshTask",
[self = task.get()] { self->refreshTask(); });
if (strategy.dependencies)
for (auto && dependency : strategy.dependencies->children)
task->initial_dependencies.emplace_back(dependency->as<const ASTTableIdentifier &>());
return task;
return OwnedRefreshTask(task);
}
void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> view)
void RefreshTask::initializeAndStart()
{
view_to_refresh = view;
if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup)
stop_requested = true;
view->getContext()->getRefreshSet().emplace(view->getStorageID(), initial_dependencies, shared_from_this());
@ -102,7 +103,11 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy
if (arriveDependency(id) && !std::exchange(refresh_immediately, true))
refresh_task->schedule();
/// TODO: Update settings once we have them.
refresh_settings = {};
if (new_strategy.settings != nullptr)
refresh_settings.applyChanges(new_strategy.settings->changes);
refresh_append = new_strategy.append;
}
RefreshInfo RefreshTask::getInfo() const
@ -111,7 +116,7 @@ RefreshInfo RefreshTask::getInfo() const
auto res = info;
res.view_id = set_handle.getID();
res.remaining_dependencies.assign(remaining_dependencies.begin(), remaining_dependencies.end());
if (res.last_refresh_result != LastRefreshResult::Exception)
if (res.last_refresh_result != LastRefreshResult::Error)
res.exception_message.clear();
res.progress = progress.getValues();
return res;
@ -139,6 +144,8 @@ void RefreshTask::run()
std::lock_guard guard(mutex);
if (std::exchange(refresh_immediately, true))
return;
next_refresh_prescribed = std::chrono::floor<std::chrono::seconds>(currentTime());
next_refresh_actual = currentTime();
refresh_task->schedule();
}
@ -149,10 +156,22 @@ void RefreshTask::cancel()
refresh_task->schedule();
}
void RefreshTask::wait()
{
std::unique_lock lock(mutex);
refresh_cv.wait(lock, [&] { return info.state != RefreshState::Running && !refresh_immediately; });
if (info.last_refresh_result == LastRefreshResult::Error)
throw Exception(ErrorCodes::REFRESH_FAILED, "Refresh failed: {}", info.exception_message);
}
void RefreshTask::shutdown()
{
{
std::lock_guard guard(mutex);
if (view == nullptr)
return; // already shut down
stop_requested = true;
interruptExecution();
}
@ -166,6 +185,8 @@ void RefreshTask::shutdown()
/// (Also, RefreshSet holds a shared_ptr to us.)
std::lock_guard guard(mutex);
set_handle.reset();
view = nullptr;
}
void RefreshTask::notify(const StorageID & parent_id, std::chrono::sys_seconds parent_next_prescribed_time)
@ -232,6 +253,7 @@ void RefreshTask::refreshTask()
chassert(lock.owns_lock());
interrupt_execution.store(false);
refresh_cv.notify_all(); // we'll assign info.state before unlocking the mutex
if (stop_requested)
{
@ -243,7 +265,7 @@ void RefreshTask::refreshTask()
if (!refresh_immediately)
{
auto now = currentTime();
if (now >= next_refresh_with_spread)
if (now >= next_refresh_actual)
{
if (arriveTime())
refresh_immediately = true;
@ -256,7 +278,7 @@ void RefreshTask::refreshTask()
else
{
size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
next_refresh_with_spread - now).count();
next_refresh_actual - now).count();
/// If we're in a test that fakes the clock, poll every 100ms.
if (fake_clock.load(std::memory_order_relaxed) != INT64_MIN)
@ -270,19 +292,9 @@ void RefreshTask::refreshTask()
/// Perform a refresh.
bool append = refresh_append;
refresh_immediately = false;
auto view = lockView();
if (!view)
{
/// The view was dropped. This RefreshTask should be destroyed soon too.
/// (Maybe this is unreachable.)
info.state = RefreshState::Disabled;
break;
}
info.state = RefreshState::Running;
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
lock.unlock();
@ -293,19 +305,13 @@ void RefreshTask::refreshTask()
try
{
executeRefreshUnlocked(view);
executeRefreshUnlocked(append);
refreshed = true;
}
catch (...)
{
if (!interrupt_execution.load())
{
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
auto text = message.text;
message.text = fmt::format("Refresh view {} failed: {}", view->getStorageID().getFullTableName(), message.text);
LOG_ERROR(log, message);
exception = text;
}
exception = getCurrentExceptionMessage(true);
}
lock.lock();
@ -317,18 +323,18 @@ void RefreshTask::refreshTask()
if (exception)
{
info.last_refresh_result = LastRefreshResult::Exception;
info.last_refresh_result = LastRefreshResult::Error;
info.exception_message = *exception;
/// TODO: Do a few retries with exponential backoff.
advanceNextRefreshTime(now);
Int64 attempt_number = num_retries + 1;
scheduleRetryOrSkipToNextRefresh(now);
LOG_ERROR(log, "Refresh view {} failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), attempt_number, refresh_settings.refresh_retries + 1, *exception);
}
else if (!refreshed)
{
info.last_refresh_result = LastRefreshResult::Cancelled;
/// Make sure we don't just start another refresh immediately.
if (!stop_requested && now >= next_refresh_with_spread)
if (!stop_requested)
advanceNextRefreshTime(now);
}
else
@ -361,17 +367,18 @@ void RefreshTask::refreshTask()
}
}
void RefreshTask::executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view)
void RefreshTask::executeRefreshUnlocked(bool append)
{
LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName());
progress.reset();
/// Create a table.
auto [refresh_context, refresh_query] = view->prepareRefresh();
StorageID stale_table = StorageID::createEmpty();
ContextMutablePtr refresh_context = view->createRefreshContext();
std::optional<StorageID> table_to_drop;
try
{
/// Create a table.
auto refresh_query = view->prepareRefresh(append, refresh_context, table_to_drop);
/// Run the query.
{
CurrentThread::QueryScope query_scope(refresh_context); // create a thread group for the query
@ -429,37 +436,55 @@ void RefreshTask::executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView
}
/// Exchange tables.
stale_table = view->exchangeTargetTable(refresh_query->table_id, refresh_context);
if (!append)
table_to_drop = view->exchangeTargetTable(refresh_query->table_id, refresh_context);
}
catch (...)
{
try
{
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, refresh_query->table_id, /*sync*/ false, /*ignore_sync_setting*/ true);
}
catch (...)
{
tryLogCurrentException(log, "Failed to drop temporary table after a failed refresh");
/// Let's ignore this and keep going, at risk of accumulating many trash tables if this keeps happening.
}
if (table_to_drop.has_value())
view->dropTempTable(table_to_drop.value(), refresh_context);
throw;
}
/// Drop the old table (outside the try-catch so we don't try to drop the other table if this fails).
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, view->getContext(), refresh_context, stale_table, /*sync*/ true, /*ignore_sync_setting*/ true);
if (table_to_drop.has_value())
view->dropTempTable(table_to_drop.value(), refresh_context);
}
void RefreshTask::advanceNextRefreshTime(std::chrono::system_clock::time_point now)
{
std::chrono::sys_seconds next = refresh_schedule.prescribeNext(next_refresh_prescribed, now);
next_refresh_prescribed = next;
next_refresh_with_spread = refresh_schedule.addRandomSpread(next);
next_refresh_actual = refresh_schedule.addRandomSpread(next);
auto secs = std::chrono::floor<std::chrono::seconds>(next_refresh_with_spread);
num_retries = 0;
info.retry = num_retries;
auto secs = std::chrono::floor<std::chrono::seconds>(next_refresh_actual);
info.next_refresh_time = UInt32(secs.time_since_epoch().count());
}
void RefreshTask::scheduleRetryOrSkipToNextRefresh(std::chrono::system_clock::time_point now)
{
if (refresh_settings.refresh_retries >= 0 && num_retries >= refresh_settings.refresh_retries)
{
advanceNextRefreshTime(now);
return;
}
num_retries += 1;
info.retry = num_retries;
UInt64 delay_ms;
UInt64 multiplier = UInt64(1) << std::min(num_retries - 1, Int64(62));
/// Overflow check: a*b <= c iff a <= c/b iff a <= floor(c/b).
if (refresh_settings.refresh_retry_initial_backoff_ms <= refresh_settings.refresh_retry_max_backoff_ms / multiplier)
delay_ms = refresh_settings.refresh_retry_initial_backoff_ms * multiplier;
else
delay_ms = refresh_settings.refresh_retry_max_backoff_ms;
next_refresh_actual = now + std::chrono::milliseconds(delay_ms);
}
bool RefreshTask::arriveDependency(const StorageID & parent)
{
remaining_dependencies.erase(parent);
@ -500,11 +525,6 @@ void RefreshTask::interruptExecution()
}
}
std::shared_ptr<StorageMaterializedView> RefreshTask::lockView()
{
return std::static_pointer_cast<StorageMaterializedView>(view_to_refresh.lock());
}
std::chrono::system_clock::time_point RefreshTask::currentTime() const
{
Int64 fake = fake_clock.load(std::memory_order::relaxed);

View File

@ -17,19 +17,21 @@ class PipelineExecutor;
class StorageMaterializedView;
class ASTRefreshStrategy;
struct OwnedRefreshTask;
class RefreshTask : public std::enable_shared_from_this<RefreshTask>
{
public:
/// Never call it manually, public for shared_ptr construction only
explicit RefreshTask(const ASTRefreshStrategy & strategy);
RefreshTask(StorageMaterializedView * view_, const ASTRefreshStrategy & strategy);
/// The only proper way to construct task
static RefreshTaskHolder create(
static OwnedRefreshTask create(
StorageMaterializedView * view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy);
void initializeAndStart(std::shared_ptr<StorageMaterializedView> view);
void initializeAndStart(); // called at most once
/// Call when renaming the materialized view.
void rename(StorageID new_id);
@ -51,7 +53,14 @@ public:
/// Cancel task execution
void cancel();
/// Waits for the currently running refresh attempt to complete.
/// If the refresh fails, throws an exception.
/// If no refresh is running, completes immediately, throwing an exception if previous refresh failed.
void wait();
/// Permanently disable task scheduling and remove this table from RefreshSet.
/// Ok to call multiple times, but not in parallel.
/// Ok to call even if initializeAndStart() wasn't called or failed.
void shutdown();
/// Notify dependent task
@ -65,7 +74,7 @@ public:
private:
LoggerPtr log = nullptr;
std::weak_ptr<IStorage> view_to_refresh;
StorageMaterializedView * view;
/// Protects interrupt_execution and running_executor.
/// Can be locked while holding `mutex`.
@ -82,8 +91,10 @@ private:
mutable std::mutex mutex;
RefreshSchedule refresh_schedule;
RefreshSettings refresh_settings; // TODO: populate, use, update on alter
RefreshSettings refresh_settings;
std::vector<StorageID> initial_dependencies;
bool refresh_append;
RefreshSet::Handle set_handle;
/// StorageIDs of our dependencies that we're waiting for.
@ -112,7 +123,8 @@ private:
/// E.g. for REFRESH EVERY 1 DAY, yesterday's refresh of the dependency shouldn't trigger today's
/// refresh of the dependent even if it happened today (e.g. it was slow or had random spread > 1 day).
std::chrono::sys_seconds next_refresh_prescribed;
std::chrono::system_clock::time_point next_refresh_with_spread;
std::chrono::system_clock::time_point next_refresh_actual;
Int64 num_retries = 0;
/// Calls refreshTask() from background thread.
BackgroundSchedulePool::TaskHolder refresh_task;
@ -123,6 +135,7 @@ private:
/// Just for observability.
RefreshInfo info;
Progress progress;
std::condition_variable refresh_cv; // notified when info.state changes
/// The main loop of the refresh task. It examines the state, sees what needs to be
/// done and does it. If there's nothing to do at the moment, returns; it's then scheduled again,
@ -134,11 +147,14 @@ private:
/// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table.
/// Mutex must be unlocked. Called only from refresh_task.
void executeRefreshUnlocked(std::shared_ptr<StorageMaterializedView> view);
void executeRefreshUnlocked(bool append);
/// Assigns next_refresh_*
void advanceNextRefreshTime(std::chrono::system_clock::time_point now);
/// Either advances next_refresh_actual using exponential backoff or does advanceNextRefreshTime().
void scheduleRetryOrSkipToNextRefresh(std::chrono::system_clock::time_point now);
/// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case.
bool arriveDependency(const StorageID & parent);
bool arriveTime();
@ -146,9 +162,24 @@ private:
void interruptExecution();
std::shared_ptr<StorageMaterializedView> lockView();
std::chrono::system_clock::time_point currentTime() const;
};
/// Wrapper around shared_ptr<RefreshTask>, calls shutdown() in destructor.
struct OwnedRefreshTask
{
RefreshTaskHolder ptr;
OwnedRefreshTask() = default;
explicit OwnedRefreshTask(RefreshTaskHolder p) : ptr(std::move(p)) {}
OwnedRefreshTask(OwnedRefreshTask &&) = default;
OwnedRefreshTask & operator=(OwnedRefreshTask &&) = default;
~OwnedRefreshTask() { if (ptr) ptr->shutdown(); }
RefreshTask* operator->() const { return ptr.get(); }
RefreshTask& operator*() const { return *ptr; }
explicit operator bool() const { return ptr != nullptr; }
};
}

View File

@ -8,9 +8,7 @@ namespace DB
class RefreshTask;
using RefreshTaskStateUnderlying = UInt8;
using RefreshTaskHolder = std::shared_ptr<RefreshTask>;
using RefreshTaskObserver = std::weak_ptr<RefreshTask>;
using RefreshTaskList = std::list<RefreshTaskHolder>;
}

View File

@ -3,6 +3,8 @@
#include <Storages/MaterializedView/RefreshTask.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
@ -14,6 +16,7 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/getTableExpressions.h>
@ -146,6 +149,13 @@ StorageMaterializedView::StorageMaterializedView(
if (point_to_itself_by_uuid || point_to_itself_by_name)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Materialized view {} cannot point to itself", table_id_.getFullTableName());
if (query.refresh_strategy)
{
fixed_uuid = false;
refresher = RefreshTask::create(this, getContext(), *query.refresh_strategy);
refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty;
}
if (!has_inner_table)
{
target_table_id = to_table_id;
@ -198,15 +208,6 @@ StorageMaterializedView::StorageMaterializedView(
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID();
}
if (query.refresh_strategy)
{
fixed_uuid = false;
refresher = RefreshTask::create(
getContext(),
*query.refresh_strategy);
refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty;
}
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
@ -377,44 +378,71 @@ bool StorageMaterializedView::optimize(
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
}
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> StorageMaterializedView::prepareRefresh() const
ContextMutablePtr StorageMaterializedView::createRefreshContext() const
{
auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(getContext());
refresh_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY);
/// Generate a random query id.
refresh_context->setCurrentQueryId("");
/// TODO: Set view's definer as the current user in refresh_context, so that the correct user's
/// quotas and permissions apply for this query.
return refresh_context;
}
CurrentThread::QueryScope query_scope(refresh_context);
std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const
{
auto inner_table_id = getTargetTableId();
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
StorageID target_table = inner_table_id;
auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name);
if (!append)
{
CurrentThread::QueryScope query_scope(refresh_context);
auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext());
auto & create_query = create_table_query->as<ASTCreateQuery &>();
create_query.setTable(new_table_name);
create_query.setDatabase(db->getDatabaseName());
create_query.create_or_replace = true;
create_query.replace_table = true;
create_query.uuid = UUIDHelpers::Nil;
auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name);
String db_name = db->getDatabaseName();
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
InterpreterCreateQuery create_interpreter(create_table_query, refresh_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext());
auto & create_query = create_table_query->as<ASTCreateQuery &>();
create_query.setTable(new_table_name);
create_query.setDatabase(db->getDatabaseName());
create_query.create_or_replace = true;
create_query.replace_table = true;
create_query.uuid = UUIDHelpers::Nil;
StorageID fresh_table = DatabaseCatalog::instance().getTable({create_query.getDatabase(), create_query.getTable()}, getContext())->getStorageID();
InterpreterCreateQuery create_interpreter(create_table_query, refresh_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
target_table = DatabaseCatalog::instance().getTable({db_name, new_table_name}, getContext())->getStorageID();
out_temp_table_id = target_table;
}
auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query;
insert_query->setTable(fresh_table.table_name);
insert_query->setDatabase(fresh_table.database_name);
insert_query->table_id = fresh_table;
insert_query->setTable(target_table.table_name);
insert_query->setDatabase(target_table.database_name);
insert_query->table_id = target_table;
return {refresh_context, insert_query};
Block header;
if (refresh_context->getSettingsRef().allow_experimental_analyzer)
header = InterpreterSelectQueryAnalyzer::getSampleBlock(insert_query->select, refresh_context);
else
header = InterpreterSelectWithUnionQuery(insert_query->select, refresh_context, SelectQueryOptions()).getSampleBlock();
auto columns = std::make_shared<ASTExpressionList>(',');
for (const String & name : header.getNames())
columns->children.push_back(std::make_shared<ASTIdentifier>(name));
insert_query->columns = std::move(columns);
return insert_query;
}
StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context)
{
/// Known problem: if the target table was ALTERed during refresh, this will effectively revert
/// the ALTER.
auto stale_table_id = getTargetTableId();
auto db = DatabaseCatalog::instance().getDatabase(stale_table_id.database_name);
@ -422,15 +450,40 @@ StorageID StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, Co
CurrentThread::QueryScope query_scope(refresh_context);
target_db->renameTable(
refresh_context, fresh_table.table_name, *db, stale_table_id.table_name, /*exchange=*/true, /*dictionary=*/false);
auto rename_query = std::make_shared<ASTRenameQuery>();
rename_query->exchange = true;
rename_query->addElement(fresh_table.database_name, fresh_table.table_name, stale_table_id.database_name, stale_table_id.table_name);
InterpreterRenameQuery(rename_query, refresh_context).execute();
std::swap(stale_table_id.database_name, fresh_table.database_name);
std::swap(stale_table_id.table_name, fresh_table.table_name);
setTargetTableId(std::move(fresh_table));
return stale_table_id;
}
void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context)
{
CurrentThread::QueryScope query_scope(refresh_context);
try
{
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->setDatabase(table_id.database_name);
drop_query->setTable(table_id.table_name);
drop_query->kind = ASTDropQuery::Kind::Drop;
drop_query->if_exists = true;
drop_query->sync = false;
InterpreterDropQuery(drop_query, refresh_context).execute();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("StorageMaterializedView"), "Failed to drop temporary table after refresh");
}
}
void StorageMaterializedView::alter(
const AlterCommands & params,
ContextPtr local_context,
@ -530,25 +583,11 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
{
auto new_target_table_name = generateInnerTableName(new_table_id);
ASTRenameQuery::Elements rename_elements;
assert(inner_table_id.database_name == old_table_id.database_name);
ASTRenameQuery::Element elem
{
ASTRenameQuery::Table
{
inner_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(inner_table_id.database_name),
std::make_shared<ASTIdentifier>(inner_table_id.table_name)
},
ASTRenameQuery::Table
{
new_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(new_table_id.database_name),
std::make_shared<ASTIdentifier>(new_target_table_name)
}
};
rename_elements.emplace_back(std::move(elem));
auto rename = std::make_shared<ASTRenameQuery>();
rename->addElement(inner_table_id.database_name, inner_table_id.table_name, new_table_id.database_name, new_target_table_name);
auto rename = std::make_shared<ASTRenameQuery>(std::move(rename_elements));
InterpreterRenameQuery(rename, getContext()).execute();
updateTargetTableId(new_table_id.database_name, new_target_table_name);
}
@ -576,7 +615,7 @@ void StorageMaterializedView::startup()
if (refresher)
{
refresher->initializeAndStart(std::static_pointer_cast<StorageMaterializedView>(shared_from_this()));
refresher->initializeAndStart();
if (refresh_on_start)
refresher->run();

View File

@ -5,7 +5,7 @@
#include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Storages/MaterializedView/RefreshTask.h>
namespace DB
{
@ -106,7 +106,7 @@ private:
/// Will be initialized in constructor
StorageID target_table_id = StorageID::createEmpty();
RefreshTaskHolder refresher;
OwnedRefreshTask refresher;
bool refresh_on_start = false;
bool has_inner_table = false;
@ -119,10 +119,14 @@ private:
void checkStatementCanBeForwarded() const;
/// Prepare to refresh a refreshable materialized view: create query context, create temporary
/// table, form the insert-select query.
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> prepareRefresh() const;
ContextMutablePtr createRefreshContext() const;
/// Prepare to refresh a refreshable materialized view: create temporary table and form the
/// insert-select query.
/// out_temp_table_id may be assigned before throwing an exception, in which case the caller
/// must drop the temp table before rethrowing.
std::shared_ptr<ASTInsertQuery> prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const;
StorageID exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context);
void dropTempTable(StorageID table, ContextMutablePtr refresh_context);
void setTargetTableId(StorageID id);
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);

View File

@ -34,8 +34,9 @@ ColumnsDescription StorageSystemViewRefreshes::getColumnsDescription()
"If status = 'WaitingForDependencies', a refresh is ready to start as soon as these dependencies are fulfilled."
},
{"exception", std::make_shared<DataTypeString>(),
"if last_refresh_result = 'Exception', i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace."
"if last_refresh_result = 'Error', i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace."
},
{"retry", std::make_shared<DataTypeUInt64>(), "How many failed attempts there were so far, for the current refresh."},
{"refresh_count", std::make_shared<DataTypeUInt64>(), "Number of successful refreshes since last server restart or table creation."},
{"progress", std::make_shared<DataTypeFloat64>(), "Progress of the current refresh, between 0 and 1."},
{"elapsed", std::make_shared<DataTypeFloat64>(), "The amount of nanoseconds the current refresh took."},
@ -88,6 +89,7 @@ void StorageSystemViewRefreshes::fillData(
res_columns[i++]->insert(Array(deps));
res_columns[i++]->insert(refresh.exception_message);
res_columns[i++]->insert(refresh.retry);
res_columns[i++]->insert(refresh.refresh_count);
res_columns[i++]->insert(Float64(refresh.progress.read_rows) / refresh.progress.total_rows_to_read);
res_columns[i++]->insert(refresh.progress.elapsed_ns / 1e9);

View File

@ -11,13 +11,13 @@
<create_query>create table mt_4 (n UInt64, s String) engine = MergeTree order by tuple()</create_query>
<create_query>create materialized view mv_1 to mt_1 as
select number, toString(number) from main_table where number % 13 != 0</create_query>
select number as n, toString(number) as s from main_table where number % 13 != 0</create_query>
<create_query>create materialized view mv_2 to mt_2 as
select number, toString(number) from main_table where number % 13 != 1</create_query>
select number as n, toString(number) as s from main_table where number % 13 != 1</create_query>
<create_query>create materialized view mv_3 to mt_3 as
select number, toString(number) from main_table where number % 13 != 3</create_query>
select number as n, toString(number) as s from main_table where number % 13 != 3</create_query>
<create_query>create materialized view mv_4 to mt_4 as
select number, toString(number) from main_table where number % 13 != 4</create_query>
select number as n, toString(number) as s from main_table where number % 13 != 4</create_query>
<fill_query>SYSTEM STOP MERGES main_table</fill_query>
<fill_query>SYSTEM STOP MERGES mt_1</fill_query>

View File

@ -4,7 +4,7 @@ DROP TABLE IF EXISTS mat_view;
CREATE TABLE test1 (a LowCardinality(String)) ENGINE=MergeTree() ORDER BY a;
CREATE TABLE test2 (a UInt64) engine=MergeTree() ORDER BY a;
CREATE MATERIALIZED VIEW test_mv TO test2 AS SELECT toUInt64(a = 'test') FROM test1;
CREATE MATERIALIZED VIEW test_mv TO test2 AS SELECT toUInt64(a = 'test') AS a FROM test1;
DROP TABLE test_mv;
DROP TABLE test1;

View File

@ -35,7 +35,7 @@ arraySort(loading_dependencies_table), arraySort(loading_dependent_table) from s
$CLICKHOUSE_CLIENT -q "select '====='"
$CLICKHOUSE_CLIENT -q "alter table t add column x int default in(1, $CLICKHOUSE_DATABASE.s), drop column y"
$CLICKHOUSE_CLIENT -q "create materialized view mv to s as select n from t where n in (select n from join)"
$CLICKHOUSE_CLIENT -q "create materialized view mv to s as select n as x from t where n in (select n from join)"
$CLICKHOUSE_CLIENT -q "select table, arraySort(dependencies_table),
arraySort(loading_dependencies_table), arraySort(loading_dependent_table) from system.tables where database=currentDatabase() order by table"

View File

@ -3,7 +3,7 @@
CREATE TABLE landing (n Int64) engine=MergeTree order by n;
CREATE TABLE target (n Int64) engine=MergeTree order by n;
CREATE MATERIALIZED VIEW landing_to_target TO target AS
SELECT n + throwIf(n == 3333)
SELECT n + throwIf(n == 3333) AS n
FROM landing;
INSERT INTO landing SELECT * FROM numbers(10000); -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }

View File

@ -0,0 +1,32 @@
<1: created view> a [] 1
CREATE MATERIALIZED VIEW default.a\nREFRESH AFTER 2 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT number AS x\nFROM numbers(2)\nUNION ALL\nSELECT rand64() AS x
<2: refreshed> 3 1 1
<3: time difference at least> 1000
<4: next refresh in> 2
<4.1: fake clock> Scheduled 2050-01-01 00:00:01 2050-01-01 00:00:03
<4.5: altered> Scheduled Finished 2052-01-01 00:00:00
CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 YEAR\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT x * 2 AS x\nFROM default.src
<5: no refresh> 3
<6: refreshed> 2
<7: refreshed> Scheduled Finished 2054-01-01 00:00:00
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR DEPENDS ON default.a\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a
<7.5: created dependent> 2052-11-11 11:11:11
<8: refreshed> 20
<9: refreshed> a Scheduled Finished 2054-01-01 00:00:00
<9: refreshed> b Scheduled Finished 2054-01-01 00:00:00
<9.2: dropping> 0 2
<9.4: dropped> 0 2
<10: creating> a Scheduled [] 2054-01-01 00:00:00
<10: creating> b WaitingForDependencies ['default.a'] 2054-01-01 00:00:00
<11: chain-refreshed a> 4
<12: chain-refreshed b> 40
<13: chain-refreshed> a Scheduled [] Finished 2054-01-01 00:00:01 2056-01-01 00:00:00 1
<13: chain-refreshed> b Scheduled ['default.a'] Finished 2054-01-24 23:22:21 2056-01-01 00:00:00 1
<14: waiting for next cycle> a Scheduled [] 2058-01-01 00:00:00
<14: waiting for next cycle> b WaitingForDependencies ['default.a'] 2060-01-01 00:00:00
<15: chain-refreshed a> 6
<16: chain-refreshed b> 60
<17: chain-refreshed> a Scheduled 2062-01-01 00:00:00
<17: chain-refreshed> b Scheduled 2062-01-01 00:00:00
<18: removed dependency> b Scheduled [] 2062-03-03 03:03:03 2064-01-01 00:00:00 5
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a

View File

@ -0,0 +1,177 @@
#!/usr/bin/env bash
# Tags: atomic-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh
# scheduling is done in UTC.
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`"
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --session_timezone Etc/UTC"`"
$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view"
# Basic refreshing.
$CLICKHOUSE_CLIENT -nq "
create materialized view a
refresh after 2 second
engine Memory
empty
as select number as x from numbers(2) union all select rand64() as x;
select '<1: created view>', view, remaining_dependencies, exception, last_refresh_result in ('Unknown', 'Finished') from refreshes;
show create a;"
# Wait for any refresh. (xargs trims the string and turns \t and \n into spaces)
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" == 'Unknown' ]
do
sleep 0.5
done
start_time="`$CLICKHOUSE_CLIENT -nq "select reinterpret(now64(), 'Int64')"`"
# Check table contents.
$CLICKHOUSE_CLIENT -nq "select '<2: refreshed>', count(), sum(x=0), sum(x=1) from a"
# Wait for table contents to change.
res1="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values'`"
while :
do
res2="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res2" == "$res1" ] || break
sleep 0.5
done
# Wait for another change.
while :
do
res3="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res3" == "$res2" ] || break
sleep 0.5
done
# Check that the two changes were at least 1 second apart, in particular that we're not refreshing
# like crazy. This is potentially flaky, but we need at least one test that uses non-mocked timer
# to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above.
$CLICKHOUSE_CLIENT -nq "
select '<3: time difference at least>', min2(reinterpret(now64(), 'Int64') - $start_time, 1000);
select '<4: next refresh in>', next_refresh_time-last_refresh_time from refreshes;"
# Create a source table from which views will read.
$CLICKHOUSE_CLIENT -nq "
create table src (x Int8) engine Memory as select 1;"
# Switch to fake clock, change refresh schedule, change query.
$CLICKHOUSE_CLIENT -nq "
system test view a set fake time '2050-01-01 00:00:01';
system wait view a;
system refresh view a;
system wait view a;
select '<4.1: fake clock>', status, last_refresh_time, next_refresh_time from refreshes;
alter table a modify refresh every 2 year;
alter table a modify query select x*2 as x from src;
select '<4.5: altered>', status, last_refresh_result, next_refresh_time from refreshes;
show create a;"
# Advance time to trigger the refresh.
$CLICKHOUSE_CLIENT -nq "
select '<5: no refresh>', count() from a;
system test view a set fake time '2052-02-03 04:05:06';"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes -- $LINENO" | xargs`" != '2052-02-03 04:05:06' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<6: refreshed>', * from a;
select '<7: refreshed>', status, last_refresh_result, next_refresh_time from refreshes;"
# Create a dependent view, refresh it once.
$CLICKHOUSE_CLIENT -nq "
create materialized view b refresh every 2 year depends on a (y Int32) engine MergeTree order by y empty as select x*10 as y from a;
show create b;
system test view b set fake time '2052-11-11 11:11:11';
system refresh view b;
system wait view b;
select '<7.5: created dependent>', last_refresh_time from refreshes where view = 'b';"
# Next refresh shouldn't start until the dependency refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<8: refreshed>', * from b;
select '<9: refreshed>', view, status, last_refresh_result, next_refresh_time from refreshes;
system test view b set fake time '2054-01-24 23:22:21';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies 2054-01-01 00:00:00' ]
do
sleep 0.5
done
# Drop the source table, check that refresh fails and doesn't leave a temp table behind.
$CLICKHOUSE_CLIENT -nq "
select '<9.2: dropping>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase();
drop table src;
system refresh view a;"
$CLICKHOUSE_CLIENT -nq "system wait view a;" 2>/dev/null && echo "SYSTEM WAIT VIEW failed to fail at $LINENO"
$CLICKHOUSE_CLIENT -nq "
select '<9.4: dropped>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase();"
# Create the source table again, check that refresh succeeds (in particular that tables are looked
# up by name rather than uuid).
$CLICKHOUSE_CLIENT -nq "
select '<10: creating>', view, status, remaining_dependencies, next_refresh_time from refreshes;
create table src (x Int16) engine Memory as select 2;
system test view a set fake time '2054-01-01 00:00:01';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled' ]
do
sleep 0.5
done
# Both tables should've refreshed.
$CLICKHOUSE_CLIENT -nq "
select '<11: chain-refreshed a>', * from a;
select '<12: chain-refreshed b>', * from b;
select '<13: chain-refreshed>', view, status, remaining_dependencies, last_refresh_result, last_refresh_time, next_refresh_time, exception == '' from refreshes;"
# Make the dependent table run ahead by one refresh cycle, make sure it waits for the dependency to
# catch up to the same cycle.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2059-01-01 00:00:00';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2060-01-01 00:00:00' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2061-01-01 00:00:00';
system test view a set fake time '2057-01-01 00:00:00';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes -- $LINENO" | xargs`" != 'Scheduled 2058-01-01 00:00:00 WaitingForDependencies 2060-01-01 00:00:00' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<14: waiting for next cycle>', view, status, remaining_dependencies, next_refresh_time from refreshes;
truncate src;
insert into src values (3);
system test view a set fake time '2060-02-02 02:02:02';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2062-01-01 00:00:00' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<15: chain-refreshed a>', * from a;
select '<16: chain-refreshed b>', * from b;
select '<17: chain-refreshed>', view, status, next_refresh_time from refreshes;"
# Get to WaitingForDependencies state and remove the depencency.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2062-03-03 03:03:03'"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
alter table b modify refresh every 2 year"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled 2062-03-03 03:03:03' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<18: removed dependency>', view, status, remaining_dependencies, last_refresh_time,next_refresh_time, refresh_count from refreshes where view = 'b';
show create b;"
$CLICKHOUSE_CLIENT -nq "
drop table src;
drop table a;
drop table b;
drop table refreshes;"

View File

@ -0,0 +1,30 @@
<19: exception> 1
<20: unexception> 1
<21: rename> 1
<22: rename> d Finished
<23: simple refresh> 1
<24: rename during refresh> 1
<25: rename during refresh> f Running
<27: cancelled> f Scheduled Cancelled
<28: drop during refresh> 0 0
CREATE MATERIALIZED VIEW default.g\nREFRESH EVERY 1 WEEK OFFSET 3 DAY 4 HOUR RANDOMIZE FOR 4 DAY 1 HOUR\n(\n `x` Int64\n)\nENGINE = Memory\nAS SELECT 42 AS x
<29: randomize> 1 1
CREATE MATERIALIZED VIEW default.h\nREFRESH EVERY 1 SECOND TO default.dest\n(\n `x` Int64\n)\nAS SELECT x * 10 AS x\nFROM default.src
<30: to existing table> 10
<31: to existing table> 10
<31: to existing table> 20
<31.5: will retry> Error 1
<31.6: did retry> 10
<32: empty> i Scheduled Unknown 0
<32: empty> j Scheduled Finished 0
<34: append> 10
<35: append> 10
<35: append> 20
<35: append> 30
<36: not append> 20
<36: not append> 30
<37: append chain> 100
<38: append chain> 100
<38: append chain> 100
<38: append chain> 200
creating MergeTree without ORDER BY failed, as expected

View File

@ -0,0 +1,222 @@
#!/usr/bin/env bash
# Tags: atomic-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh
# scheduling is done in UTC.
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`"
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --allow_materialized_view_with_bad_select=0 --session_timezone Etc/UTC"`"
$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view"
# Select from a table that doesn't exist, get an exception.
$CLICKHOUSE_CLIENT -nq "
create table src (x Int8) engine Memory as select 1;
create materialized view c refresh every 1 second (x Int64) engine Memory empty as select * from src;
drop table src;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes where view = 'c' -- $LINENO" | xargs`" != 'Error' ]
do
sleep 0.5
done
# Check exception, create src, expect successful refresh.
$CLICKHOUSE_CLIENT -nq "
select '<19: exception>', exception ilike '%UNKNOWN_TABLE%' ? '1' : exception from refreshes where view = 'c';
create table src (x Int64) engine Memory as select 1;
system refresh view c;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.5
done
# Rename table.
$CLICKHOUSE_CLIENT -nq "
select '<20: unexception>', * from c;
rename table c to d;
select '<21: rename>', * from d;
select '<22: rename>', view, last_refresh_result from refreshes;"
# Do various things during a refresh.
# First make a nonempty view.
$CLICKHOUSE_CLIENT -nq "
drop table d;
truncate src;
insert into src values (1);
create materialized view e refresh every 1 second (x Int64) engine MergeTree order by x empty as select x + sleepEachRow(1) as x from src settings max_block_size = 1;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.5
done
# Stop refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<23: simple refresh>', * from e;
system stop view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Disabled' ]
do
sleep 0.5
done
# Make refreshes slow, wait for a slow refresh to start. (We stopped refreshes first to make sure
# we wait for a slow refresh, not a previous fast one.)
$CLICKHOUSE_CLIENT -nq "
insert into src select * from numbers(1000) settings max_block_size=1;
system start view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.5
done
# Rename.
$CLICKHOUSE_CLIENT -nq "
rename table e to f;
select '<24: rename during refresh>', * from f;
select '<25: rename during refresh>', view, status from refreshes where view = 'f';
alter table f modify refresh after 10 year;"
# Cancel.
$CLICKHOUSE_CLIENT -nq "
system cancel view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Scheduled' ]
do
sleep 0.5
done
# Check that another refresh doesn't immediately start after the cancelled one.
$CLICKHOUSE_CLIENT -nq "
select '<27: cancelled>', view, status, last_refresh_result from refreshes where view = 'f';
system refresh view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.5
done
# Drop.
$CLICKHOUSE_CLIENT -nq "
drop table f;
select '<28: drop during refresh>', view, status from refreshes;
select '<28: drop during refresh>', countIf(name like '%tmp%'), countIf(name like '%.inner%') from system.tables where database = currentDatabase()"
# Try OFFSET and RANDOMIZE FOR.
$CLICKHOUSE_CLIENT -nq "
create materialized view g refresh every 1 week offset 3 day 4 hour randomize for 4 day 1 hour (x Int64) engine Memory empty as select 42 as x;
show create g;
system test view g set fake time '2050-02-03 15:30:13';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time > '2049-01-01' from refreshes -- $LINENO" | xargs`" != '1' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
with '2050-02-10 04:00:00'::DateTime as expected
select '<29: randomize>', abs(next_refresh_time::Int64 - expected::Int64) <= 3600*(24*4+1), next_refresh_time != expected from refreshes;"
# Send data 'TO' an existing table.
$CLICKHOUSE_CLIENT -nq "
drop table g;
create table dest (x Int64) engine MergeTree order by x;
truncate src;
insert into src values (1);
create materialized view h refresh every 1 second to dest empty as select x*10 as x from src;
show create h;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<30: to existing table>', * from dest;
insert into src values (2);"
while [ "`$CLICKHOUSE_CLIENT -nq "select count() from dest -- $LINENO" | xargs`" != '2' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<31: to existing table>', * from dest;
drop table dest;
drop table h;"
# Retries.
$CLICKHOUSE_CLIENT -nq "
create materialized view h2 refresh after 1 year settings refresh_retries = 10 (x Int64) engine Memory as select x*10 + throwIf(x % 2 == 0) as x from src;"
$CLICKHOUSE_CLIENT -nq "system wait view h2;" 2>/dev/null && echo "SYSTEM WAIT VIEW failed to fail at $LINENO"
$CLICKHOUSE_CLIENT -nq "
select '<31.5: will retry>', last_refresh_result, retry > 0 from refreshes;
create table src2 (x Int8) engine Memory;
insert into src2 values (1);
exchange tables src and src2;
drop table src2;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result, retry from refreshes -- $LINENO" | xargs`" != 'Finished 0' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<31.6: did retry>', x from h2;
drop table h2"
# EMPTY
$CLICKHOUSE_CLIENT -nq "
create materialized view i refresh after 1 year engine Memory empty as select number as x from numbers(2);
create materialized view j refresh after 1 year engine Memory as select number as x from numbers(2);"
while [ "`$CLICKHOUSE_CLIENT -nq "select sum(last_success_time is null) from refreshes -- $LINENO" | xargs`" == '2' ]
do
sleep 0.5
done
$CLICKHOUSE_CLIENT -nq "
select '<32: empty>', view, status, last_refresh_result, retry from refreshes order by view;
drop table i;
drop table j;"
# APPEND
$CLICKHOUSE_CLIENT -nq "
create materialized view k refresh every 10 year append (x Int64) engine Memory empty as select x*10 as x from src;
select '<33: append>', * from k;
system refresh view k;
system wait view k;
select '<34: append>', * from k;
truncate table src;
insert into src values (2), (3);
system refresh view k;
system wait view k;
select '<35: append>', * from k order by x;"
# ALTER to non-APPEND
$CLICKHOUSE_CLIENT -nq "
alter table k modify refresh every 10 year;
system wait view k;
system refresh view k;
system wait view k;
select '<36: not append>', * from k order by x;
drop table k;
truncate table src;"
# APPEND + TO + regular materialized view reading from it.
$CLICKHOUSE_CLIENT -nq "
create table mid (x Int64) engine MergeTree order by x;
create materialized view l refresh every 10 year append to mid empty as select x*10 as x from src;
create materialized view m (x Int64) engine Memory as select x*10 as x from mid;
insert into src values (1);
system refresh view l;
system wait view l;
select '<37: append chain>', * from m;
insert into src values (2);
system refresh view l;
system wait view l;
select '<38: append chain>', * from m order by x;
drop table l;
drop table m;
drop table mid;"
# Failing to create inner table.
$CLICKHOUSE_CLIENT -nq "
create materialized view n refresh every 1 second (x Int64) engine MergeTree as select 1 as x from numbers(2);" 2>/dev/null || echo "creating MergeTree without ORDER BY failed, as expected"
$CLICKHOUSE_CLIENT -nq "
create materialized view n refresh every 1 second (x Int64) engine MergeTree order by x as select 1 as x from numbers(2);
drop table n;"
# Reading from table that doesn't exist yet.
$CLICKHOUSE_CLIENT -nq "
create materialized view o refresh every 1 second (x Int64) engine Memory as select x from nonexist; -- { serverError UNKNOWN_TABLE }
create materialized view o (x Int64) engine Memory as select x from nonexist; -- { serverError UNKNOWN_TABLE }
create materialized view o (x Int64) engine Memory as select x from nope.nonexist; -- { serverError UNKNOWN_DATABASE }
create materialized view o refresh every 1 second (x Int64) engine Memory as select x from nope.nonexist settings allow_materialized_view_with_bad_select = 1;
drop table o;"
$CLICKHOUSE_CLIENT -nq "
drop table refreshes;"

View File

@ -12,15 +12,15 @@ ${CLICKHOUSE_CLIENT} --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine =
# Non-replicated engines are allowed
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test (id UInt64) ENGINE = MergeTree() ORDER BY id AS SELECT 1"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv (id UInt64) ENGINE = MergeTree() ORDER BY id POPULATE AS SELECT 1"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv (id UInt64) ENGINE = MergeTree() ORDER BY id POPULATE AS SELECT 1 AS id"
# Replicated storafes are forbidden
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" |& grep -cm1 "SUPPORT_IS_DISABLED"
# POPULATE is allowed with the special setting
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" --database_replicated_allow_heavy_create=1
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv3 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" --compatibility='24.6'
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" --database_replicated_allow_heavy_create=1
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv3 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1 AS id" --compatibility='24.6'
# AS SELECT is forbidden even with the setting
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" --database_replicated_allow_heavy_create=1 |& grep -cm1 "SUPPORT_IS_DISABLED"

View File

@ -2127,6 +2127,7 @@ namespace
namespaces
natively
nats
ness
nestjs
netloc
newjson