ISSUES-4006 add some check before create materialize mysql database

This commit is contained in:
zhang2014 2020-07-17 14:39:17 +08:00
parent 95b8afb8eb
commit 74f354d840
7 changed files with 106 additions and 41 deletions

View File

@ -501,6 +501,7 @@ namespace ErrorCodes
extern const int NO_RESERVATIONS_PROVIDED = 534;
extern const int UNKNOWN_RAID_TYPE = 535;
extern const int CANNOT_RESTORE_FROM_FIELD_DUMP = 536;
extern const int ILLEGAL_MYSQL_VARIABLE = 537;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypeString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/DateLUT.h>
#include <Common/FieldVisitors.h>
namespace DB
@ -398,7 +399,7 @@ namespace MySQLReplication
case MYSQL_TYPE_TIMESTAMP: {
UInt32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
row.push_back(Field{UInt64{val}});
row.push_back(Field{val});
break;
}
case MYSQL_TYPE_TIME: {
@ -420,15 +421,10 @@ namespace MySQLReplication
UInt32 i24 = 0;
payload.readStrict(reinterpret_cast<char *>(&i24), 3);
String time_buff;
time_buff.resize(10);
sprintf(
time_buff.data(),
"%04d-%02d-%02d",
static_cast<int>((i24 >> 9) & 0x7fff),
static_cast<int>((i24 >> 5) & 0xf),
static_cast<int>(i24 & 0x1f));
row.push_back(Field{String{time_buff}});
DayNum date_day_number = DateLUT::instance().makeDayNum(
static_cast<int>((i24 >> 9) & 0x7fff), static_cast<int>((i24 >> 5) & 0xf), static_cast<int>(i24 & 0x1f));
row.push_back(Field(date_day_number.toUnderType()));
break;
}
case MYSQL_TYPE_YEAR: {
@ -486,24 +482,20 @@ namespace MySQLReplication
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 5);
readTimeFractionalPart(payload, reinterpret_cast<char *>(&fsp), meta);
struct tm timeinfo;
UInt32 year_month = readBits(val, 1, 17, 40);
timeinfo.tm_year = (year_month / 13) - 1900;
timeinfo.tm_mon = (year_month % 13) - 1;
timeinfo.tm_mday = readBits(val, 18, 5, 40);
timeinfo.tm_hour = readBits(val, 23, 5, 40);
timeinfo.tm_min = readBits(val, 28, 6, 40);
timeinfo.tm_sec = readBits(val, 34, 6, 40);
time_t date_time = DateLUT::instance().makeDateTime(
year_month / 13, year_month % 13, readBits(val, 18, 5, 40)
, readBits(val, 23, 5, 40), readBits(val, 28, 6, 40), readBits(val, 34, 6, 40)
);
time_t time = mktime(&timeinfo);
row.push_back(Field{UInt64{static_cast<UInt32>(time)}});
row.push_back(Field{UInt32(date_time)});
break;
}
case MYSQL_TYPE_TIMESTAMP2: {
UInt64 sec = 0, fsp = 0;
UInt32 sec = 0, fsp = 0;
readBigEndianStrict(payload, reinterpret_cast<char *>(&sec), 4);
readTimeFractionalPart(payload, reinterpret_cast<char *>(&fsp), meta);
row.push_back(Field{UInt64{sec}});
row.push_back(Field{sec});
break;
}
case MYSQL_TYPE_NEWDECIMAL: {

View File

@ -80,16 +80,26 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet<String>();
}
bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const
static Block getShowMasterLogHeader(const String & mysql_version)
{
/// TODO: MySQL 5.7
Block header{
if (startsWith(mysql_version, "5."))
{
return Block {
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"}
};
}
return Block {
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"},
{std::make_shared<DataTypeString>(), "Encrypted"}
};
}
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", header, DEFAULT_BLOCK_SIZE);
bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const
{
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", getShowMasterLogHeader(mysql_version), DEFAULT_BLOCK_SIZE);
while (Block block = input.read())
{
@ -143,7 +153,9 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio
commitMetadata(fun, persistent_tmp_path, persistent_path);
}
MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database, bool & opened_transaction)
MaterializeMetadata::MaterializeMetadata(
mysqlxx::PoolWithFailover::Entry & connection, const String & path_,
const String & database, bool & opened_transaction, const String & mysql_version)
: persistent_path(path_)
{
if (Poco::File(persistent_path).exists())
@ -159,7 +171,7 @@ MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & conn
assertString("\nData Version:\t", in);
readIntText(version, in);
if (checkBinlogFileExists(connection))
if (checkBinlogFileExists(connection, mysql_version))
return;
}

View File

@ -23,11 +23,13 @@ struct MaterializeMetadata
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);
bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const;
bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const;
void transaction(const MySQLReplication::Position & position, const std::function<void()> & fun);
MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database, bool & opened_transaction);
MaterializeMetadata(
mysqlxx::PoolWithFailover::Entry & connection, const String & path
, const String & database, bool & opened_transaction, const String & mysql_version);
};
}

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
extern const int SYNTAX_ERROR;
extern const int ILLEGAL_MYSQL_VARIABLE;
}
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
@ -77,25 +78,80 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
}
}
static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection)
{
Block variables_header{
{std::make_shared<DataTypeString>(), "Variable_name"},
{std::make_shared<DataTypeString>(), "Value"}
};
const String & check_query = "SHOW VARIABLES WHERE "
"(variable_name = 'log_bin' AND upper(Value) = 'ON') "
"OR (variable_name = 'binlog_format' AND upper(Value) = 'ROW') "
"OR (variable_name = 'binlog_row_image' AND upper(Value) = 'FULL');";
MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE);
Block variables_block = variables_input.read();
if (!variables_block || variables_block.rows() != 3)
{
std::unordered_map<String, String> variable_error_message{
{"log_bin", "log_bin = 'ON'"},
{"binlog_format", "binlog_format='ROW'"},
{"binlog_row_image", "binlog_row_image='FULL'"}
};
ColumnPtr variable_name_column = variables_block.getByName("Variable_name").column;
for (size_t index = 0; index < variables_block.rows(); ++index)
{
const auto & error_message_it = variable_error_message.find(variable_name_column->getDataAt(index).toString());
if (error_message_it != variable_error_message.end())
variable_error_message.erase(error_message_it);
}
bool first = true;
std::stringstream error_message;
error_message << "Illegal MySQL variables, the MaterializeMySQL engine requires ";
for (const auto & [variable_name, variable_error_message] : variable_error_message)
{
error_message << (first ? "" : ", ") << variable_error_message;
if (first)
first = false;
}
throw Exception(error_message.str(), ErrorCodes::ILLEGAL_MYSQL_VARIABLE);
}
Block version_header{{std::make_shared<DataTypeString>(), "version"}};
MySQLBlockInputStream version_input(connection, "SELECT version() AS version;", version_header, DEFAULT_BLOCK_SIZE);
Block version_block = version_input.read();
if (!version_block || version_block.rows() != 1)
throw Exception("LOGICAL ERROR: cannot get mysql version.", ErrorCodes::LOGICAL_ERROR);
return version_block.getByPosition(0).column->getDataAt(0).toString();
}
MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
const Context & context, const String & database_name_, const String & mysql_database_name_,
mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_)
: log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context.getGlobalContext()), database_name(database_name_)
, mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_)
{
/// TODO: 做简单的check, 失败即报错
/// binlog_format = ROW binlog_row_image = FULL
const auto & mysql_server_version = checkVariableAndGetVersion(pool.get());
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") ";
startSynchronization();
startSynchronization(mysql_server_version);
}
void MaterializeMySQLSyncThread::synchronization()
void MaterializeMySQLSyncThread::synchronization(const String & mysql_version)
{
setThreadName(MYSQL_BACKGROUND_THREAD_NAME);
try
{
if (std::optional<MaterializeMetadata> metadata = prepareSynchronized())
if (std::optional<MaterializeMetadata> metadata = prepareSynchronized(mysql_version))
{
Stopwatch watch;
Buffers buffers(database_name);
@ -150,10 +206,10 @@ void MaterializeMySQLSyncThread::stopSynchronization()
}
}
void MaterializeMySQLSyncThread::startSynchronization()
void MaterializeMySQLSyncThread::startSynchronization(const String & mysql_version)
{
/// TODO: reset exception.
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this]() { synchronization(); });
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this, mysql_version = mysql_version]() { synchronization(mysql_version); });
}
static inline void cleanOutdatedTables(const String & database_name, const Context & context)
@ -208,7 +264,7 @@ static inline UInt32 randomNumber()
return dist6(rng);
}
std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchronized()
std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchronized(const String & mysql_version)
{
std::unique_lock<std::mutex> lock(sync_mutex);
@ -227,7 +283,9 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
connection = pool.get();
opened_transaction = false;
MaterializeMetadata metadata(connection, getDatabase(database_name).getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction);
MaterializeMetadata metadata(
connection, getDatabase(database_name).getMetadataPath() + "/.metadata",
mysql_database_name, opened_transaction, mysql_version);
if (!metadata.need_dumping_tables.empty())
{

View File

@ -32,7 +32,7 @@ public:
void stopSynchronization();
void startSynchronization();
void startSynchronization(const String & mysql_version);
static bool isMySQLSyncThread();
@ -73,11 +73,11 @@ private:
BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, const Context & context);
};
void synchronization();
void synchronization(const String & mysql_version);
bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); }
std::optional<MaterializeMetadata> prepareSynchronized();
std::optional<MaterializeMetadata> prepareSynchronized(const String & mysql_version);
void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata);