Merge pull request #29837 from havardk/gtidupdate

MaterializedMySQL: Update GTID set at end of transaction
This commit is contained in:
Kseniia Sumarokova 2021-10-12 08:01:18 +03:00 committed by GitHub
commit 198adc7ecd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 4 deletions

View File

@ -105,12 +105,16 @@ namespace MySQLReplication
if (query.starts_with("BEGIN") || query.starts_with("COMMIT")) if (query.starts_with("BEGIN") || query.starts_with("COMMIT"))
{ {
typ = QUERY_EVENT_MULTI_TXN_FLAG; typ = QUERY_EVENT_MULTI_TXN_FLAG;
if (!query.starts_with("COMMIT"))
transaction_complete = false;
} }
else if (query.starts_with("XA")) else if (query.starts_with("XA"))
{ {
if (query.starts_with("XA ROLLBACK")) if (query.starts_with("XA ROLLBACK"))
throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::LOGICAL_ERROR); throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::LOGICAL_ERROR);
typ = QUERY_EVENT_XA; typ = QUERY_EVENT_XA;
if (!query.starts_with("XA COMMIT"))
transaction_complete = false;
} }
else if (query.starts_with("SAVEPOINT")) else if (query.starts_with("SAVEPOINT"))
{ {
@ -711,9 +715,26 @@ namespace MySQLReplication
{ {
switch (event->header.type) switch (event->header.type)
{ {
case FORMAT_DESCRIPTION_EVENT: case FORMAT_DESCRIPTION_EVENT: {
case QUERY_EVENT: binlog_pos = event->header.log_pos;
break;
}
case QUERY_EVENT: {
auto query = std::static_pointer_cast<QueryEvent>(event);
if (query->transaction_complete && pending_gtid)
{
gtid_sets.update(*pending_gtid);
pending_gtid.reset();
}
binlog_pos = event->header.log_pos;
break;
}
case XID_EVENT: { case XID_EVENT: {
if (pending_gtid)
{
gtid_sets.update(*pending_gtid);
pending_gtid.reset();
}
binlog_pos = event->header.log_pos; binlog_pos = event->header.log_pos;
break; break;
} }
@ -724,9 +745,11 @@ namespace MySQLReplication
break; break;
} }
case GTID_EVENT: { case GTID_EVENT: {
if (pending_gtid)
gtid_sets.update(*pending_gtid);
auto gtid_event = std::static_pointer_cast<GTIDEvent>(event); auto gtid_event = std::static_pointer_cast<GTIDEvent>(event);
binlog_pos = event->header.log_pos; binlog_pos = event->header.log_pos;
gtid_sets.update(gtid_event->gtid); pending_gtid = gtid_event->gtid;
break; break;
} }
default: default:
@ -792,6 +815,7 @@ namespace MySQLReplication
{ {
event = std::make_shared<QueryEvent>(std::move(event_header)); event = std::make_shared<QueryEvent>(std::move(event_header));
event->parseEvent(event_payload); event->parseEvent(event_payload);
position.update(event);
auto query = std::static_pointer_cast<QueryEvent>(event); auto query = std::static_pointer_cast<QueryEvent>(event);
switch (query->typ) switch (query->typ)
@ -803,7 +827,7 @@ namespace MySQLReplication
break; break;
} }
default: default:
position.update(event); break;
} }
break; break;
} }

View File

@ -383,6 +383,7 @@ namespace MySQLReplication
String schema; String schema;
String query; String query;
QueryType typ = QUERY_EVENT_DDL; QueryType typ = QUERY_EVENT_DDL;
bool transaction_complete = true;
QueryEvent(EventHeader && header_) QueryEvent(EventHeader && header_)
: EventBase(std::move(header_)), thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0) : EventBase(std::move(header_)), thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0)
@ -536,6 +537,9 @@ namespace MySQLReplication
void update(BinlogEventPtr event); void update(BinlogEventPtr event);
void update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_); void update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_);
void dump(WriteBuffer & out) const; void dump(WriteBuffer & out) const;
private:
std::optional<GTID> pending_gtid;
}; };
class IFlavor : public MySQLProtocol::IMySQLReadPacket class IFlavor : public MySQLProtocol::IMySQLReadPacket

View File

@ -980,3 +980,33 @@ def mysql_settings_test(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE test_database") clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database") mysql_node.query("DROP DATABASE test_database")
def materialized_mysql_large_transaction(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS largetransaction")
clickhouse_node.query("DROP DATABASE IF EXISTS largetransaction")
mysql_node.query("CREATE DATABASE largetransaction")
mysql_node.query("CREATE TABLE largetransaction.test_table ("
"`key` INT NOT NULL PRIMARY KEY AUTO_INCREMENT, "
"`value` INT NOT NULL) ENGINE = InnoDB;")
num_rows = 200000
rows_per_insert = 5000
values = ",".join(["(1)" for _ in range(rows_per_insert)])
for i in range(num_rows//rows_per_insert):
mysql_node.query(f"INSERT INTO largetransaction.test_table (`value`) VALUES {values};")
clickhouse_node.query("CREATE DATABASE largetransaction ENGINE = MaterializedMySQL('{}:3306', 'largetransaction', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT COUNT() FROM largetransaction.test_table", f"{num_rows}\n")
mysql_node.query("UPDATE largetransaction.test_table SET value = 2;")
# Attempt to restart clickhouse after it has started processing
# the transaction, but before it has completed it.
while int(clickhouse_node.query("SELECT COUNT() FROM largetransaction.test_table WHERE value = 2")) == 0:
time.sleep(0.2)
clickhouse_node.restart_clickhouse()
check_query(clickhouse_node, "SELECT COUNT() FROM largetransaction.test_table WHERE value = 2", f"{num_rows}\n")
clickhouse_node.query("DROP DATABASE largetransaction")
mysql_node.query("DROP DATABASE largetransaction")

View File

@ -237,3 +237,8 @@ def test_materialize_with_enum(started_cluster, started_mysql_8_0, started_mysql
def test_mysql_settings(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node): def test_mysql_settings(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_5_7, "mysql57") materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_8_0, "mysql80") materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_large_transaction(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_mysql_large_transaction(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_mysql_large_transaction(clickhouse_node, started_mysql_5_7, "mysql57")