ISSUES-14235 change string.rfind to string starts_with and add some tests

This commit is contained in:
BohuTANG 2020-09-02 08:31:51 +08:00
parent 6c37fea17e
commit 6dfab88156
3 changed files with 47 additions and 6 deletions

View File

@ -13,6 +13,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
extern const int LOGICAL_ERROR;
}
namespace MySQLReplication
@ -103,19 +104,19 @@ namespace MySQLReplication
= header.event_size - EVENT_HEADER_LENGTH - 4 - 4 - 1 - 2 - 2 - status_len - schema_len - 1 - CHECKSUM_CRC32_SIGNATURE_LENGTH;
query.resize(len);
payload.readStrict(reinterpret_cast<char *>(query.data()), len);
if (query.rfind("BEGIN", 0) == 0 || query.rfind("COMMIT") == 0)
if (query.starts_with("BEGIN") || query.starts_with("COMMIT"))
{
typ = QUERY_EVENT_MULTI_TXN_FLAG;
}
else if (query.rfind("XA", 0) == 0)
else if (query.starts_with("XA"))
{
if (query.rfind("XA ROLLBACK", 0) == 0)
throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::UNKNOWN_EXCEPTION);
if (query.starts_with("XA ROLLBACK"))
throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::LOGICAL_ERROR);
typ = QUERY_EVENT_XA;
}
else if (query.rfind("SAVEPOINT", 0) == 0)
else if (query.starts_with("SAVEPOINT"))
{
throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::UNKNOWN_EXCEPTION);
throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -1,4 +1,5 @@
import time
import pymysql.cursors
def check_query(clickhouse_node, query, result_set, retry_count=3, interval_seconds=3):
@ -321,3 +322,37 @@ def alter_rename_table_with_materialize_mysql_database(clickhouse_node, mysql_no
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")
def query_event_with_empty_transaction(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE test_database")
mysql_node.query("RESET MASTER")
mysql_node.query("CREATE TABLE test_database.t1(a INT NOT NULL PRIMARY KEY, b VARCHAR(255) DEFAULT 'BEGIN')")
mysql_node.query("INSERT INTO test_database.t1(a) VALUES(1)")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(
service_name))
# Reject one empty GTID QUERY event with 'BEGIN' and 'COMMIT'
mysql_cursor = mysql_node.alloc_connection().cursor(pymysql.cursors.DictCursor)
mysql_cursor.execute("SHOW MASTER STATUS")
(uuid, seqs) = mysql_cursor.fetchall()[0]["Executed_Gtid_Set"].split(":")
(seq_begin, seq_end) = seqs.split("-")
assert int(seq_begin) == 1
assert int(seq_end) == 3
next_gtid = uuid + ":" + str(int(seq_end) + 1)
mysql_node.query("SET gtid_next='" + next_gtid + "'")
mysql_node.query("BEGIN")
mysql_node.query("COMMIT")
mysql_node.query("SET gtid_next='AUTOMATIC'")
# Reject one 'BEGIN' QUERY event and 'COMMIT' XID event.
mysql_node.query("/* start */ begin /* end */")
mysql_node.query("INSERT INTO test_database.t1(a) VALUES(2)")
mysql_node.query("/* start */ commit /* end */")
check_query(clickhouse_node, "SELECT * FROM test_database.t1 ORDER BY a FORMAT TSV",
"1\tBEGIN\n2\tBEGIN\n")
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")

View File

@ -120,3 +120,8 @@ def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_
materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_5_7, "mysql5_7")
def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql8_0")