#include #include #include #include #include #include #include #include #include #include #include static DB::MySQLReplication::BinlogEventPtr parseSingleEventBody( DB::MySQLReplication::EventHeader & header, DB::ReadBuffer & payload, std::shared_ptr & last_table_map_event, bool exist_checksum) { DB::MySQLReplication::BinlogEventPtr event; DB::ReadBufferPtr limit_read_buffer = std::make_shared(payload, header.event_size - 19, /* trow_exception */ false, /* exact_limit */ std::nullopt); DB::ReadBufferPtr event_payload = std::make_shared(*limit_read_buffer, exist_checksum ? 4 : 0); switch (header.type) { case DB::MySQLReplication::FORMAT_DESCRIPTION_EVENT: { event = std::make_shared(std::move(header)); event->parseEvent(*event_payload); break; } case DB::MySQLReplication::ROTATE_EVENT: { event = std::make_shared(std::move(header)); event->parseEvent(*event_payload); break; } case DB::MySQLReplication::QUERY_EVENT: { event = std::make_shared(std::move(header)); event->parseEvent(*event_payload); auto query = std::static_pointer_cast(event); switch (query->typ) { case DB::MySQLReplication::QUERY_EVENT_MULTI_TXN_FLAG: case DB::MySQLReplication::QUERY_EVENT_XA: { event = std::make_shared(std::move(query->header)); break; } default: break; } break; } case DB::MySQLReplication::XID_EVENT: { event = std::make_shared(std::move(header)); event->parseEvent(*event_payload); break; } case DB::MySQLReplication::TABLE_MAP_EVENT: { DB::MySQLReplication::TableMapEventHeader map_event_header; map_event_header.parse(*event_payload); event = std::make_shared(std::move(header), map_event_header); event->parseEvent(*event_payload); last_table_map_event = std::static_pointer_cast(event); break; } case DB::MySQLReplication::WRITE_ROWS_EVENT_V1: case DB::MySQLReplication::WRITE_ROWS_EVENT_V2: { DB::MySQLReplication::RowsEventHeader rows_header(header.type); rows_header.parse(*event_payload); event = std::make_shared(last_table_map_event, std::move(header), rows_header); event->parseEvent(*event_payload); break; } case DB::MySQLReplication::DELETE_ROWS_EVENT_V1: case DB::MySQLReplication::DELETE_ROWS_EVENT_V2: { DB::MySQLReplication::RowsEventHeader rows_header(header.type); rows_header.parse(*event_payload); event = std::make_shared(last_table_map_event, std::move(header), rows_header); event->parseEvent(*event_payload); break; } case DB::MySQLReplication::UPDATE_ROWS_EVENT_V1: case DB::MySQLReplication::UPDATE_ROWS_EVENT_V2: { DB::MySQLReplication::RowsEventHeader rows_header(header.type); rows_header.parse(*event_payload); event = std::make_shared(last_table_map_event, std::move(header), rows_header); event->parseEvent(*event_payload); break; } case DB::MySQLReplication::GTID_EVENT: { event = std::make_shared(std::move(header)); event->parseEvent(*event_payload); break; } default: { event = std::make_shared(std::move(header)); event->parseEvent(*event_payload); break; } } return event; } static int checkBinLogFile(const std::string & bin_path, bool exist_checksum) { DB::ReadBufferFromFile in(bin_path); DB::assertString("\xfe\x62\x69\x6e", in); /// magic number DB::MySQLReplication::BinlogEventPtr last_event; std::shared_ptr last_header; std::shared_ptr table_map; try { while (!in.eof()) { last_header = std::make_shared(); last_header->parse(in); last_event = parseSingleEventBody(*last_header, in, table_map, exist_checksum); } } catch (...) { DB::WriteBufferFromOStream cerr(std::cerr); cerr << "Unable to parse MySQL binlog event. Code: " << DB::getCurrentExceptionCode() << ", Exception message: " << DB::getCurrentExceptionMessage(false) << '\n' << ", Previous event: " << '\n'; last_event->dump(cerr); cerr << '\n' << ", Event header: " << '\n'; last_header->dump(cerr); cerr << '\n'; return DB::getCurrentExceptionCode(); } DB::WriteBufferFromOStream cout(std::cout); cout << "Check passed. " << '\n' << "No exception was thrown." << '\n' << "The last binlog event: " << '\n'; last_event->dump(cout); cout << '\n'; return 0; } int main(int argc, char ** argv) { boost::program_options::options_description desc("Allowed options"); desc.add_options()("help,h", "Produce help message"); desc.add_options()("disable_checksum", "Disable checksums in binlog files."); boost::program_options::variables_map options; boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); if (options.count("help") || argc < 2) { std::cout << "Usage: " << argv[0] << " mysql_binlog_file" << std::endl; std::cout << desc << std::endl; return 1; } return checkBinLogFile(argv[argc - 1], !options.count("disable_checksum")); }