Add method to metadata class to start replication

Previously this was done by the constructor.  This makes it
possible to restart replication with an existing metadata object.
This commit is contained in:
Haavard Kvaalen 2021-02-15 10:07:05 +01:00
parent e6711675a1
commit 8e95d6e174
4 changed files with 37 additions and 26 deletions

View File

@ -159,6 +159,9 @@ static void checkSyncUserPriv(const mysqlxx::PoolWithFailover::Entry & connectio
bool MaterializeMetadata::checkBinlogFileExists(const mysqlxx::PoolWithFailover::Entry & connection) const
{
if (binlog_file.empty())
return false;
Block logs_header {
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"}
@ -219,13 +222,8 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio
commitMetadata(std::move(fun), persistent_tmp_path, persistent_path);
}
MaterializeMetadata::MaterializeMetadata(
mysqlxx::PoolWithFailover::Entry & connection, const String & path_,
const String & database, bool & opened_transaction)
: persistent_path(path_)
MaterializeMetadata::MaterializeMetadata(const String & path_) : persistent_path(path_)
{
checkSyncUserPriv(connection);
if (Poco::File(persistent_path).exists())
{
ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE);
@ -239,9 +237,17 @@ MaterializeMetadata::MaterializeMetadata(
assertString("\nData Version:\t", in);
readIntText(data_version, in);
if (checkBinlogFileExists(connection))
return;
}
}
void MaterializeMetadata::startReplication(
mysqlxx::PoolWithFailover::Entry & connection, const String & database,
bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables)
{
checkSyncUserPriv(connection);
if (checkBinlogFileExists(connection))
return;
bool locked_tables = false;

View File

@ -35,7 +35,6 @@ struct MaterializeMetadata
size_t data_version = 1;
size_t meta_version = 2;
String binlog_checksum = "CRC32";
std::unordered_map<String, String> need_dumping_tables;
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);
@ -45,9 +44,13 @@ struct MaterializeMetadata
void transaction(const MySQLReplication::Position & position, const std::function<void()> & fun);
MaterializeMetadata(
mysqlxx::PoolWithFailover::Entry & connection, const String & path
, const String & database, bool & opened_transaction);
void startReplication(
mysqlxx::PoolWithFailover::Entry & connection,
const String & database,
bool & opened_transaction,
std::unordered_map<String, String> & need_dumping_tables);
MaterializeMetadata(const String & path_);
};
}

View File

@ -155,7 +155,9 @@ void MaterializeMySQLSyncThread::synchronization()
try
{
if (std::optional<MaterializeMetadata> metadata = prepareSynchronized())
MaterializeMetadata metadata(
DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata");
if (prepareSynchronized(metadata))
{
Stopwatch watch;
Buffers buffers(database_name);
@ -168,7 +170,7 @@ void MaterializeMySQLSyncThread::synchronization()
{
if (binlog_event)
onEvent(buffers, binlog_event, *metadata);
onEvent(buffers, binlog_event, metadata);
if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds(
settings->max_rows_in_buffer, settings->max_bytes_in_buffer,
@ -178,7 +180,7 @@ void MaterializeMySQLSyncThread::synchronization()
watch.restart();
if (!buffers.data.empty())
flushBuffersData(buffers, *metadata);
flushBuffersData(buffers, metadata);
}
}
}
@ -281,12 +283,12 @@ static inline BlockOutputStreamPtr getTableOutput(const String & database_name,
}
static inline void dumpDataForTables(
mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info,
mysqlxx::Pool::Entry & connection, const std::unordered_map<String, String> & need_dumping_tables,
const String & query_prefix, const String & database_name, const String & mysql_database_name,
const Context & context, const std::function<bool()> & is_cancelled)
{
auto iterator = master_info.need_dumping_tables.begin();
for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator)
auto iterator = need_dumping_tables.begin();
for (; iterator != need_dumping_tables.end() && !is_cancelled(); ++iterator)
{
try
{
@ -325,7 +327,7 @@ static inline UInt32 randomNumber()
return dist6(rng);
}
std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchronized()
bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metadata)
{
bool opened_transaction = false;
mysqlxx::PoolWithFailover::Entry connection;
@ -338,10 +340,10 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
opened_transaction = false;
checkMySQLVariables(connection);
MaterializeMetadata metadata(
connection, DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction);
std::unordered_map<String, String> need_dumping_tables;
metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables);
if (!metadata.need_dumping_tables.empty())
if (!need_dumping_tables.empty())
{
Position position;
position.update(metadata.binlog_position, metadata.binlog_file, metadata.executed_gtid_set);
@ -349,7 +351,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
metadata.transaction(position, [&]()
{
cleanOutdatedTables(database_name, global_context);
dumpDataForTables(connection, metadata, query_prefix, database_name, mysql_database_name, global_context, [this] { return isCancelled(); });
dumpDataForTables(connection, need_dumping_tables, query_prefix, database_name, mysql_database_name, global_context, [this] { return isCancelled(); });
});
const auto & position_message = [&]()
@ -368,7 +370,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set, metadata.binlog_checksum);
setSynchronizationThreadException(nullptr);
return metadata;
return true;
}
catch (...)
{
@ -390,7 +392,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
}
}
return {};
return false;
}
void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata)

View File

@ -100,7 +100,7 @@ private:
bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); }
std::optional<MaterializeMetadata> prepareSynchronized();
bool prepareSynchronized(MaterializeMetadata & metadata);
void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata);