ISSUES-4006 split replication packets

This commit is contained in:
zhang2014 2020-08-13 20:07:02 +08:00
parent 4061254cb1
commit 34f4c8972e
16 changed files with 207 additions and 193 deletions

View File

@ -5,6 +5,8 @@
#include <Access/User.h>
#include <Access/AccessControlManager.h>
#include <Common/OpenSSLHelpers.h>
#include <ext/scope_guard.h>
namespace DB
@ -15,7 +17,6 @@ namespace ErrorCodes
extern const int OPENSSL_ERROR;
extern const int UNKNOWN_EXCEPTION;
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES;
}
namespace MySQLProtocol

View File

@ -63,6 +63,32 @@ enum ColumnType
MYSQL_TYPE_GEOMETRY = 0xff
};
enum Command
{
COM_SLEEP = 0x0,
COM_QUIT = 0x1,
COM_INIT_DB = 0x2,
COM_QUERY = 0x3,
COM_FIELD_LIST = 0x4,
COM_CREATE_DB = 0x5,
COM_DROP_DB = 0x6,
COM_REFRESH = 0x7,
COM_SHUTDOWN = 0x8,
COM_STATISTICS = 0x9,
COM_PROCESS_INFO = 0xa,
COM_CONNECT = 0xb,
COM_PROCESS_KILL = 0xc,
COM_DEBUG = 0xd,
COM_PING = 0xe,
COM_TIME = 0xf,
COM_DELAYED_INSERT = 0x10,
COM_CHANGE_USER = 0x11,
COM_BINLOG_DUMP = 0x12,
COM_REGISTER_SLAVE = 0x15,
COM_RESET_CONNECTION = 0x1f,
COM_DAEMON = 0x1d
};
class ResultSetRow : public IMySQLWritePacket
{
protected:

View File

@ -0,0 +1,71 @@
#include <Core/MySQL/PacketsReplication.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/MySQL/PacketsProtocolText.h>
namespace DB
{
namespace MySQLProtocol
{
namespace Replication
{
RegisterSlave::RegisterSlave(UInt32 server_id_)
: server_id(server_id_), slaves_mysql_port(0x00), replication_rank(0x00), master_id(0x00)
{
}
size_t RegisterSlave::getPayloadSize() const
{
return 1 + 4 + getLengthEncodedStringSize(slaves_hostname) + getLengthEncodedStringSize(slaves_users)
+ getLengthEncodedStringSize(slaves_password) + 2 + 4 + 4;
}
void RegisterSlave::writePayloadImpl(WriteBuffer & buffer) const
{
buffer.write(ProtocolText::COM_REGISTER_SLAVE);
buffer.write(reinterpret_cast<const char *>(&server_id), 4);
writeLengthEncodedString(slaves_hostname, buffer);
writeLengthEncodedString(slaves_users, buffer);
writeLengthEncodedString(slaves_password, buffer);
buffer.write(reinterpret_cast<const char *>(&slaves_mysql_port), 2);
buffer.write(reinterpret_cast<const char *>(&replication_rank), 4);
buffer.write(reinterpret_cast<const char *>(&master_id), 4);
}
BinlogDump::BinlogDump(UInt32 binlog_pos_, String binlog_file_name_, UInt32 server_id_)
: binlog_pos(binlog_pos_), flags(0x00), server_id(server_id_), binlog_file_name(std::move(binlog_file_name_))
{
}
size_t BinlogDump::getPayloadSize() const
{
return 1 + 4 + 2 + 4 + binlog_file_name.size() + 1;
}
void BinlogDump::writePayloadImpl(WriteBuffer & buffer) const
{
buffer.write(ProtocolText::COM_BINLOG_DUMP);
buffer.write(reinterpret_cast<const char *>(&binlog_pos), 4);
buffer.write(reinterpret_cast<const char *>(&flags), 2);
buffer.write(reinterpret_cast<const char *>(&server_id), 4);
buffer.write(binlog_file_name.data(), binlog_file_name.length());
buffer.write(0x00);
}
}
}
}
namespace DB::MySQLProtocol
{
extern const size_t MAX_PACKET_LENGTH = (1 << 24) - 1; // 16 mb
}

View File

@ -0,0 +1,61 @@
#pragma once
#include <Core/MySQL/PacketPayloadReadBuffer.h>
#include <Core/MySQL/PacketPayloadWriteBuffer.h>
#include <Core/MySQL/PacketEndpoint.h>
/// Implementation of MySQL wire protocol.
/// Works only on little-endian architecture.
namespace DB
{
namespace MySQLProtocol
{
namespace Replication
{
/// https://dev.mysql.com/doc/internals/en/com-register-slave.html
class RegisterSlave : public IMySQLWritePacket
{
public:
UInt32 server_id;
String slaves_hostname;
String slaves_users;
String slaves_password;
size_t slaves_mysql_port;
UInt32 replication_rank;
UInt32 master_id;
protected:
size_t getPayloadSize() const override;
void writePayloadImpl(WriteBuffer & buffer) const override;
public:
RegisterSlave(UInt32 server_id_);
};
/// https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
class BinlogDump : public IMySQLWritePacket
{
public:
UInt32 binlog_pos;
UInt16 flags;
UInt32 server_id;
String binlog_file_name;
protected:
size_t getPayloadSize() const override;
void writePayloadImpl(WriteBuffer & buffer) const override;
public:
BinlogDump(UInt32 binlog_pos_, String binlog_file_name_, UInt32 server_id_);
};
}
}
}

View File

@ -1,10 +1,17 @@
#include "MySQLClient.h"
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/MySQL/PacketsReplication.h>
#include <Core/MySQLReplication.h>
namespace DB
{
using namespace Generic;
using namespace Replication;
using namespace ProtocolText;
using namespace Authentication;
using namespace ConnectionPhase;
@ -67,7 +74,7 @@ void MySQLClient::handshake()
packet_sender->receivePacket(handshake);
if (handshake.auth_plugin_name != mysql_native_password)
{
throw MySQLClientError(
throw Exception(
"Only support " + mysql_native_password + " auth plugin name, but got " + handshake.auth_plugin_name,
ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
@ -84,9 +91,9 @@ void MySQLClient::handshake()
packet_sender->resetSequenceId();
if (packet_response.getType() == PACKET_ERR)
throw MySQLClientError(packet_response.err.error_message, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
throw Exception(packet_response.err.error_message, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
else if (packet_response.getType() == PACKET_AUTH_SWITCH)
throw MySQLClientError("Access denied for user " + user, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
throw Exception("Access denied for user " + user, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
void MySQLClient::writeCommand(char command, String query)
@ -99,7 +106,7 @@ void MySQLClient::writeCommand(char command, String query)
switch (packet_response.getType())
{
case PACKET_ERR:
throw MySQLClientError(packet_response.err.error_message, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
throw Exception(packet_response.err.error_message, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
case PACKET_OK:
break;
default:
@ -117,7 +124,7 @@ void MySQLClient::registerSlaveOnMaster(UInt32 slave_id)
packet_sender->receivePacket(packet_response);
packet_sender->resetSequenceId();
if (packet_response.getType() == PACKET_ERR)
throw MySQLClientError(packet_response.err.error_message, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
throw Exception(packet_response.err.error_message, ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
void MySQLClient::ping()

View File

@ -1,7 +1,6 @@
#pragma once
#include <Core/MySQLProtocol.h>
#include <Core/MySQLReplication.h>
#include <Core/Types.h>
#include <Core/MySQLReplication.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromPocoSocket.h>
@ -11,19 +10,15 @@
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Core/MySQL/IMySQLWritePacket.h>
namespace DB
{
using namespace MySQLProtocol;
using namespace MySQLReplication;
class MySQLClientError : public DB::Exception
{
public:
using Exception::Exception;
};
class MySQLClient
{
public:

View File

@ -1,14 +0,0 @@
#include "MySQLProtocol.h"
#include <IO/WriteBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <common/logger_useful.h>
#include <random>
#include <sstream>
namespace DB::MySQLProtocol
{
extern const size_t MAX_PACKET_LENGTH = (1 << 24) - 1; // 16 mb
}

View File

@ -1,159 +0,0 @@
#pragma once
#include <ext/scope_guard.h>
#include <random>
#include <sstream>
#include <Common/MemoryTracker.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/PODArray.h>
#include <Core/Types.h>
#include <Interpreters/Context.h>
#include <Access/AccessControlManager.h>
#include <Access/User.h>
#include <IO/copyData.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Poco/Net/StreamSocket.h>
#include <Poco/RandomStream.h>
#include <Poco/SHA1Engine.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketPayloadReadBuffer.h>
#include <Core/MySQL/PacketPayloadWriteBuffer.h>
#include <Core/MySQL/PacketEndpoint.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/MySQL/Authentication.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_SSL
# include <openssl/pem.h>
# include <openssl/rsa.h>
#endif
/// Implementation of MySQL wire protocol.
/// Works only on little-endian architecture.
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
extern const int UNKNOWN_PACKET_FROM_CLIENT;
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES;
extern const int OPENSSL_ERROR;
extern const int UNKNOWN_EXCEPTION;
}
namespace MySQLProtocol
{
//const size_t MAX_PACKET_LENGTH = (1 << 24) - 1; // 16 mb
const size_t PACKET_HEADER_SIZE = 4;
const size_t SSL_REQUEST_PAYLOAD_SIZE = 32;
enum Command
{
COM_SLEEP = 0x0,
COM_QUIT = 0x1,
COM_INIT_DB = 0x2,
COM_QUERY = 0x3,
COM_FIELD_LIST = 0x4,
COM_CREATE_DB = 0x5,
COM_DROP_DB = 0x6,
COM_REFRESH = 0x7,
COM_SHUTDOWN = 0x8,
COM_STATISTICS = 0x9,
COM_PROCESS_INFO = 0xa,
COM_CONNECT = 0xb,
COM_PROCESS_KILL = 0xc,
COM_DEBUG = 0xd,
COM_PING = 0xe,
COM_TIME = 0xf,
COM_DELAYED_INSERT = 0x10,
COM_CHANGE_USER = 0x11,
COM_BINLOG_DUMP = 0x12,
COM_REGISTER_SLAVE = 0x15,
COM_RESET_CONNECTION = 0x1f,
COM_DAEMON = 0x1d
};
namespace Replication
{
/// https://dev.mysql.com/doc/internals/en/com-register-slave.html
class RegisterSlave : public IMySQLWritePacket
{
public:
UInt8 header = COM_REGISTER_SLAVE;
UInt32 server_id;
String slaves_hostname;
String slaves_users;
String slaves_password;
size_t slaves_mysql_port;
UInt32 replication_rank;
UInt32 master_id;
RegisterSlave(UInt32 server_id_) : server_id(server_id_), slaves_mysql_port(0x00), replication_rank(0x00), master_id(0x00) { }
void writePayloadImpl(WriteBuffer & buffer) const override
{
buffer.write(header);
buffer.write(reinterpret_cast<const char *>(&server_id), 4);
writeLengthEncodedString(slaves_hostname, buffer);
writeLengthEncodedString(slaves_users, buffer);
writeLengthEncodedString(slaves_password, buffer);
buffer.write(reinterpret_cast<const char *>(&slaves_mysql_port), 2);
buffer.write(reinterpret_cast<const char *>(&replication_rank), 4);
buffer.write(reinterpret_cast<const char *>(&master_id), 4);
}
protected:
size_t getPayloadSize() const override
{
return 1 + 4 + getLengthEncodedStringSize(slaves_hostname) + getLengthEncodedStringSize(slaves_users)
+ getLengthEncodedStringSize(slaves_password) + 2 + 4 + 4;
}
};
/// https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
class BinlogDump : public IMySQLWritePacket
{
public:
UInt8 header = COM_BINLOG_DUMP;
UInt32 binlog_pos;
UInt16 flags;
UInt32 server_id;
String binlog_file_name;
BinlogDump(UInt32 binlog_pos_, String binlog_file_name_, UInt32 server_id_)
: binlog_pos(binlog_pos_), flags(0x00), server_id(server_id_), binlog_file_name(std::move(binlog_file_name_))
{
}
void writePayloadImpl(WriteBuffer & buffer) const override
{
buffer.write(header);
buffer.write(reinterpret_cast<const char *>(&binlog_pos), 4);
buffer.write(reinterpret_cast<const char *>(&flags), 2);
buffer.write(reinterpret_cast<const char *>(&server_id), 4);
buffer.write(binlog_file_name.data(), binlog_file_name.length());
buffer.write(0x00);
}
protected:
size_t getPayloadSize() const override { return 1 + 4 + 2 + 4 + binlog_file_name.size() + 1; }
};
}
}
}

View File

@ -4,6 +4,8 @@
#include <IO/ReadBufferFromString.h>
#include <common/DateLUT.h>
#include <Common/FieldVisitors.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsProtocolText.h>
namespace DB
{

View File

@ -1,6 +1,6 @@
#pragma once
#include <Core/Field.h>
#include <Core/MySQLProtocol.h>
#include <Core/MySQL/PacketsReplication.h>
#include <Core/Types.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>

View File

@ -1,7 +1,11 @@
#include <string>
#include <Core/MySQLClient.h>
#include <Core/MySQLProtocol.h>
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/MySQL/PacketsReplication.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>

View File

@ -6,6 +6,7 @@
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Interpreters/Context.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/MySQL/DatabaseMaterializeTablesIterator.h>
# include <Databases/MySQL/MaterializeMySQLSyncThread.h>

View File

@ -1,5 +1,4 @@
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Core/MySQLProtocol.h>
#include <Interpreters/ProcessList.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>

View File

@ -3,7 +3,10 @@
#include <Processors/Formats/IRowOutputFormat.h>
#include <Core/Block.h>
#include <Core/MySQLProtocol.h>
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -5,16 +5,25 @@
#include <Columns/ColumnVector.h>
#include <Common/NetException.h>
#include <Common/OpenSSLHelpers.h>
#include <Core/MySQLProtocol.h>
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/copyData.h>
#include <Interpreters/executeQuery.h>
#include <IO/copyData.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Storages/IStorage.h>
#include <boost/algorithm/string/replace.hpp>
#include <regex>
#include <Access/User.h>
#include <Access/AccessControlManager.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
@ -25,6 +34,7 @@
# include <Poco/Crypto/RSAKey.h>
# include <Poco/Net/SSLManager.h>
# include <Poco/Net/SecureStreamSocket.h>
#endif
namespace DB
@ -48,6 +58,10 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
static const size_t PACKET_HEADER_SIZE = 4;
static const size_t SSL_REQUEST_PAYLOAD_SIZE = 32;
static String selectEmptyReplacementQuery(const String & query);
static String showTableStatusReplacementQuery(const String & query);
static String killConnectionIdReplacementQuery(const String & query);

View File

@ -3,7 +3,10 @@
#include <Poco/Net/TCPServerConnection.h>
#include <common/getFQDNOrHostName.h>
#include <Common/CurrentMetrics.h>
#include <Core/MySQLProtocol.h>
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include "IServer.h"
#if !defined(ARCADIA_BUILD)