diff --git a/src/Core/MySQL/MySQLReplication.cpp b/src/Core/MySQL/MySQLReplication.cpp index 481ab74028d..dd710b1991e 100644 --- a/src/Core/MySQL/MySQLReplication.cpp +++ b/src/Core/MySQL/MySQLReplication.cpp @@ -788,6 +788,13 @@ namespace MySQLReplication } } + void Position::update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_) + { + binlog_pos = binlog_pos_; + binlog_name = binlog_name_; + gtid_sets.parse(gtid_sets_); + } + void Position::dump(std::ostream & out) const { out << "\n=== Binlog Position ===" << std::endl; diff --git a/src/Core/MySQL/MySQLReplication.h b/src/Core/MySQL/MySQLReplication.h index 5046e42a118..e375bb38c92 100644 --- a/src/Core/MySQL/MySQLReplication.h +++ b/src/Core/MySQL/MySQLReplication.h @@ -473,9 +473,8 @@ namespace MySQLReplication String binlog_name; GTIDSets gtid_sets; - Position() : binlog_pos(0), binlog_name("") { } - Position(UInt64 binlog_pos_, const String & binlog_name_) : binlog_pos(binlog_pos_), binlog_name(binlog_name_) { } void update(BinlogEventPtr event); + void update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_); void dump(std::ostream & out) const; }; diff --git a/src/Databases/MySQL/MaterializeMetadata.cpp b/src/Databases/MySQL/MaterializeMetadata.cpp index 587ccd1aad1..74fd59dc98e 100644 --- a/src/Databases/MySQL/MaterializeMetadata.cpp +++ b/src/Databases/MySQL/MaterializeMetadata.cpp @@ -80,7 +80,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c if (!master_status || master_status.rows() != 1) throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR); - version = 1; + data_version = 1; binlog_file = (*master_status.getByPosition(0).column)[0].safeGet(); binlog_position = (*master_status.getByPosition(1).column)[0].safeGet(); binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet(); @@ -148,11 +148,11 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio WriteBufferFromFile out(persistent_tmp_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT | O_EXCL); /// TSV format metadata file. - writeString("Version:\t1", out); + writeString("Version:\t" + toString(meta_version), out); writeString("\nBinlog File:\t" + binlog_file, out); writeString("\nExecuted GTID:\t" + executed_gtid_set, out); writeString("\nBinlog Position:\t" + toString(binlog_position), out); - writeString("\nData Version:\t" + toString(version), out); + writeString("\nData Version:\t" + toString(data_version), out); out.next(); out.sync(); @@ -170,7 +170,7 @@ MaterializeMetadata::MaterializeMetadata( if (Poco::File(persistent_path).exists()) { ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE); - assertString("Version:\t1", in); + assertString("Version:\t" + toString(meta_version), in); assertString("\nBinlog File:\t", in); readString(binlog_file, in); assertString("\nExecuted GTID:\t", in); @@ -178,7 +178,7 @@ MaterializeMetadata::MaterializeMetadata( assertString("\nBinlog Position:\t", in); readIntText(binlog_position, in); assertString("\nData Version:\t", in); - readIntText(version, in); + readIntText(data_version, in); if (checkBinlogFileExists(connection, mysql_version)) return; diff --git a/src/Databases/MySQL/MaterializeMetadata.h b/src/Databases/MySQL/MaterializeMetadata.h index b50e0d28334..c036ea77940 100644 --- a/src/Databases/MySQL/MaterializeMetadata.h +++ b/src/Databases/MySQL/MaterializeMetadata.h @@ -32,7 +32,8 @@ struct MaterializeMetadata String binlog_ignore_db; String executed_gtid_set; - size_t version = 1; + size_t data_version = 1; + size_t meta_version = 2; std::unordered_map need_dumping_tables; void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection); diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index ebe93f527d3..f8d5e3d61aa 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -315,18 +315,29 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz if (!metadata.need_dumping_tables.empty()) { - metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]() + Position position; + position.update(metadata.binlog_position, metadata.binlog_file, metadata.executed_gtid_set); + + metadata.transaction(position, [&]() { cleanOutdatedTables(database_name, global_context); dumpDataForTables(connection, metadata, query_prefix, database_name, mysql_database_name, global_context, [this] { return isCancelled(); }); }); + + const auto & position_message = [&]() + { + std::stringstream ss; + position.dump(ss); + return ss.str(); + }; + LOG_INFO(log, "MySQL dump database position: \n {}", position_message()); } if (opened_transaction) connection->query("COMMIT").execute(); client.connect(); - client.startBinlogDump(randomNumber(), mysql_database_name, metadata.binlog_file, metadata.binlog_position); + client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set); return metadata; } catch (...) @@ -354,6 +365,14 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata) { metadata.transaction(client.getPosition(), [&]() { buffers.commit(global_context); }); + + const auto & position_message = [&]() + { + std::stringstream ss; + client.getPosition().dump(ss); + return ss.str(); + }; + LOG_INFO(log, "MySQL executed position: \n {}", position_message()); } static inline void fillSignAndVersionColumnsData(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size) @@ -569,21 +588,21 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr { WriteRowsEvent & write_rows_event = static_cast(*receive_event); Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(write_rows_event.table, global_context); - size_t bytes = onWriteOrDeleteData<1>(write_rows_event.rows, buffer->first, ++metadata.version); + size_t bytes = onWriteOrDeleteData<1>(write_rows_event.rows, buffer->first, ++metadata.data_version); buffers.add(buffer->first.rows(), buffer->first.bytes(), write_rows_event.rows.size(), bytes); } else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT) { UpdateRowsEvent & update_rows_event = static_cast(*receive_event); Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(update_rows_event.table, global_context); - size_t bytes = onUpdateData(update_rows_event.rows, buffer->first, ++metadata.version, buffer->second); + size_t bytes = onUpdateData(update_rows_event.rows, buffer->first, ++metadata.data_version, buffer->second); buffers.add(buffer->first.rows(), buffer->first.bytes(), update_rows_event.rows.size(), bytes); } else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT) { DeleteRowsEvent & delete_rows_event = static_cast(*receive_event); Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(delete_rows_event.table, global_context); - size_t bytes = onWriteOrDeleteData<-1>(delete_rows_event.rows, buffer->first, ++metadata.version); + size_t bytes = onWriteOrDeleteData<-1>(delete_rows_event.rows, buffer->first, ++metadata.data_version); buffers.add(buffer->first.rows(), buffer->first.bytes(), delete_rows_event.rows.size(), bytes); } else if (receive_event->type() == MYSQL_QUERY_EVENT)