Try fix MaterializeMySQL SYNC with modify binlog_checksum

This commit is contained in:
zhang2014 2020-11-16 21:46:36 +08:00
parent 11f56186db
commit 6ef93ac73c
14 changed files with 126 additions and 39 deletions

View File

@ -6,6 +6,7 @@
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/MySQL/PacketsReplication.h>
#include <Core/MySQL/MySQLReplication.h>
#include <Poco/String.h>
namespace DB
{
@ -132,11 +133,19 @@ void MySQLClient::ping()
writeCommand(Command::COM_PING, "");
}
void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid_str)
void MySQLClient::setBinlogChecksum(const String & binlog_checksum)
{
/// Set binlog checksum to CRC32.
String checksum = "CRC32";
writeCommand(Command::COM_QUERY, "SET @master_binlog_checksum = '" + checksum + "'");
replication.setChecksumSignatureLength(Poco::toUpper(binlog_checksum) == "NONE" ? 0 : 4);
}
void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid_str, const String & binlog_checksum)
{
/// Maybe CRC32 or NONE. mysqlbinlog.cc use NONE, see its below comments:
/// Make a notice to the server that this client is checksum-aware.
/// It does not need the first fake Rotate necessary checksummed.
writeCommand(Command::COM_QUERY, "SET @master_binlog_checksum = 'CRC32'");
setBinlogChecksum(binlog_checksum);
/// Set heartbeat 1s.
UInt64 period_ns = (1 * 1e9);

View File

@ -29,10 +29,12 @@ public:
void disconnect();
void ping();
void setBinlogChecksum(const String & binlog_checksum);
/// Start replication stream by GTID.
/// replicate_db: replication database schema, events from other databases will be ignored.
/// gtid: executed gtid sets format like 'hhhhhhhh-hhhh-hhhh-hhhh-hhhhhhhhhhhh:x-y'.
void startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid);
void startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid, const String & binlog_checksum);
BinlogEventPtr readOneBinlogEvent(UInt64 milliseconds = 0);
Position getPosition() const { return replication.getPosition(); }

View File

@ -57,7 +57,6 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(&create_timestamp), 4);
payload.readStrict(reinterpret_cast<char *>(&event_header_length), 1);
assert(event_header_length == EVENT_HEADER_LENGTH);
readStringUntilEOF(event_type_header_length, payload);
}
@ -745,7 +744,7 @@ namespace MySQLReplication
// skip the generic response packets header flag.
payload.ignore(1);
MySQLBinlogEventReadBuffer event_payload(payload);
MySQLBinlogEventReadBuffer event_payload(payload, checksum_signature_length);
EventHeader event_header;
event_header.parse(event_payload);

View File

@ -526,6 +526,8 @@ namespace MySQLReplication
virtual BinlogEventPtr readOneEvent() = 0;
virtual void setReplicateDatabase(String db) = 0;
virtual void setGTIDSets(GTIDSets sets) = 0;
virtual void setChecksumSignatureLength(size_t checksum_signature_length_) = 0;
virtual ~IFlavor() override = default;
};
@ -538,12 +540,14 @@ namespace MySQLReplication
BinlogEventPtr readOneEvent() override { return event; }
void setReplicateDatabase(String db) override { replicate_do_db = std::move(db); }
void setGTIDSets(GTIDSets sets) override { position.gtid_sets = std::move(sets); }
void setChecksumSignatureLength(size_t checksum_signature_length_) override { checksum_signature_length = checksum_signature_length_; }
private:
Position position;
BinlogEventPtr event;
String replicate_do_db;
std::shared_ptr<TableMapEvent> table_map;
size_t checksum_signature_length = 4;
inline bool do_replicate() { return (replicate_do_db.empty() || table_map->schema == replicate_do_db); }
};

View File

@ -304,7 +304,8 @@ int main(int argc, char ** argv)
"user", boost::program_options::value<std::string>()->default_value("root"), "master user")(
"password", boost::program_options::value<std::string>()->required(), "master password")(
"gtid", boost::program_options::value<std::string>()->default_value(""), "executed GTID sets")(
"db", boost::program_options::value<std::string>()->required(), "replicate do db");
"db", boost::program_options::value<std::string>()->required(), "replicate do db")(
"binlog_checksum", boost::program_options::value<std::string>()->default_value("CRC32"), "master binlog_checksum");
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
@ -319,6 +320,7 @@ int main(int argc, char ** argv)
auto master_password = options.at("password").as<DB::String>();
auto gtid_sets = options.at("gtid").as<DB::String>();
auto replicate_db = options.at("db").as<DB::String>();
auto binlog_checksum = options.at("binlog_checksum").as<String>();
std::cerr << "Master Host: " << host << ", Port: " << port << ", User: " << master_user << ", Password: " << master_password
<< ", Replicate DB: " << replicate_db << ", GTID: " << gtid_sets << std::endl;
@ -328,7 +330,7 @@ int main(int argc, char ** argv)
/// Connect to the master.
slave.connect();
slave.startBinlogDumpGTID(slave_id, replicate_db, gtid_sets);
slave.startBinlogDumpGTID(slave_id, replicate_db, gtid_sets, binlog_checksum);
WriteBufferFromOStream cerr(std::cerr);

View File

@ -88,6 +88,29 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet<String>();
}
void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailover::Entry & connection)
{
Block variables_header{
{std::make_shared<DataTypeString>(), "Variable_name"},
{std::make_shared<DataTypeString>(), "Value"}
};
const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'";
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, DEFAULT_BLOCK_SIZE);
while (Block variables_block = variables_input.read())
{
ColumnPtr variables_name = variables_block.getByName("Variable_name").column;
ColumnPtr variables_value = variables_block.getByName("Value").column;
for (size_t index = 0; index < variables_block.rows(); ++index)
{
if (variables_name->getDataAt(index) == "binlog_checksum")
binlog_checksum = variables_value->getDataAt(index).toString();
}
}
}
static Block getShowMasterLogHeader(const String & mysql_version)
{
if (startsWith(mysql_version, "5."))
@ -193,6 +216,7 @@ MaterializeMetadata::MaterializeMetadata(
locked_tables = true;
fetchMasterStatus(connection);
fetchMasterVariablesValue(connection);
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();

View File

@ -34,10 +34,13 @@ struct MaterializeMetadata
size_t data_version = 1;
size_t meta_version = 2;
String binlog_checksum = "CRC32";
std::unordered_map<String, String> need_dumping_tables;
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);
void fetchMasterVariablesValue(const mysqlxx::PoolWithFailover::Entry & connection);
bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const;
void transaction(const MySQLReplication::Position & position, const std::function<void()> & fun);

View File

@ -340,7 +340,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
connection->query("COMMIT").execute();
client.connect();
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set);
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set, metadata.binlog_checksum);
return metadata;
}
catch (...)
@ -624,16 +624,27 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
metadata.transaction(position_before_ddl, [&]() { buffers.commit(global_context); });
metadata.transaction(client.getPosition(),[&](){ executeDDLAtomic(query_event); });
}
else if (receive_event->header.type != HEARTBEAT_EVENT)
else
{
const auto & dump_event_message = [&]()
/// MYSQL_UNHANDLED_EVENT
if (receive_event->header.type == ROTATE_EVENT)
{
WriteBufferFromOwnString buf;
receive_event->dump(buf);
return buf.str();
};
/// Some behaviors(such as changing the value of "binlog_checksum") rotate the binlog file.
/// To ensure that the synchronization continues, we need to handle these events
metadata.fetchMasterVariablesValue(pool.get());
client.setBinlogChecksum(metadata.binlog_checksum);
}
else if (receive_event->header.type != HEARTBEAT_EVENT)
{
const auto & dump_event_message = [&]()
{
WriteBufferFromOwnString buf;
receive_event->dump(buf);
return buf.str();
};
LOG_DEBUG(log, "Skip MySQL event: \n {}", dump_event_message());
LOG_DEBUG(log, "Skip MySQL event: \n {}", dump_event_message());
}
}
}

View File

@ -4,9 +4,12 @@
namespace DB
{
MySQLBinlogEventReadBuffer::MySQLBinlogEventReadBuffer(ReadBuffer & in_)
: ReadBuffer(nullptr, 0, 0), in(in_)
MySQLBinlogEventReadBuffer::MySQLBinlogEventReadBuffer(ReadBuffer & in_, size_t checksum_signature_length_)
: ReadBuffer(nullptr, 0, 0), in(in_), checksum_signature_length(checksum_signature_length_)
{
if (checksum_signature_length)
checksum_buf = new char[checksum_signature_length];
nextIfAtEnd();
}
@ -20,15 +23,15 @@ bool MySQLBinlogEventReadBuffer::nextImpl()
if (checksum_buff_size == checksum_buff_limit)
{
if (likely(in.available() > CHECKSUM_CRC32_SIGNATURE_LENGTH))
if (likely(in.available() > checksum_signature_length))
{
working_buffer = ReadBuffer::Buffer(in.position(), in.buffer().end() - CHECKSUM_CRC32_SIGNATURE_LENGTH);
working_buffer = ReadBuffer::Buffer(in.position(), in.buffer().end() - checksum_signature_length);
in.ignore(working_buffer.size());
return true;
}
in.readStrict(checksum_buf, CHECKSUM_CRC32_SIGNATURE_LENGTH);
checksum_buff_size = checksum_buff_limit = CHECKSUM_CRC32_SIGNATURE_LENGTH;
in.readStrict(checksum_buf, checksum_signature_length);
checksum_buff_size = checksum_buff_limit = checksum_signature_length;
}
else
{
@ -36,17 +39,17 @@ bool MySQLBinlogEventReadBuffer::nextImpl()
checksum_buf[index] = checksum_buf[checksum_buff_limit + index];
checksum_buff_size -= checksum_buff_limit;
size_t read_bytes = CHECKSUM_CRC32_SIGNATURE_LENGTH - checksum_buff_size;
in.readStrict(checksum_buf + checksum_buff_size, read_bytes); /// Minimum CHECKSUM_CRC32_SIGNATURE_LENGTH bytes
checksum_buff_size = checksum_buff_limit = CHECKSUM_CRC32_SIGNATURE_LENGTH;
size_t read_bytes = checksum_signature_length - checksum_buff_size;
in.readStrict(checksum_buf + checksum_buff_size, read_bytes); /// Minimum checksum_signature_length bytes
checksum_buff_size = checksum_buff_limit = checksum_signature_length;
}
if (in.eof())
return false;
if (in.available() < CHECKSUM_CRC32_SIGNATURE_LENGTH)
if (in.available() < checksum_signature_length)
{
size_t left_move_size = CHECKSUM_CRC32_SIGNATURE_LENGTH - in.available();
size_t left_move_size = checksum_signature_length - in.available();
checksum_buff_limit = checksum_buff_size - left_move_size;
}
@ -60,6 +63,9 @@ MySQLBinlogEventReadBuffer::~MySQLBinlogEventReadBuffer()
{
/// ignore last 4 bytes
nextIfAtEnd();
if (checksum_signature_length)
delete checksum_buf;
}
catch (...)
{

View File

@ -8,19 +8,19 @@ namespace DB
class MySQLBinlogEventReadBuffer : public ReadBuffer
{
protected:
static const size_t CHECKSUM_CRC32_SIGNATURE_LENGTH = 4;
ReadBuffer & in;
size_t checksum_signature_length;
size_t checksum_buff_size = 0;
size_t checksum_buff_limit = 0;
char checksum_buf[CHECKSUM_CRC32_SIGNATURE_LENGTH];
char * checksum_buf = nullptr;
bool nextImpl() override;
public:
~MySQLBinlogEventReadBuffer() override;
MySQLBinlogEventReadBuffer(ReadBuffer & in_);
MySQLBinlogEventReadBuffer(ReadBuffer & in_, size_t checksum_signature_length_);
};

View File

@ -13,7 +13,7 @@ TEST(MySQLBinlogEventReadBuffer, CheckBoundary)
std::vector<char> memory_data(index, 0x01);
ReadBufferFromMemory nested_in(memory_data.data(), index);
EXPECT_THROW({ MySQLBinlogEventReadBuffer binlog_in(nested_in); }, Exception);
EXPECT_THROW({ MySQLBinlogEventReadBuffer binlog_in(nested_in, 4); }, Exception);
}
}
@ -23,7 +23,7 @@ TEST(MySQLBinlogEventReadBuffer, NiceBufferSize)
std::vector<char> memory_data(6, 0x01);
ReadBufferFromMemory nested_in(memory_data.data(), 6);
MySQLBinlogEventReadBuffer binlog_in(nested_in);
MySQLBinlogEventReadBuffer binlog_in(nested_in, 4);
binlog_in.readStrict(res, 2);
ASSERT_EQ(res[0], 0x01);
ASSERT_EQ(res[1], 0x01);
@ -46,7 +46,7 @@ TEST(MySQLBinlogEventReadBuffer, BadBufferSizes)
}
ConcatReadBuffer concat_buffer(nested_buffers);
MySQLBinlogEventReadBuffer binlog_in(concat_buffer);
MySQLBinlogEventReadBuffer binlog_in(concat_buffer, 4);
binlog_in.readStrict(res, 4);
for (const auto & res_byte : res)
@ -71,7 +71,7 @@ TEST(MySQLBinlogEventReadBuffer, NiceAndBadBufferSizes)
}
ConcatReadBuffer concat_buffer(nested_buffers);
MySQLBinlogEventReadBuffer binlog_in(concat_buffer);
MySQLBinlogEventReadBuffer binlog_in(concat_buffer, 4);
binlog_in.readStrict(res, 12);
for (const auto & res_byte : res)

View File

@ -432,6 +432,7 @@ def query_event_with_empty_transaction(clickhouse_node, mysql_node, service_name
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")
def select_without_columns(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE db")
mysql_node.query("CREATE TABLE db.t (a INT PRIMARY KEY, b INT)")
@ -461,3 +462,23 @@ def select_without_columns(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP VIEW v")
clickhouse_node.query("DROP DATABASE db")
mysql_node.query("DROP DATABASE db")
def insert_with_modify_binlog_checksum(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE test_checksum")
mysql_node.query("CREATE TABLE test_checksum.t (a INT PRIMARY KEY, b varchar(200))")
clickhouse_node.query("CREATE DATABASE test_checksum ENGINE = MaterializeMySQL('{}:3306', 'test_checksum', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM test_checksum FORMAT TSV", "t\n")
mysql_node.query("INSERT INTO test_checksum.t VALUES(1, '1111')")
check_query(clickhouse_node, "SELECT * FROM test_checksum ORDER BY a FORMAT TSV", "1\t1111\n")
mysql_node.query("SET GLOBAL binlog_checksum=NONE")
mysql_node.query("INSERT INTO test_checksum.t VALUES(2, '2222')")
check_query(clickhouse_node, "SELECT * FROM test_checksum ORDER BY a FORMAT TSV", "1\t1111\n2\t2222\n")
mysql_node.query("SET GLOBAL binlog_checksum=CRC32")
mysql_node.query("INSERT INTO test_checksum.t VALUES(3, '3333')")
check_query(clickhouse_node, "SELECT * FROM test_checksum ORDER BY a FORMAT TSV", "1\t1111\n2\t2222\n3\t3333\n")
clickhouse_node.query("DROP DATABASE test_checksum")
mysql_node.query("DROP DATABASE test_checksum")

View File

@ -151,5 +151,14 @@ def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, st
def test_select_without_columns_5_7(started_cluster, started_mysql_5_7):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_5_7, "mysql1")
def test_select_without_columns_8_0(started_cluster, started_mysql_8_0):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5_7):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql1")
def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_5_7):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql1")

View File

@ -18,10 +18,7 @@ static DB::MySQLReplication::BinlogEventPtr parseSingleEventBody(
{
DB::MySQLReplication::BinlogEventPtr event;
DB::ReadBufferPtr limit_read_buffer = std::make_shared<DB::LimitReadBuffer>(payload, header.event_size - 19, false);
DB::ReadBufferPtr event_payload = limit_read_buffer;
if (exist_checksum)
event_payload = std::make_shared<DB::MySQLBinlogEventReadBuffer>(*limit_read_buffer);
DB::ReadBufferPtr event_payload = std::make_shared<DB::MySQLBinlogEventReadBuffer>(*limit_read_buffer, exist_checksum ? 4 : 0);
switch (header.type)
{