diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index d63b9f9ccab..0f7d9099314 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -501,6 +501,7 @@ namespace ErrorCodes extern const int NO_RESERVATIONS_PROVIDED = 534; extern const int UNKNOWN_RAID_TYPE = 535; extern const int CANNOT_RESTORE_FROM_FIELD_DUMP = 536; + extern const int ILLEGAL_MYSQL_VARIABLE = 537; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/src/Core/MySQLReplication.cpp b/src/Core/MySQLReplication.cpp index d31c4c0b128..6dd507ec409 100644 --- a/src/Core/MySQLReplication.cpp +++ b/src/Core/MySQLReplication.cpp @@ -2,6 +2,7 @@ #include #include +#include #include namespace DB @@ -398,7 +399,7 @@ namespace MySQLReplication case MYSQL_TYPE_TIMESTAMP: { UInt32 val = 0; payload.readStrict(reinterpret_cast(&val), 4); - row.push_back(Field{UInt64{val}}); + row.push_back(Field{val}); break; } case MYSQL_TYPE_TIME: { @@ -420,15 +421,10 @@ namespace MySQLReplication UInt32 i24 = 0; payload.readStrict(reinterpret_cast(&i24), 3); - String time_buff; - time_buff.resize(10); - sprintf( - time_buff.data(), - "%04d-%02d-%02d", - static_cast((i24 >> 9) & 0x7fff), - static_cast((i24 >> 5) & 0xf), - static_cast(i24 & 0x1f)); - row.push_back(Field{String{time_buff}}); + DayNum date_day_number = DateLUT::instance().makeDayNum( + static_cast((i24 >> 9) & 0x7fff), static_cast((i24 >> 5) & 0xf), static_cast(i24 & 0x1f)); + + row.push_back(Field(date_day_number.toUnderType())); break; } case MYSQL_TYPE_YEAR: { @@ -486,24 +482,20 @@ namespace MySQLReplication readBigEndianStrict(payload, reinterpret_cast(&val), 5); readTimeFractionalPart(payload, reinterpret_cast(&fsp), meta); - struct tm timeinfo; UInt32 year_month = readBits(val, 1, 17, 40); - timeinfo.tm_year = (year_month / 13) - 1900; - timeinfo.tm_mon = (year_month % 13) - 1; - timeinfo.tm_mday = readBits(val, 18, 5, 40); - timeinfo.tm_hour = readBits(val, 23, 5, 40); - timeinfo.tm_min = readBits(val, 28, 6, 40); - timeinfo.tm_sec = readBits(val, 34, 6, 40); + time_t date_time = DateLUT::instance().makeDateTime( + year_month / 13, year_month % 13, readBits(val, 18, 5, 40) + , readBits(val, 23, 5, 40), readBits(val, 28, 6, 40), readBits(val, 34, 6, 40) + ); - time_t time = mktime(&timeinfo); - row.push_back(Field{UInt64{static_cast(time)}}); + row.push_back(Field{UInt32(date_time)}); break; } case MYSQL_TYPE_TIMESTAMP2: { - UInt64 sec = 0, fsp = 0; + UInt32 sec = 0, fsp = 0; readBigEndianStrict(payload, reinterpret_cast(&sec), 4); readTimeFractionalPart(payload, reinterpret_cast(&fsp), meta); - row.push_back(Field{UInt64{sec}}); + row.push_back(Field{sec}); break; } case MYSQL_TYPE_NEWDECIMAL: { diff --git a/src/Databases/MySQL/MaterializeMetadata.cpp b/src/Databases/MySQL/MaterializeMetadata.cpp index 99b515ebea8..1879a1fb79d 100644 --- a/src/Databases/MySQL/MaterializeMetadata.cpp +++ b/src/Databases/MySQL/MaterializeMetadata.cpp @@ -80,16 +80,26 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet(); } -bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const +static Block getShowMasterLogHeader(const String & mysql_version) { - /// TODO: MySQL 5.7 - Block header{ + if (startsWith(mysql_version, "5.")) + { + return Block { + {std::make_shared(), "Log_name"}, + {std::make_shared(), "File_size"} + }; + } + + return Block { {std::make_shared(), "Log_name"}, {std::make_shared(), "File_size"}, {std::make_shared(), "Encrypted"} }; +} - MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", header, DEFAULT_BLOCK_SIZE); +bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const +{ + MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", getShowMasterLogHeader(mysql_version), DEFAULT_BLOCK_SIZE); while (Block block = input.read()) { @@ -143,7 +153,9 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio commitMetadata(fun, persistent_tmp_path, persistent_path); } -MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database, bool & opened_transaction) +MaterializeMetadata::MaterializeMetadata( + mysqlxx::PoolWithFailover::Entry & connection, const String & path_, + const String & database, bool & opened_transaction, const String & mysql_version) : persistent_path(path_) { if (Poco::File(persistent_path).exists()) @@ -159,7 +171,7 @@ MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & conn assertString("\nData Version:\t", in); readIntText(version, in); - if (checkBinlogFileExists(connection)) + if (checkBinlogFileExists(connection, mysql_version)) return; } diff --git a/src/Databases/MySQL/MaterializeMetadata.h b/src/Databases/MySQL/MaterializeMetadata.h index be6f83f8166..79769ef9f4a 100644 --- a/src/Databases/MySQL/MaterializeMetadata.h +++ b/src/Databases/MySQL/MaterializeMetadata.h @@ -23,11 +23,13 @@ struct MaterializeMetadata void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection); - bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const; + bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const; void transaction(const MySQLReplication::Position & position, const std::function & fun); - MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database, bool & opened_transaction); + MaterializeMetadata( + mysqlxx::PoolWithFailover::Entry & connection, const String & path + , const String & database, bool & opened_transaction, const String & mysql_version); }; } diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index aa0adb3b5a0..d53ca15e4b4 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -30,6 +30,7 @@ namespace ErrorCodes { extern const int INCORRECT_QUERY; extern const int SYNTAX_ERROR; + extern const int ILLEGAL_MYSQL_VARIABLE; } static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync"; @@ -77,25 +78,80 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread() } } +static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection) +{ + Block variables_header{ + {std::make_shared(), "Variable_name"}, + {std::make_shared(), "Value"} + }; + + const String & check_query = "SHOW VARIABLES WHERE " + "(variable_name = 'log_bin' AND upper(Value) = 'ON') " + "OR (variable_name = 'binlog_format' AND upper(Value) = 'ROW') " + "OR (variable_name = 'binlog_row_image' AND upper(Value) = 'FULL');"; + + MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE); + + Block variables_block = variables_input.read(); + if (!variables_block || variables_block.rows() != 3) + { + std::unordered_map variable_error_message{ + {"log_bin", "log_bin = 'ON'"}, + {"binlog_format", "binlog_format='ROW'"}, + {"binlog_row_image", "binlog_row_image='FULL'"} + }; + ColumnPtr variable_name_column = variables_block.getByName("Variable_name").column; + + for (size_t index = 0; index < variables_block.rows(); ++index) + { + const auto & error_message_it = variable_error_message.find(variable_name_column->getDataAt(index).toString()); + + if (error_message_it != variable_error_message.end()) + variable_error_message.erase(error_message_it); + } + + bool first = true; + std::stringstream error_message; + error_message << "Illegal MySQL variables, the MaterializeMySQL engine requires "; + for (const auto & [variable_name, variable_error_message] : variable_error_message) + { + error_message << (first ? "" : ", ") << variable_error_message; + + if (first) + first = false; + } + + throw Exception(error_message.str(), ErrorCodes::ILLEGAL_MYSQL_VARIABLE); + } + + Block version_header{{std::make_shared(), "version"}}; + MySQLBlockInputStream version_input(connection, "SELECT version() AS version;", version_header, DEFAULT_BLOCK_SIZE); + + Block version_block = version_input.read(); + if (!version_block || version_block.rows() != 1) + throw Exception("LOGICAL ERROR: cannot get mysql version.", ErrorCodes::LOGICAL_ERROR); + + return version_block.getByPosition(0).column->getDataAt(0).toString(); +} + MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( const Context & context, const String & database_name_, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_) : log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context.getGlobalContext()), database_name(database_name_) , mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_) { - /// TODO: 做简单的check, 失败即报错 - /// binlog_format = ROW binlog_row_image = FULL + const auto & mysql_server_version = checkVariableAndGetVersion(pool.get()); query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") "; - startSynchronization(); + startSynchronization(mysql_server_version); } -void MaterializeMySQLSyncThread::synchronization() +void MaterializeMySQLSyncThread::synchronization(const String & mysql_version) { setThreadName(MYSQL_BACKGROUND_THREAD_NAME); try { - if (std::optional metadata = prepareSynchronized()) + if (std::optional metadata = prepareSynchronized(mysql_version)) { Stopwatch watch; Buffers buffers(database_name); @@ -150,10 +206,10 @@ void MaterializeMySQLSyncThread::stopSynchronization() } } -void MaterializeMySQLSyncThread::startSynchronization() +void MaterializeMySQLSyncThread::startSynchronization(const String & mysql_version) { /// TODO: reset exception. - background_thread_pool = std::make_unique([this]() { synchronization(); }); + background_thread_pool = std::make_unique([this, mysql_version = mysql_version]() { synchronization(mysql_version); }); } static inline void cleanOutdatedTables(const String & database_name, const Context & context) @@ -208,7 +264,7 @@ static inline UInt32 randomNumber() return dist6(rng); } -std::optional MaterializeMySQLSyncThread::prepareSynchronized() +std::optional MaterializeMySQLSyncThread::prepareSynchronized(const String & mysql_version) { std::unique_lock lock(sync_mutex); @@ -227,7 +283,9 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz connection = pool.get(); opened_transaction = false; - MaterializeMetadata metadata(connection, getDatabase(database_name).getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction); + MaterializeMetadata metadata( + connection, getDatabase(database_name).getMetadataPath() + "/.metadata", + mysql_database_name, opened_transaction, mysql_version); if (!metadata.need_dumping_tables.empty()) { diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h index 76297a55f54..18a0bba7a25 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.h +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -32,7 +32,7 @@ public: void stopSynchronization(); - void startSynchronization(); + void startSynchronization(const String & mysql_version); static bool isMySQLSyncThread(); @@ -73,11 +73,11 @@ private: BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, const Context & context); }; - void synchronization(); + void synchronization(const String & mysql_version); bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); } - std::optional prepareSynchronized(); + std::optional prepareSynchronized(const String & mysql_version); void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata); diff --git a/tests/integration/test_materialize_mysql_database/composes/mysql_5_7_compose.yml b/tests/integration/test_materialize_mysql_database/composes/mysql_5_7_compose.yml new file mode 100644 index 00000000000..e69de29bb2d