From e6f1ce48fe50c83a88f8c8feb985c1a0ea6180d3 Mon Sep 17 00:00:00 2001 From: Haavard Kvaalen Date: Thu, 4 Feb 2021 11:37:12 +0100 Subject: [PATCH] Fix handling of TABLE_MAP_EVENT from MySQL The MySQL replication code assumed that row update events would be preceded by a single TABLE_MAP_EVENT. However, if a single SQL statement modifies rows in multiple tables, MySQL will first send table map events for all involved tables, and then row update events. Depending on circumstances, this could lead to an exception when the row update was processed, the update could be incorrectly dropped, or the update could be applied to the wrong table. --- src/Core/MySQL/MySQLReplication.cpp | 46 +++++++++++++++---- src/Core/MySQL/MySQLReplication.h | 36 +++++++++++---- .../materialize_with_ddl.py | 15 ++++++ .../test_materialize_mysql_database/test.py | 5 ++ 4 files changed, 86 insertions(+), 16 deletions(-) diff --git a/src/Core/MySQL/MySQLReplication.cpp b/src/Core/MySQL/MySQLReplication.cpp index b86d6447e26..8e1e0cd7646 100644 --- a/src/Core/MySQL/MySQLReplication.cpp +++ b/src/Core/MySQL/MySQLReplication.cpp @@ -136,6 +136,7 @@ namespace MySQLReplication out << "XID: " << this->xid << '\n'; } + /// https://dev.mysql.com/doc/internals/en/table-map-event.html void TableMapEvent::parseImpl(ReadBuffer & payload) { payload.readStrict(reinterpret_cast(&table_id), 6); @@ -257,15 +258,19 @@ namespace MySQLReplication out << "Null Bitmap: " << bitmap_str << '\n'; } - void RowsEvent::parseImpl(ReadBuffer & payload) + void RowsEventHeader::parse(ReadBuffer & payload) { payload.readStrict(reinterpret_cast(&table_id), 6); payload.readStrict(reinterpret_cast(&flags), 2); + UInt16 extra_data_len; /// This extra_data_len contains the 2 bytes length. payload.readStrict(reinterpret_cast(&extra_data_len), 2); payload.ignore(extra_data_len - 2); + } + void RowsEvent::parseImpl(ReadBuffer & payload) + { number_columns = readLengthEncodedNumber(payload); size_t columns_bitmap_size = (number_columns + 7) / 8; switch (header.type) @@ -795,37 +800,50 @@ namespace MySQLReplication { event = std::make_shared(std::move(event_header)); event->parseEvent(event_payload); - table_map = std::static_pointer_cast(event); + auto table_map = std::static_pointer_cast(event); + table_maps[table_map->table_id] = table_map; break; } case WRITE_ROWS_EVENT_V1: case WRITE_ROWS_EVENT_V2: { - if (doReplicate()) - event = std::make_shared(table_map, std::move(event_header)); + RowsEventHeader rows_header(event_header.type); + rows_header.parse(event_payload); + if (doReplicate(rows_header.table_id)) + event = std::make_shared(table_maps.at(rows_header.table_id), std::move(event_header), rows_header); else event = std::make_shared(std::move(event_header)); event->parseEvent(event_payload); + if (rows_header.flags & ROWS_END_OF_STATEMENT) + table_maps.clear(); break; } case DELETE_ROWS_EVENT_V1: case DELETE_ROWS_EVENT_V2: { - if (doReplicate()) - event = std::make_shared(table_map, std::move(event_header)); + RowsEventHeader rows_header(event_header.type); + rows_header.parse(event_payload); + if (doReplicate(rows_header.table_id)) + event = std::make_shared(table_maps.at(rows_header.table_id), std::move(event_header), rows_header); else event = std::make_shared(std::move(event_header)); event->parseEvent(event_payload); + if (rows_header.flags & ROWS_END_OF_STATEMENT) + table_maps.clear(); break; } case UPDATE_ROWS_EVENT_V1: case UPDATE_ROWS_EVENT_V2: { - if (doReplicate()) - event = std::make_shared(table_map, std::move(event_header)); + RowsEventHeader rows_header(event_header.type); + rows_header.parse(event_payload); + if (doReplicate(rows_header.table_id)) + event = std::make_shared(table_maps.at(rows_header.table_id), std::move(event_header), rows_header); else event = std::make_shared(std::move(event_header)); event->parseEvent(event_payload); + if (rows_header.flags & ROWS_END_OF_STATEMENT) + table_maps.clear(); break; } case GTID_EVENT: @@ -843,6 +861,18 @@ namespace MySQLReplication } } } + + bool MySQLFlavor::doReplicate(UInt64 table_id) + { + if (replicate_do_db.empty()) + return false; + if (table_id == 0x00ffffff) { + // Special "dummy event" + return false; + } + auto table_map = table_maps.at(table_id); + return table_map->schema == replicate_do_db; + } } } diff --git a/src/Core/MySQL/MySQLReplication.h b/src/Core/MySQL/MySQLReplication.h index 7c7604cad58..ae8dc80f673 100644 --- a/src/Core/MySQL/MySQLReplication.h +++ b/src/Core/MySQL/MySQLReplication.h @@ -430,6 +430,22 @@ namespace MySQLReplication void parseMeta(String meta); }; + enum RowsEventFlags + { + ROWS_END_OF_STATEMENT = 1 + }; + + class RowsEventHeader + { + public: + EventType type; + UInt64 table_id; + UInt16 flags; + + RowsEventHeader(EventType type_) : type(type_), table_id(0), flags(0) {}; + void parse(ReadBuffer & payload); + }; + class RowsEvent : public EventBase { public: @@ -438,9 +454,11 @@ namespace MySQLReplication String table; std::vector rows; - RowsEvent(std::shared_ptr table_map_, EventHeader && header_) - : EventBase(std::move(header_)), number_columns(0), table_id(0), flags(0), extra_data_len(0), table_map(table_map_) + RowsEvent(std::shared_ptr table_map_, EventHeader && header_, const RowsEventHeader & rows_header) + : EventBase(std::move(header_)), number_columns(0), table_map(table_map_) { + table_id = rows_header.table_id; + flags = rows_header.flags; schema = table_map->schema; table = table_map->table; } @@ -450,7 +468,6 @@ namespace MySQLReplication protected: UInt64 table_id; UInt16 flags; - UInt16 extra_data_len; Bitmap columns_present_bitmap1; Bitmap columns_present_bitmap2; @@ -464,21 +481,24 @@ namespace MySQLReplication class WriteRowsEvent : public RowsEvent { public: - WriteRowsEvent(std::shared_ptr table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {} + WriteRowsEvent(std::shared_ptr table_map_, EventHeader && header_, const RowsEventHeader & rows_header) + : RowsEvent(table_map_, std::move(header_), rows_header) {} MySQLEventType type() const override { return MYSQL_WRITE_ROWS_EVENT; } }; class DeleteRowsEvent : public RowsEvent { public: - DeleteRowsEvent(std::shared_ptr table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {} + DeleteRowsEvent(std::shared_ptr table_map_, EventHeader && header_, const RowsEventHeader & rows_header) + : RowsEvent(table_map_, std::move(header_), rows_header) {} MySQLEventType type() const override { return MYSQL_DELETE_ROWS_EVENT; } }; class UpdateRowsEvent : public RowsEvent { public: - UpdateRowsEvent(std::shared_ptr table_map_, EventHeader && header_) : RowsEvent(table_map_, std::move(header_)) {} + UpdateRowsEvent(std::shared_ptr table_map_, EventHeader && header_, const RowsEventHeader & rows_header) + : RowsEvent(table_map_, std::move(header_), rows_header) {} MySQLEventType type() const override { return MYSQL_UPDATE_ROWS_EVENT; } }; @@ -546,10 +566,10 @@ namespace MySQLReplication Position position; BinlogEventPtr event; String replicate_do_db; - std::shared_ptr table_map; + std::map > table_maps; size_t checksum_signature_length = 4; - inline bool doReplicate() { return (replicate_do_db.empty() || table_map->schema == replicate_do_db); } + bool doReplicate(UInt64 table_id); }; } diff --git a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py index 38ff8fd752b..b7f432d963b 100644 --- a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py @@ -757,3 +757,18 @@ def system_parts_test(clickhouse_node, mysql_node, service_name): check_active_parts(2) clickhouse_node.query("OPTIMIZE TABLE system_parts_test.test") check_active_parts(1) + +def multi_table_update_test(clickhouse_node, mysql_node, service_name): + mysql_node.query("DROP DATABASE IF EXISTS multi_table_update") + clickhouse_node.query("DROP DATABASE IF EXISTS multi_table_update") + mysql_node.query("CREATE DATABASE multi_table_update") + mysql_node.query("CREATE TABLE multi_table_update.a (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))") + mysql_node.query("CREATE TABLE multi_table_update.b (id INT(11) NOT NULL PRIMARY KEY, othervalue VARCHAR(255))") + mysql_node.query("INSERT INTO multi_table_update.a VALUES(1, 'foo')") + mysql_node.query("INSERT INTO multi_table_update.b VALUES(1, 'bar')") + clickhouse_node.query("CREATE DATABASE multi_table_update ENGINE = MaterializeMySQL('{}:3306', 'multi_table_update', 'root', 'clickhouse')".format(service_name)) + check_query(clickhouse_node, "SHOW TABLES FROM multi_table_update", "a\nb\n") + mysql_node.query("UPDATE multi_table_update.a, multi_table_update.b SET value='baz', othervalue='quux' where a.id=b.id") + + check_query(clickhouse_node, "SELECT * FROM multi_table_update.a", "1\tbaz\n") + check_query(clickhouse_node, "SELECT * FROM multi_table_update.b", "1\tquux\n") diff --git a/tests/integration/test_materialize_mysql_database/test.py b/tests/integration/test_materialize_mysql_database/test.py index 8cd2f7def07..32316901dce 100644 --- a/tests/integration/test_materialize_mysql_database/test.py +++ b/tests/integration/test_materialize_mysql_database/test.py @@ -237,3 +237,8 @@ def test_utf8mb4(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhou @pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary]) def test_system_parts_table(started_cluster, started_mysql_8_0, clickhouse_node): materialize_with_ddl.system_parts_test(clickhouse_node, started_mysql_8_0, "mysql8_0") + +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary]) +def test_multi_table_update(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node): + materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_5_7, "mysql1") + materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_8_0, "mysql8_0")