mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
MaterializedMySQL: Update GTID set at end of transaction
We would update the set of seen GTIDs as soon as we saw a GTID_EVENT, which arrives before a transaction. This would mostly work fine, but if we lost the connection to MySQL in the middle of a large transaction we would persist that the transaction had been processed as soon as the transaction had started. When the connection was reestablished, we would not process the transaction again, which meant that we only applied parts of it. Fix this by updating the seen GTIDs at the end of the transaction instead.
This commit is contained in:
parent
13b2fdc23b
commit
c41923c595
@ -105,12 +105,16 @@ namespace MySQLReplication
|
||||
if (query.starts_with("BEGIN") || query.starts_with("COMMIT"))
|
||||
{
|
||||
typ = QUERY_EVENT_MULTI_TXN_FLAG;
|
||||
if (!query.starts_with("COMMIT"))
|
||||
transaction_complete = false;
|
||||
}
|
||||
else if (query.starts_with("XA"))
|
||||
{
|
||||
if (query.starts_with("XA ROLLBACK"))
|
||||
throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::LOGICAL_ERROR);
|
||||
typ = QUERY_EVENT_XA;
|
||||
if (!query.starts_with("XA COMMIT"))
|
||||
transaction_complete = false;
|
||||
}
|
||||
else if (query.starts_with("SAVEPOINT"))
|
||||
{
|
||||
@ -711,9 +715,26 @@ namespace MySQLReplication
|
||||
{
|
||||
switch (event->header.type)
|
||||
{
|
||||
case FORMAT_DESCRIPTION_EVENT:
|
||||
case QUERY_EVENT:
|
||||
case FORMAT_DESCRIPTION_EVENT: {
|
||||
binlog_pos = event->header.log_pos;
|
||||
break;
|
||||
}
|
||||
case QUERY_EVENT: {
|
||||
auto query = std::static_pointer_cast<QueryEvent>(event);
|
||||
if (query->transaction_complete && pending_gtid)
|
||||
{
|
||||
gtid_sets.update(*pending_gtid);
|
||||
pending_gtid.reset();
|
||||
}
|
||||
binlog_pos = event->header.log_pos;
|
||||
break;
|
||||
}
|
||||
case XID_EVENT: {
|
||||
if (pending_gtid)
|
||||
{
|
||||
gtid_sets.update(*pending_gtid);
|
||||
pending_gtid.reset();
|
||||
}
|
||||
binlog_pos = event->header.log_pos;
|
||||
break;
|
||||
}
|
||||
@ -724,9 +745,11 @@ namespace MySQLReplication
|
||||
break;
|
||||
}
|
||||
case GTID_EVENT: {
|
||||
if (pending_gtid)
|
||||
gtid_sets.update(*pending_gtid);
|
||||
auto gtid_event = std::static_pointer_cast<GTIDEvent>(event);
|
||||
binlog_pos = event->header.log_pos;
|
||||
gtid_sets.update(gtid_event->gtid);
|
||||
pending_gtid = gtid_event->gtid;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -383,6 +383,7 @@ namespace MySQLReplication
|
||||
String schema;
|
||||
String query;
|
||||
QueryType typ = QUERY_EVENT_DDL;
|
||||
bool transaction_complete = true;
|
||||
|
||||
QueryEvent(EventHeader && header_)
|
||||
: EventBase(std::move(header_)), thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0)
|
||||
@ -536,6 +537,9 @@ namespace MySQLReplication
|
||||
void update(BinlogEventPtr event);
|
||||
void update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_);
|
||||
void dump(WriteBuffer & out) const;
|
||||
|
||||
private:
|
||||
std::optional<GTID> pending_gtid;
|
||||
};
|
||||
|
||||
class IFlavor : public MySQLProtocol::IMySQLReadPacket
|
||||
|
@ -980,3 +980,33 @@ def mysql_settings_test(clickhouse_node, mysql_node, service_name):
|
||||
clickhouse_node.query("DROP DATABASE test_database")
|
||||
mysql_node.query("DROP DATABASE test_database")
|
||||
|
||||
def materialized_mysql_large_transaction(clickhouse_node, mysql_node, service_name):
|
||||
mysql_node.query("DROP DATABASE IF EXISTS largetransaction")
|
||||
clickhouse_node.query("DROP DATABASE IF EXISTS largetransaction")
|
||||
mysql_node.query("CREATE DATABASE largetransaction")
|
||||
|
||||
mysql_node.query("CREATE TABLE largetransaction.test_table ("
|
||||
"`key` INT NOT NULL PRIMARY KEY AUTO_INCREMENT, "
|
||||
"`value` INT NOT NULL) ENGINE = InnoDB;")
|
||||
num_rows = 200000
|
||||
rows_per_insert = 5000
|
||||
values = ",".join(["(1)" for _ in range(rows_per_insert)])
|
||||
for i in range(num_rows//rows_per_insert):
|
||||
mysql_node.query(f"INSERT INTO largetransaction.test_table (`value`) VALUES {values};")
|
||||
|
||||
|
||||
clickhouse_node.query("CREATE DATABASE largetransaction ENGINE = MaterializedMySQL('{}:3306', 'largetransaction', 'root', 'clickhouse')".format(service_name))
|
||||
check_query(clickhouse_node, "SELECT COUNT() FROM largetransaction.test_table", f"{num_rows}\n")
|
||||
|
||||
mysql_node.query("UPDATE largetransaction.test_table SET value = 2;")
|
||||
|
||||
# Attempt to restart clickhouse after it has started processing
|
||||
# the transaction, but before it has completed it.
|
||||
while int(clickhouse_node.query("SELECT COUNT() FROM largetransaction.test_table WHERE value = 2")) == 0:
|
||||
time.sleep(0.2)
|
||||
clickhouse_node.restart_clickhouse()
|
||||
|
||||
check_query(clickhouse_node, "SELECT COUNT() FROM largetransaction.test_table WHERE value = 2", f"{num_rows}\n")
|
||||
|
||||
clickhouse_node.query("DROP DATABASE largetransaction")
|
||||
mysql_node.query("DROP DATABASE largetransaction")
|
||||
|
@ -237,3 +237,8 @@ def test_materialize_with_enum(started_cluster, started_mysql_8_0, started_mysql
|
||||
def test_mysql_settings(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
|
||||
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_5_7, "mysql57")
|
||||
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_8_0, "mysql80")
|
||||
|
||||
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
|
||||
def test_large_transaction(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
|
||||
materialize_with_ddl.materialized_mysql_large_transaction(clickhouse_node, started_mysql_8_0, "mysql80")
|
||||
materialize_with_ddl.materialized_mysql_large_transaction(clickhouse_node, started_mysql_5_7, "mysql57")
|
||||
|
Loading…
Reference in New Issue
Block a user