diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 12cff3407d3..74933b149e6 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -557,12 +557,14 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node auto ast = parseQuery(parser, query, description, 0, global_context.getSettingsRef().max_parser_depth); auto & create = ast->as(); - if (create.uuid == UUIDHelpers::Nil || create.table != TABLE_WITH_UUID_NAME_PLACEHOLDER || ! create.database.empty()) + if (create.uuid == UUIDHelpers::Nil || create.table != TABLE_WITH_UUID_NAME_PLACEHOLDER || !create.database.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected query from {}: {}", node_name, query); + bool is_materialized_view_with_inner_table = create.is_materialized_view && create.to_table_id.empty(); + create.database = getDatabaseName(); create.table = unescapeForFileName(node_name); - create.attach = false; + create.attach = is_materialized_view_with_inner_table; return ast; } @@ -598,7 +600,7 @@ void DatabaseReplicated::shutdown() void DatabaseReplicated::dropTable(const Context & context, const String & table_name, bool no_delay) { auto txn = context.getZooKeeperMetadataTransaction(); - assert(!ddl_worker->isCurrentlyActive() || txn); + assert(!ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id.")); if (txn && txn->isInitialQuery()) { String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index d1af86e7b11..7c7403f678b 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -712,6 +712,19 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const } } +static void generateUUIDForTable(ASTCreateQuery & create) +{ + if (create.uuid == UUIDHelpers::Nil) + create.uuid = UUIDHelpers::generateV4(); + + /// If destination table (to_table_id) is not specified for materialized view, + /// then MV will create inner table. We should generate UUID of inner table here, + /// so it will be the same on all hosts if query in ON CLUSTER or database engine is Replicated. + bool need_uuid_for_inner_table = create.is_materialized_view && !create.to_table_id; + if (need_uuid_for_inner_table && create.to_inner_uuid == UUIDHelpers::Nil) + create.to_inner_uuid = UUIDHelpers::generateV4(); +} + void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const DatabasePtr & database) const { const auto * kind = create.is_dictionary ? "Dictionary" : "Table"; @@ -743,18 +756,19 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data kind_upper, create.table); } - if (create.uuid == UUIDHelpers::Nil) - create.uuid = UUIDHelpers::generateV4(); + generateUUIDForTable(create); } else { bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY; - if (create.uuid != UUIDHelpers::Nil && !is_on_cluster) + bool has_uuid = create.uuid != UUIDHelpers::Nil || create.to_inner_uuid != UUIDHelpers::Nil; + if (has_uuid && !is_on_cluster) throw Exception(ErrorCodes::INCORRECT_QUERY, "{} UUID specified, but engine of database {} is not Atomic", kind, create.database); /// Ignore UUID if it's ON CLUSTER query create.uuid = UUIDHelpers::Nil; + create.to_inner_uuid = UUIDHelpers::Nil; } if (create.replace_table) @@ -1134,8 +1148,7 @@ void InterpreterCreateQuery::prepareOnClusterQuery(ASTCreateQuery & create, cons /// For CREATE query generate UUID on initiator, so it will be the same on all hosts. /// It will be ignored if database does not support UUIDs. - if (create.uuid == UUIDHelpers::Nil) - create.uuid = UUIDHelpers::generateV4(); + generateUUIDForTable(create); /// For cross-replication cluster we cannot use UUID in replica path. String cluster_name_expanded = context.getMacros()->expand(cluster_name); diff --git a/src/Interpreters/InterpreterShowCreateQuery.cpp b/src/Interpreters/InterpreterShowCreateQuery.cpp index 10c8339c135..a853c4b9a9b 100644 --- a/src/Interpreters/InterpreterShowCreateQuery.cpp +++ b/src/Interpreters/InterpreterShowCreateQuery.cpp @@ -82,6 +82,7 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl() { auto & create = create_query->as(); create.uuid = UUIDHelpers::Nil; + create.to_inner_uuid = UUIDHelpers::Nil; } WriteBufferFromOwnString buf; diff --git a/src/Parsers/ASTCreateQuery.cpp b/src/Parsers/ASTCreateQuery.cpp index 2af0d2d4a45..1192fcc6ebd 100644 --- a/src/Parsers/ASTCreateQuery.cpp +++ b/src/Parsers/ASTCreateQuery.cpp @@ -297,12 +297,20 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat if (to_table_id) { + assert(is_materialized_view && to_inner_uuid == UUIDHelpers::Nil); settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO " << (settings.hilite ? hilite_none : "") << (!to_table_id.database_name.empty() ? backQuoteIfNeed(to_table_id.database_name) + "." : "") << backQuoteIfNeed(to_table_id.table_name); } + if (to_inner_uuid != UUIDHelpers::Nil) + { + assert(is_materialized_view && !to_table_id); + settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO INNER UUID " << (settings.hilite ? hilite_none : "") + << quoteString(toString(to_inner_uuid)); + } + if (!as_table.empty()) { settings.ostr diff --git a/src/Parsers/ASTCreateQuery.h b/src/Parsers/ASTCreateQuery.h index c9a6251cb94..d6d5c22240c 100644 --- a/src/Parsers/ASTCreateQuery.h +++ b/src/Parsers/ASTCreateQuery.h @@ -66,6 +66,7 @@ public: ASTExpressionList * tables = nullptr; StorageID to_table_id = StorageID::createEmpty(); /// For CREATE MATERIALIZED VIEW mv TO table. + UUID to_inner_uuid = UUIDHelpers::Nil; /// For materialized view with inner table ASTStorage * storage = nullptr; String as_database; String as_table; diff --git a/src/Parsers/ParserCreateQuery.cpp b/src/Parsers/ParserCreateQuery.cpp index 4cef79fdf42..bfd51b7633d 100644 --- a/src/Parsers/ParserCreateQuery.cpp +++ b/src/Parsers/ParserCreateQuery.cpp @@ -780,6 +780,7 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec ASTPtr table; ASTPtr to_table; + ASTPtr to_inner_uuid; ASTPtr columns_list; ASTPtr storage; ASTPtr as_database; @@ -830,9 +831,16 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec return false; } - // TO [db.]table - if (ParserKeyword{"TO"}.ignore(pos, expected)) + + if (ParserKeyword{"TO INNER UUID"}.ignore(pos, expected)) { + ParserLiteral literal_p; + if (!literal_p.parse(pos, to_inner_uuid, expected)) + return false; + } + else if (ParserKeyword{"TO"}.ignore(pos, expected)) + { + // TO [db.]table if (!table_name_p.parse(pos, to_table, expected)) return false; } @@ -883,6 +891,8 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec if (to_table) query->to_table_id = getTableIdentifier(to_table); + if (to_inner_uuid) + query->to_inner_uuid = parseFromString(to_inner_uuid->as()->value.get()); query->set(query->columns_list, columns_list); query->set(query->storage, storage); diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index d2cb418ce97..f915f76bbfc 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -84,7 +84,7 @@ StorageMaterializedView::StorageMaterializedView( else if (attach_) { /// If there is an ATTACH request, then the internal table must already be created. - target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID())); + target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid); } else { @@ -93,6 +93,7 @@ StorageMaterializedView::StorageMaterializedView( auto manual_create_query = std::make_shared(); manual_create_query->database = getStorageID().database_name; manual_create_query->table = generateInnerTableName(getStorageID()); + manual_create_query->uuid = query.to_inner_uuid; auto new_columns_list = std::make_shared(); new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr()); diff --git a/src/Storages/System/StorageSystemTables.cpp b/src/Storages/System/StorageSystemTables.cpp index 132ed234323..fffe909715f 100644 --- a/src/Storages/System/StorageSystemTables.cpp +++ b/src/Storages/System/StorageSystemTables.cpp @@ -359,6 +359,7 @@ protected: { auto & create = ast->as(); create.uuid = UUIDHelpers::Nil; + create.to_inner_uuid = UUIDHelpers::Nil; } if (columns_mask[src_index++]) diff --git a/tests/integration/test_replicated_database/configs/settings.xml b/tests/integration/test_replicated_database/configs/settings.xml index e0f7e8691e6..7f45502e20d 100644 --- a/tests/integration/test_replicated_database/configs/settings.xml +++ b/tests/integration/test_replicated_database/configs/settings.xml @@ -2,6 +2,7 @@ 1 + 1 diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index 99e7d6077f8..487f2b5a8c1 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -198,8 +198,14 @@ def test_recover_staled_replica(started_cluster): dummy_node.query("CREATE TABLE recover.rmt2 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings) main_node.query("CREATE TABLE recover.rmt3 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings) dummy_node.query("CREATE TABLE recover.rmt5 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings) - main_node.query("CREATE DICTIONARY recover.d1 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())") - dummy_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt2' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())") + main_node.query("CREATE MATERIALIZED VIEW recover.mv1 (n int) ENGINE=ReplicatedMergeTree order by n AS SELECT n FROM recover.rmt1", settings=settings) + dummy_node.query("CREATE MATERIALIZED VIEW recover.mv2 (n int) ENGINE=ReplicatedMergeTree order by n AS SELECT n FROM recover.rmt2", settings=settings) + main_node.query("CREATE DICTIONARY recover.d1 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n " + "SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) " + "LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())") + dummy_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n " + "SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt2' PASSWORD '' DB 'recover')) " + "LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())") for table in ['t1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt3', 'rmt5']: main_node.query("INSERT INTO recover.{} VALUES (42)".format(table)) @@ -217,35 +223,44 @@ def test_recover_staled_replica(started_cluster): main_node.query("RENAME TABLE recover.rmt3 TO recover.rmt4", settings=settings) main_node.query("DROP TABLE recover.rmt5", settings=settings) main_node.query("DROP DICTIONARY recover.d2", settings=settings) - main_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());", settings=settings) + main_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n " + "SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) " + "LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());", settings=settings) + + inner_table = ".inner_id." + dummy_node.query("SELECT uuid FROM system.tables WHERE database='recover' AND name='mv1'").strip() + main_node.query("ALTER TABLE recover.`{}` MODIFY COLUMN n int DEFAULT 42".format(inner_table), settings=settings) + main_node.query("ALTER TABLE recover.mv1 MODIFY QUERY SELECT m FROM recover.rmt1".format(inner_table), settings=settings) + main_node.query("RENAME TABLE recover.mv2 TO recover.mv3".format(inner_table), settings=settings) main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings) main_node.query("DROP TABLE recover.tmp", settings=settings) main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings) main_node.query("DROP TABLE recover.tmp", settings=settings) main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings) - main_node.query("DROP TABLE recover.tmp", settings=settings) - main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings) - assert main_node.query("SELECT name FROM system.tables WHERE database='recover' ORDER BY name") == "d1\nd2\nm1\nmt1\nmt2\nrmt1\nrmt2\nrmt4\nt2\ntmp\n" - query = "SELECT name, uuid, create_table_query FROM system.tables WHERE database='recover' ORDER BY name" + assert main_node.query("SELECT name FROM system.tables WHERE database='recover' AND name NOT LIKE '.inner_id.%' ORDER BY name") == \ + "d1\nd2\nm1\nmt1\nmt2\nmv1\nmv3\nrmt1\nrmt2\nrmt4\nt2\ntmp\n" + query = "SELECT name, uuid, create_table_query FROM system.tables WHERE database='recover' AND name NOT LIKE '.inner_id.%' " \ + "ORDER BY name SETTINGS show_table_uuid_in_table_create_query_if_not_nil=1" expected = main_node.query(query) assert_eq_with_retry(dummy_node, query, expected) + assert main_node.query("SELECT count() FROM system.tables WHERE database='recover' AND name LIKE '.inner_id.%'") == "2\n" + assert dummy_node.query("SELECT count() FROM system.tables WHERE database='recover' AND name LIKE '.inner_id.%'") == "2\n" - for table in ['m1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2']: + for table in ['m1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mv1', 'mv3']: assert main_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n" - for table in ['t2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mt2']: + for table in ['t2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mt2', 'mv1', 'mv3']: assert dummy_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n" for table in ['m1', 'mt1']: assert dummy_node.query("SELECT count() FROM recover.{}".format(table)) == "0\n" assert dummy_node.query("SELECT count() FROM system.tables WHERE database='recover_broken_tables'") == "2\n" - table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'mt1_26_%'").strip() + table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'mt1_29_%'").strip() assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n" - table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'rmt5_26_%'").strip() + table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'rmt5_29_%'").strip() assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n" - expected = "Cleaned 4 outdated objects: dropped 1 dictionaries and 1 tables, moved 2 tables" + expected = "Cleaned 6 outdated objects: dropped 1 dictionaries and 3 tables, moved 2 tables" assert_logs_contain(dummy_node, expected) dummy_node.query("DROP TABLE recover.tmp")