ISSUES-4006 support dump all mysql data when create database

This commit is contained in:
zhang2014 2020-05-08 15:36:48 +08:00
parent 5d569f56a7
commit bd18c1cbf8
3 changed files with 137 additions and 33 deletions

View File

@ -19,6 +19,7 @@
#if USE_MYSQL
# include <Databases/MySQL/DatabaseConnectionMySQL.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Common/parseAddress.h>
# include <mysqlxx/Pool.h>
@ -116,9 +117,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
/*if (materializeMySQLDatabase(define->settings))
if (materializeMySQLDatabase(engine_define->settings))
return std::make_shared<DatabaseMaterializeMySQL>(
context, database_name, metadata_path, define, mysql_database_name, std::move(mysql_pool));*/
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool));
return std::make_shared<DatabaseConnectionMySQL>(context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool));
}

View File

@ -6,17 +6,29 @@
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Databases/MySQL/MasterStatusInfo.h>
#include <IO/Operators.h>
# include <DataStreams/copyData.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/MySQL/MasterStatusInfo.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/Operators.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/MySQL/CreateQueryConvertVisitor.h>
# include <Interpreters/executeQuery.h>
# include <Parsers/MySQL/ASTCreateQuery.h>
# include <Parsers/parseQuery.h>
# include <Common/quoteString.h>
# include <Common/setThreadName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
static MasterStatusInfo fetchMasterStatus(const mysqlxx::PoolWithFailover::Entry & connection)
{
Block header
@ -69,38 +81,115 @@ DatabaseMaterializeMySQL::DatabaseMaterializeMySQL(
/*, global_context(context.getGlobalContext()), metadata_path(metadata_path_)*/
, database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_), pool(std::move(pool_))
{
/// TODO: 做简单的check, 失败即报错
}
String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name)
{
Block show_create_table_header{
{std::make_shared<DataTypeString>(), "Table"},
{std::make_shared<DataTypeUInt64>(), "Create Table"},
};
MySQLBlockInputStream show_create_table(
connection, "SHOW CREATE TABLE " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name),
show_create_table_header, DEFAULT_BLOCK_SIZE);
Block create_query_block = show_create_table.read();
if (!create_query_block || create_query_block.rows() != 1)
throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR);
const auto & create_query = create_query_block.getByName("Create Table").column->getDataAt(0);
MySQLParser::ParserCreateQuery p_create_query;
ASTPtr ast = parseQuery(p_create_query, create_query.data, create_query.data + create_query.size, "", 0, 0);
WriteBufferFromOwnString out;
MySQLVisitor::CreateQueryConvertVisitor::Data data{.out = out};
MySQLVisitor::CreateQueryConvertVisitor visitor(data);
visitor.visit(ast);
return out.str();
}
void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute)
{
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
try
{
mysqlxx::PoolWithFailover::Entry connection = pool.get();
connection->query("FLUSH TABLES;").execute();
connection->query("FLUSH TABLES WITH READ LOCK;").execute();
MasterStatusInfo master_status = fetchMasterStatus(connection);
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
std::vector<String> tables_in_db = fetchTablesInDB(connection, mysql_database_name);
connection->query("UNLOCK TABLES;").execute();
for (const auto & dumping_table_name : tables_in_db)
{
/// TODO: 查询表结构, 根据不同的模式创建对应的表(暂时只支持多version即可)
connection->query("SHOW CREATE TABLE " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name))
.execute();
MySQLBlockInputStream input(
"SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name));
copyData(input, output);
/// TODO: 查询所有数据写入对应表中(全量dump)
/// TODO: 启动slave, 监听事件
}
Context context = global_context;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
executeQuery(istr, ostr, false, context, {});
}
catch (...)
{
tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully");
throw;
}
}
LOG_DEBUG(log, "Executed query: " << query_to_execute);
}
void DatabaseMaterializeMySQL::dumpMySQLDatabase()
{
mysqlxx::PoolWithFailover::Entry connection = pool.get();
connection->query("FLUSH TABLES;").execute();
connection->query("FLUSH TABLES WITH READ LOCK;").execute();
MasterStatusInfo master_status = fetchMasterStatus(connection);
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
std::vector<String> tables_in_db = fetchTablesInDB(connection, mysql_database_name);
connection->query("UNLOCK TABLES;").execute();
for (const auto & dumping_table_name : tables_in_db)
{
String query_prefix = "/* Dumping " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name) + " for "
+ backQuoteIfNeed(database_name) + " Database */ ";
tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name));
tryToExecuteQuery(query_prefix + getCreateQuery(connection, mysql_database_name, dumping_table_name));
Context context = global_context;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
BlockIO streams = executeQuery(query_prefix + " INSERT INTO " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name), context, true);
if (!streams.out)
throw Exception("LOGICAL ERROR out stream is undefined.", ErrorCodes::LOGICAL_ERROR);
MySQLBlockInputStream input(
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name),
streams.out->getHeader(), DEFAULT_BLOCK_SIZE);
copyData(input, *streams.out /*, is_quit*/);
/// TODO: 启动slave, 监听事件
}
}
void DatabaseMaterializeMySQL::synchronization()
{
setThreadName("MySQLDBSync");
try
{
std::unique_lock<std::mutex> lock{sync_mutex};
/// Check database is exists in ClickHouse.
while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name))
sync_cond.wait_for(lock, std::chrono::seconds(1));
/// 查找一下位点文件, 如果不存在需要清理目前的数据库, 然后dump全量数据.
dumpMySQLDatabase();
}
catch(...)
{
tryLogCurrentException(log);
}
}
}

View File

@ -9,6 +9,7 @@
#include <Databases/DatabaseOrdinary.h>
#include <Parsers/ASTCreateQuery.h>
#include <mysqlxx/Pool.h>
#include <mutex>
namespace DB
{
@ -29,6 +30,19 @@ private:
String mysql_database_name;
mutable mysqlxx::Pool pool;
void synchronization();
void dumpMySQLDatabase();
void tryToExecuteQuery(const String & query_to_execute);
String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name);
mutable std::mutex sync_mutex;
std::atomic<bool> sync_quit{false};
std::condition_variable sync_cond;
ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronization, this};
};
}