From 27e2fe2778b809e117682f130fb36fa704dbcedf Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 23 Apr 2020 22:08:01 +0800 Subject: [PATCH] add register slave command --- src/Core/MySQLClient.cpp | 34 +++++++++++++----------- src/Core/MySQLClient.h | 3 +-- src/Core/MySQLProtocol.h | 44 +++++++++++++++++++++++++++++++ src/Core/tests/mysql_protocol.cpp | 8 +----- 4 files changed, 64 insertions(+), 25 deletions(-) diff --git a/src/Core/MySQLClient.cpp b/src/Core/MySQLClient.cpp index 4ea75206ddb..76421172c42 100644 --- a/src/Core/MySQLClient.cpp +++ b/src/Core/MySQLClient.cpp @@ -2,6 +2,7 @@ namespace DB { +using namespace ReplicationProtocol; using namespace MySQLProtocol::Authentication; namespace ErrorCodes @@ -72,6 +73,22 @@ bool MySQLClient::handshake() return (packet_response.getType() != PACKET_ERR); } + +bool MySQLClient::register_slave(UInt32 server_id) +{ + RegisterSlave register_slave(server_id); + packet_sender->sendPacket(register_slave, true); + + PacketResponse packet_response(client_capability_flags); + packet_sender->receivePacket(packet_response); + packet_sender->resetSequenceId(); + if (packet_response.getType() == PACKET_ERR) + { + last_error = packet_response.err.error_message; + } + return (packet_response.getType() != PACKET_ERR); +} + bool MySQLClient::ping() { return writeCommand(Command::COM_PING, ""); @@ -82,11 +99,6 @@ bool MySQLClient::initdb(String db) return writeCommand(Command::COM_INIT_DB, db); } -bool MySQLClient::query(String q) -{ - return writeCommand(Command::COM_QUERY, q); -} - String MySQLClient::error() { return last_error; @@ -107,7 +119,7 @@ bool MySQLClient::writeCommand(char command, String query) last_error = packet_response.err.error_message; break; case PACKET_OK: - ret = readColumns(packet_response.column_length); + ret = true; break; default: break; @@ -115,14 +127,4 @@ bool MySQLClient::writeCommand(char command, String query) packet_sender->resetSequenceId(); return ret; } - -bool MySQLClient::readColumns(int column_length) -{ - for (auto i = 0; i < column_length; i++) - { - ColumnDefinition cd; - packet_sender->receivePacket(cd); - } - return true; -} } diff --git a/src/Core/MySQLClient.h b/src/Core/MySQLClient.h index 43edfbe963d..16b4b01c4ae 100644 --- a/src/Core/MySQLClient.h +++ b/src/Core/MySQLClient.h @@ -24,7 +24,7 @@ public: void disconnect(); bool ping(); bool initdb(String db); - bool query(String q); + bool register_slave(UInt32 server_id); String error(); private: @@ -50,7 +50,6 @@ private: std::shared_ptr packet_sender; bool handshake(); - bool readColumns(int column_length); bool writeCommand(char command, String query); }; diff --git a/src/Core/MySQLProtocol.h b/src/Core/MySQLProtocol.h index 237929ed068..982273f6c14 100644 --- a/src/Core/MySQLProtocol.h +++ b/src/Core/MySQLProtocol.h @@ -102,6 +102,7 @@ enum Command COM_TIME = 0xf, COM_DELAYED_INSERT = 0x10, COM_CHANGE_USER = 0x11, + COM_REGISTER_SLAVE = 0x15, COM_RESET_CONNECTION = 0x1f, COM_DAEMON = 0x1d }; @@ -1434,5 +1435,48 @@ private: } +namespace ReplicationProtocol +{ + /// https://dev.mysql.com/doc/internals/en/com-register-slave.html + class RegisterSlave : public WritePacket + { + public: + UInt8 header = COM_REGISTER_SLAVE; + UInt32 server_id; + String slaves_hostname; + String slaves_users; + String slaves_password; + size_t slaves_mysql_port; + UInt32 replication_rank; + UInt32 master_id; + + RegisterSlave(UInt32 server_id_) + : server_id(server_id_) + , slaves_mysql_port(0x00) + , replication_rank(0x00) + , master_id(0x00) + { + } + + void writePayloadImpl(WriteBuffer & buffer) const override + { + buffer.write(header); + buffer.write(reinterpret_cast(&server_id), 4); + writeLengthEncodedString(slaves_hostname, buffer); + writeLengthEncodedString(slaves_users, buffer); + writeLengthEncodedString(slaves_password, buffer); + buffer.write(reinterpret_cast(&slaves_mysql_port), 2); + buffer.write(reinterpret_cast(&replication_rank), 4); + buffer.write(reinterpret_cast(&master_id), 4); + } + + protected: + size_t getPayloadSize() const override + { + return 1 + 4 + getLengthEncodedStringSize(slaves_hostname) + getLengthEncodedStringSize(slaves_users) + + getLengthEncodedStringSize(slaves_password) + 2 + 4 + 4; + } + }; +} } } diff --git a/src/Core/tests/mysql_protocol.cpp b/src/Core/tests/mysql_protocol.cpp index 68f9f68d8d3..fd4c12d68c8 100644 --- a/src/Core/tests/mysql_protocol.cpp +++ b/src/Core/tests/mysql_protocol.cpp @@ -175,13 +175,7 @@ int main(int, char **) return 1; } - if (!client1.initdb("default")) - { - std::cerr << "Connect Error: " << client1.error() << std::endl; - return 1; - } - - if (!client1.query("select 1")) + if (!client1.register_slave(123)) { std::cerr << "Connect Error: " << client1.error() << std::endl; return 1;