Fix review comment

This commit is contained in:
zhang2014 2020-09-17 14:21:38 +08:00
parent 9246e77b05
commit 0c81a8777e
2 changed files with 46 additions and 39 deletions

View File

@ -15,6 +15,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
extern const int LOGICAL_ERROR;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}
namespace MySQLReplication
@ -800,6 +801,9 @@ namespace MySQLReplication
void MySQLFlavor::readPayloadImpl(ReadBuffer & payload)
{
if (payload.eof())
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
UInt16 header = static_cast<unsigned char>(*payload.position());
switch (header)
{
@ -810,33 +814,33 @@ namespace MySQLReplication
err.readPayloadWithUnpacked(payload);
throw ReplicationError(err.error_message, ErrorCodes::UNKNOWN_EXCEPTION);
}
// skip the header flag.
// skip the generic response packets header flag.
payload.ignore(1);
MySQLBinlogEventReadBuffer event_payload(payload);
EventType event_type = static_cast<EventType>(*(event_payload.position() + 4));
switch (event_type)
EventHeader event_header;
event_header.parse(event_payload);
switch (event_header.type)
{
case FORMAT_DESCRIPTION_EVENT:
{
event = std::make_shared<FormatDescriptionEvent>();
event->parseHeader(event_payload);
event = std::make_shared<FormatDescriptionEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
case ROTATE_EVENT:
{
event = std::make_shared<RotateEvent>();
event->parseHeader(event_payload);
event = std::make_shared<RotateEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
case QUERY_EVENT:
{
event = std::make_shared<QueryEvent>();
event->parseHeader(event_payload);
event = std::make_shared<QueryEvent>(std::move(event_header));
event->parseEvent(event_payload);
auto query = std::static_pointer_cast<QueryEvent>(event);
@ -845,7 +849,7 @@ namespace MySQLReplication
case QUERY_EVENT_MULTI_TXN_FLAG:
case QUERY_EVENT_XA:
{
event = std::make_shared<DryRunEvent>();
event = std::make_shared<DryRunEvent>(std::move(query->header));
break;
}
default:
@ -855,16 +859,14 @@ namespace MySQLReplication
}
case XID_EVENT:
{
event = std::make_shared<XIDEvent>();
event->parseHeader(event_payload);
event = std::make_shared<XIDEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
case TABLE_MAP_EVENT:
{
event = std::make_shared<TableMapEvent>();
event->parseHeader(event_payload);
event = std::make_shared<TableMapEvent>(std::move(event_header));
event->parseEvent(event_payload);
table_map = std::static_pointer_cast<TableMapEvent>(event);
break;
@ -873,11 +875,10 @@ namespace MySQLReplication
case WRITE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<WriteRowsEvent>(table_map);
event = std::make_shared<WriteRowsEvent>(table_map, std::move(event_header));
else
event = std::make_shared<DryRunEvent>();
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseHeader(event_payload);
event->parseEvent(event_payload);
break;
}
@ -885,11 +886,10 @@ namespace MySQLReplication
case DELETE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<DeleteRowsEvent>(table_map);
event = std::make_shared<DeleteRowsEvent>(table_map, std::move(event_header));
else
event = std::make_shared<DryRunEvent>();
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseHeader(event_payload);
event->parseEvent(event_payload);
break;
}
@ -897,26 +897,23 @@ namespace MySQLReplication
case UPDATE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<UpdateRowsEvent>(table_map);
event = std::make_shared<UpdateRowsEvent>(table_map, std::move(event_header));
else
event = std::make_shared<DryRunEvent>();
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseHeader(event_payload);
event->parseEvent(event_payload);
break;
}
case GTID_EVENT:
{
event = std::make_shared<GTIDEvent>();
event->parseHeader(event_payload);
event = std::make_shared<GTIDEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
break;
}
default:
{
event = std::make_shared<DryRunEvent>();
event->parseHeader(event_payload);
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseEvent(event_payload);
break;
}

View File

@ -300,9 +300,10 @@ namespace MySQLReplication
public:
EventHeader header;
EventBase(EventHeader && header_) : header(std::move(header_)) {}
virtual ~EventBase() = default;
virtual void dump(std::ostream & out) const = 0;
virtual void parseHeader(ReadBuffer & payload) { header.parse(payload); }
virtual void parseEvent(ReadBuffer & payload) { parseImpl(payload); }
virtual MySQLEventType type() const { return MYSQL_UNHANDLED_EVENT; }
@ -313,7 +314,10 @@ namespace MySQLReplication
class FormatDescriptionEvent : public EventBase
{
public:
FormatDescriptionEvent() : binlog_version(0), create_timestamp(0), event_header_length(0) { }
FormatDescriptionEvent(EventHeader && header_)
: EventBase(std::move(header_)), binlog_version(0), create_timestamp(0), event_header_length(0)
{
}
protected:
UInt16 binlog_version;
@ -335,7 +339,7 @@ namespace MySQLReplication
UInt64 position;
String next_binlog;
RotateEvent() : position(0) { }
RotateEvent(EventHeader && header_) : EventBase(std::move(header_)), position(0) {}
void dump(std::ostream & out) const override;
protected:
@ -362,7 +366,11 @@ namespace MySQLReplication
String query;
QueryType typ = QUERY_EVENT_DDL;
QueryEvent() : thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0) { }
QueryEvent(EventHeader && header_)
: EventBase(std::move(header_)), thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0)
{
}
void dump(std::ostream & out) const override;
MySQLEventType type() const override { return MYSQL_QUERY_EVENT; }
@ -373,7 +381,7 @@ namespace MySQLReplication
class XIDEvent : public EventBase
{
public:
XIDEvent() : xid(0) { }
XIDEvent(EventHeader && header_) : EventBase(std::move(header_)), xid(0) {}
protected:
UInt64 xid;
@ -396,7 +404,7 @@ namespace MySQLReplication
std::vector<UInt16> column_meta;
Bitmap null_bitmap;
TableMapEvent() : table_id(0), flags(0), schema_len(0), table_len(0), column_count(0) { }
TableMapEvent(EventHeader && header_) : EventBase(std::move(header_)), table_id(0), flags(0), schema_len(0), table_len(0), column_count(0) {}
void dump(std::ostream & out) const override;
protected:
@ -412,8 +420,8 @@ namespace MySQLReplication
String table;
std::vector<Field> rows;
RowsEvent(std::shared_ptr<TableMapEvent> table_map_)
: number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_)
RowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_)
: EventBase(std::move(header_)), number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_)
{
schema = table_map->schema;
table = table_map->table;
@ -438,21 +446,21 @@ namespace MySQLReplication
class WriteRowsEvent : public RowsEvent
{
public:
WriteRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
WriteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {}
MySQLEventType type() const override { return MYSQL_WRITE_ROWS_EVENT; }
};
class DeleteRowsEvent : public RowsEvent
{
public:
DeleteRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
DeleteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {}
MySQLEventType type() const override { return MYSQL_DELETE_ROWS_EVENT; }
};
class UpdateRowsEvent : public RowsEvent
{
public:
UpdateRowsEvent(std::shared_ptr<TableMapEvent> table_map_) : RowsEvent(table_map_) { }
UpdateRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {}
MySQLEventType type() const override { return MYSQL_UPDATE_ROWS_EVENT; }
};
@ -462,7 +470,7 @@ namespace MySQLReplication
UInt8 commit_flag;
GTID gtid;
GTIDEvent() : commit_flag(0) { }
GTIDEvent(EventHeader && header_) : EventBase(std::move(header_)), commit_flag(0) {}
void dump(std::ostream & out) const override;
protected:
@ -471,6 +479,8 @@ namespace MySQLReplication
class DryRunEvent : public EventBase
{
public:
DryRunEvent(EventHeader && header_) : EventBase(std::move(header_)) {}
void dump(std::ostream & out) const override;
protected: