Fix handling of TABLE_MAP_EVENT from MySQL

The MySQL replication code assumed that row update events would be
preceded by a single TABLE_MAP_EVENT.  However, if a single SQL
statement modifies rows in multiple tables, MySQL will first send
table map events for all involved tables, and then row update events.

Depending on circumstances, this could lead to an exception when the row
update was processed, the update could be incorrectly dropped, or the
update could be applied to the wrong table.
This commit is contained in:
Haavard Kvaalen 2021-02-04 11:37:12 +01:00
parent c13759dd85
commit e6f1ce48fe
4 changed files with 86 additions and 16 deletions

View File

@ -136,6 +136,7 @@ namespace MySQLReplication
out << "XID: " << this->xid << '\n'; out << "XID: " << this->xid << '\n';
} }
/// https://dev.mysql.com/doc/internals/en/table-map-event.html
void TableMapEvent::parseImpl(ReadBuffer & payload) void TableMapEvent::parseImpl(ReadBuffer & payload)
{ {
payload.readStrict(reinterpret_cast<char *>(&table_id), 6); payload.readStrict(reinterpret_cast<char *>(&table_id), 6);
@ -257,15 +258,19 @@ namespace MySQLReplication
out << "Null Bitmap: " << bitmap_str << '\n'; out << "Null Bitmap: " << bitmap_str << '\n';
} }
void RowsEvent::parseImpl(ReadBuffer & payload) void RowsEventHeader::parse(ReadBuffer & payload)
{ {
payload.readStrict(reinterpret_cast<char *>(&table_id), 6); payload.readStrict(reinterpret_cast<char *>(&table_id), 6);
payload.readStrict(reinterpret_cast<char *>(&flags), 2); payload.readStrict(reinterpret_cast<char *>(&flags), 2);
UInt16 extra_data_len;
/// This extra_data_len contains the 2 bytes length. /// This extra_data_len contains the 2 bytes length.
payload.readStrict(reinterpret_cast<char *>(&extra_data_len), 2); payload.readStrict(reinterpret_cast<char *>(&extra_data_len), 2);
payload.ignore(extra_data_len - 2); payload.ignore(extra_data_len - 2);
}
void RowsEvent::parseImpl(ReadBuffer & payload)
{
number_columns = readLengthEncodedNumber(payload); number_columns = readLengthEncodedNumber(payload);
size_t columns_bitmap_size = (number_columns + 7) / 8; size_t columns_bitmap_size = (number_columns + 7) / 8;
switch (header.type) switch (header.type)
@ -795,37 +800,50 @@ namespace MySQLReplication
{ {
event = std::make_shared<TableMapEvent>(std::move(event_header)); event = std::make_shared<TableMapEvent>(std::move(event_header));
event->parseEvent(event_payload); event->parseEvent(event_payload);
table_map = std::static_pointer_cast<TableMapEvent>(event); auto table_map = std::static_pointer_cast<TableMapEvent>(event);
table_maps[table_map->table_id] = table_map;
break; break;
} }
case WRITE_ROWS_EVENT_V1: case WRITE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT_V2: { case WRITE_ROWS_EVENT_V2: {
if (doReplicate()) RowsEventHeader rows_header(event_header.type);
event = std::make_shared<WriteRowsEvent>(table_map, std::move(event_header)); rows_header.parse(event_payload);
if (doReplicate(rows_header.table_id))
event = std::make_shared<WriteRowsEvent>(table_maps.at(rows_header.table_id), std::move(event_header), rows_header);
else else
event = std::make_shared<DryRunEvent>(std::move(event_header)); event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseEvent(event_payload); event->parseEvent(event_payload);
if (rows_header.flags & ROWS_END_OF_STATEMENT)
table_maps.clear();
break; break;
} }
case DELETE_ROWS_EVENT_V1: case DELETE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V2: { case DELETE_ROWS_EVENT_V2: {
if (doReplicate()) RowsEventHeader rows_header(event_header.type);
event = std::make_shared<DeleteRowsEvent>(table_map, std::move(event_header)); rows_header.parse(event_payload);
if (doReplicate(rows_header.table_id))
event = std::make_shared<DeleteRowsEvent>(table_maps.at(rows_header.table_id), std::move(event_header), rows_header);
else else
event = std::make_shared<DryRunEvent>(std::move(event_header)); event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseEvent(event_payload); event->parseEvent(event_payload);
if (rows_header.flags & ROWS_END_OF_STATEMENT)
table_maps.clear();
break; break;
} }
case UPDATE_ROWS_EVENT_V1: case UPDATE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V2: { case UPDATE_ROWS_EVENT_V2: {
if (doReplicate()) RowsEventHeader rows_header(event_header.type);
event = std::make_shared<UpdateRowsEvent>(table_map, std::move(event_header)); rows_header.parse(event_payload);
if (doReplicate(rows_header.table_id))
event = std::make_shared<UpdateRowsEvent>(table_maps.at(rows_header.table_id), std::move(event_header), rows_header);
else else
event = std::make_shared<DryRunEvent>(std::move(event_header)); event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseEvent(event_payload); event->parseEvent(event_payload);
if (rows_header.flags & ROWS_END_OF_STATEMENT)
table_maps.clear();
break; break;
} }
case GTID_EVENT: case GTID_EVENT:
@ -843,6 +861,18 @@ namespace MySQLReplication
} }
} }
} }
bool MySQLFlavor::doReplicate(UInt64 table_id)
{
if (replicate_do_db.empty())
return false;
if (table_id == 0x00ffffff) {
// Special "dummy event"
return false;
}
auto table_map = table_maps.at(table_id);
return table_map->schema == replicate_do_db;
}
} }
} }

View File

@ -430,6 +430,22 @@ namespace MySQLReplication
void parseMeta(String meta); void parseMeta(String meta);
}; };
enum RowsEventFlags
{
ROWS_END_OF_STATEMENT = 1
};
class RowsEventHeader
{
public:
EventType type;
UInt64 table_id;
UInt16 flags;
RowsEventHeader(EventType type_) : type(type_), table_id(0), flags(0) {};
void parse(ReadBuffer & payload);
};
class RowsEvent : public EventBase class RowsEvent : public EventBase
{ {
public: public:
@ -438,9 +454,11 @@ namespace MySQLReplication
String table; String table;
std::vector<Field> rows; std::vector<Field> rows;
RowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) RowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_, const RowsEventHeader & rows_header)
: EventBase(std::move(header_)), number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_) : EventBase(std::move(header_)), number_columns(0), table_map(table_map_)
{ {
table_id = rows_header.table_id;
flags = rows_header.flags;
schema = table_map->schema; schema = table_map->schema;
table = table_map->table; table = table_map->table;
} }
@ -450,7 +468,6 @@ namespace MySQLReplication
protected: protected:
UInt64 table_id; UInt64 table_id;
UInt16 flags; UInt16 flags;
UInt16 extra_data_len;
Bitmap columns_present_bitmap1; Bitmap columns_present_bitmap1;
Bitmap columns_present_bitmap2; Bitmap columns_present_bitmap2;
@ -464,21 +481,24 @@ namespace MySQLReplication
class WriteRowsEvent : public RowsEvent class WriteRowsEvent : public RowsEvent
{ {
public: public:
WriteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {} WriteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_, const RowsEventHeader & rows_header)
: RowsEvent(table_map_, std::move(header_), rows_header) {}
MySQLEventType type() const override { return MYSQL_WRITE_ROWS_EVENT; } MySQLEventType type() const override { return MYSQL_WRITE_ROWS_EVENT; }
}; };
class DeleteRowsEvent : public RowsEvent class DeleteRowsEvent : public RowsEvent
{ {
public: public:
DeleteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {} DeleteRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_, const RowsEventHeader & rows_header)
: RowsEvent(table_map_, std::move(header_), rows_header) {}
MySQLEventType type() const override { return MYSQL_DELETE_ROWS_EVENT; } MySQLEventType type() const override { return MYSQL_DELETE_ROWS_EVENT; }
}; };
class UpdateRowsEvent : public RowsEvent class UpdateRowsEvent : public RowsEvent
{ {
public: public:
UpdateRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {} UpdateRowsEvent(std::shared_ptr<TableMapEvent> table_map_, EventHeader && header_, const RowsEventHeader & rows_header)
: RowsEvent(table_map_, std::move(header_), rows_header) {}
MySQLEventType type() const override { return MYSQL_UPDATE_ROWS_EVENT; } MySQLEventType type() const override { return MYSQL_UPDATE_ROWS_EVENT; }
}; };
@ -546,10 +566,10 @@ namespace MySQLReplication
Position position; Position position;
BinlogEventPtr event; BinlogEventPtr event;
String replicate_do_db; String replicate_do_db;
std::shared_ptr<TableMapEvent> table_map; std::map<UInt64, std::shared_ptr<TableMapEvent> > table_maps;
size_t checksum_signature_length = 4; size_t checksum_signature_length = 4;
inline bool doReplicate() { return (replicate_do_db.empty() || table_map->schema == replicate_do_db); } bool doReplicate(UInt64 table_id);
}; };
} }

View File

@ -757,3 +757,18 @@ def system_parts_test(clickhouse_node, mysql_node, service_name):
check_active_parts(2) check_active_parts(2)
clickhouse_node.query("OPTIMIZE TABLE system_parts_test.test") clickhouse_node.query("OPTIMIZE TABLE system_parts_test.test")
check_active_parts(1) check_active_parts(1)
def multi_table_update_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS multi_table_update")
clickhouse_node.query("DROP DATABASE IF EXISTS multi_table_update")
mysql_node.query("CREATE DATABASE multi_table_update")
mysql_node.query("CREATE TABLE multi_table_update.a (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))")
mysql_node.query("CREATE TABLE multi_table_update.b (id INT(11) NOT NULL PRIMARY KEY, othervalue VARCHAR(255))")
mysql_node.query("INSERT INTO multi_table_update.a VALUES(1, 'foo')")
mysql_node.query("INSERT INTO multi_table_update.b VALUES(1, 'bar')")
clickhouse_node.query("CREATE DATABASE multi_table_update ENGINE = MaterializeMySQL('{}:3306', 'multi_table_update', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM multi_table_update", "a\nb\n")
mysql_node.query("UPDATE multi_table_update.a, multi_table_update.b SET value='baz', othervalue='quux' where a.id=b.id")
check_query(clickhouse_node, "SELECT * FROM multi_table_update.a", "1\tbaz\n")
check_query(clickhouse_node, "SELECT * FROM multi_table_update.b", "1\tquux\n")

View File

@ -237,3 +237,8 @@ def test_utf8mb4(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhou
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary]) @pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_parts_table(started_cluster, started_mysql_8_0, clickhouse_node): def test_system_parts_table(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.system_parts_test(clickhouse_node, started_mysql_8_0, "mysql8_0") materialize_with_ddl.system_parts_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_multi_table_update(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_8_0, "mysql8_0")