Add DEMICAL type

This commit is contained in:
BohuTANG 2020-05-12 21:22:55 +08:00 committed by zhang2014
parent 5219a34b32
commit 5c23583cda
2 changed files with 96 additions and 7 deletions

View File

@ -1,6 +1,7 @@
#include "MySQLReplication.h"
#include <DataTypes/DataTypeString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/FieldVisitors.h>
namespace DB
@ -335,6 +336,12 @@ namespace MySQLReplication
}
}
/// Types that do not used in the binlog event:
/// MYSQL_TYPE_ENUM
/// MYSQL_TYPE_SET
/// MYSQL_TYPE_TINY_BLOB
/// MYSQL_TYPE_MEDIUM_BLOB
/// MYSQL_TYPE_LONG_BLOB
switch (field_type)
{
case MYSQL_TYPE_TINY: {
@ -490,6 +497,88 @@ namespace MySQLReplication
row.push_back(Field{UInt64{sec}});
break;
}
case MYSQL_TYPE_NEWDECIMAL: {
Int8 digits_per_integer = 9;
Int8 precision = meta >> 8;
Int8 decimals = meta & 0xff;
const char compressed_byte_map[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
Int8 integral = (precision - decimals);
UInt32 uncompressed_integers = integral / digits_per_integer;
UInt32 uncompressed_decimals = decimals / digits_per_integer;
UInt32 compressed_integers = integral - (uncompressed_integers * digits_per_integer);
UInt32 compressed_decimals = decimals - (uncompressed_decimals * digits_per_integer);
String buff;
UInt32 bytes_to_read = uncompressed_integers * 4 + compressed_byte_map[compressed_integers]
+ uncompressed_decimals * 4 + compressed_byte_map[compressed_decimals];
buff.resize(bytes_to_read);
payload.readStrict(reinterpret_cast<char *>(buff.data()), bytes_to_read);
String format;
format.resize(0);
bool is_negative = ((buff[0] & 0x80) == 0);
if (is_negative)
{
format += "-";
}
buff[0] ^= 0x80;
ReadBufferFromString reader(buff);
/// Compressed part.
if (compressed_integers != 0)
{
Int64 val = 0;
UInt32 to_read = compressed_byte_map[compressed_integers];
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), to_read);
format += std::to_string(val);
}
for (auto k = 0; k < uncompressed_integers; k++)
{
UInt32 val = 0;
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 4);
format += std::to_string(val);
}
format += ".";
for (auto k = 0; k < uncompressed_decimals; k++)
{
UInt32 val = 0;
reader.readStrict(reinterpret_cast<char *>(&val), 4);
format += std::to_string(val);
}
/// Compressed part.
if (compressed_decimals != 0)
{
Int64 val = 0;
String compressed_buff;
UInt32 to_read = compressed_byte_map[compressed_decimals];
switch (to_read)
{
case 1: {
reader.readStrict(reinterpret_cast<char *>(&val), 1);
break;
}
case 2: {
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 2);
break;
}
case 3: {
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 3);
break;
}
case 4: {
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 4);
break;
}
}
format += std::to_string(val);
}
row.push_back(Field{String{format}});
break;
}
case MYSQL_TYPE_ENUM: {
Int32 val = 0;
Int32 len = (meta & 0xff);
@ -653,7 +742,7 @@ namespace MySQLReplication
event->parseHeader(payload);
event->parseEvent(payload);
auto rotate = std::dynamic_pointer_cast<RotateEvent>(event);
auto rotate = std::static_pointer_cast<RotateEvent>(event);
position.updateLogPos(event->header.log_pos);
position.updateLogName(rotate->next_binlog);
break;
@ -663,7 +752,7 @@ namespace MySQLReplication
event->parseHeader(payload);
event->parseEvent(payload);
auto query = std::dynamic_pointer_cast<QueryEvent>(event);
auto query = std::static_pointer_cast<QueryEvent>(event);
switch (query->typ)
{
case BEGIN:
@ -688,7 +777,7 @@ namespace MySQLReplication
event->parseHeader(payload);
event->parseEvent(payload);
table_map = std::dynamic_pointer_cast<TableMapEvent>(event);
table_map = std::static_pointer_cast<TableMapEvent>(event);
position.updateLogPos(event->header.log_pos);
break;
}

View File

@ -184,7 +184,7 @@ int main(int, char **)
switch (event->type())
{
case MYSQL_QUERY_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<QueryEvent>(event);
auto binlogEvent = std::static_pointer_cast<QueryEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();
@ -192,7 +192,7 @@ int main(int, char **)
break;
}
case MYSQL_WRITE_ROWS_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<WriteRowsEvent>(event);
auto binlogEvent = std::static_pointer_cast<WriteRowsEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();
@ -200,7 +200,7 @@ int main(int, char **)
break;
}
case MYSQL_UPDATE_ROWS_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<UpdateRowsEvent>(event);
auto binlogEvent = std::static_pointer_cast<UpdateRowsEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();
@ -208,7 +208,7 @@ int main(int, char **)
break;
}
case MYSQL_DELETE_ROWS_EVENT: {
auto binlogEvent = std::dynamic_pointer_cast<DeleteRowsEvent>(event);
auto binlogEvent = std::static_pointer_cast<DeleteRowsEvent>(event);
binlogEvent->print();
Position pos = slave.getPosition();