From cee16292a689bb60ec1e20cd86c164bf12f613ce Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 13 Jul 2020 17:10:37 +0800 Subject: [PATCH] ISSUES-4006 fix restart failure --- src/Databases/MySQL/MaterializeMetadata.cpp | 21 ++++++++++--------- .../MySQL/MaterializeMySQLSyncThread.cpp | 4 ++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/Databases/MySQL/MaterializeMetadata.cpp b/src/Databases/MySQL/MaterializeMetadata.cpp index 73f7b1c3870..d85b671dfd7 100644 --- a/src/Databases/MySQL/MaterializeMetadata.cpp +++ b/src/Databases/MySQL/MaterializeMetadata.cpp @@ -82,6 +82,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) { + /// TODO: MySQL 5.7 Block header{ {std::make_shared(), "Log_name"}, {std::make_shared(), "File_size"}, @@ -128,11 +129,11 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio WriteBufferFromFile out(persistent_tmp_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT | O_EXCL); /// TSV format metadata file. - writeString("Version:\t1\n", out); - writeString("Binlog File:\t" + binlog_file + "\n", out); - writeString("Executed GTID:\t" + executed_gtid_set + "\n", out); - writeString("Binlog Position:\t" + toString(binlog_position) + "\n", out); - writeString("Data Version:\t" + toString(version) + "\n", out); + writeString("Version:\t1", out); + writeString("\nBinlog File:\t" + binlog_file, out); + writeString("\nExecuted GTID:\t" + executed_gtid_set, out); + writeString("\nBinlog Position:\t" + toString(binlog_position), out); + writeString("\nData Version:\t" + toString(version), out); out.next(); out.sync(); @@ -148,14 +149,14 @@ MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & conn if (Poco::File(persistent_path).exists()) { ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE); - assertString("Version:\t1\n", in); - assertString("Binlog File:\t", in); + assertString("Version:\t1", in); + assertString("\nBinlog File:\t", in); readString(binlog_file, in); - assertString("Executed GTID:\t", in); + assertString("\nExecuted GTID:\t", in); readString(executed_gtid_set, in); - assertString("Binlog Position:\t", in); + assertString("\nBinlog Position:\t", in); readIntText(binlog_position, in); - assertString("Data Version:\t", in); + assertString("\nData Version:\t", in); readIntText(version, in); if (checkBinlogFileExists(connection)) diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 2f648256514..35cd9113b5e 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -92,6 +92,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( , mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_) { /// TODO: 做简单的check, 失败即报错 + /// binlog_format = ROW binlog_row_image = FULL query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") "; startSynchronization(); } @@ -138,7 +139,6 @@ void MaterializeMySQLSyncThread::synchronization() } catch (...) { - /// TODO: set tryLogCurrentException(log); getDatabase(database_name).setException(std::current_exception()); } @@ -169,7 +169,7 @@ static inline BlockOutputStreamPtr getTableOutput(const String & database_name, BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, comment); if (!res.out) - throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR); + throw Exception("LOGICAL ERROR: It is a bug.", ErrorCodes::LOGICAL_ERROR); return res.out; }