diff --git a/src/Core/MySQLReplication.cpp b/src/Core/MySQLReplication.cpp index 4aead1974f8..1914c96b3bf 100644 --- a/src/Core/MySQLReplication.cpp +++ b/src/Core/MySQLReplication.cpp @@ -1,6 +1,7 @@ #include "MySQLReplication.h" #include +#include #include namespace DB @@ -335,6 +336,12 @@ namespace MySQLReplication } } + /// Types that do not used in the binlog event: + /// MYSQL_TYPE_ENUM + /// MYSQL_TYPE_SET + /// MYSQL_TYPE_TINY_BLOB + /// MYSQL_TYPE_MEDIUM_BLOB + /// MYSQL_TYPE_LONG_BLOB switch (field_type) { case MYSQL_TYPE_TINY: { @@ -490,6 +497,88 @@ namespace MySQLReplication row.push_back(Field{UInt64{sec}}); break; } + case MYSQL_TYPE_NEWDECIMAL: { + Int8 digits_per_integer = 9; + Int8 precision = meta >> 8; + Int8 decimals = meta & 0xff; + const char compressed_byte_map[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4}; + + Int8 integral = (precision - decimals); + UInt32 uncompressed_integers = integral / digits_per_integer; + UInt32 uncompressed_decimals = decimals / digits_per_integer; + UInt32 compressed_integers = integral - (uncompressed_integers * digits_per_integer); + UInt32 compressed_decimals = decimals - (uncompressed_decimals * digits_per_integer); + + String buff; + UInt32 bytes_to_read = uncompressed_integers * 4 + compressed_byte_map[compressed_integers] + + uncompressed_decimals * 4 + compressed_byte_map[compressed_decimals]; + buff.resize(bytes_to_read); + payload.readStrict(reinterpret_cast(buff.data()), bytes_to_read); + + String format; + format.resize(0); + + bool is_negative = ((buff[0] & 0x80) == 0); + if (is_negative) + { + format += "-"; + } + buff[0] ^= 0x80; + + ReadBufferFromString reader(buff); + /// Compressed part. + if (compressed_integers != 0) + { + Int64 val = 0; + UInt32 to_read = compressed_byte_map[compressed_integers]; + readBigEndianStrict(reader, reinterpret_cast(&val), to_read); + format += std::to_string(val); + } + + for (auto k = 0; k < uncompressed_integers; k++) + { + UInt32 val = 0; + readBigEndianStrict(reader, reinterpret_cast(&val), 4); + format += std::to_string(val); + } + format += "."; + for (auto k = 0; k < uncompressed_decimals; k++) + { + UInt32 val = 0; + reader.readStrict(reinterpret_cast(&val), 4); + format += std::to_string(val); + } + + /// Compressed part. + if (compressed_decimals != 0) + { + Int64 val = 0; + String compressed_buff; + UInt32 to_read = compressed_byte_map[compressed_decimals]; + switch (to_read) + { + case 1: { + reader.readStrict(reinterpret_cast(&val), 1); + break; + } + case 2: { + readBigEndianStrict(reader, reinterpret_cast(&val), 2); + break; + } + case 3: { + readBigEndianStrict(reader, reinterpret_cast(&val), 3); + break; + } + case 4: { + readBigEndianStrict(reader, reinterpret_cast(&val), 4); + break; + } + } + format += std::to_string(val); + } + row.push_back(Field{String{format}}); + break; + } case MYSQL_TYPE_ENUM: { Int32 val = 0; Int32 len = (meta & 0xff); @@ -653,7 +742,7 @@ namespace MySQLReplication event->parseHeader(payload); event->parseEvent(payload); - auto rotate = std::dynamic_pointer_cast(event); + auto rotate = std::static_pointer_cast(event); position.updateLogPos(event->header.log_pos); position.updateLogName(rotate->next_binlog); break; @@ -663,7 +752,7 @@ namespace MySQLReplication event->parseHeader(payload); event->parseEvent(payload); - auto query = std::dynamic_pointer_cast(event); + auto query = std::static_pointer_cast(event); switch (query->typ) { case BEGIN: @@ -688,7 +777,7 @@ namespace MySQLReplication event->parseHeader(payload); event->parseEvent(payload); - table_map = std::dynamic_pointer_cast(event); + table_map = std::static_pointer_cast(event); position.updateLogPos(event->header.log_pos); break; } diff --git a/src/Core/tests/mysql_protocol.cpp b/src/Core/tests/mysql_protocol.cpp index 7b081f50602..89cd07d260e 100644 --- a/src/Core/tests/mysql_protocol.cpp +++ b/src/Core/tests/mysql_protocol.cpp @@ -184,7 +184,7 @@ int main(int, char **) switch (event->type()) { case MYSQL_QUERY_EVENT: { - auto binlogEvent = std::dynamic_pointer_cast(event); + auto binlogEvent = std::static_pointer_cast(event); binlogEvent->print(); Position pos = slave.getPosition(); @@ -192,7 +192,7 @@ int main(int, char **) break; } case MYSQL_WRITE_ROWS_EVENT: { - auto binlogEvent = std::dynamic_pointer_cast(event); + auto binlogEvent = std::static_pointer_cast(event); binlogEvent->print(); Position pos = slave.getPosition(); @@ -200,7 +200,7 @@ int main(int, char **) break; } case MYSQL_UPDATE_ROWS_EVENT: { - auto binlogEvent = std::dynamic_pointer_cast(event); + auto binlogEvent = std::static_pointer_cast(event); binlogEvent->print(); Position pos = slave.getPosition(); @@ -208,7 +208,7 @@ int main(int, char **) break; } case MYSQL_DELETE_ROWS_EVENT: { - auto binlogEvent = std::dynamic_pointer_cast(event); + auto binlogEvent = std::static_pointer_cast(event); binlogEvent->print(); Position pos = slave.getPosition();