add register slave command

This commit is contained in:
BohuTANG 2020-04-23 22:08:01 +08:00 committed by zhang2014
parent 4a21fce889
commit 27e2fe2778
4 changed files with 64 additions and 25 deletions

View File

@ -2,6 +2,7 @@
namespace DB
{
using namespace ReplicationProtocol;
using namespace MySQLProtocol::Authentication;
namespace ErrorCodes
@ -72,6 +73,22 @@ bool MySQLClient::handshake()
return (packet_response.getType() != PACKET_ERR);
}
bool MySQLClient::register_slave(UInt32 server_id)
{
RegisterSlave register_slave(server_id);
packet_sender->sendPacket<RegisterSlave>(register_slave, true);
PacketResponse packet_response(client_capability_flags);
packet_sender->receivePacket(packet_response);
packet_sender->resetSequenceId();
if (packet_response.getType() == PACKET_ERR)
{
last_error = packet_response.err.error_message;
}
return (packet_response.getType() != PACKET_ERR);
}
bool MySQLClient::ping()
{
return writeCommand(Command::COM_PING, "");
@ -82,11 +99,6 @@ bool MySQLClient::initdb(String db)
return writeCommand(Command::COM_INIT_DB, db);
}
bool MySQLClient::query(String q)
{
return writeCommand(Command::COM_QUERY, q);
}
String MySQLClient::error()
{
return last_error;
@ -107,7 +119,7 @@ bool MySQLClient::writeCommand(char command, String query)
last_error = packet_response.err.error_message;
break;
case PACKET_OK:
ret = readColumns(packet_response.column_length);
ret = true;
break;
default:
break;
@ -115,14 +127,4 @@ bool MySQLClient::writeCommand(char command, String query)
packet_sender->resetSequenceId();
return ret;
}
bool MySQLClient::readColumns(int column_length)
{
for (auto i = 0; i < column_length; i++)
{
ColumnDefinition cd;
packet_sender->receivePacket(cd);
}
return true;
}
}

View File

@ -24,7 +24,7 @@ public:
void disconnect();
bool ping();
bool initdb(String db);
bool query(String q);
bool register_slave(UInt32 server_id);
String error();
private:
@ -50,7 +50,6 @@ private:
std::shared_ptr<PacketSender> packet_sender;
bool handshake();
bool readColumns(int column_length);
bool writeCommand(char command, String query);
};

View File

@ -102,6 +102,7 @@ enum Command
COM_TIME = 0xf,
COM_DELAYED_INSERT = 0x10,
COM_CHANGE_USER = 0x11,
COM_REGISTER_SLAVE = 0x15,
COM_RESET_CONNECTION = 0x1f,
COM_DAEMON = 0x1d
};
@ -1434,5 +1435,48 @@ private:
}
namespace ReplicationProtocol
{
/// https://dev.mysql.com/doc/internals/en/com-register-slave.html
class RegisterSlave : public WritePacket
{
public:
UInt8 header = COM_REGISTER_SLAVE;
UInt32 server_id;
String slaves_hostname;
String slaves_users;
String slaves_password;
size_t slaves_mysql_port;
UInt32 replication_rank;
UInt32 master_id;
RegisterSlave(UInt32 server_id_)
: server_id(server_id_)
, slaves_mysql_port(0x00)
, replication_rank(0x00)
, master_id(0x00)
{
}
void writePayloadImpl(WriteBuffer & buffer) const override
{
buffer.write(header);
buffer.write(reinterpret_cast<const char *>(&server_id), 4);
writeLengthEncodedString(slaves_hostname, buffer);
writeLengthEncodedString(slaves_users, buffer);
writeLengthEncodedString(slaves_password, buffer);
buffer.write(reinterpret_cast<const char *>(&slaves_mysql_port), 2);
buffer.write(reinterpret_cast<const char *>(&replication_rank), 4);
buffer.write(reinterpret_cast<const char *>(&master_id), 4);
}
protected:
size_t getPayloadSize() const override
{
return 1 + 4 + getLengthEncodedStringSize(slaves_hostname) + getLengthEncodedStringSize(slaves_users)
+ getLengthEncodedStringSize(slaves_password) + 2 + 4 + 4;
}
};
}
}
}

View File

@ -175,13 +175,7 @@ int main(int, char **)
return 1;
}
if (!client1.initdb("default"))
{
std::cerr << "Connect Error: " << client1.error() << std::endl;
return 1;
}
if (!client1.query("select 1"))
if (!client1.register_slave(123))
{
std::cerr << "Connect Error: " << client1.error() << std::endl;
return 1;