From bd18c1cbf89bd934d4659afc1e77afc6a15258b1 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 8 May 2020 15:36:48 +0800 Subject: [PATCH] ISSUES-4006 support dump all mysql data when create database --- src/Databases/DatabaseFactory.cpp | 5 +- .../MySQL/DatabaseMaterializeMySQL.cpp | 151 ++++++++++++++---- .../MySQL/DatabaseMaterializeMySQL.h | 14 ++ 3 files changed, 137 insertions(+), 33 deletions(-) diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index c6b79a24ff1..cf7e93a0e1f 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -19,6 +19,7 @@ #if USE_MYSQL # include +# include # include # include # include @@ -116,9 +117,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306); auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port); - /*if (materializeMySQLDatabase(define->settings)) + if (materializeMySQLDatabase(engine_define->settings)) return std::make_shared( - context, database_name, metadata_path, define, mysql_database_name, std::move(mysql_pool));*/ + context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); return std::make_shared(context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); } diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 9fdf2c037ea..288e99cd906 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -6,17 +6,29 @@ #include -#include -#include -#include -#include -#include -#include -#include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include namespace DB { +namespace ErrorCodes +{ + extern const int INCORRECT_QUERY; +} + static MasterStatusInfo fetchMasterStatus(const mysqlxx::PoolWithFailover::Entry & connection) { Block header @@ -69,38 +81,115 @@ DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( /*, global_context(context.getGlobalContext()), metadata_path(metadata_path_)*/ , database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_), pool(std::move(pool_)) { + /// TODO: 做简单的check, 失败即报错 +} + +String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name) +{ + Block show_create_table_header{ + {std::make_shared(), "Table"}, + {std::make_shared(), "Create Table"}, + }; + + MySQLBlockInputStream show_create_table( + connection, "SHOW CREATE TABLE " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), + show_create_table_header, DEFAULT_BLOCK_SIZE); + + Block create_query_block = show_create_table.read(); + if (!create_query_block || create_query_block.rows() != 1) + throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR); + + const auto & create_query = create_query_block.getByName("Create Table").column->getDataAt(0); + + MySQLParser::ParserCreateQuery p_create_query; + ASTPtr ast = parseQuery(p_create_query, create_query.data, create_query.data + create_query.size, "", 0, 0); + + WriteBufferFromOwnString out; + MySQLVisitor::CreateQueryConvertVisitor::Data data{.out = out}; + MySQLVisitor::CreateQueryConvertVisitor visitor(data); + visitor.visit(ast); + return out.str(); +} + +void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute) +{ + ReadBufferFromString istr(query_to_execute); + String dummy_string; + WriteBufferFromString ostr(dummy_string); + try { - mysqlxx::PoolWithFailover::Entry connection = pool.get(); - - connection->query("FLUSH TABLES;").execute(); - connection->query("FLUSH TABLES WITH READ LOCK;").execute(); - - MasterStatusInfo master_status = fetchMasterStatus(connection); - connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); - connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); - - std::vector tables_in_db = fetchTablesInDB(connection, mysql_database_name); - connection->query("UNLOCK TABLES;").execute(); - - for (const auto & dumping_table_name : tables_in_db) - { - /// TODO: 查询表结构, 根据不同的模式创建对应的表(暂时只支持多version即可) - connection->query("SHOW CREATE TABLE " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name)) - .execute(); - MySQLBlockInputStream input( - "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name)); - copyData(input, output); - /// TODO: 查询所有数据写入对应表中(全量dump) - /// TODO: 启动slave, 监听事件 - } + Context context = global_context; + context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + context.setCurrentQueryId(""); // generate random query_id + executeQuery(istr, ostr, false, context, {}); } catch (...) { + tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully"); throw; } -} + LOG_DEBUG(log, "Executed query: " << query_to_execute); +} +void DatabaseMaterializeMySQL::dumpMySQLDatabase() +{ + mysqlxx::PoolWithFailover::Entry connection = pool.get(); + + connection->query("FLUSH TABLES;").execute(); + connection->query("FLUSH TABLES WITH READ LOCK;").execute(); + + MasterStatusInfo master_status = fetchMasterStatus(connection); + connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); + connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); + + std::vector tables_in_db = fetchTablesInDB(connection, mysql_database_name); + connection->query("UNLOCK TABLES;").execute(); + + for (const auto & dumping_table_name : tables_in_db) + { + String query_prefix = "/* Dumping " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name) + " for " + + backQuoteIfNeed(database_name) + " Database */ "; + + tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name)); + tryToExecuteQuery(query_prefix + getCreateQuery(connection, mysql_database_name, dumping_table_name)); + + Context context = global_context; + context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + context.setCurrentQueryId(""); // generate random query_id + BlockIO streams = executeQuery(query_prefix + " INSERT INTO " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name), context, true); + + if (!streams.out) + throw Exception("LOGICAL ERROR out stream is undefined.", ErrorCodes::LOGICAL_ERROR); + + MySQLBlockInputStream input( + connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name), + streams.out->getHeader(), DEFAULT_BLOCK_SIZE); + + copyData(input, *streams.out /*, is_quit*/); + /// TODO: 启动slave, 监听事件 + } +} +void DatabaseMaterializeMySQL::synchronization() +{ + setThreadName("MySQLDBSync"); + + try + { + std::unique_lock lock{sync_mutex}; + + /// Check database is exists in ClickHouse. + while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name)) + sync_cond.wait_for(lock, std::chrono::seconds(1)); + + /// 查找一下位点文件, 如果不存在需要清理目前的数据库, 然后dump全量数据. + dumpMySQLDatabase(); + } + catch(...) + { + tryLogCurrentException(log); + } +} } diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index 136df735c79..2d00e4888fb 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -9,6 +9,7 @@ #include #include #include +#include namespace DB { @@ -29,6 +30,19 @@ private: String mysql_database_name; mutable mysqlxx::Pool pool; + + void synchronization(); + + void dumpMySQLDatabase(); + + void tryToExecuteQuery(const String & query_to_execute); + + String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name); + + mutable std::mutex sync_mutex; + std::atomic sync_quit{false}; + std::condition_variable sync_cond; + ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronization, this}; }; }