ISSUES-4006 fix restart failure

This commit is contained in:
zhang2014 2020-07-13 17:10:37 +08:00
parent 094ce895da
commit cee16292a6
2 changed files with 13 additions and 12 deletions

View File

@ -82,6 +82,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection)
{
/// TODO: MySQL 5.7
Block header{
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"},
@ -128,11 +129,11 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio
WriteBufferFromFile out(persistent_tmp_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT | O_EXCL);
/// TSV format metadata file.
writeString("Version:\t1\n", out);
writeString("Binlog File:\t" + binlog_file + "\n", out);
writeString("Executed GTID:\t" + executed_gtid_set + "\n", out);
writeString("Binlog Position:\t" + toString(binlog_position) + "\n", out);
writeString("Data Version:\t" + toString(version) + "\n", out);
writeString("Version:\t1", out);
writeString("\nBinlog File:\t" + binlog_file, out);
writeString("\nExecuted GTID:\t" + executed_gtid_set, out);
writeString("\nBinlog Position:\t" + toString(binlog_position), out);
writeString("\nData Version:\t" + toString(version), out);
out.next();
out.sync();
@ -148,14 +149,14 @@ MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & conn
if (Poco::File(persistent_path).exists())
{
ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE);
assertString("Version:\t1\n", in);
assertString("Binlog File:\t", in);
assertString("Version:\t1", in);
assertString("\nBinlog File:\t", in);
readString(binlog_file, in);
assertString("Executed GTID:\t", in);
assertString("\nExecuted GTID:\t", in);
readString(executed_gtid_set, in);
assertString("Binlog Position:\t", in);
assertString("\nBinlog Position:\t", in);
readIntText(binlog_position, in);
assertString("Data Version:\t", in);
assertString("\nData Version:\t", in);
readIntText(version, in);
if (checkBinlogFileExists(connection))

View File

@ -92,6 +92,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
, mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_)
{
/// TODO: 做简单的check, 失败即报错
/// binlog_format = ROW binlog_row_image = FULL
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") ";
startSynchronization();
}
@ -138,7 +139,6 @@ void MaterializeMySQLSyncThread::synchronization()
}
catch (...)
{
/// TODO: set
tryLogCurrentException(log);
getDatabase(database_name).setException(std::current_exception());
}
@ -169,7 +169,7 @@ static inline BlockOutputStreamPtr getTableOutput(const String & database_name,
BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, comment);
if (!res.out)
throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR);
throw Exception("LOGICAL ERROR: It is a bug.", ErrorCodes::LOGICAL_ERROR);
return res.out;
}