From 81998cb1afc0f616ac34bd76ecb990929826dafe Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Mon, 11 May 2020 12:15:06 +0800 Subject: [PATCH] Support DATETIME/TIMESTAMP type --- src/Core/MySQLClient.cpp | 13 +- src/Core/MySQLClient.h | 5 +- src/Core/MySQLReplication.cpp | 591 ++++++++++++++++-------------- src/Core/MySQLReplication.h | 237 ++++++++++-- src/Core/tests/mysql_protocol.cpp | 87 +++-- 5 files changed, 610 insertions(+), 323 deletions(-) diff --git a/src/Core/MySQLClient.cpp b/src/Core/MySQLClient.cpp index b7166701e99..8cbe645f4f9 100644 --- a/src/Core/MySQLClient.cpp +++ b/src/Core/MySQLClient.cpp @@ -126,21 +126,30 @@ bool MySQLClient::ping() return writeCommand(Command::COM_PING, ""); } -bool MySQLClient::startBinlogDump(UInt32 slave_id, String binlog_file_name, UInt64 binlog_pos) +bool MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos) { + /// Set binlog checksum to CRC32. String checksum = "CRC32"; if (!writeCommand(Command::COM_QUERY, "SET @master_binlog_checksum = '" + checksum + "'")) { return false; } - /// 30s. + /// Set heartbeat 30s. UInt64 period_ns = (30 * 1e9); if (!writeCommand(Command::COM_QUERY, "SET @master_heartbeat_period = " + std::to_string(period_ns))) { return false; } + /// Set replication filter to master + /// This requires MySQL version >=5.6, so results are not checked here. + writeCommand(Command::COM_QUERY, "CHANGE REPLICATION FILTER REPLICATE_DO_DB = (" + replicate_db + ")"); + + /// Set Filter rule to replication. + replication.setReplicateDatabase(replicate_db); + + // Register slave. if (!registerSlaveOnMaster(slave_id)) { return false; diff --git a/src/Core/MySQLClient.h b/src/Core/MySQLClient.h index 55d4c834899..06ec81e6adb 100644 --- a/src/Core/MySQLClient.h +++ b/src/Core/MySQLClient.h @@ -32,8 +32,9 @@ public: void disconnect(); bool ping(); String error(); - bool startBinlogDump(UInt32 slave_id, String binlog_file_name, UInt64 binlog_pos); - BinlogEventPtr readOneBinlogEvent() ; + + bool startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos); + BinlogEventPtr readOneBinlogEvent(); Position getPosition() const { return replication.getPosition(); } private: diff --git a/src/Core/MySQLReplication.cpp b/src/Core/MySQLReplication.cpp index 15880f75810..7d87443c0bb 100644 --- a/src/Core/MySQLReplication.cpp +++ b/src/Core/MySQLReplication.cpp @@ -14,118 +14,6 @@ namespace MySQLReplication { using namespace MySQLProtocol; - String ToString(BinlogChecksumAlg type) - { - switch (type) - { - case BINLOG_CHECKSUM_ALG_OFF: - return "BINLOG_CHECKSUM_ALG_OFF"; - case BINLOG_CHECKSUM_ALG_CRC32: - return "BINLOG_CHECKSUM_ALG_CRC32"; - case BINLOG_CHECKSUM_ALG_ENUM_END: - return "BINLOG_CHECKSUM_ALG_ENUM_END"; - case BINLOG_CHECKSUM_ALG_UNDEF: - return "BINLOG_CHECKSUM_ALG_UNDEF"; - } - return std::string("Unknown checksum alg: ") + std::to_string(static_cast(type)); - } - - String ToString(EventType type) - { - switch (type) - { - case START_EVENT_V3: - return "StartEventV3"; - case QUERY_EVENT: - return "QueryEvent"; - case STOP_EVENT: - return "StopEvent"; - case ROTATE_EVENT: - return "RotateEvent"; - case INT_VAR_EVENT: - return "IntVarEvent"; - case LOAD_EVENT: - return "LoadEvent"; - case SLAVE_EVENT: - return "SlaveEvent"; - case CREATE_FILE_EVENT: - return "CreateFileEvent"; - case APPEND_BLOCK_EVENT: - return "AppendBlockEvent"; - case EXEC_LOAD_EVENT: - return "ExecLoadEvent"; - case DELETE_FILE_EVENT: - return "DeleteFileEvent"; - case NEW_LOAD_EVENT: - return "NewLoadEvent"; - case RAND_EVENT: - return "RandEvent"; - case USER_VAR_EVENT: - return "UserVarEvent"; - case FORMAT_DESCRIPTION_EVENT: - return "FormatDescriptionEvent"; - case XID_EVENT: - return "XIDEvent"; - case BEGIN_LOAD_QUERY_EVENT: - return "BeginLoadQueryEvent"; - case EXECUTE_LOAD_QUERY_EVENT: - return "ExecuteLoadQueryEvent"; - case TABLE_MAP_EVENT: - return "TableMapEvent"; - case WRITE_ROWS_EVENT_V0: - return "WriteRowsEventV0"; - case UPDATE_ROWS_EVENT_V0: - return "UpdateRowsEventV0"; - case DELETE_ROWS_EVENT_V0: - return "DeleteRowsEventV0"; - case WRITE_ROWS_EVENT_V1: - return "WriteRowsEventV1"; - case UPDATE_ROWS_EVENT_V1: - return "UpdateRowsEventV1"; - case DELETE_ROWS_EVENT_V1: - return "DeleteRowsEventV1"; - case INCIDENT_EVENT: - return "IncidentEvent"; - case HEARTBEAT_EVENT: - return "HeartbeatEvent"; - case IGNORABLE_EVENT: - return "IgnorableEvent"; - case ROWS_QUERY_EVENT: - return "RowsQueryEvent"; - case WRITE_ROWS_EVENT_V2: - return "WriteRowsEventV2"; - case UPDATE_ROWS_EVENT_V2: - return "UpdateRowsEventV2"; - case DELETE_ROWS_EVENT_V2: - return "DeleteRowsEventV2"; - case GTID_EVENT: - return "GTIDEvent"; - case ANONYMOUS_GTID_EVENT: - return "AnonymousGTIDEvent"; - case PREVIOUS_GTIDS_EVENT: - return "PreviousGTIDsEvent"; - case TRANSACTION_CONTEXT_EVENT: - return "TransactionContextEvent"; - case VIEW_CHANGE_EVENT: - return "ViewChangeEvent"; - case XA_PREPARE_LOG_EVENT: - return "XAPrepareLogEvent"; - case MARIA_ANNOTATE_ROWS_EVENT: - return "MariaAnnotateRowsEvent"; - case MARIA_BINLOG_CHECKPOINT_EVENT: - return "MariaBinlogCheckpointEvent"; - case MARIA_GTID_EVENT: - return "MariaGTIDEvent"; - case MARIA_GTID_LIST_EVENT: - return "MariaGTIDListEvent"; - case MARIA_START_ENCRYPTION_EVENT: - return "MariaStartEncryptionEvent"; - default: - break; - } - return std::string("Unknown event: ") + std::to_string(static_cast(type)); - } - /// https://dev.mysql.com/doc/internals/en/binlog-event-header.html void EventHeader::parse(ReadBuffer & payload) { @@ -139,7 +27,7 @@ namespace MySQLReplication void EventHeader::print() const { - std::cerr << "\n=== " << ToString(this->type) << " ===" << std::endl; + std::cerr << "\n=== " << to_string(this->type) << " ===" << std::endl; std::cerr << "Timestamp: " << this->timestamp << std::endl; std::cerr << "Event Type: " << this->type << std::endl; std::cerr << "Server ID: " << this->server_id << std::endl; @@ -353,19 +241,16 @@ namespace MySQLReplication payload.ignore(extra_data_len - 2); number_columns = readLengthEncodedNumber(payload); - size_t columns_bitmap_size = (number_columns + 8) / 7; + size_t columns_bitmap_size = (number_columns + 7) / 8; switch (header.type) { case UPDATE_ROWS_EVENT_V1: case UPDATE_ROWS_EVENT_V2: - columns_present_bitmap1.resize(columns_bitmap_size); - columns_present_bitmap2.resize(columns_bitmap_size); - payload.readStrict(reinterpret_cast(columns_present_bitmap1.data()), columns_bitmap_size); - payload.readStrict(reinterpret_cast(columns_present_bitmap2.data()), columns_bitmap_size); + readBitmap(payload, columns_present_bitmap1, columns_bitmap_size); + readBitmap(payload, columns_present_bitmap2, columns_bitmap_size); break; default: - columns_present_bitmap1.resize(columns_bitmap_size); - payload.readStrict(reinterpret_cast(columns_present_bitmap1.data()), columns_bitmap_size); + readBitmap(payload, columns_present_bitmap1, columns_bitmap_size); break; } @@ -379,186 +264,339 @@ namespace MySQLReplication } } - void RowsEvent::parseRow(ReadBuffer & payload, String bitmap) + void RowsEvent::parseRow(ReadBuffer & payload, Bitmap & bitmap) { + Tuple row; UInt32 field_type = 0; UInt32 field_len = 0; + UInt32 null_index = 0; - size_t columns_null_bitmap_size = (number_columns + 8) / 7; - String columns_null_bitmap; - columns_null_bitmap.resize(columns_null_bitmap_size); - payload.readStrict(reinterpret_cast(columns_null_bitmap.data()), columns_null_bitmap_size); + UInt32 re_count = 0; + for (auto i = 0U; i < number_columns; i++) + { + if (bitmap[i]) + re_count++; + } + re_count = (re_count + 7) / 8; + boost::dynamic_bitset<> columns_null_set; + readBitmap(payload, columns_null_set, re_count); - - Tuple row; for (auto i = 0U; i < number_columns; i++) { /// Column not presents. - if (!check_string_bit(bitmap, i)) - { + if (!bitmap[i]) continue; - } - /// NULL column. - if (check_string_bit(columns_null_bitmap, i)) + if (columns_null_set[null_index]) { row.push_back(Field{Null{}}); - continue; } - - field_type = table_map->column_type[i]; - auto meta = table_map->column_meta[i]; - if (field_type == MYSQL_TYPE_STRING) + else { - if (meta >= 256) + field_type = table_map->column_type[i]; + auto meta = table_map->column_meta[i]; + if (field_type == MYSQL_TYPE_STRING) { - UInt32 byte0 = meta >> 8; - UInt32 byte1 = meta & 0xff; - if ((byte0 & 0x30) != 0x30) + if (meta >= 256) { - field_len = byte1 | (((byte0 & 0x30) ^ 0x30) << 4); - field_type = byte0 | 0x30; - } - else - { - switch (byte0) + UInt32 byte0 = meta >> 8; + UInt32 byte1 = meta & 0xff; + if ((byte0 & 0x30) != 0x30) { - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_STRING: - field_type = byte0; - field_len = byte1; - break; - default: - throw ReplicationError("ParseRow: Illegal event", ErrorCodes::UNKNOWN_EXCEPTION); + field_len = byte1 | (((byte0 & 0x30) ^ 0x30) << 4); + field_type = byte0 | 0x30; + } + else + { + switch (byte0) + { + case MYSQL_TYPE_SET: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_STRING: + field_type = byte0; + field_len = byte1; + break; + default: + throw ReplicationError("ParseRow: Unhandled binlog event", ErrorCodes::UNKNOWN_EXCEPTION); + } } } + else + { + field_len = meta; + } } - else + + switch (field_type) { - field_len = meta; - } - } + case MYSQL_TYPE_TINY: { + UInt8 val = 0; + payload.readStrict(reinterpret_cast(&val), 1); + row.push_back(Field{UInt8{val}}); + break; + } + case MYSQL_TYPE_SHORT: { + UInt16 val = 0; + payload.readStrict(reinterpret_cast(&val), 2); + row.push_back(Field{UInt16{val}}); + break; + } + case MYSQL_TYPE_INT24: { + Int32 val = 0; + payload.readStrict(reinterpret_cast(&val), 3); + row.push_back(Field{Int32{val}}); + break; + } + case MYSQL_TYPE_LONG: { + UInt32 val = 0; + payload.readStrict(reinterpret_cast(&val), 4); + row.push_back(Field{UInt32{val}}); + break; + } + case MYSQL_TYPE_LONGLONG: { + UInt64 val = 0; + payload.readStrict(reinterpret_cast(&val), 8); + row.push_back(Field{UInt64{val}}); + break; + } + case MYSQL_TYPE_FLOAT: { + Float64 val = 0; + payload.readStrict(reinterpret_cast(&val), 4); + row.push_back(Field{Float64{val}}); + break; + } + case MYSQL_TYPE_DOUBLE: { + Float64 val = 0; + payload.readStrict(reinterpret_cast(&val), 8); + row.push_back(Field{Float64{val}}); + break; + } + case MYSQL_TYPE_TIMESTAMP: { + UInt32 val = 0; + payload.readStrict(reinterpret_cast(&val), 4); - switch (field_type) - { - case MYSQL_TYPE_TINY: { - Int8 val = 0; - payload.readStrict(reinterpret_cast(&val), 1); - row.push_back(Field{Int8{val}}); - break; - } - case MYSQL_TYPE_SHORT: { - Int16 val = 0; - payload.readStrict(reinterpret_cast(&val), 2); - row.push_back(Field{Int16{val}}); - break; - } - case MYSQL_TYPE_INT24: { - Int32 val = 0; - payload.readStrict(reinterpret_cast(&val), 3); - row.push_back(Field{Int32{val}}); - break; - } - case MYSQL_TYPE_LONG: { - Int32 val = 0; - payload.readStrict(reinterpret_cast(&val), 4); - row.push_back(Field{Int32{val}}); - break; - } - case MYSQL_TYPE_LONGLONG: { - Int64 val = 0; - payload.readStrict(reinterpret_cast(&val), 8); - row.push_back(Field{Int64{val}}); - break; - } - case MYSQL_TYPE_FLOAT: { - Float32 val = 0; - payload.readStrict(reinterpret_cast(&val), 4); - row.push_back(Field{Float32{val}}); - break; - } - case MYSQL_TYPE_DOUBLE: { - Float64 val = 0; - payload.readStrict(reinterpret_cast(&val), 8); - row.push_back(Field{Float64{val}}); - break; - } - case MYSQL_TYPE_TIMESTAMP: { - UInt32 val = 0; - payload.readStrict(reinterpret_cast(&val), 4); - row.push_back(Field{UInt32{val}}); - break; - } - case MYSQL_TYPE_VARCHAR: - case MYSQL_TYPE_VAR_STRING: { - uint32_t size = 0; - if (meta < 256) - { - payload.readStrict(reinterpret_cast(&size), 1); - } - else - { - payload.readStrict(reinterpret_cast(&size), 2); + time_t time = time_t(val); + std::tm * gtm = std::gmtime(&time); + char buffer[32]; + std::strftime(buffer, 32, "%Y-%m-%d %H:%M:%S", gtm); + row.push_back(Field{String{buffer}}); + break; } + case MYSQL_TYPE_TIME: { + UInt32 i24 = 0; + payload.readStrict(reinterpret_cast(&i24), 3); - String val; - val.resize(size); - payload.readStrict(reinterpret_cast(val.data()), size); - row.push_back(Field{String{val}}); - break; - } - case MYSQL_TYPE_STRING: { - UInt32 size = 0; - if (field_len < 256) - { - payload.readStrict(reinterpret_cast(&size), 1); - } - else - { - payload.readStrict(reinterpret_cast(&size), 2); + String time_buff; + time_buff.resize(8); + sprintf( + time_buff.data(), + "%02d:%02d:%02d", + static_cast(i24 / 10000), + static_cast(i24 % 10000) / 100, + static_cast(i24 % 100)); + row.push_back(Field{String{time_buff}}); + break; } + case MYSQL_TYPE_DATE: { + UInt32 i24 = 0; + payload.readStrict(reinterpret_cast(&i24), 3); - String val; - val.resize(size); - payload.readStrict(reinterpret_cast(val.data()), size); - row.push_back(Field{String{val}}); - break; - } - case MYSQL_TYPE_GEOMETRY: - case MYSQL_TYPE_BLOB: { - UInt32 size = 0; - switch (meta) - { - case 1: { + String time_buff; + time_buff.resize(10); + sprintf( + time_buff.data(), + "%04d-%02d-%02d", + static_cast((i24 >> 9) & 0x7fff), + static_cast((i24 >> 5) & 0xf), + static_cast(i24 & 0x1f)); + row.push_back(Field{String{time_buff}}); + break; + } + case MYSQL_TYPE_YEAR: { + Int32 val = 0; + payload.readStrict(reinterpret_cast(&val), 1); + row.push_back(Field{Int32{val + 1900}}); + break; + } + case MYSQL_TYPE_TIME2: { + UInt32 val = 0, frac_part = 0; + char sign = 0x22; + + readBigEndianStrict(payload, reinterpret_cast(&val), 3); + if (readBits(val, 0, 1, 24) == 0) + { + sign = '-'; + val = ~val + 1; + } + UInt32 hour = readBits(val, 2, 10, 24); + UInt32 minute = readBits(val, 12, 6, 24); + UInt32 second = readBits(val, 18, 6, 24); + readTimeFractionalPart(payload, reinterpret_cast(&frac_part), meta); + + if (frac_part != 0) + { + String time_buff; + time_buff.resize(16); + sprintf( + time_buff.data(), + "%c%02d:%02d:%02d.%06d", + static_cast(sign), + static_cast(hour), + static_cast(minute), + static_cast(second), + static_cast(frac_part)); + row.push_back(Field{String{time_buff}}); + } + else + { + String time_buff; + time_buff.resize(9); + sprintf( + time_buff.data(), + "%c%02d:%02d:%02d", + static_cast(sign), + static_cast(hour), + static_cast(minute), + static_cast(second)); + row.push_back(Field{String{time_buff}}); + } + break; + } + case MYSQL_TYPE_DATETIME2: { + Int64 val = 0, fsp = 0; + readBigEndianStrict(payload, reinterpret_cast(&val), 5); + readTimeFractionalPart(payload, reinterpret_cast(&fsp), meta); + + struct tm timeinfo; + UInt32 year_month = readBits(val, 1, 17, 40); + timeinfo.tm_year = year_month / 13 - 1900; + timeinfo.tm_mon = year_month % 13; + timeinfo.tm_mday = readBits(val, 18, 5, 40); + timeinfo.tm_hour = readBits(val, 23, 5, 40); + timeinfo.tm_min = readBits(val, 28, 6, 40); + timeinfo.tm_sec = readBits(val, 34, 6, 40); + + time_t time = mktime(&timeinfo); + std::tm * gtm = std::gmtime(&time); + char buffer[32]; + std::strftime(buffer, 32, "%Y-%m-%d %H:%M:%S", gtm); + row.push_back(Field{String{buffer}}); + break; + } + case MYSQL_TYPE_TIMESTAMP2: { + UInt32 sec = 0, subsec = 0, whole_part = 0; + readBigEndianStrict(payload, reinterpret_cast(&sec), 4); + readTimeFractionalPart(payload, reinterpret_cast(&sec), meta); + + whole_part = (sec + subsec / 1e6); + time_t time = time_t(whole_part); + std::tm * gtm = std::gmtime(&time); + char buffer[32]; + std::strftime(buffer, 32, "%Y-%m-%d %H:%M:%S", gtm); + row.push_back(Field{String{buffer}}); + break; + } + case MYSQL_TYPE_ENUM: { + Int32 val = 0; + Int32 len = (meta & 0xff); + switch (len) + { + case 1: { + payload.readStrict(reinterpret_cast(&val), 1); + break; + } + case 2: { + payload.readStrict(reinterpret_cast(&val), 2); + break; + } + default: + break; + } + row.push_back(Field{Int32{val}}); + break; + } + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_VAR_STRING: { + uint32_t size = 0; + if (meta < 256) + { payload.readStrict(reinterpret_cast(&size), 1); - break; } - case 2: { + else + { payload.readStrict(reinterpret_cast(&size), 2); - break; } - case 3: { - payload.readStrict(reinterpret_cast(&size), 3); - break; - } - case 4: { - payload.readStrict(reinterpret_cast(&size), 4); - break; - } - default: - break; - } - String val; - val.resize(size); - payload.readStrict(reinterpret_cast(val.data()), size); - row.push_back(Field{String{val}}); - break; + String val; + val.resize(size); + payload.readStrict(reinterpret_cast(val.data()), size); + row.push_back(Field{String{val}}); + break; + } + case MYSQL_TYPE_STRING: { + UInt32 size = 0; + if (field_len < 256) + { + payload.readStrict(reinterpret_cast(&size), 1); + } + else + { + payload.readStrict(reinterpret_cast(&size), 2); + } + + String val; + val.resize(size); + payload.readStrict(reinterpret_cast(val.data()), size); + row.push_back(Field{String{val}}); + break; + } + case MYSQL_TYPE_GEOMETRY: + case MYSQL_TYPE_BLOB: { + UInt32 size = 0; + switch (meta) + { + case 1: { + payload.readStrict(reinterpret_cast(&size), 1); + break; + } + case 2: { + payload.readStrict(reinterpret_cast(&size), 2); + break; + } + case 3: { + payload.readStrict(reinterpret_cast(&size), 3); + break; + } + case 4: { + payload.readStrict(reinterpret_cast(&size), 4); + break; + } + default: + break; + } + + String val; + val.resize(size); + payload.readStrict(reinterpret_cast(val.data()), size); + row.push_back(Field{String{val}}); + break; + } + case MYSQL_TYPE_JSON: { + UInt32 size = 0; + payload.readStrict(reinterpret_cast(&size), meta); + + String val; + val.resize(size); + payload.readStrict(reinterpret_cast(val.data()), size); + row.push_back(Field{String{val}}); + break; + } + default: + throw ReplicationError( + "ParseRow: Unhandled MySQL field type:" + std::to_string(field_type), ErrorCodes::UNKNOWN_EXCEPTION); } - default: - throw ReplicationError("ParseRow: Unhandled field type:" + std::to_string(field_type), ErrorCodes::UNKNOWN_EXCEPTION); } + null_index++; } rows.push_back(row); } @@ -589,6 +627,8 @@ namespace MySQLReplication std::cerr << "[DryRun Event]" << std::endl; } + void MySQLFlavor::setReplicateDatabase(String db) { replicate_do_db = std::move(db); } + void MySQLFlavor::readPayloadImpl(ReadBuffer & payload) { UInt16 header = static_cast(*payload.position()); @@ -630,6 +670,8 @@ namespace MySQLReplication event->parseEvent(payload); if (event->header.event_size > QUERY_EVENT_BEGIN_LENGTH) position.updateLogPos(event->header.log_pos); + else + event = std::make_shared(); break; } case XID_EVENT: { @@ -650,21 +692,30 @@ namespace MySQLReplication } case WRITE_ROWS_EVENT_V1: case WRITE_ROWS_EVENT_V2: { - event = std::make_shared(table_map); + if (do_replicate()) + event = std::make_shared(table_map); + else + event = std::make_shared(); event->parseHeader(payload); event->parseEvent(payload); break; } case DELETE_ROWS_EVENT_V1: case DELETE_ROWS_EVENT_V2: { - event = std::make_shared(table_map); + if (do_replicate()) + event = std::make_shared(table_map); + else + event = std::make_shared(); event->parseHeader(payload); event->parseEvent(payload); break; } case UPDATE_ROWS_EVENT_V1: case UPDATE_ROWS_EVENT_V2: { - event = std::make_shared(table_map); + if (do_replicate()) + event = std::make_shared(table_map); + else + event = std::make_shared(); event->parseHeader(payload); event->parseEvent(payload); break; diff --git a/src/Core/MySQLReplication.h b/src/Core/MySQLReplication.h index a10d3779ada..1c33e2918a6 100644 --- a/src/Core/MySQLReplication.h +++ b/src/Core/MySQLReplication.h @@ -6,6 +6,7 @@ #include #include +#include /// Implementation of MySQL replication protocol. /// Works only on little-endian architecture. @@ -18,13 +19,80 @@ namespace MySQLReplication static const int EVENT_HEADER_LENGTH = 19; static const int CHECKSUM_CRC32_SIGNATURE_LENGTH = 4; static const int QUERY_EVENT_BEGIN_LENGTH = 74; - static const int ROWS_HEADER_LEN_V2 = 10; + + using Bitmap = boost::dynamic_bitset<>; + + inline UInt64 readBits(UInt64 val, UInt8 start, UInt8 size, UInt8 length) + { + val = val >> (length - (start + size)); + return val & (UInt64(1 << size) - 1); + } + + inline void readBigEndianStrict(ReadBuffer & payload, char * to, size_t n) + { + payload.readStrict(to, n); + char *start = to, *end = to + n; + std::reverse(start, end); + } + + inline void readTimeFractionalPart(ReadBuffer & payload, char * to, UInt16 meta) + { + switch (meta) + { + case 1: + case 2: { + readBigEndianStrict(payload, to, 1); + break; + } + case 3: + case 4: { + readBigEndianStrict(payload, to, 2); + break; + } + case 5: + case 6: { + readBigEndianStrict(payload, to, 3); + break; + } + default: + break; + } + } + + inline void readBitmap(ReadBuffer & payload, Bitmap & bitmap, size_t bitmap_size) + { + String byte_buffer; + byte_buffer.resize(bitmap_size); + payload.readStrict(reinterpret_cast(byte_buffer.data()), bitmap_size); + bitmap.resize(bitmap_size * 8, false); + for (size_t i = 0; i < bitmap_size; ++i) + { + uint8_t tmp = byte_buffer[i]; + boost::dynamic_bitset<>::size_type bit = i * 8; + if (tmp == 0) + continue; + if ((tmp & 0x01) != 0) + bitmap.set(bit); + if ((tmp & 0x02) != 0) + bitmap.set(bit + 1); + if ((tmp & 0x04) != 0) + bitmap.set(bit + 2); + if ((tmp & 0x08) != 0) + bitmap.set(bit + 3); + if ((tmp & 0x10) != 0) + bitmap.set(bit + 4); + if ((tmp & 0x20) != 0) + bitmap.set(bit + 5); + if ((tmp & 0x40) != 0) + bitmap.set(bit + 6); + if ((tmp & 0x80) != 0) + bitmap.set(bit + 7); + } + } class EventBase; using BinlogEventPtr = std::shared_ptr; - inline bool check_string_bit(String s, int k) { return (s[(k / 8)] & (1 << (k % 8))) != 0; } - enum BinlogChecksumAlg { BINLOG_CHECKSUM_ALG_OFF = 0, @@ -32,7 +100,22 @@ namespace MySQLReplication BINLOG_CHECKSUM_ALG_ENUM_END, BINLOG_CHECKSUM_ALG_UNDEF = 255 }; - String ToString(BinlogChecksumAlg type); + + inline String to_string(BinlogChecksumAlg type) + { + switch (type) + { + case BINLOG_CHECKSUM_ALG_OFF: + return "BINLOG_CHECKSUM_ALG_OFF"; + case BINLOG_CHECKSUM_ALG_CRC32: + return "BINLOG_CHECKSUM_ALG_CRC32"; + case BINLOG_CHECKSUM_ALG_ENUM_END: + return "BINLOG_CHECKSUM_ALG_ENUM_END"; + case BINLOG_CHECKSUM_ALG_UNDEF: + return "BINLOG_CHECKSUM_ALG_UNDEF"; + } + return std::string("Unknown checksum alg: ") + std::to_string(static_cast(type)); + } /// http://dev.mysql.com/doc/internals/en/binlog-event-type.html enum EventType @@ -84,7 +167,111 @@ namespace MySQLReplication MARIA_GTID_LIST_EVENT = 163, MARIA_START_ENCRYPTION_EVENT = 164, }; - String ToString(EventType type); + + inline String to_string(EventType type) + { + switch (type) + { + case START_EVENT_V3: + return "StartEventV3"; + case QUERY_EVENT: + return "QueryEvent"; + case STOP_EVENT: + return "StopEvent"; + case ROTATE_EVENT: + return "RotateEvent"; + case INT_VAR_EVENT: + return "IntVarEvent"; + case LOAD_EVENT: + return "LoadEvent"; + case SLAVE_EVENT: + return "SlaveEvent"; + case CREATE_FILE_EVENT: + return "CreateFileEvent"; + case APPEND_BLOCK_EVENT: + return "AppendBlockEvent"; + case EXEC_LOAD_EVENT: + return "ExecLoadEvent"; + case DELETE_FILE_EVENT: + return "DeleteFileEvent"; + case NEW_LOAD_EVENT: + return "NewLoadEvent"; + case RAND_EVENT: + return "RandEvent"; + case USER_VAR_EVENT: + return "UserVarEvent"; + case FORMAT_DESCRIPTION_EVENT: + return "FormatDescriptionEvent"; + case XID_EVENT: + return "XIDEvent"; + case BEGIN_LOAD_QUERY_EVENT: + return "BeginLoadQueryEvent"; + case EXECUTE_LOAD_QUERY_EVENT: + return "ExecuteLoadQueryEvent"; + case TABLE_MAP_EVENT: + return "TableMapEvent"; + case WRITE_ROWS_EVENT_V0: + return "WriteRowsEventV0"; + case UPDATE_ROWS_EVENT_V0: + return "UpdateRowsEventV0"; + case DELETE_ROWS_EVENT_V0: + return "DeleteRowsEventV0"; + case WRITE_ROWS_EVENT_V1: + return "WriteRowsEventV1"; + case UPDATE_ROWS_EVENT_V1: + return "UpdateRowsEventV1"; + case DELETE_ROWS_EVENT_V1: + return "DeleteRowsEventV1"; + case INCIDENT_EVENT: + return "IncidentEvent"; + case HEARTBEAT_EVENT: + return "HeartbeatEvent"; + case IGNORABLE_EVENT: + return "IgnorableEvent"; + case ROWS_QUERY_EVENT: + return "RowsQueryEvent"; + case WRITE_ROWS_EVENT_V2: + return "WriteRowsEventV2"; + case UPDATE_ROWS_EVENT_V2: + return "UpdateRowsEventV2"; + case DELETE_ROWS_EVENT_V2: + return "DeleteRowsEventV2"; + case GTID_EVENT: + return "GTIDEvent"; + case ANONYMOUS_GTID_EVENT: + return "AnonymousGTIDEvent"; + case PREVIOUS_GTIDS_EVENT: + return "PreviousGTIDsEvent"; + case TRANSACTION_CONTEXT_EVENT: + return "TransactionContextEvent"; + case VIEW_CHANGE_EVENT: + return "ViewChangeEvent"; + case XA_PREPARE_LOG_EVENT: + return "XAPrepareLogEvent"; + case MARIA_ANNOTATE_ROWS_EVENT: + return "MariaAnnotateRowsEvent"; + case MARIA_BINLOG_CHECKPOINT_EVENT: + return "MariaBinlogCheckpointEvent"; + case MARIA_GTID_EVENT: + return "MariaGTIDEvent"; + case MARIA_GTID_LIST_EVENT: + return "MariaGTIDListEvent"; + case MARIA_START_ENCRYPTION_EVENT: + return "MariaStartEncryptionEvent"; + default: + break; + } + return std::string("Unknown event: ") + std::to_string(static_cast(type)); + } + + enum MySQLEventType + { + MYSQL_UNHANDLED_EVENT = 0, + MYSQL_QUERY_EVENT = 1, + MYSQL_WRITE_ROWS_EVENT = 2, + MYSQL_UPDATE_ROWS_EVENT = 3, + MYSQL_DELETE_ROWS_EVENT = 4, + }; class ReplicationError : public DB::Exception { @@ -114,10 +301,8 @@ namespace MySQLReplication virtual ~EventBase() = default; virtual void print() const = 0; virtual void parseHeader(ReadBuffer & payload) { header.parse(payload); } - virtual void parseEvent(ReadBuffer & payload) { parseImpl(payload); } - - EventType type() const { return header.type; } + virtual MySQLEventType type() const { return MYSQL_UNHANDLED_EVENT; } protected: virtual void parseImpl(ReadBuffer & payload) = 0; @@ -125,7 +310,7 @@ namespace MySQLReplication class FormatDescriptionEvent : public EventBase { - public: + protected: UInt16 binlog_version; String server_version; UInt32 create_timestamp; @@ -133,8 +318,6 @@ namespace MySQLReplication String event_type_header_length; void print() const override; - - protected: void parseImpl(ReadBuffer & payload) override; private: @@ -166,6 +349,7 @@ namespace MySQLReplication String query; void print() const override; + MySQLEventType type() const override { return MYSQL_QUERY_EVENT; } protected: void parseImpl(ReadBuffer & payload) override; @@ -173,12 +357,10 @@ namespace MySQLReplication class XIDEvent : public EventBase { - public: + protected: UInt64 xid; void print() const override; - - protected: void parseImpl(ReadBuffer & payload) override; }; @@ -206,26 +388,29 @@ namespace MySQLReplication class RowsEvent : public EventBase { public: - UInt64 table_id; - UInt16 flags; - UInt16 extra_data_len; UInt32 number_columns; String schema; String table; - String columns_present_bitmap1; - String columns_present_bitmap2; std::vector rows; - RowsEvent(std::shared_ptr table_map_) : table_map(table_map_) + RowsEvent(std::shared_ptr table_map_) + : number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_) { schema = table_map->schema; table = table_map->table; } + void print() const override; protected: + UInt64 table_id; + UInt16 flags; + UInt16 extra_data_len; + Bitmap columns_present_bitmap1; + Bitmap columns_present_bitmap2; + void parseImpl(ReadBuffer & payload) override; - void parseRow(ReadBuffer & payload, String bitmap); + void parseRow(ReadBuffer & payload, Bitmap & bitmap); private: std::shared_ptr table_map; @@ -235,18 +420,21 @@ namespace MySQLReplication { public: WriteRowsEvent(std::shared_ptr table_map_) : RowsEvent(table_map_) { } + MySQLEventType type() const override { return MYSQL_WRITE_ROWS_EVENT; } }; class DeleteRowsEvent : public RowsEvent { public: DeleteRowsEvent(std::shared_ptr table_map_) : RowsEvent(table_map_) { } + MySQLEventType type() const override { return MYSQL_DELETE_ROWS_EVENT; } }; class UpdateRowsEvent : public RowsEvent { public: UpdateRowsEvent(std::shared_ptr table_map_) : RowsEvent(table_map_) { } + MySQLEventType type() const override { return MYSQL_UPDATE_ROWS_EVENT; } }; class DryRunEvent : public EventBase @@ -274,6 +462,7 @@ namespace MySQLReplication virtual String getName() const = 0; virtual Position getPosition() const = 0; virtual BinlogEventPtr readOneEvent() = 0; + virtual void setReplicateDatabase(String db) = 0; virtual ~IFlavor() = default; }; @@ -282,14 +471,18 @@ namespace MySQLReplication public: BinlogEventPtr event; + void readPayloadImpl(ReadBuffer & payload) override; + void setReplicateDatabase(String db) override; String getName() const override { return "MySQL"; } Position getPosition() const override { return position; } - void readPayloadImpl(ReadBuffer & payload) override; BinlogEventPtr readOneEvent() override { return event; } private: Position position; + String replicate_do_db; std::shared_ptr table_map; + + inline bool do_replicate() { return (replicate_do_db.empty() || (table_map->schema == replicate_do_db)); } }; } diff --git a/src/Core/tests/mysql_protocol.cpp b/src/Core/tests/mysql_protocol.cpp index 1c6b61b742e..7b081f50602 100644 --- a/src/Core/tests/mysql_protocol.cpp +++ b/src/Core/tests/mysql_protocol.cpp @@ -162,35 +162,68 @@ int main(int, char **) } { - UInt32 slave_id = 9004; - MySQLClient slave("127.0.0.1", 9001, "default", "123"); - if (!slave.connect()) + try { - std::cerr << "Connect Error: " << slave.error() << std::endl; + UInt32 slave_id = 9004; + MySQLClient slave("127.0.0.1", 9001, "default", "123"); + if (!slave.connect()) + { + std::cerr << "Connect Error: " << slave.error() << std::endl; + return 1; + } + + if (!slave.startBinlogDump(slave_id, "", "", 4)) + { + std::cerr << "Connect Error: " << slave.error() << std::endl; + return 1; + } + + while (true) + { + auto event = slave.readOneBinlogEvent(); + switch (event->type()) + { + case MYSQL_QUERY_EVENT: { + auto binlogEvent = std::dynamic_pointer_cast(event); + binlogEvent->print(); + + Position pos = slave.getPosition(); + std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; + break; + } + case MYSQL_WRITE_ROWS_EVENT: { + auto binlogEvent = std::dynamic_pointer_cast(event); + binlogEvent->print(); + + Position pos = slave.getPosition(); + std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; + break; + } + case MYSQL_UPDATE_ROWS_EVENT: { + auto binlogEvent = std::dynamic_pointer_cast(event); + binlogEvent->print(); + + Position pos = slave.getPosition(); + std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; + break; + } + case MYSQL_DELETE_ROWS_EVENT: { + auto binlogEvent = std::dynamic_pointer_cast(event); + binlogEvent->print(); + + Position pos = slave.getPosition(); + std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; + break; + } + default: + break; + } + } + } + catch (const Exception & ex) + { + std::cerr << "Error: " << ex.message() << std::endl; return 1; } - - if (!slave.ping()) - { - std::cerr << "Connect Error: " << slave.error() << std::endl; - return 1; - } - - if (!slave.startBinlogDump(slave_id, "", 4)) - { - std::cerr << "Connect Error: " << slave.error() << std::endl; - assert(0); - } - - while (true) - { - auto event = slave.readOneBinlogEvent(); - ASSERT(event != nullptr) - event->print(); - std::cerr << "Binlog Name: " << slave.getPosition().binlog_name << std::endl; - std::cerr << "Binlog Pos: " << slave.getPosition().binlog_pos << std::endl; - } } - - return 0; }