Support database replicate filter

This commit is contained in:
BohuTANG 2020-05-15 14:54:26 +08:00 committed by zhang2014
parent d2aa1f9a47
commit 882a773264
4 changed files with 40 additions and 18 deletions

View File

@ -124,13 +124,12 @@ void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String b
UInt64 period_ns = (30 * 1e9);
writeCommand(Command::COM_QUERY, "SET @master_heartbeat_period = " + std::to_string(period_ns));
/// Set replication filter to master
/// This requires MySQL version >=5.6, so results are not checked here.
writeCommand(Command::COM_QUERY, "CHANGE REPLICATION FILTER REPLICATE_DO_DB = (" + replicate_db + ")");
// Register slave.
registerSlaveOnMaster(slave_id);
/// Set Filter rule to replication.
replication.setReplicateDatabase(replicate_db);
binlog_pos = binlog_pos < 4 ? 4 : binlog_pos;
BinlogDump binlog_dump(binlog_pos, binlog_file_name, slave_id);
packet_sender->sendPacket<BinlogDump>(binlog_dump, true);

View File

@ -758,6 +758,8 @@ namespace MySQLReplication
event->parseEvent(payload);
auto query = std::static_pointer_cast<QueryEvent>(event);
if (query->schema == replicate_do_db)
{
switch (query->typ)
{
case BEGIN:
@ -768,6 +770,12 @@ namespace MySQLReplication
default:
position.updateLogPos(event->header.log_pos);
}
}
else
{
event = std::make_shared<DryRunEvent>();
position.updateLogPos(event->header.log_pos);
}
break;
}
case XID_EVENT: {
@ -781,28 +789,39 @@ namespace MySQLReplication
event = std::make_shared<TableMapEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
table_map = std::static_pointer_cast<TableMapEvent>(event);
position.updateLogPos(event->header.log_pos);
table_map = std::static_pointer_cast<TableMapEvent>(event);
break;
}
case WRITE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT_V2: {
if (do_replicate())
event = std::make_shared<WriteRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
break;
}
case DELETE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V2: {
if (do_replicate())
event = std::make_shared<DeleteRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
break;
}
case UPDATE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V2: {
if (do_replicate())
event = std::make_shared<UpdateRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
break;

View File

@ -470,6 +470,7 @@ namespace MySQLReplication
virtual String getName() const = 0;
virtual Position getPosition() const = 0;
virtual BinlogEventPtr readOneEvent() = 0;
virtual void setReplicateDatabase(String db) = 0;
virtual ~IFlavor() = default;
};
@ -480,11 +481,15 @@ namespace MySQLReplication
String getName() const override { return "MySQL"; }
Position getPosition() const override { return position; }
BinlogEventPtr readOneEvent() override { return event; }
void setReplicateDatabase(String db) override { replicate_do_db = std::move(db); }
private:
Position position;
BinlogEventPtr event;
String replicate_do_db;
std::shared_ptr<TableMapEvent> table_map;
inline bool do_replicate() { return (replicate_do_db.empty() || table_map->schema == replicate_do_db); }
};
}

View File

@ -171,7 +171,7 @@ int main(int, char **)
slave.connect();
/// start to dump binlog.
slave.startBinlogDump(slave_id, "", "", 4);
slave.startBinlogDump(slave_id, "dbtest", "", 4);
/// Read one binlog event on by one.
while (true)
@ -212,7 +212,6 @@ int main(int, char **)
break;
}
default:
event->dump();
break;
}
}