Support DATETIME/TIMESTAMP type

This commit is contained in:
BohuTANG 2020-05-11 12:15:06 +08:00 committed by zhang2014
parent a36ef50f92
commit 81998cb1af
5 changed files with 610 additions and 323 deletions

View File

@ -126,21 +126,30 @@ bool MySQLClient::ping()
return writeCommand(Command::COM_PING, "");
}
bool MySQLClient::startBinlogDump(UInt32 slave_id, String binlog_file_name, UInt64 binlog_pos)
bool MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos)
{
/// Set binlog checksum to CRC32.
String checksum = "CRC32";
if (!writeCommand(Command::COM_QUERY, "SET @master_binlog_checksum = '" + checksum + "'"))
{
return false;
}
/// 30s.
/// Set heartbeat 30s.
UInt64 period_ns = (30 * 1e9);
if (!writeCommand(Command::COM_QUERY, "SET @master_heartbeat_period = " + std::to_string(period_ns)))
{
return false;
}
/// Set replication filter to master
/// This requires MySQL version >=5.6, so results are not checked here.
writeCommand(Command::COM_QUERY, "CHANGE REPLICATION FILTER REPLICATE_DO_DB = (" + replicate_db + ")");
/// Set Filter rule to replication.
replication.setReplicateDatabase(replicate_db);
// Register slave.
if (!registerSlaveOnMaster(slave_id))
{
return false;

View File

@ -32,8 +32,9 @@ public:
void disconnect();
bool ping();
String error();
bool startBinlogDump(UInt32 slave_id, String binlog_file_name, UInt64 binlog_pos);
BinlogEventPtr readOneBinlogEvent() ;
bool startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos);
BinlogEventPtr readOneBinlogEvent();
Position getPosition() const { return replication.getPosition(); }
private:

View File

@ -14,118 +14,6 @@ namespace MySQLReplication
{
using namespace MySQLProtocol;
String ToString(BinlogChecksumAlg type)
{
switch (type)
{
case BINLOG_CHECKSUM_ALG_OFF:
return "BINLOG_CHECKSUM_ALG_OFF";
case BINLOG_CHECKSUM_ALG_CRC32:
return "BINLOG_CHECKSUM_ALG_CRC32";
case BINLOG_CHECKSUM_ALG_ENUM_END:
return "BINLOG_CHECKSUM_ALG_ENUM_END";
case BINLOG_CHECKSUM_ALG_UNDEF:
return "BINLOG_CHECKSUM_ALG_UNDEF";
}
return std::string("Unknown checksum alg: ") + std::to_string(static_cast<int>(type));
}
String ToString(EventType type)
{
switch (type)
{
case START_EVENT_V3:
return "StartEventV3";
case QUERY_EVENT:
return "QueryEvent";
case STOP_EVENT:
return "StopEvent";
case ROTATE_EVENT:
return "RotateEvent";
case INT_VAR_EVENT:
return "IntVarEvent";
case LOAD_EVENT:
return "LoadEvent";
case SLAVE_EVENT:
return "SlaveEvent";
case CREATE_FILE_EVENT:
return "CreateFileEvent";
case APPEND_BLOCK_EVENT:
return "AppendBlockEvent";
case EXEC_LOAD_EVENT:
return "ExecLoadEvent";
case DELETE_FILE_EVENT:
return "DeleteFileEvent";
case NEW_LOAD_EVENT:
return "NewLoadEvent";
case RAND_EVENT:
return "RandEvent";
case USER_VAR_EVENT:
return "UserVarEvent";
case FORMAT_DESCRIPTION_EVENT:
return "FormatDescriptionEvent";
case XID_EVENT:
return "XIDEvent";
case BEGIN_LOAD_QUERY_EVENT:
return "BeginLoadQueryEvent";
case EXECUTE_LOAD_QUERY_EVENT:
return "ExecuteLoadQueryEvent";
case TABLE_MAP_EVENT:
return "TableMapEvent";
case WRITE_ROWS_EVENT_V0:
return "WriteRowsEventV0";
case UPDATE_ROWS_EVENT_V0:
return "UpdateRowsEventV0";
case DELETE_ROWS_EVENT_V0:
return "DeleteRowsEventV0";
case WRITE_ROWS_EVENT_V1:
return "WriteRowsEventV1";
case UPDATE_ROWS_EVENT_V1:
return "UpdateRowsEventV1";
case DELETE_ROWS_EVENT_V1:
return "DeleteRowsEventV1";
case INCIDENT_EVENT:
return "IncidentEvent";
case HEARTBEAT_EVENT:
return "HeartbeatEvent";
case IGNORABLE_EVENT:
return "IgnorableEvent";
case ROWS_QUERY_EVENT:
return "RowsQueryEvent";
case WRITE_ROWS_EVENT_V2:
return "WriteRowsEventV2";
case UPDATE_ROWS_EVENT_V2:
return "UpdateRowsEventV2";
case DELETE_ROWS_EVENT_V2:
return "DeleteRowsEventV2";
case GTID_EVENT:
return "GTIDEvent";
case ANONYMOUS_GTID_EVENT:
return "AnonymousGTIDEvent";
case PREVIOUS_GTIDS_EVENT:
return "PreviousGTIDsEvent";
case TRANSACTION_CONTEXT_EVENT:
return "TransactionContextEvent";
case VIEW_CHANGE_EVENT:
return "ViewChangeEvent";
case XA_PREPARE_LOG_EVENT:
return "XAPrepareLogEvent";
case MARIA_ANNOTATE_ROWS_EVENT:
return "MariaAnnotateRowsEvent";
case MARIA_BINLOG_CHECKPOINT_EVENT:
return "MariaBinlogCheckpointEvent";
case MARIA_GTID_EVENT:
return "MariaGTIDEvent";
case MARIA_GTID_LIST_EVENT:
return "MariaGTIDListEvent";
case MARIA_START_ENCRYPTION_EVENT:
return "MariaStartEncryptionEvent";
default:
break;
}
return std::string("Unknown event: ") + std::to_string(static_cast<int>(type));
}
/// https://dev.mysql.com/doc/internals/en/binlog-event-header.html
void EventHeader::parse(ReadBuffer & payload)
{
@ -139,7 +27,7 @@ namespace MySQLReplication
void EventHeader::print() const
{
std::cerr << "\n=== " << ToString(this->type) << " ===" << std::endl;
std::cerr << "\n=== " << to_string(this->type) << " ===" << std::endl;
std::cerr << "Timestamp: " << this->timestamp << std::endl;
std::cerr << "Event Type: " << this->type << std::endl;
std::cerr << "Server ID: " << this->server_id << std::endl;
@ -353,19 +241,16 @@ namespace MySQLReplication
payload.ignore(extra_data_len - 2);
number_columns = readLengthEncodedNumber(payload);
size_t columns_bitmap_size = (number_columns + 8) / 7;
size_t columns_bitmap_size = (number_columns + 7) / 8;
switch (header.type)
{
case UPDATE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V2:
columns_present_bitmap1.resize(columns_bitmap_size);
columns_present_bitmap2.resize(columns_bitmap_size);
payload.readStrict(reinterpret_cast<char *>(columns_present_bitmap1.data()), columns_bitmap_size);
payload.readStrict(reinterpret_cast<char *>(columns_present_bitmap2.data()), columns_bitmap_size);
readBitmap(payload, columns_present_bitmap1, columns_bitmap_size);
readBitmap(payload, columns_present_bitmap2, columns_bitmap_size);
break;
default:
columns_present_bitmap1.resize(columns_bitmap_size);
payload.readStrict(reinterpret_cast<char *>(columns_present_bitmap1.data()), columns_bitmap_size);
readBitmap(payload, columns_present_bitmap1, columns_bitmap_size);
break;
}
@ -379,186 +264,339 @@ namespace MySQLReplication
}
}
void RowsEvent::parseRow(ReadBuffer & payload, String bitmap)
void RowsEvent::parseRow(ReadBuffer & payload, Bitmap & bitmap)
{
Tuple row;
UInt32 field_type = 0;
UInt32 field_len = 0;
UInt32 null_index = 0;
size_t columns_null_bitmap_size = (number_columns + 8) / 7;
String columns_null_bitmap;
columns_null_bitmap.resize(columns_null_bitmap_size);
payload.readStrict(reinterpret_cast<char *>(columns_null_bitmap.data()), columns_null_bitmap_size);
UInt32 re_count = 0;
for (auto i = 0U; i < number_columns; i++)
{
if (bitmap[i])
re_count++;
}
re_count = (re_count + 7) / 8;
boost::dynamic_bitset<> columns_null_set;
readBitmap(payload, columns_null_set, re_count);
Tuple row;
for (auto i = 0U; i < number_columns; i++)
{
/// Column not presents.
if (!check_string_bit(bitmap, i))
{
if (!bitmap[i])
continue;
}
/// NULL column.
if (check_string_bit(columns_null_bitmap, i))
if (columns_null_set[null_index])
{
row.push_back(Field{Null{}});
continue;
}
field_type = table_map->column_type[i];
auto meta = table_map->column_meta[i];
if (field_type == MYSQL_TYPE_STRING)
else
{
if (meta >= 256)
field_type = table_map->column_type[i];
auto meta = table_map->column_meta[i];
if (field_type == MYSQL_TYPE_STRING)
{
UInt32 byte0 = meta >> 8;
UInt32 byte1 = meta & 0xff;
if ((byte0 & 0x30) != 0x30)
if (meta >= 256)
{
field_len = byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
field_type = byte0 | 0x30;
}
else
{
switch (byte0)
UInt32 byte0 = meta >> 8;
UInt32 byte1 = meta & 0xff;
if ((byte0 & 0x30) != 0x30)
{
case MYSQL_TYPE_SET:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_STRING:
field_type = byte0;
field_len = byte1;
break;
default:
throw ReplicationError("ParseRow: Illegal event", ErrorCodes::UNKNOWN_EXCEPTION);
field_len = byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
field_type = byte0 | 0x30;
}
else
{
switch (byte0)
{
case MYSQL_TYPE_SET:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_STRING:
field_type = byte0;
field_len = byte1;
break;
default:
throw ReplicationError("ParseRow: Unhandled binlog event", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
}
else
{
field_len = meta;
}
}
else
switch (field_type)
{
field_len = meta;
}
}
case MYSQL_TYPE_TINY: {
UInt8 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 1);
row.push_back(Field{UInt8{val}});
break;
}
case MYSQL_TYPE_SHORT: {
UInt16 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 2);
row.push_back(Field{UInt16{val}});
break;
}
case MYSQL_TYPE_INT24: {
Int32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 3);
row.push_back(Field{Int32{val}});
break;
}
case MYSQL_TYPE_LONG: {
UInt32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
row.push_back(Field{UInt32{val}});
break;
}
case MYSQL_TYPE_LONGLONG: {
UInt64 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 8);
row.push_back(Field{UInt64{val}});
break;
}
case MYSQL_TYPE_FLOAT: {
Float64 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
row.push_back(Field{Float64{val}});
break;
}
case MYSQL_TYPE_DOUBLE: {
Float64 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 8);
row.push_back(Field{Float64{val}});
break;
}
case MYSQL_TYPE_TIMESTAMP: {
UInt32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
switch (field_type)
{
case MYSQL_TYPE_TINY: {
Int8 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 1);
row.push_back(Field{Int8{val}});
break;
}
case MYSQL_TYPE_SHORT: {
Int16 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 2);
row.push_back(Field{Int16{val}});
break;
}
case MYSQL_TYPE_INT24: {
Int32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 3);
row.push_back(Field{Int32{val}});
break;
}
case MYSQL_TYPE_LONG: {
Int32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
row.push_back(Field{Int32{val}});
break;
}
case MYSQL_TYPE_LONGLONG: {
Int64 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 8);
row.push_back(Field{Int64{val}});
break;
}
case MYSQL_TYPE_FLOAT: {
Float32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
row.push_back(Field{Float32{val}});
break;
}
case MYSQL_TYPE_DOUBLE: {
Float64 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 8);
row.push_back(Field{Float64{val}});
break;
}
case MYSQL_TYPE_TIMESTAMP: {
UInt32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
row.push_back(Field{UInt32{val}});
break;
}
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING: {
uint32_t size = 0;
if (meta < 256)
{
payload.readStrict(reinterpret_cast<char *>(&size), 1);
}
else
{
payload.readStrict(reinterpret_cast<char *>(&size), 2);
time_t time = time_t(val);
std::tm * gtm = std::gmtime(&time);
char buffer[32];
std::strftime(buffer, 32, "%Y-%m-%d %H:%M:%S", gtm);
row.push_back(Field{String{buffer}});
break;
}
case MYSQL_TYPE_TIME: {
UInt32 i24 = 0;
payload.readStrict(reinterpret_cast<char *>(&i24), 3);
String val;
val.resize(size);
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
row.push_back(Field{String{val}});
break;
}
case MYSQL_TYPE_STRING: {
UInt32 size = 0;
if (field_len < 256)
{
payload.readStrict(reinterpret_cast<char *>(&size), 1);
}
else
{
payload.readStrict(reinterpret_cast<char *>(&size), 2);
String time_buff;
time_buff.resize(8);
sprintf(
time_buff.data(),
"%02d:%02d:%02d",
static_cast<int>(i24 / 10000),
static_cast<int>(i24 % 10000) / 100,
static_cast<int>(i24 % 100));
row.push_back(Field{String{time_buff}});
break;
}
case MYSQL_TYPE_DATE: {
UInt32 i24 = 0;
payload.readStrict(reinterpret_cast<char *>(&i24), 3);
String val;
val.resize(size);
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
row.push_back(Field{String{val}});
break;
}
case MYSQL_TYPE_GEOMETRY:
case MYSQL_TYPE_BLOB: {
UInt32 size = 0;
switch (meta)
{
case 1: {
String time_buff;
time_buff.resize(10);
sprintf(
time_buff.data(),
"%04d-%02d-%02d",
static_cast<int>((i24 >> 9) & 0x7fff),
static_cast<int>((i24 >> 5) & 0xf),
static_cast<int>(i24 & 0x1f));
row.push_back(Field{String{time_buff}});
break;
}
case MYSQL_TYPE_YEAR: {
Int32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 1);
row.push_back(Field{Int32{val + 1900}});
break;
}
case MYSQL_TYPE_TIME2: {
UInt32 val = 0, frac_part = 0;
char sign = 0x22;
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 3);
if (readBits(val, 0, 1, 24) == 0)
{
sign = '-';
val = ~val + 1;
}
UInt32 hour = readBits(val, 2, 10, 24);
UInt32 minute = readBits(val, 12, 6, 24);
UInt32 second = readBits(val, 18, 6, 24);
readTimeFractionalPart(payload, reinterpret_cast<char *>(&frac_part), meta);
if (frac_part != 0)
{
String time_buff;
time_buff.resize(16);
sprintf(
time_buff.data(),
"%c%02d:%02d:%02d.%06d",
static_cast<char>(sign),
static_cast<int>(hour),
static_cast<int>(minute),
static_cast<int>(second),
static_cast<int>(frac_part));
row.push_back(Field{String{time_buff}});
}
else
{
String time_buff;
time_buff.resize(9);
sprintf(
time_buff.data(),
"%c%02d:%02d:%02d",
static_cast<char>(sign),
static_cast<int>(hour),
static_cast<int>(minute),
static_cast<int>(second));
row.push_back(Field{String{time_buff}});
}
break;
}
case MYSQL_TYPE_DATETIME2: {
Int64 val = 0, fsp = 0;
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 5);
readTimeFractionalPart(payload, reinterpret_cast<char *>(&fsp), meta);
struct tm timeinfo;
UInt32 year_month = readBits(val, 1, 17, 40);
timeinfo.tm_year = year_month / 13 - 1900;
timeinfo.tm_mon = year_month % 13;
timeinfo.tm_mday = readBits(val, 18, 5, 40);
timeinfo.tm_hour = readBits(val, 23, 5, 40);
timeinfo.tm_min = readBits(val, 28, 6, 40);
timeinfo.tm_sec = readBits(val, 34, 6, 40);
time_t time = mktime(&timeinfo);
std::tm * gtm = std::gmtime(&time);
char buffer[32];
std::strftime(buffer, 32, "%Y-%m-%d %H:%M:%S", gtm);
row.push_back(Field{String{buffer}});
break;
}
case MYSQL_TYPE_TIMESTAMP2: {
UInt32 sec = 0, subsec = 0, whole_part = 0;
readBigEndianStrict(payload, reinterpret_cast<char *>(&sec), 4);
readTimeFractionalPart(payload, reinterpret_cast<char *>(&sec), meta);
whole_part = (sec + subsec / 1e6);
time_t time = time_t(whole_part);
std::tm * gtm = std::gmtime(&time);
char buffer[32];
std::strftime(buffer, 32, "%Y-%m-%d %H:%M:%S", gtm);
row.push_back(Field{String{buffer}});
break;
}
case MYSQL_TYPE_ENUM: {
Int32 val = 0;
Int32 len = (meta & 0xff);
switch (len)
{
case 1: {
payload.readStrict(reinterpret_cast<char *>(&val), 1);
break;
}
case 2: {
payload.readStrict(reinterpret_cast<char *>(&val), 2);
break;
}
default:
break;
}
row.push_back(Field{Int32{val}});
break;
}
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING: {
uint32_t size = 0;
if (meta < 256)
{
payload.readStrict(reinterpret_cast<char *>(&size), 1);
break;
}
case 2: {
else
{
payload.readStrict(reinterpret_cast<char *>(&size), 2);
break;
}
case 3: {
payload.readStrict(reinterpret_cast<char *>(&size), 3);
break;
}
case 4: {
payload.readStrict(reinterpret_cast<char *>(&size), 4);
break;
}
default:
break;
}
String val;
val.resize(size);
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
row.push_back(Field{String{val}});
break;
String val;
val.resize(size);
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
row.push_back(Field{String{val}});
break;
}
case MYSQL_TYPE_STRING: {
UInt32 size = 0;
if (field_len < 256)
{
payload.readStrict(reinterpret_cast<char *>(&size), 1);
}
else
{
payload.readStrict(reinterpret_cast<char *>(&size), 2);
}
String val;
val.resize(size);
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
row.push_back(Field{String{val}});
break;
}
case MYSQL_TYPE_GEOMETRY:
case MYSQL_TYPE_BLOB: {
UInt32 size = 0;
switch (meta)
{
case 1: {
payload.readStrict(reinterpret_cast<char *>(&size), 1);
break;
}
case 2: {
payload.readStrict(reinterpret_cast<char *>(&size), 2);
break;
}
case 3: {
payload.readStrict(reinterpret_cast<char *>(&size), 3);
break;
}
case 4: {
payload.readStrict(reinterpret_cast<char *>(&size), 4);
break;
}
default:
break;
}
String val;
val.resize(size);
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
row.push_back(Field{String{val}});
break;
}
case MYSQL_TYPE_JSON: {
UInt32 size = 0;
payload.readStrict(reinterpret_cast<char *>(&size), meta);
String val;
val.resize(size);
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
row.push_back(Field{String{val}});
break;
}
default:
throw ReplicationError(
"ParseRow: Unhandled MySQL field type:" + std::to_string(field_type), ErrorCodes::UNKNOWN_EXCEPTION);
}
default:
throw ReplicationError("ParseRow: Unhandled field type:" + std::to_string(field_type), ErrorCodes::UNKNOWN_EXCEPTION);
}
null_index++;
}
rows.push_back(row);
}
@ -589,6 +627,8 @@ namespace MySQLReplication
std::cerr << "[DryRun Event]" << std::endl;
}
void MySQLFlavor::setReplicateDatabase(String db) { replicate_do_db = std::move(db); }
void MySQLFlavor::readPayloadImpl(ReadBuffer & payload)
{
UInt16 header = static_cast<unsigned char>(*payload.position());
@ -630,6 +670,8 @@ namespace MySQLReplication
event->parseEvent(payload);
if (event->header.event_size > QUERY_EVENT_BEGIN_LENGTH)
position.updateLogPos(event->header.log_pos);
else
event = std::make_shared<DryRunEvent>();
break;
}
case XID_EVENT: {
@ -650,21 +692,30 @@ namespace MySQLReplication
}
case WRITE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT_V2: {
event = std::make_shared<WriteRowsEvent>(table_map);
if (do_replicate())
event = std::make_shared<WriteRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
break;
}
case DELETE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V2: {
event = std::make_shared<DeleteRowsEvent>(table_map);
if (do_replicate())
event = std::make_shared<DeleteRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
break;
}
case UPDATE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V2: {
event = std::make_shared<UpdateRowsEvent>(table_map);
if (do_replicate())
event = std::make_shared<UpdateRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
break;

View File

@ -6,6 +6,7 @@
#include <IO/WriteBuffer.h>
#include <map>
#include <boost/dynamic_bitset.hpp>
/// Implementation of MySQL replication protocol.
/// Works only on little-endian architecture.
@ -18,13 +19,80 @@ namespace MySQLReplication
static const int EVENT_HEADER_LENGTH = 19;
static const int CHECKSUM_CRC32_SIGNATURE_LENGTH = 4;
static const int QUERY_EVENT_BEGIN_LENGTH = 74;
static const int ROWS_HEADER_LEN_V2 = 10;
using Bitmap = boost::dynamic_bitset<>;
inline UInt64 readBits(UInt64 val, UInt8 start, UInt8 size, UInt8 length)
{
val = val >> (length - (start + size));
return val & (UInt64(1 << size) - 1);
}
inline void readBigEndianStrict(ReadBuffer & payload, char * to, size_t n)
{
payload.readStrict(to, n);
char *start = to, *end = to + n;
std::reverse(start, end);
}
inline void readTimeFractionalPart(ReadBuffer & payload, char * to, UInt16 meta)
{
switch (meta)
{
case 1:
case 2: {
readBigEndianStrict(payload, to, 1);
break;
}
case 3:
case 4: {
readBigEndianStrict(payload, to, 2);
break;
}
case 5:
case 6: {
readBigEndianStrict(payload, to, 3);
break;
}
default:
break;
}
}
inline void readBitmap(ReadBuffer & payload, Bitmap & bitmap, size_t bitmap_size)
{
String byte_buffer;
byte_buffer.resize(bitmap_size);
payload.readStrict(reinterpret_cast<char *>(byte_buffer.data()), bitmap_size);
bitmap.resize(bitmap_size * 8, false);
for (size_t i = 0; i < bitmap_size; ++i)
{
uint8_t tmp = byte_buffer[i];
boost::dynamic_bitset<>::size_type bit = i * 8;
if (tmp == 0)
continue;
if ((tmp & 0x01) != 0)
bitmap.set(bit);
if ((tmp & 0x02) != 0)
bitmap.set(bit + 1);
if ((tmp & 0x04) != 0)
bitmap.set(bit + 2);
if ((tmp & 0x08) != 0)
bitmap.set(bit + 3);
if ((tmp & 0x10) != 0)
bitmap.set(bit + 4);
if ((tmp & 0x20) != 0)
bitmap.set(bit + 5);
if ((tmp & 0x40) != 0)
bitmap.set(bit + 6);
if ((tmp & 0x80) != 0)
bitmap.set(bit + 7);
}
}
class EventBase;
using BinlogEventPtr = std::shared_ptr<EventBase>;
inline bool check_string_bit(String s, int k) { return (s[(k / 8)] & (1 << (k % 8))) != 0; }
enum BinlogChecksumAlg
{
BINLOG_CHECKSUM_ALG_OFF = 0,
@ -32,7 +100,22 @@ namespace MySQLReplication
BINLOG_CHECKSUM_ALG_ENUM_END,
BINLOG_CHECKSUM_ALG_UNDEF = 255
};
String ToString(BinlogChecksumAlg type);
inline String to_string(BinlogChecksumAlg type)
{
switch (type)
{
case BINLOG_CHECKSUM_ALG_OFF:
return "BINLOG_CHECKSUM_ALG_OFF";
case BINLOG_CHECKSUM_ALG_CRC32:
return "BINLOG_CHECKSUM_ALG_CRC32";
case BINLOG_CHECKSUM_ALG_ENUM_END:
return "BINLOG_CHECKSUM_ALG_ENUM_END";
case BINLOG_CHECKSUM_ALG_UNDEF:
return "BINLOG_CHECKSUM_ALG_UNDEF";
}
return std::string("Unknown checksum alg: ") + std::to_string(static_cast<int>(type));
}
/// http://dev.mysql.com/doc/internals/en/binlog-event-type.html
enum EventType
@ -84,7 +167,111 @@ namespace MySQLReplication
MARIA_GTID_LIST_EVENT = 163,
MARIA_START_ENCRYPTION_EVENT = 164,
};
String ToString(EventType type);
inline String to_string(EventType type)
{
switch (type)
{
case START_EVENT_V3:
return "StartEventV3";
case QUERY_EVENT:
return "QueryEvent";
case STOP_EVENT:
return "StopEvent";
case ROTATE_EVENT:
return "RotateEvent";
case INT_VAR_EVENT:
return "IntVarEvent";
case LOAD_EVENT:
return "LoadEvent";
case SLAVE_EVENT:
return "SlaveEvent";
case CREATE_FILE_EVENT:
return "CreateFileEvent";
case APPEND_BLOCK_EVENT:
return "AppendBlockEvent";
case EXEC_LOAD_EVENT:
return "ExecLoadEvent";
case DELETE_FILE_EVENT:
return "DeleteFileEvent";
case NEW_LOAD_EVENT:
return "NewLoadEvent";
case RAND_EVENT:
return "RandEvent";
case USER_VAR_EVENT:
return "UserVarEvent";
case FORMAT_DESCRIPTION_EVENT:
return "FormatDescriptionEvent";
case XID_EVENT:
return "XIDEvent";
case BEGIN_LOAD_QUERY_EVENT:
return "BeginLoadQueryEvent";
case EXECUTE_LOAD_QUERY_EVENT:
return "ExecuteLoadQueryEvent";
case TABLE_MAP_EVENT:
return "TableMapEvent";
case WRITE_ROWS_EVENT_V0:
return "WriteRowsEventV0";
case UPDATE_ROWS_EVENT_V0:
return "UpdateRowsEventV0";
case DELETE_ROWS_EVENT_V0:
return "DeleteRowsEventV0";
case WRITE_ROWS_EVENT_V1:
return "WriteRowsEventV1";
case UPDATE_ROWS_EVENT_V1:
return "UpdateRowsEventV1";
case DELETE_ROWS_EVENT_V1:
return "DeleteRowsEventV1";
case INCIDENT_EVENT:
return "IncidentEvent";
case HEARTBEAT_EVENT:
return "HeartbeatEvent";
case IGNORABLE_EVENT:
return "IgnorableEvent";
case ROWS_QUERY_EVENT:
return "RowsQueryEvent";
case WRITE_ROWS_EVENT_V2:
return "WriteRowsEventV2";
case UPDATE_ROWS_EVENT_V2:
return "UpdateRowsEventV2";
case DELETE_ROWS_EVENT_V2:
return "DeleteRowsEventV2";
case GTID_EVENT:
return "GTIDEvent";
case ANONYMOUS_GTID_EVENT:
return "AnonymousGTIDEvent";
case PREVIOUS_GTIDS_EVENT:
return "PreviousGTIDsEvent";
case TRANSACTION_CONTEXT_EVENT:
return "TransactionContextEvent";
case VIEW_CHANGE_EVENT:
return "ViewChangeEvent";
case XA_PREPARE_LOG_EVENT:
return "XAPrepareLogEvent";
case MARIA_ANNOTATE_ROWS_EVENT:
return "MariaAnnotateRowsEvent";
case MARIA_BINLOG_CHECKPOINT_EVENT:
return "MariaBinlogCheckpointEvent";
case MARIA_GTID_EVENT:
return "MariaGTIDEvent";
case MARIA_GTID_LIST_EVENT:
return "MariaGTIDListEvent";
case MARIA_START_ENCRYPTION_EVENT:
return "MariaStartEncryptionEvent";
default:
break;
}
return std::string("Unknown event: ") + std::to_string(static_cast<int>(type));
}
enum MySQLEventType
{
MYSQL_UNHANDLED_EVENT = 0,
MYSQL_QUERY_EVENT = 1,
MYSQL_WRITE_ROWS_EVENT = 2,
MYSQL_UPDATE_ROWS_EVENT = 3,
MYSQL_DELETE_ROWS_EVENT = 4,
};
class ReplicationError : public DB::Exception
{
@ -114,10 +301,8 @@ namespace MySQLReplication
virtual ~EventBase() = default;
virtual void print() const = 0;
virtual void parseHeader(ReadBuffer & payload) { header.parse(payload); }
virtual void parseEvent(ReadBuffer & payload) { parseImpl(payload); }
EventType type() const { return header.type; }
virtual MySQLEventType type() const { return MYSQL_UNHANDLED_EVENT; }
protected:
virtual void parseImpl(ReadBuffer & payload) = 0;
@ -125,7 +310,7 @@ namespace MySQLReplication
class FormatDescriptionEvent : public EventBase
{
public:
protected:
UInt16 binlog_version;
String server_version;
UInt32 create_timestamp;
@ -133,8 +318,6 @@ namespace MySQLReplication
String event_type_header_length;
void print() const override;
protected:
void parseImpl(ReadBuffer & payload) override;
private:
@ -166,6 +349,7 @@ namespace MySQLReplication
String query;
void print() const override;
MySQLEventType type() const override { return MYSQL_QUERY_EVENT; }
protected:
void parseImpl(ReadBuffer & payload) override;
@ -173,12 +357,10 @@ namespace MySQLReplication
class XIDEvent : public EventBase
{
public:
protected:
UInt64 xid;
void print() const override;
protected:
void parseImpl(ReadBuffer & payload) override;
};
@ -206,26 +388,29 @@ namespace MySQLReplication
class RowsEvent : public EventBase
{
public:
UInt64 table_id;
UInt16 flags;
UInt16 extra_data_len;
UInt32 number_columns;
String schema;
String table;
String columns_present_bitmap1;
String columns_present_bitmap2;
std::vector<Field> rows;
RowsEvent(std::shared_ptr<TableMapEvent> table_map_) : table_map(table_map_)
RowsEvent(std::shared_ptr<TableMapEvent> table_map_)
: number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_)
{
schema = table_map->schema;
table = table_map->table;
}
void print() const override;
protected:
UInt64 table_id;
UInt16 flags;
UInt16 extra_data_len;
Bitmap columns_present_bitmap1;
Bitmap columns_present_bitmap2;
void parseImpl(ReadBuffer & payload) override;
void parseRow(ReadBuffer & payload, String bitmap);
void parseRow(ReadBuffer & payload, Bitmap & bitmap);
private:
std::shared_ptr<TableMapEvent> table_map;
@ -235,18 +420,21 @@ namespace MySQLReplication
{
public:
WriteRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
MySQLEventType type() const override { return MYSQL_WRITE_ROWS_EVENT; }
};
class DeleteRowsEvent : public RowsEvent
{
public:
DeleteRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
MySQLEventType type() const override { return MYSQL_DELETE_ROWS_EVENT; }
};
class UpdateRowsEvent : public RowsEvent
{
public:
UpdateRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
MySQLEventType type() const override { return MYSQL_UPDATE_ROWS_EVENT; }
};
class DryRunEvent : public EventBase
@ -274,6 +462,7 @@ namespace MySQLReplication
virtual String getName() const = 0;
virtual Position getPosition() const = 0;
virtual BinlogEventPtr readOneEvent() = 0;
virtual void setReplicateDatabase(String db) = 0;
virtual ~IFlavor() = default;
};
@ -282,14 +471,18 @@ namespace MySQLReplication
public:
BinlogEventPtr event;
void readPayloadImpl(ReadBuffer & payload) override;
void setReplicateDatabase(String db) override;
String getName() const override { return "MySQL"; }
Position getPosition() const override { return position; }
void readPayloadImpl(ReadBuffer & payload) override;
BinlogEventPtr readOneEvent() override { return event; }
private:
Position position;
String replicate_do_db;
std::shared_ptr<TableMapEvent> table_map;
inline bool do_replicate() { return (replicate_do_db.empty() || (table_map->schema == replicate_do_db)); }
};
}

View File

@ -162,35 +162,68 @@ int main(int, char **)
}
{
UInt32 slave_id = 9004;
MySQLClient slave("127.0.0.1", 9001, "default", "123");
if (!slave.connect())
try
{
std::cerr << "Connect Error: " << slave.error() << std::endl;
UInt32 slave_id = 9004;
MySQLClient slave("127.0.0.1", 9001, "default", "123");
if (!slave.connect())
{
std::cerr << "Connect Error: " << slave.error() << std::endl;
return 1;
}
if (!slave.startBinlogDump(slave_id, "", "", 4))
{
std::cerr << "Connect Error: " << slave.error() << std::endl;
return 1;
}
while (true)
{
auto event = slave.readOneBinlogEvent();
switch (event->type())
{
case MYSQL_QUERY_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<QueryEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;
break;
}
case MYSQL_WRITE_ROWS_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<WriteRowsEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;
break;
}
case MYSQL_UPDATE_ROWS_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<UpdateRowsEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;
break;
}
case MYSQL_DELETE_ROWS_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<DeleteRowsEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;
break;
}
default:
break;
}
}
}
catch (const Exception & ex)
{
std::cerr << "Error: " << ex.message() << std::endl;
return 1;
}
if (!slave.ping())
{
std::cerr << "Connect Error: " << slave.error() << std::endl;
return 1;
}
if (!slave.startBinlogDump(slave_id, "", 4))
{
std::cerr << "Connect Error: " << slave.error() << std::endl;
assert(0);
}
while (true)
{
auto event = slave.readOneBinlogEvent();
ASSERT(event != nullptr)
event->print();
std::cerr << "Binlog Name: " << slave.getPosition().binlog_name << std::endl;
std::cerr << "Binlog Pos: " << slave.getPosition().binlog_pos << std::endl;
}
}
return 0;
}