fix MV recovery in Replicated db

This commit is contained in:
Alexander Tokmakov 2021-03-08 20:26:38 +03:00
parent e8987f799e
commit 2484781070
10 changed files with 76 additions and 23 deletions

View File

@ -557,12 +557,14 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
auto ast = parseQuery(parser, query, description, 0, global_context.getSettingsRef().max_parser_depth); auto ast = parseQuery(parser, query, description, 0, global_context.getSettingsRef().max_parser_depth);
auto & create = ast->as<ASTCreateQuery &>(); auto & create = ast->as<ASTCreateQuery &>();
if (create.uuid == UUIDHelpers::Nil || create.table != TABLE_WITH_UUID_NAME_PLACEHOLDER || ! create.database.empty()) if (create.uuid == UUIDHelpers::Nil || create.table != TABLE_WITH_UUID_NAME_PLACEHOLDER || !create.database.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected query from {}: {}", node_name, query); throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected query from {}: {}", node_name, query);
bool is_materialized_view_with_inner_table = create.is_materialized_view && create.to_table_id.empty();
create.database = getDatabaseName(); create.database = getDatabaseName();
create.table = unescapeForFileName(node_name); create.table = unescapeForFileName(node_name);
create.attach = false; create.attach = is_materialized_view_with_inner_table;
return ast; return ast;
} }
@ -598,7 +600,7 @@ void DatabaseReplicated::shutdown()
void DatabaseReplicated::dropTable(const Context & context, const String & table_name, bool no_delay) void DatabaseReplicated::dropTable(const Context & context, const String & table_name, bool no_delay)
{ {
auto txn = context.getZooKeeperMetadataTransaction(); auto txn = context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn); assert(!ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
if (txn && txn->isInitialQuery()) if (txn && txn->isInitialQuery())
{ {
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name); String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);

View File

@ -712,6 +712,19 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
} }
} }
static void generateUUIDForTable(ASTCreateQuery & create)
{
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
/// If destination table (to_table_id) is not specified for materialized view,
/// then MV will create inner table. We should generate UUID of inner table here,
/// so it will be the same on all hosts if query in ON CLUSTER or database engine is Replicated.
bool need_uuid_for_inner_table = create.is_materialized_view && !create.to_table_id;
if (need_uuid_for_inner_table && create.to_inner_uuid == UUIDHelpers::Nil)
create.to_inner_uuid = UUIDHelpers::generateV4();
}
void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const DatabasePtr & database) const void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const DatabasePtr & database) const
{ {
const auto * kind = create.is_dictionary ? "Dictionary" : "Table"; const auto * kind = create.is_dictionary ? "Dictionary" : "Table";
@ -743,18 +756,19 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
kind_upper, create.table); kind_upper, create.table);
} }
if (create.uuid == UUIDHelpers::Nil) generateUUIDForTable(create);
create.uuid = UUIDHelpers::generateV4();
} }
else else
{ {
bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY; bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
if (create.uuid != UUIDHelpers::Nil && !is_on_cluster) bool has_uuid = create.uuid != UUIDHelpers::Nil || create.to_inner_uuid != UUIDHelpers::Nil;
if (has_uuid && !is_on_cluster)
throw Exception(ErrorCodes::INCORRECT_QUERY, throw Exception(ErrorCodes::INCORRECT_QUERY,
"{} UUID specified, but engine of database {} is not Atomic", kind, create.database); "{} UUID specified, but engine of database {} is not Atomic", kind, create.database);
/// Ignore UUID if it's ON CLUSTER query /// Ignore UUID if it's ON CLUSTER query
create.uuid = UUIDHelpers::Nil; create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
} }
if (create.replace_table) if (create.replace_table)
@ -1134,8 +1148,7 @@ void InterpreterCreateQuery::prepareOnClusterQuery(ASTCreateQuery & create, cons
/// For CREATE query generate UUID on initiator, so it will be the same on all hosts. /// For CREATE query generate UUID on initiator, so it will be the same on all hosts.
/// It will be ignored if database does not support UUIDs. /// It will be ignored if database does not support UUIDs.
if (create.uuid == UUIDHelpers::Nil) generateUUIDForTable(create);
create.uuid = UUIDHelpers::generateV4();
/// For cross-replication cluster we cannot use UUID in replica path. /// For cross-replication cluster we cannot use UUID in replica path.
String cluster_name_expanded = context.getMacros()->expand(cluster_name); String cluster_name_expanded = context.getMacros()->expand(cluster_name);

View File

@ -82,6 +82,7 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
{ {
auto & create = create_query->as<ASTCreateQuery &>(); auto & create = create_query->as<ASTCreateQuery &>();
create.uuid = UUIDHelpers::Nil; create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
} }
WriteBufferFromOwnString buf; WriteBufferFromOwnString buf;

View File

@ -297,12 +297,20 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (to_table_id) if (to_table_id)
{ {
assert(is_materialized_view && to_inner_uuid == UUIDHelpers::Nil);
settings.ostr settings.ostr
<< (settings.hilite ? hilite_keyword : "") << " TO " << (settings.hilite ? hilite_none : "") << (settings.hilite ? hilite_keyword : "") << " TO " << (settings.hilite ? hilite_none : "")
<< (!to_table_id.database_name.empty() ? backQuoteIfNeed(to_table_id.database_name) + "." : "") << (!to_table_id.database_name.empty() ? backQuoteIfNeed(to_table_id.database_name) + "." : "")
<< backQuoteIfNeed(to_table_id.table_name); << backQuoteIfNeed(to_table_id.table_name);
} }
if (to_inner_uuid != UUIDHelpers::Nil)
{
assert(is_materialized_view && !to_table_id);
settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO INNER UUID " << (settings.hilite ? hilite_none : "")
<< quoteString(toString(to_inner_uuid));
}
if (!as_table.empty()) if (!as_table.empty())
{ {
settings.ostr settings.ostr

View File

@ -66,6 +66,7 @@ public:
ASTExpressionList * tables = nullptr; ASTExpressionList * tables = nullptr;
StorageID to_table_id = StorageID::createEmpty(); /// For CREATE MATERIALIZED VIEW mv TO table. StorageID to_table_id = StorageID::createEmpty(); /// For CREATE MATERIALIZED VIEW mv TO table.
UUID to_inner_uuid = UUIDHelpers::Nil; /// For materialized view with inner table
ASTStorage * storage = nullptr; ASTStorage * storage = nullptr;
String as_database; String as_database;
String as_table; String as_table;

View File

@ -780,6 +780,7 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ASTPtr table; ASTPtr table;
ASTPtr to_table; ASTPtr to_table;
ASTPtr to_inner_uuid;
ASTPtr columns_list; ASTPtr columns_list;
ASTPtr storage; ASTPtr storage;
ASTPtr as_database; ASTPtr as_database;
@ -830,9 +831,16 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return false; return false;
} }
// TO [db.]table
if (ParserKeyword{"TO"}.ignore(pos, expected)) if (ParserKeyword{"TO INNER UUID"}.ignore(pos, expected))
{ {
ParserLiteral literal_p;
if (!literal_p.parse(pos, to_inner_uuid, expected))
return false;
}
else if (ParserKeyword{"TO"}.ignore(pos, expected))
{
// TO [db.]table
if (!table_name_p.parse(pos, to_table, expected)) if (!table_name_p.parse(pos, to_table, expected))
return false; return false;
} }
@ -883,6 +891,8 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
if (to_table) if (to_table)
query->to_table_id = getTableIdentifier(to_table); query->to_table_id = getTableIdentifier(to_table);
if (to_inner_uuid)
query->to_inner_uuid = parseFromString<UUID>(to_inner_uuid->as<ASTLiteral>()->value.get<String>());
query->set(query->columns_list, columns_list); query->set(query->columns_list, columns_list);
query->set(query->storage, storage); query->set(query->storage, storage);

View File

@ -84,7 +84,7 @@ StorageMaterializedView::StorageMaterializedView(
else if (attach_) else if (attach_)
{ {
/// If there is an ATTACH request, then the internal table must already be created. /// If there is an ATTACH request, then the internal table must already be created.
target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID())); target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);
} }
else else
{ {
@ -93,6 +93,7 @@ StorageMaterializedView::StorageMaterializedView(
auto manual_create_query = std::make_shared<ASTCreateQuery>(); auto manual_create_query = std::make_shared<ASTCreateQuery>();
manual_create_query->database = getStorageID().database_name; manual_create_query->database = getStorageID().database_name;
manual_create_query->table = generateInnerTableName(getStorageID()); manual_create_query->table = generateInnerTableName(getStorageID());
manual_create_query->uuid = query.to_inner_uuid;
auto new_columns_list = std::make_shared<ASTColumns>(); auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr()); new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());

View File

@ -359,6 +359,7 @@ protected:
{ {
auto & create = ast->as<ASTCreateQuery &>(); auto & create = ast->as<ASTCreateQuery &>();
create.uuid = UUIDHelpers::Nil; create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
} }
if (columns_mask[src_index++]) if (columns_mask[src_index++])

View File

@ -2,6 +2,7 @@
<profiles> <profiles>
<default> <default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated> <allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_experimental_alter_materialized_view_structure>1</allow_experimental_alter_materialized_view_structure>
</default> </default>
</profiles> </profiles>
<users> <users>

View File

@ -198,8 +198,14 @@ def test_recover_staled_replica(started_cluster):
dummy_node.query("CREATE TABLE recover.rmt2 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings) dummy_node.query("CREATE TABLE recover.rmt2 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings)
main_node.query("CREATE TABLE recover.rmt3 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings) main_node.query("CREATE TABLE recover.rmt3 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings)
dummy_node.query("CREATE TABLE recover.rmt5 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings) dummy_node.query("CREATE TABLE recover.rmt5 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings)
main_node.query("CREATE DICTIONARY recover.d1 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())") main_node.query("CREATE MATERIALIZED VIEW recover.mv1 (n int) ENGINE=ReplicatedMergeTree order by n AS SELECT n FROM recover.rmt1", settings=settings)
dummy_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt2' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())") dummy_node.query("CREATE MATERIALIZED VIEW recover.mv2 (n int) ENGINE=ReplicatedMergeTree order by n AS SELECT n FROM recover.rmt2", settings=settings)
main_node.query("CREATE DICTIONARY recover.d1 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())")
dummy_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt2' PASSWORD '' DB 'recover')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())")
for table in ['t1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt3', 'rmt5']: for table in ['t1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt3', 'rmt5']:
main_node.query("INSERT INTO recover.{} VALUES (42)".format(table)) main_node.query("INSERT INTO recover.{} VALUES (42)".format(table))
@ -217,35 +223,44 @@ def test_recover_staled_replica(started_cluster):
main_node.query("RENAME TABLE recover.rmt3 TO recover.rmt4", settings=settings) main_node.query("RENAME TABLE recover.rmt3 TO recover.rmt4", settings=settings)
main_node.query("DROP TABLE recover.rmt5", settings=settings) main_node.query("DROP TABLE recover.rmt5", settings=settings)
main_node.query("DROP DICTIONARY recover.d2", settings=settings) main_node.query("DROP DICTIONARY recover.d2", settings=settings)
main_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());", settings=settings) main_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());", settings=settings)
inner_table = ".inner_id." + dummy_node.query("SELECT uuid FROM system.tables WHERE database='recover' AND name='mv1'").strip()
main_node.query("ALTER TABLE recover.`{}` MODIFY COLUMN n int DEFAULT 42".format(inner_table), settings=settings)
main_node.query("ALTER TABLE recover.mv1 MODIFY QUERY SELECT m FROM recover.rmt1".format(inner_table), settings=settings)
main_node.query("RENAME TABLE recover.mv2 TO recover.mv3".format(inner_table), settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings) main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
main_node.query("DROP TABLE recover.tmp", settings=settings) main_node.query("DROP TABLE recover.tmp", settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings) main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
main_node.query("DROP TABLE recover.tmp", settings=settings) main_node.query("DROP TABLE recover.tmp", settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings) main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
main_node.query("DROP TABLE recover.tmp", settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
assert main_node.query("SELECT name FROM system.tables WHERE database='recover' ORDER BY name") == "d1\nd2\nm1\nmt1\nmt2\nrmt1\nrmt2\nrmt4\nt2\ntmp\n" assert main_node.query("SELECT name FROM system.tables WHERE database='recover' AND name NOT LIKE '.inner_id.%' ORDER BY name") == \
query = "SELECT name, uuid, create_table_query FROM system.tables WHERE database='recover' ORDER BY name" "d1\nd2\nm1\nmt1\nmt2\nmv1\nmv3\nrmt1\nrmt2\nrmt4\nt2\ntmp\n"
query = "SELECT name, uuid, create_table_query FROM system.tables WHERE database='recover' AND name NOT LIKE '.inner_id.%' " \
"ORDER BY name SETTINGS show_table_uuid_in_table_create_query_if_not_nil=1"
expected = main_node.query(query) expected = main_node.query(query)
assert_eq_with_retry(dummy_node, query, expected) assert_eq_with_retry(dummy_node, query, expected)
assert main_node.query("SELECT count() FROM system.tables WHERE database='recover' AND name LIKE '.inner_id.%'") == "2\n"
assert dummy_node.query("SELECT count() FROM system.tables WHERE database='recover' AND name LIKE '.inner_id.%'") == "2\n"
for table in ['m1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2']: for table in ['m1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mv1', 'mv3']:
assert main_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n" assert main_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n"
for table in ['t2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mt2']: for table in ['t2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mt2', 'mv1', 'mv3']:
assert dummy_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n" assert dummy_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n"
for table in ['m1', 'mt1']: for table in ['m1', 'mt1']:
assert dummy_node.query("SELECT count() FROM recover.{}".format(table)) == "0\n" assert dummy_node.query("SELECT count() FROM recover.{}".format(table)) == "0\n"
assert dummy_node.query("SELECT count() FROM system.tables WHERE database='recover_broken_tables'") == "2\n" assert dummy_node.query("SELECT count() FROM system.tables WHERE database='recover_broken_tables'") == "2\n"
table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'mt1_26_%'").strip() table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'mt1_29_%'").strip()
assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n" assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n"
table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'rmt5_26_%'").strip() table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'rmt5_29_%'").strip()
assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n" assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n"
expected = "Cleaned 4 outdated objects: dropped 1 dictionaries and 1 tables, moved 2 tables" expected = "Cleaned 6 outdated objects: dropped 1 dictionaries and 3 tables, moved 2 tables"
assert_logs_contain(dummy_node, expected) assert_logs_contain(dummy_node, expected)
dummy_node.query("DROP TABLE recover.tmp") dummy_node.query("DROP TABLE recover.tmp")