Use SYSTEM SYNC DATABASE REPLICA to make code better.

This commit is contained in:
Vitaly Baranov 2022-05-03 16:59:41 +02:00
parent 828f45f078
commit 484c2c9c4a
2 changed files with 18 additions and 19 deletions

View File

@ -334,12 +334,7 @@ namespace
void getDatabase()
{
database = DatabaseCatalog::instance().getDatabase(table_name.first);
const auto * replicated_db = typeid_cast<DatabaseReplicated *>(database.get());
if (replicated_db)
{
is_replicated_database = true;
replicated_database_zk_path = replicated_db->getZooKeeperPath();
}
replicated_database = typeid_cast<std::shared_ptr<DatabaseReplicated>>(database);
}
void createStorage()
@ -357,9 +352,9 @@ namespace
/// IRestoreCoordination::startCreatingTableInReplicatedDB() and then for other nodes this function returns false which means
/// this table is already being created by some other node.
bool wait_instead_of_creating = false;
if (is_replicated_database)
if (replicated_database)
wait_instead_of_creating = !restore_coordination->startCreatingTableInReplicatedDB(
restore_settings->host_id, table_name.first, replicated_database_zk_path, table_name.second);
restore_settings->host_id, table_name.first, replicated_database->getZooKeeperPath(), table_name.second);
if (wait_instead_of_creating)
{
@ -375,34 +370,39 @@ namespace
}
catch (...)
{
if (is_replicated_database)
if (replicated_database)
{
restore_coordination->finishCreatingTableInReplicatedDB(
restore_settings->host_id,
table_name.first,
replicated_database_zk_path,
replicated_database->getZooKeeperPath(),
table_name.second,
getCurrentExceptionMessage(false));
}
throw;
}
if (is_replicated_database)
if (replicated_database)
restore_coordination->finishCreatingTableInReplicatedDB(
restore_settings->host_id, table_name.first, replicated_database_zk_path, table_name.second);
restore_settings->host_id, table_name.first, replicated_database->getZooKeeperPath(), table_name.second);
}
}
void waitForReplicatedDatabaseToSyncTable()
{
if (!is_replicated_database)
if (!replicated_database)
return;
restore_coordination->waitForCreatingTableInReplicatedDB(table_name.first, replicated_database_zk_path, table_name.second);
restore_coordination->waitForCreatingTableInReplicatedDB(table_name.first, replicated_database->getZooKeeperPath(), table_name.second);
/// The table `table_name` was created on other host, must be in the replicated database's queue,
/// we have to wait until the replicated database syncs that.
bool replicated_database_synced = false;
auto start_time = std::chrono::steady_clock::now();
bool use_timeout = (timeout_for_restoring_metadata.count() > 0);
while (!database->isTableExist(table_name.second, context))
{
if (use_timeout && (std::chrono::steady_clock::now() - start_time) >= timeout_for_restoring_metadata)
if (replicated_database_synced || (use_timeout && (std::chrono::steady_clock::now() - start_time) >= timeout_for_restoring_metadata))
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
@ -412,7 +412,7 @@ namespace
table_name.first,
to_string(timeout_for_restoring_metadata));
}
sleepForMilliseconds(50);
replicated_database_synced = replicated_database->waitForReplicaToProcessAllEntries(50);
}
}
@ -542,8 +542,7 @@ namespace
std::shared_ptr<IRestoreCoordination> restore_coordination;
std::chrono::seconds timeout_for_restoring_metadata;
DatabasePtr database;
bool is_replicated_database = false;
String replicated_database_zk_path;
std::shared_ptr<DatabaseReplicated> replicated_database;
StoragePtr storage;
ASTPtr storage_create_query;
bool has_data = false;

View File

@ -104,7 +104,7 @@ def test_replicated_database():
"CREATE TABLE mydb.tbl(x UInt8, y String) ENGINE=ReplicatedMergeTree ORDER BY x"
)
assert_eq_with_retry(node2, "EXISTS mydb.tbl", "1\n")
node2.query("SYSTEM SYNC DATABASE REPLICA mydb")
node1.query("INSERT INTO mydb.tbl VALUES (1, 'Don''t')")
node2.query("INSERT INTO mydb.tbl VALUES (2, 'count')")