From 882a773264e0f911d9937a36b3700e94749d2dd2 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Fri, 15 May 2020 14:54:26 +0800 Subject: [PATCH] Support database replicate filter --- src/Core/MySQLClient.cpp | 7 +++-- src/Core/MySQLReplication.cpp | 43 ++++++++++++++++++++++--------- src/Core/MySQLReplication.h | 5 ++++ src/Core/tests/mysql_protocol.cpp | 3 +-- 4 files changed, 40 insertions(+), 18 deletions(-) diff --git a/src/Core/MySQLClient.cpp b/src/Core/MySQLClient.cpp index 4b420068571..3400229855a 100644 --- a/src/Core/MySQLClient.cpp +++ b/src/Core/MySQLClient.cpp @@ -124,13 +124,12 @@ void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String b UInt64 period_ns = (30 * 1e9); writeCommand(Command::COM_QUERY, "SET @master_heartbeat_period = " + std::to_string(period_ns)); - /// Set replication filter to master - /// This requires MySQL version >=5.6, so results are not checked here. - writeCommand(Command::COM_QUERY, "CHANGE REPLICATION FILTER REPLICATE_DO_DB = (" + replicate_db + ")"); - // Register slave. registerSlaveOnMaster(slave_id); + /// Set Filter rule to replication. + replication.setReplicateDatabase(replicate_db); + binlog_pos = binlog_pos < 4 ? 4 : binlog_pos; BinlogDump binlog_dump(binlog_pos, binlog_file_name, slave_id); packet_sender->sendPacket(binlog_dump, true); diff --git a/src/Core/MySQLReplication.cpp b/src/Core/MySQLReplication.cpp index a2e081823b3..decc31f3140 100644 --- a/src/Core/MySQLReplication.cpp +++ b/src/Core/MySQLReplication.cpp @@ -758,15 +758,23 @@ namespace MySQLReplication event->parseEvent(payload); auto query = std::static_pointer_cast(event); - switch (query->typ) + if (query->schema == replicate_do_db) { - case BEGIN: - case XA: { - event = std::make_shared(); - break; + switch (query->typ) + { + case BEGIN: + case XA: { + event = std::make_shared(); + break; + } + default: + position.updateLogPos(event->header.log_pos); } - default: - position.updateLogPos(event->header.log_pos); + } + else + { + event = std::make_shared(); + position.updateLogPos(event->header.log_pos); } break; } @@ -781,28 +789,39 @@ namespace MySQLReplication event = std::make_shared(); event->parseHeader(payload); event->parseEvent(payload); - - table_map = std::static_pointer_cast(event); position.updateLogPos(event->header.log_pos); + table_map = std::static_pointer_cast(event); break; } case WRITE_ROWS_EVENT_V1: case WRITE_ROWS_EVENT_V2: { - event = std::make_shared(table_map); + if (do_replicate()) + event = std::make_shared(table_map); + else + event = std::make_shared(); + event->parseHeader(payload); event->parseEvent(payload); break; } case DELETE_ROWS_EVENT_V1: case DELETE_ROWS_EVENT_V2: { - event = std::make_shared(table_map); + if (do_replicate()) + event = std::make_shared(table_map); + else + event = std::make_shared(); + event->parseHeader(payload); event->parseEvent(payload); break; } case UPDATE_ROWS_EVENT_V1: case UPDATE_ROWS_EVENT_V2: { - event = std::make_shared(table_map); + if (do_replicate()) + event = std::make_shared(table_map); + else + event = std::make_shared(); + event->parseHeader(payload); event->parseEvent(payload); break; diff --git a/src/Core/MySQLReplication.h b/src/Core/MySQLReplication.h index df0eb36e395..4537afb51a3 100644 --- a/src/Core/MySQLReplication.h +++ b/src/Core/MySQLReplication.h @@ -470,6 +470,7 @@ namespace MySQLReplication virtual String getName() const = 0; virtual Position getPosition() const = 0; virtual BinlogEventPtr readOneEvent() = 0; + virtual void setReplicateDatabase(String db) = 0; virtual ~IFlavor() = default; }; @@ -480,11 +481,15 @@ namespace MySQLReplication String getName() const override { return "MySQL"; } Position getPosition() const override { return position; } BinlogEventPtr readOneEvent() override { return event; } + void setReplicateDatabase(String db) override { replicate_do_db = std::move(db); } private: Position position; BinlogEventPtr event; + String replicate_do_db; std::shared_ptr table_map; + + inline bool do_replicate() { return (replicate_do_db.empty() || table_map->schema == replicate_do_db); } }; } diff --git a/src/Core/tests/mysql_protocol.cpp b/src/Core/tests/mysql_protocol.cpp index 931884a1cee..9564bdb544c 100644 --- a/src/Core/tests/mysql_protocol.cpp +++ b/src/Core/tests/mysql_protocol.cpp @@ -171,7 +171,7 @@ int main(int, char **) slave.connect(); /// start to dump binlog. - slave.startBinlogDump(slave_id, "", "", 4); + slave.startBinlogDump(slave_id, "dbtest", "", 4); /// Read one binlog event on by one. while (true) @@ -212,7 +212,6 @@ int main(int, char **) break; } default: - event->dump(); break; } }