Merge pull request #14852 from zhang2014/fix/mysql_protocol_parser

Fix parse MySQL binlog event for MaterializeMySQL
This commit is contained in:
tavplubix 2020-09-17 19:17:30 +03:00 committed by GitHub
commit 5a890d7377
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 268 additions and 74 deletions

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypeString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/MySQLBinlogEventReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <common/DateLUT.h>
#include <Common/FieldVisitors.h>
@ -14,6 +15,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
extern const int LOGICAL_ERROR;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}
namespace MySQLReplication
@ -49,14 +51,13 @@ namespace MySQLReplication
{
payload.readStrict(reinterpret_cast<char *>(&binlog_version), 2);
assert(binlog_version == EVENT_VERSION_V4);
server_version.resize(50);
payload.readStrict(reinterpret_cast<char *>(server_version.data()), 50);
payload.readStrict(reinterpret_cast<char *>(&create_timestamp), 4);
payload.readStrict(reinterpret_cast<char *>(&event_header_length), 1);
assert(event_header_length == EVENT_HEADER_LENGTH);
size_t len = header.event_size - (2 + 50 + 4 + 1 + EVENT_HEADER_LENGTH) - 1;
event_type_header_length.resize(len);
payload.readStrict(reinterpret_cast<char *>(event_type_header_length.data()), len);
readStringUntilEOF(event_type_header_length, payload);
}
void FormatDescriptionEvent::dump(std::ostream & out) const
@ -72,9 +73,7 @@ namespace MySQLReplication
void RotateEvent::parseImpl(ReadBuffer & payload)
{
payload.readStrict(reinterpret_cast<char *>(&position), 8);
size_t len = header.event_size - EVENT_HEADER_LENGTH - 8 - CHECKSUM_CRC32_SIGNATURE_LENGTH;
next_binlog.resize(len);
payload.readStrict(reinterpret_cast<char *>(next_binlog.data()), len);
readStringUntilEOF(next_binlog, payload);
}
void RotateEvent::dump(std::ostream & out) const
@ -100,9 +99,7 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(schema.data()), schema_len);
payload.ignore(1);
size_t len = payload.available() - CHECKSUM_CRC32_SIGNATURE_LENGTH;
query.resize(len);
payload.readStrict(reinterpret_cast<char *>(query.data()), len);
readStringUntilEOF(query, payload);
if (query.starts_with("BEGIN") || query.starts_with("COMMIT"))
{
typ = QUERY_EVENT_MULTI_TXN_FLAG;
@ -285,7 +282,7 @@ namespace MySQLReplication
break;
}
while (payload.available() > CHECKSUM_CRC32_SIGNATURE_LENGTH)
while (!payload.eof())
{
parseRow(payload, columns_present_bitmap1);
if (header.type == UPDATE_ROWS_EVENT_V1 || header.type == UPDATE_ROWS_EVENT_V2)
@ -738,7 +735,7 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(&gtid.seq_no), 8);
/// Skip others.
payload.ignore(payload.available() - CHECKSUM_CRC32_SIGNATURE_LENGTH);
payload.ignoreAll();
}
void GTIDEvent::dump(std::ostream & out) const
@ -751,7 +748,7 @@ namespace MySQLReplication
out << "GTID Next: " << gtid_next << std::endl;
}
void DryRunEvent::parseImpl(ReadBuffer & payload) { payload.ignore(header.event_size - EVENT_HEADER_LENGTH); }
void DryRunEvent::parseImpl(ReadBuffer & payload) { payload.ignoreAll(); }
void DryRunEvent::dump(std::ostream & out) const
{
@ -804,6 +801,9 @@ namespace MySQLReplication
void MySQLFlavor::readPayloadImpl(ReadBuffer & payload)
{
if (payload.eof())
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
UInt16 header = static_cast<unsigned char>(*payload.position());
switch (header)
{
@ -814,37 +814,42 @@ namespace MySQLReplication
err.readPayloadWithUnpacked(payload);
throw ReplicationError(err.error_message, ErrorCodes::UNKNOWN_EXCEPTION);
}
// skip the header flag.
// skip the generic response packets header flag.
payload.ignore(1);
EventType event_type = static_cast<EventType>(*(payload.position() + 4));
switch (event_type)
MySQLBinlogEventReadBuffer event_payload(payload);
EventHeader event_header;
event_header.parse(event_payload);
switch (event_header.type)
{
case FORMAT_DESCRIPTION_EVENT: {
event = std::make_shared<FormatDescriptionEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
case FORMAT_DESCRIPTION_EVENT:
{
event = std::make_shared<FormatDescriptionEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
case ROTATE_EVENT: {
event = std::make_shared<RotateEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
case ROTATE_EVENT:
{
event = std::make_shared<RotateEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
case QUERY_EVENT: {
event = std::make_shared<QueryEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
case QUERY_EVENT:
{
event = std::make_shared<QueryEvent>(std::move(event_header));
event->parseEvent(event_payload);
auto query = std::static_pointer_cast<QueryEvent>(event);
switch (query->typ)
{
case QUERY_EVENT_MULTI_TXN_FLAG:
case QUERY_EVENT_XA: {
event = std::make_shared<DryRunEvent>();
case QUERY_EVENT_XA:
{
event = std::make_shared<DryRunEvent>(std::move(query->header));
break;
}
default:
@ -852,68 +857,67 @@ namespace MySQLReplication
}
break;
}
case XID_EVENT: {
event = std::make_shared<XIDEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
case XID_EVENT:
{
event = std::make_shared<XIDEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
case TABLE_MAP_EVENT: {
event = std::make_shared<TableMapEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
case TABLE_MAP_EVENT:
{
event = std::make_shared<TableMapEvent>(std::move(event_header));
event->parseEvent(event_payload);
table_map = std::static_pointer_cast<TableMapEvent>(event);
break;
}
case WRITE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT_V2: {
case WRITE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<WriteRowsEvent>(table_map);
event = std::make_shared<WriteRowsEvent>(table_map, std::move(event_header));
else
event = std::make_shared<DryRunEvent>();
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseHeader(payload);
event->parseEvent(payload);
event->parseEvent(event_payload);
break;
}
case DELETE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V2: {
case DELETE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<DeleteRowsEvent>(table_map);
event = std::make_shared<DeleteRowsEvent>(table_map, std::move(event_header));
else
event = std::make_shared<DryRunEvent>();
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseHeader(payload);
event->parseEvent(payload);
event->parseEvent(event_payload);
break;
}
case UPDATE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V2: {
case UPDATE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<UpdateRowsEvent>(table_map);
event = std::make_shared<UpdateRowsEvent>(table_map, std::move(event_header));
else
event = std::make_shared<DryRunEvent>();
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseHeader(payload);
event->parseEvent(payload);
event->parseEvent(event_payload);
break;
}
case GTID_EVENT: {
event = std::make_shared<GTIDEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
case GTID_EVENT:
{
event = std::make_shared<GTIDEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
default: {
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
default:
{
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseEvent(event_payload);
break;
}
}
payload.ignoreAll();
}
}

View File

@ -19,7 +19,6 @@ namespace MySQLReplication
{
static const int EVENT_VERSION_V4 = 4;
static const int EVENT_HEADER_LENGTH = 19;
static const int CHECKSUM_CRC32_SIGNATURE_LENGTH = 4;
using Bitmap = boost::dynamic_bitset<>;
@ -301,9 +300,10 @@ namespace MySQLReplication
public:
EventHeader header;
EventBase(EventHeader && header_) : header(std::move(header_)) {}
virtual ~EventBase() = default;
virtual void dump(std::ostream & out) const = 0;
virtual void parseHeader(ReadBuffer & payload) { header.parse(payload); }
virtual void parseEvent(ReadBuffer & payload) { parseImpl(payload); }
virtual MySQLEventType type() const { return MYSQL_UNHANDLED_EVENT; }
@ -314,7 +314,10 @@ namespace MySQLReplication
class FormatDescriptionEvent : public EventBase
{
public:
FormatDescriptionEvent() : binlog_version(0), create_timestamp(0), event_header_length(0) { }
FormatDescriptionEvent(EventHeader && header_)
: EventBase(std::move(header_)), binlog_version(0), create_timestamp(0), event_header_length(0)
{
}
protected:
UInt16 binlog_version;
@ -336,7 +339,7 @@ namespace MySQLReplication
UInt64 position;
String next_binlog;
RotateEvent() : position(0) { }
RotateEvent(EventHeader && header_) : EventBase(std::move(header_)), position(0) {}
void dump(std::ostream & out) const override;
protected:
@ -363,7 +366,11 @@ namespace MySQLReplication
String query;
QueryType typ = QUERY_EVENT_DDL;
QueryEvent() : thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0) { }
QueryEvent(EventHeader && header_)
: EventBase(std::move(header_)), thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0)
{
}
void dump(std::ostream & out) const override;
MySQLEventType type() const override { return MYSQL_QUERY_EVENT; }
@ -374,7 +381,7 @@ namespace MySQLReplication
class XIDEvent : public EventBase
{
public:
XIDEvent() : xid(0) { }
XIDEvent(EventHeader && header_) : EventBase(std::move(header_)), xid(0) {}
protected:
UInt64 xid;
@ -397,7 +404,7 @@ namespace MySQLReplication
std::vector<UInt16> column_meta;
Bitmap null_bitmap;
TableMapEvent() : table_id(0), flags(0), schema_len(0), table_len(0), column_count(0) { }
TableMapEvent(EventHeader && header_) : EventBase(std::move(header_)), table_id(0), flags(0), schema_len(0), table_len(0), column_count(0) {}
void dump(std::ostream & out) const override;
protected:
@ -413,8 +420,8 @@ namespace MySQLReplication
String table;
std::vector<Field> rows;
RowsEvent(std::shared_ptr<TableMapEvent> table_map_)
: number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_)
RowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_)
: EventBase(std::move(header_)), number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_)
{
schema = table_map->schema;
table = table_map->table;
@ -439,21 +446,21 @@ namespace MySQLReplication
class WriteRowsEvent : public RowsEvent
{
public:
WriteRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
WriteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {}
MySQLEventType type() const override { return MYSQL_WRITE_ROWS_EVENT; }
};
class DeleteRowsEvent : public RowsEvent
{
public:
DeleteRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
DeleteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {}
MySQLEventType type() const override { return MYSQL_DELETE_ROWS_EVENT; }
};
class UpdateRowsEvent : public RowsEvent
{
public:
UpdateRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
UpdateRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {}
MySQLEventType type() const override { return MYSQL_UPDATE_ROWS_EVENT; }
};
@ -463,7 +470,7 @@ namespace MySQLReplication
UInt8 commit_flag;
GTID gtid;
GTIDEvent() : commit_flag(0) { }
GTIDEvent(EventHeader && header_) : EventBase(std::move(header_)), commit_flag(0) {}
void dump(std::ostream & out) const override;
protected:
@ -472,6 +479,8 @@ namespace MySQLReplication
class DryRunEvent : public EventBase
{
public:
DryRunEvent(EventHeader && header_) : EventBase(std::move(header_)) {}
void dump(std::ostream & out) const override;
protected:

View File

@ -0,0 +1,70 @@
#include <IO/MySQLBinlogEventReadBuffer.h>
namespace DB
{
MySQLBinlogEventReadBuffer::MySQLBinlogEventReadBuffer(ReadBuffer & in_)
: ReadBuffer(nullptr, 0, 0), in(in_)
{
nextIfAtEnd();
}
bool MySQLBinlogEventReadBuffer::nextImpl()
{
if (hasPendingData())
return true;
if (in.eof())
return false;
if (checksum_buff_size == checksum_buff_limit)
{
if (likely(in.available() > CHECKSUM_CRC32_SIGNATURE_LENGTH))
{
working_buffer = ReadBuffer::Buffer(in.position(), in.buffer().end() - CHECKSUM_CRC32_SIGNATURE_LENGTH);
in.ignore(working_buffer.size());
return true;
}
in.readStrict(checksum_buf, CHECKSUM_CRC32_SIGNATURE_LENGTH);
checksum_buff_size = checksum_buff_limit = CHECKSUM_CRC32_SIGNATURE_LENGTH;
}
else
{
for (size_t index = 0; index < checksum_buff_size - checksum_buff_limit; ++index)
checksum_buf[index] = checksum_buf[checksum_buff_limit + index];
checksum_buff_size -= checksum_buff_limit;
size_t read_bytes = CHECKSUM_CRC32_SIGNATURE_LENGTH - checksum_buff_size;
in.readStrict(checksum_buf + checksum_buff_size, read_bytes); /// Minimum CHECKSUM_CRC32_SIGNATURE_LENGTH bytes
checksum_buff_size = checksum_buff_limit = CHECKSUM_CRC32_SIGNATURE_LENGTH;
}
if (in.eof())
return false;
if (in.available() < CHECKSUM_CRC32_SIGNATURE_LENGTH)
{
size_t left_move_size = CHECKSUM_CRC32_SIGNATURE_LENGTH - in.available();
checksum_buff_limit = checksum_buff_size - left_move_size;
}
working_buffer = ReadBuffer::Buffer(checksum_buf, checksum_buf + checksum_buff_limit);
return true;
}
MySQLBinlogEventReadBuffer::~MySQLBinlogEventReadBuffer()
{
try
{
/// ignore last 4 bytes
nextIfAtEnd();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <IO/ReadBuffer.h>
namespace DB
{
class MySQLBinlogEventReadBuffer : public ReadBuffer
{
protected:
static const size_t CHECKSUM_CRC32_SIGNATURE_LENGTH = 4;
ReadBuffer & in;
size_t checksum_buff_size = 0;
size_t checksum_buff_limit = 0;
char checksum_buf[CHECKSUM_CRC32_SIGNATURE_LENGTH];
bool nextImpl() override;
public:
~MySQLBinlogEventReadBuffer() override;
MySQLBinlogEventReadBuffer(ReadBuffer & in_);
};
}

View File

@ -0,0 +1,82 @@
#include <gtest/gtest.h>
#include <Common/Exception.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/MySQLBinlogEventReadBuffer.h>
using namespace DB;
TEST(MySQLBinlogEventReadBuffer, CheckBoundary)
{
for (size_t index = 1; index < 4; ++index)
{
std::vector<char> memory_data(index, 0x01);
ReadBufferFromMemory nested_in(memory_data.data(), index);
EXPECT_THROW({ MySQLBinlogEventReadBuffer binlog_in(nested_in); }, Exception);
}
}
TEST(MySQLBinlogEventReadBuffer, NiceBufferSize)
{
char res[2];
std::vector<char> memory_data(6, 0x01);
ReadBufferFromMemory nested_in(memory_data.data(), 6);
MySQLBinlogEventReadBuffer binlog_in(nested_in);
binlog_in.readStrict(res, 2);
ASSERT_EQ(res[0], 0x01);
ASSERT_EQ(res[1], 0x01);
ASSERT_TRUE(binlog_in.eof());
}
TEST(MySQLBinlogEventReadBuffer, BadBufferSizes)
{
char res[4];
std::vector<ReadBufferPtr> buffers;
std::vector<ReadBuffer *> nested_buffers;
std::vector<std::shared_ptr<std::vector<char>>> memory_buffers_data;
std::vector<size_t> bad_buffers_size = {2, 1, 2, 3};
for (const auto & bad_buffer_size : bad_buffers_size)
{
memory_buffers_data.emplace_back(std::make_shared<std::vector<char>>(bad_buffer_size, 0x01));
buffers.emplace_back(std::make_shared<ReadBufferFromMemory>(memory_buffers_data.back()->data(), bad_buffer_size));
nested_buffers.emplace_back(buffers.back().get());
}
ConcatReadBuffer concat_buffer(nested_buffers);
MySQLBinlogEventReadBuffer binlog_in(concat_buffer);
binlog_in.readStrict(res, 4);
for (const auto & res_byte : res)
ASSERT_EQ(res_byte, 0x01);
ASSERT_TRUE(binlog_in.eof());
}
TEST(MySQLBinlogEventReadBuffer, NiceAndBadBufferSizes)
{
char res[12];
std::vector<ReadBufferPtr> buffers;
std::vector<ReadBuffer *> nested_buffers;
std::vector<std::shared_ptr<std::vector<char>>> memory_buffers_data;
std::vector<size_t> buffers_size = {6, 1, 3, 6};
for (const auto & bad_buffer_size : buffers_size)
{
memory_buffers_data.emplace_back(std::make_shared<std::vector<char>>(bad_buffer_size, 0x01));
buffers.emplace_back(std::make_shared<ReadBufferFromMemory>(memory_buffers_data.back()->data(), bad_buffer_size));
nested_buffers.emplace_back(buffers.back().get());
}
ConcatReadBuffer concat_buffer(nested_buffers);
MySQLBinlogEventReadBuffer binlog_in(concat_buffer);
binlog_in.readStrict(res, 12);
for (const auto & res_byte : res)
ASSERT_EQ(res_byte, 0x01);
ASSERT_TRUE(binlog_in.eof());
}

View File

@ -28,6 +28,7 @@ SRCS(
MemoryReadWriteBuffer.cpp
MMapReadBufferFromFile.cpp
MMapReadBufferFromFileDescriptor.cpp
MySQLBinlogEventReadBuffer.cpp
MySQLPacketPayloadReadBuffer.cpp
MySQLPacketPayloadWriteBuffer.cpp
NullWriteBuffer.cpp