ISSUES-4006 adapted GTID replication to Materialize MySQL

This commit is contained in:
BohuTANG 2020-08-17 22:12:41 +08:00
parent c3561cd7e8
commit 8c9236fefd
5 changed files with 39 additions and 13 deletions

View File

@ -788,6 +788,13 @@ namespace MySQLReplication
}
}
void Position::update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_)
{
binlog_pos = binlog_pos_;
binlog_name = binlog_name_;
gtid_sets.parse(gtid_sets_);
}
void Position::dump(std::ostream & out) const
{
out << "\n=== Binlog Position ===" << std::endl;

View File

@ -473,9 +473,8 @@ namespace MySQLReplication
String binlog_name;
GTIDSets gtid_sets;
Position() : binlog_pos(0), binlog_name("") { }
Position(UInt64 binlog_pos_, const String & binlog_name_) : binlog_pos(binlog_pos_), binlog_name(binlog_name_) { }
void update(BinlogEventPtr event);
void update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_);
void dump(std::ostream & out) const;
};

View File

@ -80,7 +80,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
if (!master_status || master_status.rows() != 1)
throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR);
version = 1;
data_version = 1;
binlog_file = (*master_status.getByPosition(0).column)[0].safeGet<String>();
binlog_position = (*master_status.getByPosition(1).column)[0].safeGet<UInt64>();
binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet<String>();
@ -148,11 +148,11 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio
WriteBufferFromFile out(persistent_tmp_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT | O_EXCL);
/// TSV format metadata file.
writeString("Version:\t1", out);
writeString("Version:\t" + toString(meta_version), out);
writeString("\nBinlog File:\t" + binlog_file, out);
writeString("\nExecuted GTID:\t" + executed_gtid_set, out);
writeString("\nBinlog Position:\t" + toString(binlog_position), out);
writeString("\nData Version:\t" + toString(version), out);
writeString("\nData Version:\t" + toString(data_version), out);
out.next();
out.sync();
@ -170,7 +170,7 @@ MaterializeMetadata::MaterializeMetadata(
if (Poco::File(persistent_path).exists())
{
ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE);
assertString("Version:\t1", in);
assertString("Version:\t" + toString(meta_version), in);
assertString("\nBinlog File:\t", in);
readString(binlog_file, in);
assertString("\nExecuted GTID:\t", in);
@ -178,7 +178,7 @@ MaterializeMetadata::MaterializeMetadata(
assertString("\nBinlog Position:\t", in);
readIntText(binlog_position, in);
assertString("\nData Version:\t", in);
readIntText(version, in);
readIntText(data_version, in);
if (checkBinlogFileExists(connection, mysql_version))
return;

View File

@ -32,7 +32,8 @@ struct MaterializeMetadata
String binlog_ignore_db;
String executed_gtid_set;
size_t version = 1;
size_t data_version = 1;
size_t meta_version = 2;
std::unordered_map<String, String> need_dumping_tables;
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);

View File

@ -315,18 +315,29 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
if (!metadata.need_dumping_tables.empty())
{
metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]()
Position position;
position.update(metadata.binlog_position, metadata.binlog_file, metadata.executed_gtid_set);
metadata.transaction(position, [&]()
{
cleanOutdatedTables(database_name, global_context);
dumpDataForTables(connection, metadata, query_prefix, database_name, mysql_database_name, global_context, [this] { return isCancelled(); });
});
const auto & position_message = [&]()
{
std::stringstream ss;
position.dump(ss);
return ss.str();
};
LOG_INFO(log, "MySQL dump database position: \n {}", position_message());
}
if (opened_transaction)
connection->query("COMMIT").execute();
client.connect();
client.startBinlogDump(randomNumber(), mysql_database_name, metadata.binlog_file, metadata.binlog_position);
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set);
return metadata;
}
catch (...)
@ -354,6 +365,14 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata)
{
metadata.transaction(client.getPosition(), [&]() { buffers.commit(global_context); });
const auto & position_message = [&]()
{
std::stringstream ss;
client.getPosition().dump(ss);
return ss.str();
};
LOG_INFO(log, "MySQL executed position: \n {}", position_message());
}
static inline void fillSignAndVersionColumnsData(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size)
@ -569,21 +588,21 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
{
WriteRowsEvent & write_rows_event = static_cast<WriteRowsEvent &>(*receive_event);
Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(write_rows_event.table, global_context);
size_t bytes = onWriteOrDeleteData<1>(write_rows_event.rows, buffer->first, ++metadata.version);
size_t bytes = onWriteOrDeleteData<1>(write_rows_event.rows, buffer->first, ++metadata.data_version);
buffers.add(buffer->first.rows(), buffer->first.bytes(), write_rows_event.rows.size(), bytes);
}
else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT)
{
UpdateRowsEvent & update_rows_event = static_cast<UpdateRowsEvent &>(*receive_event);
Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(update_rows_event.table, global_context);
size_t bytes = onUpdateData(update_rows_event.rows, buffer->first, ++metadata.version, buffer->second);
size_t bytes = onUpdateData(update_rows_event.rows, buffer->first, ++metadata.data_version, buffer->second);
buffers.add(buffer->first.rows(), buffer->first.bytes(), update_rows_event.rows.size(), bytes);
}
else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT)
{
DeleteRowsEvent & delete_rows_event = static_cast<DeleteRowsEvent &>(*receive_event);
Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(delete_rows_event.table, global_context);
size_t bytes = onWriteOrDeleteData<-1>(delete_rows_event.rows, buffer->first, ++metadata.version);
size_t bytes = onWriteOrDeleteData<-1>(delete_rows_event.rows, buffer->first, ++metadata.data_version);
buffers.add(buffer->first.rows(), buffer->first.bytes(), delete_rows_event.rows.size(), bytes);
}
else if (receive_event->type() == MYSQL_QUERY_EVENT)