diff --git a/src/Core/MySQL/IMySQLReadPacket.cpp b/src/Core/MySQL/IMySQLReadPacket.cpp index 8fc8855c8a4..5f6bbc7bceb 100644 --- a/src/Core/MySQL/IMySQLReadPacket.cpp +++ b/src/Core/MySQL/IMySQLReadPacket.cpp @@ -50,21 +50,22 @@ uint64_t readLengthEncodedNumber(ReadBuffer & buffer) uint64_t buf = 0; buffer.readStrict(c); auto cc = static_cast(c); - if (cc < 0xfc) + switch (cc) { - return cc; - } - else if (cc < 0xfd) - { - buffer.readStrict(reinterpret_cast(&buf), 2); - } - else if (cc < 0xfe) - { - buffer.readStrict(reinterpret_cast(&buf), 3); - } - else - { - buffer.readStrict(reinterpret_cast(&buf), 8); + /// NULL + case 0xfb: + break; + case 0xfc: + buffer.readStrict(reinterpret_cast(&buf), 2); + break; + case 0xfd: + buffer.readStrict(reinterpret_cast(&buf), 3); + break; + case 0xfe: + buffer.readStrict(reinterpret_cast(&buf), 8); + break; + default: + return cc; } return buf; } diff --git a/src/Core/MySQL/MySQLReplication.cpp b/src/Core/MySQL/MySQLReplication.cpp index 42d077260f8..e7f113ba7af 100644 --- a/src/Core/MySQL/MySQLReplication.cpp +++ b/src/Core/MySQL/MySQLReplication.cpp @@ -171,7 +171,7 @@ namespace MySQLReplication /// Ignore MySQL 8.0 optional metadata fields. /// https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ - payload.ignore(payload.available() - CHECKSUM_CRC32_SIGNATURE_LENGTH); + payload.ignoreAll(); } /// Types that do not used in the binlog event: @@ -221,6 +221,7 @@ namespace MySQLReplication } case MYSQL_TYPE_NEWDECIMAL: case MYSQL_TYPE_STRING: { + /// Big-Endian auto b0 = UInt16(meta[pos] << 8); auto b1 = UInt8(meta[pos + 1]); column_meta.emplace_back(UInt16(b0 + b1)); @@ -231,6 +232,7 @@ namespace MySQLReplication case MYSQL_TYPE_BIT: case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: { + /// Little-Endian auto b0 = UInt8(meta[pos]); auto b1 = UInt16(meta[pos + 1] << 8); column_meta.emplace_back(UInt16(b0 + b1)); @@ -911,7 +913,7 @@ namespace MySQLReplication break; } } - payload.tryIgnore(CHECKSUM_CRC32_SIGNATURE_LENGTH); + payload.ignoreAll(); } } diff --git a/src/Core/tests/mysql_protocol.cpp b/src/Core/tests/mysql_protocol.cpp index acae8603c40..6cad095fc85 100644 --- a/src/Core/tests/mysql_protocol.cpp +++ b/src/Core/tests/mysql_protocol.cpp @@ -283,6 +283,7 @@ int main(int argc, char ** argv) } { + /// mysql_protocol --host=172.17.0.3 --user=root --password=123 --db=sbtest try { boost::program_options::options_description desc("Allowed options"); diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 851ea351876..465a7cb912a 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -195,6 +195,7 @@ void MaterializeMySQLSyncThread::synchronization(const String & mysql_version) } catch (...) { + client.disconnect(); tryLogCurrentException(log); getDatabase(database_name).setException(std::current_exception()); } @@ -206,6 +207,7 @@ void MaterializeMySQLSyncThread::stopSynchronization() { sync_quit = true; background_thread_pool->join(); + client.disconnect(); } }