diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 178fca8bbe4..508cdd5a861 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -18,6 +18,7 @@ # include # include # include +# include # include # include @@ -29,51 +30,6 @@ namespace ErrorCodes extern const int INCORRECT_QUERY; } -static MasterStatusInfo fetchMasterStatus(const mysqlxx::PoolWithFailover::Entry & connection) -{ - Block header - { - {std::make_shared(), "File"}, - {std::make_shared(), "Position"}, - {std::make_shared(), "Binlog_Do_DB"}, - {std::make_shared(), "Binlog_Ignore_DB"}, - {std::make_shared(), "Executed_Gtid_Set"}, - }; - - MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE); - Block master_status = input.read(); - - if (!master_status || master_status.rows() != 1) - throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR); - - return MasterStatusInfo - { - (*master_status.getByPosition(0).column)[0].safeGet(), - (*master_status.getByPosition(1).column)[0].safeGet(), - (*master_status.getByPosition(2).column)[0].safeGet(), - (*master_status.getByPosition(3).column)[0].safeGet(), - (*master_status.getByPosition(4).column)[0].safeGet() - }; -} - -static std::vector fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database) -{ - Block header{{std::make_shared(), "table_name"}}; - String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = " + quoteString(database); - - std::vector tables_in_db; - MySQLBlockInputStream input(connection, query, header, DEFAULT_BLOCK_SIZE); - - while (Block block = input.read()) - { - tables_in_db.reserve(tables_in_db.size() + block.rows()); - for (size_t index = 0; index < block.rows(); ++index) - tables_in_db.emplace_back((*block.getByPosition(0).column)[index].safeGet()); - } - - return tables_in_db; -} - DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( const Context & context, const String & database_name_, const String & metadata_path_, const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_) @@ -84,6 +40,28 @@ DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( /// TODO: 做简单的check, 失败即报错 } +void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute) +{ + ReadBufferFromString istr(query_to_execute); + String dummy_string; + WriteBufferFromString ostr(dummy_string); + + try + { + Context context = global_context; + context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + context.setCurrentQueryId(""); // generate random query_id + executeQuery(istr, ostr, false, context, {}); + } + catch (...) + { + tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully"); + throw; + } + + LOG_DEBUG(log, "Executed query: " << query_to_execute); +} + String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name) { Block show_create_table_header{ @@ -115,53 +93,28 @@ String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & con return out.str(); } -void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute) -{ - ReadBufferFromString istr(query_to_execute); - String dummy_string; - WriteBufferFromString ostr(dummy_string); - - try - { - Context context = global_context; - context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - context.setCurrentQueryId(""); // generate random query_id - executeQuery(istr, ostr, false, context, {}); - } - catch (...) - { - tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully"); - throw; - } - - LOG_DEBUG(log, "Executed query: " << query_to_execute); -} -void DatabaseMaterializeMySQL::dumpMySQLDatabase() +void DatabaseMaterializeMySQL::dumpMySQLDatabase(const std::function & is_cancelled) { mysqlxx::PoolWithFailover::Entry connection = pool.get(); - connection->query("FLUSH TABLES;").execute(); - connection->query("FLUSH TABLES WITH READ LOCK;").execute(); + MasterStatusInfo info(connection, mysql_database_name); - MasterStatusInfo master_status = fetchMasterStatus(connection); - connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); - connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); - - std::vector tables_in_db = fetchTablesInDB(connection, mysql_database_name); - connection->query("UNLOCK TABLES;").execute(); - - for (const auto & dumping_table_name : tables_in_db) + for (const auto & dumping_table_name : info.need_dumping_tables) { + if (is_cancelled()) + return; + + const auto & table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name); String query_prefix = "/* Dumping " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name) + " for " + backQuoteIfNeed(database_name) + " Database */ "; - tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name)); + tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + table_name); tryToExecuteQuery(query_prefix + getCreateQuery(connection, database_name, dumping_table_name)); Context context = global_context; - context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; context.setCurrentQueryId(""); // generate random query_id - BlockIO streams = executeQuery( query_prefix + " INSERT INTO " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name) + " VALUES", context, true); + context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + BlockIO streams = executeQuery(query_prefix + " INSERT INTO " + table_name + " VALUES", context, true); if (!streams.out) throw Exception("LOGICAL ERROR out stream is undefined.", ErrorCodes::LOGICAL_ERROR); @@ -170,7 +123,7 @@ void DatabaseMaterializeMySQL::dumpMySQLDatabase() connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name), streams.out->getHeader(), DEFAULT_BLOCK_SIZE); - copyData(input, *streams.out /*, is_quit*/); + copyData(input, *streams.out, is_cancelled); /// TODO: 启动slave, 监听事件 } } @@ -187,8 +140,14 @@ void DatabaseMaterializeMySQL::synchronization() sync_cond.wait_for(lock, std::chrono::seconds(1)); LOG_DEBUG(log, database_name + " database status is OK."); - /// 查找一下位点文件, 如果不存在需要清理目前的数据库, 然后dump全量数据. - dumpMySQLDatabase(); + + Poco::File dumped_flag(getMetadataPath() + "/dumped.flag"); + + if (!dumped_flag.exists()) + { + dumpMySQLDatabase([&]() { return sync_quit.load(std::memory_order_seq_cst); }); + dumped_flag.createFile(); + } } catch(...) { diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index 2d00e4888fb..b4fabdb20eb 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -33,10 +33,10 @@ private: void synchronization(); - void dumpMySQLDatabase(); - void tryToExecuteQuery(const String & query_to_execute); + void dumpMySQLDatabase(const std::function & is_cancelled); + String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name); mutable std::mutex sync_mutex; diff --git a/src/Databases/MySQL/MasterStatusInfo.cpp b/src/Databases/MySQL/MasterStatusInfo.cpp index 007f611520f..3a1a6c3d7dd 100644 --- a/src/Databases/MySQL/MasterStatusInfo.cpp +++ b/src/Databases/MySQL/MasterStatusInfo.cpp @@ -1,12 +1,84 @@ +#include +#include +#include #include +#include +#include namespace DB { -MasterStatusInfo::MasterStatusInfo( +/*MasterStatusInfo::MasterStatusInfo( String binlog_file_, UInt64 binlog_position_, String binlog_do_db_, String binlog_ignore_db_, String executed_gtid_set_) : binlog_file(binlog_file_), binlog_position(binlog_position_), binlog_do_db(binlog_do_db_), binlog_ignore_db(binlog_ignore_db_), executed_gtid_set(executed_gtid_set_) { +}*/ + +static std::vector fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database) +{ + Block header{{std::make_shared(), "table_name"}}; + String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = " + quoteString(database); + + std::vector tables_in_db; + MySQLBlockInputStream input(connection, query, header, DEFAULT_BLOCK_SIZE); + + while (Block block = input.read()) + { + tables_in_db.reserve(tables_in_db.size() + block.rows()); + for (size_t index = 0; index < block.rows(); ++index) + tables_in_db.emplace_back((*block.getByPosition(0).column)[index].safeGet()); + } + + return tables_in_db; +} + +MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database) +{ + bool locked_tables = false; + + try + { + connection->query("FLUSH TABLES;").execute(); + connection->query("FLUSH TABLES WITH READ LOCK;").execute(); + + locked_tables = true; + fetchMasterStatus(connection); + connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); + connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); + + need_dumping_tables = fetchTablesInDB(connection, database); + connection->query("UNLOCK TABLES;").execute(); + } + catch (...) + { + if (locked_tables) + connection->query("UNLOCK TABLES;").execute(); + + throw; + } +} +void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection) +{ + Block header + { + {std::make_shared(), "File"}, + {std::make_shared(), "Position"}, + {std::make_shared(), "Binlog_Do_DB"}, + {std::make_shared(), "Binlog_Ignore_DB"}, + {std::make_shared(), "Executed_Gtid_Set"}, + }; + + MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE); + Block master_status = input.read(); + + if (!master_status || master_status.rows() != 1) + throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR); + + binlog_file = (*master_status.getByPosition(0).column)[0].safeGet(); + binlog_position = (*master_status.getByPosition(1).column)[0].safeGet(); + binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet(); + binlog_ignore_db = (*master_status.getByPosition(3).column)[0].safeGet(); + executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet(); } } diff --git a/src/Databases/MySQL/MasterStatusInfo.h b/src/Databases/MySQL/MasterStatusInfo.h index ba953ceb1dc..ebc2a8c8d12 100644 --- a/src/Databases/MySQL/MasterStatusInfo.h +++ b/src/Databases/MySQL/MasterStatusInfo.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -14,8 +15,11 @@ struct MasterStatusInfo String binlog_ignore_db; String executed_gtid_set; - MasterStatusInfo( - String binlog_file_, UInt64 binlog_position_, String binlog_do_db_, String binlog_ignore_db_, String executed_gtid_set_); + std::vector need_dumping_tables; + + MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database); + + void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection); }; diff --git a/src/Interpreters/MySQL/CreateQueryConvertVisitor.cpp b/src/Interpreters/MySQL/CreateQueryConvertVisitor.cpp index c52588cab64..c40cbaf846a 100644 --- a/src/Interpreters/MySQL/CreateQueryConvertVisitor.cpp +++ b/src/Interpreters/MySQL/CreateQueryConvertVisitor.cpp @@ -83,7 +83,7 @@ void CreateQueryMatcher::visit(ASTPtr & ast, Data & data) visit(*t, ast, data); } -void CreateQueryMatcher::visit(MySQLParser::ASTCreateQuery & create, ASTPtr &, Data & data) +void CreateQueryMatcher::visit(const MySQLParser::ASTCreateQuery & create, const ASTPtr &, Data & data) { if (create.like_table) throw Exception("Cannot convert create like statement to ClickHouse SQL", ErrorCodes::NOT_IMPLEMENTED); @@ -104,7 +104,13 @@ void CreateQueryMatcher::visit(MySQLParser::ASTCreateQuery & create, ASTPtr &, D << " ORDER BY " << queryToString(data.getFormattedOrderByExpression()); } -void CreateQueryMatcher::visit(MySQLParser::ASTCreateDefines & create_defines, ASTPtr &, Data & data) +void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareIndex & declare_index, const ASTPtr &, Data & data) +{ + if (startsWith(declare_index.index_type, "PRIMARY_KEY_")) + data.addPrimaryKey(declare_index.index_columns); +} + +void CreateQueryMatcher::visit(const MySQLParser::ASTCreateDefines & create_defines, const ASTPtr &, Data & data) { if (!create_defines.columns || create_defines.columns->children.empty()) throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); @@ -119,13 +125,7 @@ void CreateQueryMatcher::visit(MySQLParser::ASTCreateDefines & create_defines, A visit(*column->as(), column, data); } -void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareIndex & declare_index, ASTPtr &, Data & data) -{ - if (startsWith(declare_index.index_type, "PRIMARY_KEY_")) - data.addPrimaryKey(declare_index.index_columns); -} - -void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareColumn & declare_column, ASTPtr &, Data & data) +void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareColumn & declare_column, const ASTPtr &, Data & data) { if (!declare_column.data_type) throw Exception("Missing type in definition of column.", ErrorCodes::UNKNOWN_TYPE); @@ -158,7 +158,7 @@ void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareColumn & declare_col throw Exception("Unsupported MySQL data type " + queryToString(declare_column.data_type) + ".", ErrorCodes::NOT_IMPLEMENTED); } -void CreateQueryMatcher::visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, ASTPtr &, Data & data) +void CreateQueryMatcher::visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, const ASTPtr &, Data & data) { data.addPartitionKey(declare_partition_options.partition_expression); } diff --git a/src/Interpreters/MySQL/CreateQueryConvertVisitor.h b/src/Interpreters/MySQL/CreateQueryConvertVisitor.h index c4c31f54b88..10fcb0e7b7e 100644 --- a/src/Interpreters/MySQL/CreateQueryConvertVisitor.h +++ b/src/Interpreters/MySQL/CreateQueryConvertVisitor.h @@ -45,29 +45,21 @@ public: static bool needChildVisit(ASTPtr &, const ASTPtr &) { return false; } private: - static void visit(MySQLParser::ASTCreateQuery & create, ASTPtr & ast, Data &); + static void visit(const MySQLParser::ASTCreateQuery & create, const ASTPtr &, Data & data); - static void visit(MySQLParser::ASTCreateDefines & create_defines, ASTPtr & ast, Data & data); + static void visit(const MySQLParser::ASTDeclareIndex & declare_index, const ASTPtr &, Data & data); - static void visit(const MySQLParser::ASTDeclareIndex & declare_index, ASTPtr & ast, Data & data); + static void visit(const MySQLParser::ASTCreateDefines & create_defines, const ASTPtr &, Data & data); - static void visit(const MySQLParser::ASTDeclareColumn & declare_column, ASTPtr & ast, Data & data); - - static void visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, ASTPtr & ast, Data & data); - -// static void visitPartitionBy(MySQLParser::ASTCreateQuery & create, ASTPtr & ast, Data & data); - -// static void visitPartitionBy(MySQLParser::ASTCreateDefines & create_defines, ASTPtr & ast, Data & data); - -// static void visitPartitionBy(const MySQLParser::ASTDeclarePartitionOptions & partition_options, ASTPtr & ast, Data & data); - -// static void visitColumns(const ASTFunction & declare_column, ASTPtr & ast, Data & data); -// static void visit(ASTTableJoin & join, const ASTPtr & ast, Data &); + static void visit(const MySQLParser::ASTDeclareColumn & declare_column, const ASTPtr &, Data & data); + static void visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, const ASTPtr &, Data & data); }; using CreateQueryConvertVisitor = CreateQueryMatcher::Visitor; } + + }