ISSUES-4006 unlock table on exception

This commit is contained in:
zhang2014 2020-05-12 20:28:46 +08:00
parent 86c6be4cd9
commit 7203c6e186
6 changed files with 140 additions and 113 deletions

View File

@ -18,6 +18,7 @@
# include <Interpreters/executeQuery.h>
# include <Parsers/MySQL/ASTCreateQuery.h>
# include <Parsers/parseQuery.h>
# include <Poco/File.h>
# include <Common/quoteString.h>
# include <Common/setThreadName.h>
@ -29,51 +30,6 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
}
static MasterStatusInfo fetchMasterStatus(const mysqlxx::PoolWithFailover::Entry & connection)
{
Block header
{
{std::make_shared<DataTypeString>(), "File"},
{std::make_shared<DataTypeUInt64>(), "Position"},
{std::make_shared<DataTypeString>(), "Binlog_Do_DB"},
{std::make_shared<DataTypeString>(), "Binlog_Ignore_DB"},
{std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
};
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE);
Block master_status = input.read();
if (!master_status || master_status.rows() != 1)
throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR);
return MasterStatusInfo
{
(*master_status.getByPosition(0).column)[0].safeGet<String>(),
(*master_status.getByPosition(1).column)[0].safeGet<UInt64>(),
(*master_status.getByPosition(2).column)[0].safeGet<String>(),
(*master_status.getByPosition(3).column)[0].safeGet<String>(),
(*master_status.getByPosition(4).column)[0].safeGet<String>()
};
}
static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database)
{
Block header{{std::make_shared<DataTypeString>(), "table_name"}};
String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = " + quoteString(database);
std::vector<String> tables_in_db;
MySQLBlockInputStream input(connection, query, header, DEFAULT_BLOCK_SIZE);
while (Block block = input.read())
{
tables_in_db.reserve(tables_in_db.size() + block.rows());
for (size_t index = 0; index < block.rows(); ++index)
tables_in_db.emplace_back((*block.getByPosition(0).column)[index].safeGet<String>());
}
return tables_in_db;
}
DatabaseMaterializeMySQL::DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_)
@ -84,6 +40,28 @@ DatabaseMaterializeMySQL::DatabaseMaterializeMySQL(
/// TODO: 做简单的check, 失败即报错
}
void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute)
{
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
try
{
Context context = global_context;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
executeQuery(istr, ostr, false, context, {});
}
catch (...)
{
tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully");
throw;
}
LOG_DEBUG(log, "Executed query: " << query_to_execute);
}
String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name)
{
Block show_create_table_header{
@ -115,53 +93,28 @@ String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & con
return out.str();
}
void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute)
{
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
try
{
Context context = global_context;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
executeQuery(istr, ostr, false, context, {});
}
catch (...)
{
tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully");
throw;
}
LOG_DEBUG(log, "Executed query: " << query_to_execute);
}
void DatabaseMaterializeMySQL::dumpMySQLDatabase()
void DatabaseMaterializeMySQL::dumpMySQLDatabase(const std::function<bool()> & is_cancelled)
{
mysqlxx::PoolWithFailover::Entry connection = pool.get();
connection->query("FLUSH TABLES;").execute();
connection->query("FLUSH TABLES WITH READ LOCK;").execute();
MasterStatusInfo info(connection, mysql_database_name);
MasterStatusInfo master_status = fetchMasterStatus(connection);
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
std::vector<String> tables_in_db = fetchTablesInDB(connection, mysql_database_name);
connection->query("UNLOCK TABLES;").execute();
for (const auto & dumping_table_name : tables_in_db)
for (const auto & dumping_table_name : info.need_dumping_tables)
{
if (is_cancelled())
return;
const auto & table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name);
String query_prefix = "/* Dumping " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name) + " for "
+ backQuoteIfNeed(database_name) + " Database */ ";
tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name));
tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + table_name);
tryToExecuteQuery(query_prefix + getCreateQuery(connection, database_name, dumping_table_name));
Context context = global_context;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
BlockIO streams = executeQuery( query_prefix + " INSERT INTO " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name) + " VALUES", context, true);
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
BlockIO streams = executeQuery(query_prefix + " INSERT INTO " + table_name + " VALUES", context, true);
if (!streams.out)
throw Exception("LOGICAL ERROR out stream is undefined.", ErrorCodes::LOGICAL_ERROR);
@ -170,7 +123,7 @@ void DatabaseMaterializeMySQL::dumpMySQLDatabase()
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name),
streams.out->getHeader(), DEFAULT_BLOCK_SIZE);
copyData(input, *streams.out /*, is_quit*/);
copyData(input, *streams.out, is_cancelled);
/// TODO: 启动slave, 监听事件
}
}
@ -187,8 +140,14 @@ void DatabaseMaterializeMySQL::synchronization()
sync_cond.wait_for(lock, std::chrono::seconds(1));
LOG_DEBUG(log, database_name + " database status is OK.");
/// 查找一下位点文件, 如果不存在需要清理目前的数据库, 然后dump全量数据.
dumpMySQLDatabase();
Poco::File dumped_flag(getMetadataPath() + "/dumped.flag");
if (!dumped_flag.exists())
{
dumpMySQLDatabase([&]() { return sync_quit.load(std::memory_order_seq_cst); });
dumped_flag.createFile();
}
}
catch(...)
{

View File

@ -33,10 +33,10 @@ private:
void synchronization();
void dumpMySQLDatabase();
void tryToExecuteQuery(const String & query_to_execute);
void dumpMySQLDatabase(const std::function<bool()> & is_cancelled);
String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name);
mutable std::mutex sync_mutex;

View File

@ -1,12 +1,84 @@
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/MySQL/MasterStatusInfo.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Common/quoteString.h>
namespace DB
{
MasterStatusInfo::MasterStatusInfo(
/*MasterStatusInfo::MasterStatusInfo(
String binlog_file_, UInt64 binlog_position_, String binlog_do_db_, String binlog_ignore_db_, String executed_gtid_set_)
: binlog_file(binlog_file_), binlog_position(binlog_position_), binlog_do_db(binlog_do_db_), binlog_ignore_db(binlog_ignore_db_),
executed_gtid_set(executed_gtid_set_)
{
}*/
static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database)
{
Block header{{std::make_shared<DataTypeString>(), "table_name"}};
String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = " + quoteString(database);
std::vector<String> tables_in_db;
MySQLBlockInputStream input(connection, query, header, DEFAULT_BLOCK_SIZE);
while (Block block = input.read())
{
tables_in_db.reserve(tables_in_db.size() + block.rows());
for (size_t index = 0; index < block.rows(); ++index)
tables_in_db.emplace_back((*block.getByPosition(0).column)[index].safeGet<String>());
}
return tables_in_db;
}
MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database)
{
bool locked_tables = false;
try
{
connection->query("FLUSH TABLES;").execute();
connection->query("FLUSH TABLES WITH READ LOCK;").execute();
locked_tables = true;
fetchMasterStatus(connection);
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
need_dumping_tables = fetchTablesInDB(connection, database);
connection->query("UNLOCK TABLES;").execute();
}
catch (...)
{
if (locked_tables)
connection->query("UNLOCK TABLES;").execute();
throw;
}
}
void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection)
{
Block header
{
{std::make_shared<DataTypeString>(), "File"},
{std::make_shared<DataTypeUInt64>(), "Position"},
{std::make_shared<DataTypeString>(), "Binlog_Do_DB"},
{std::make_shared<DataTypeString>(), "Binlog_Ignore_DB"},
{std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
};
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE);
Block master_status = input.read();
if (!master_status || master_status.rows() != 1)
throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR);
binlog_file = (*master_status.getByPosition(0).column)[0].safeGet<String>();
binlog_position = (*master_status.getByPosition(1).column)[0].safeGet<UInt64>();
binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet<String>();
binlog_ignore_db = (*master_status.getByPosition(3).column)[0].safeGet<String>();
executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet<String>();
}
}

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/PoolWithFailover.h>
namespace DB
{
@ -14,8 +15,11 @@ struct MasterStatusInfo
String binlog_ignore_db;
String executed_gtid_set;
MasterStatusInfo(
String binlog_file_, UInt64 binlog_position_, String binlog_do_db_, String binlog_ignore_db_, String executed_gtid_set_);
std::vector<String> need_dumping_tables;
MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database);
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);
};

View File

@ -83,7 +83,7 @@ void CreateQueryMatcher::visit(ASTPtr & ast, Data & data)
visit(*t, ast, data);
}
void CreateQueryMatcher::visit(MySQLParser::ASTCreateQuery & create, ASTPtr &, Data & data)
void CreateQueryMatcher::visit(const MySQLParser::ASTCreateQuery & create, const ASTPtr &, Data & data)
{
if (create.like_table)
throw Exception("Cannot convert create like statement to ClickHouse SQL", ErrorCodes::NOT_IMPLEMENTED);
@ -104,7 +104,13 @@ void CreateQueryMatcher::visit(MySQLParser::ASTCreateQuery & create, ASTPtr &, D
<< " ORDER BY " << queryToString(data.getFormattedOrderByExpression());
}
void CreateQueryMatcher::visit(MySQLParser::ASTCreateDefines & create_defines, ASTPtr &, Data & data)
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareIndex & declare_index, const ASTPtr &, Data & data)
{
if (startsWith(declare_index.index_type, "PRIMARY_KEY_"))
data.addPrimaryKey(declare_index.index_columns);
}
void CreateQueryMatcher::visit(const MySQLParser::ASTCreateDefines & create_defines, const ASTPtr &, Data & data)
{
if (!create_defines.columns || create_defines.columns->children.empty())
throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
@ -119,13 +125,7 @@ void CreateQueryMatcher::visit(MySQLParser::ASTCreateDefines & create_defines, A
visit(*column->as<MySQLParser::ASTDeclareColumn>(), column, data);
}
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareIndex & declare_index, ASTPtr &, Data & data)
{
if (startsWith(declare_index.index_type, "PRIMARY_KEY_"))
data.addPrimaryKey(declare_index.index_columns);
}
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareColumn & declare_column, ASTPtr &, Data & data)
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareColumn & declare_column, const ASTPtr &, Data & data)
{
if (!declare_column.data_type)
throw Exception("Missing type in definition of column.", ErrorCodes::UNKNOWN_TYPE);
@ -158,7 +158,7 @@ void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareColumn & declare_col
throw Exception("Unsupported MySQL data type " + queryToString(declare_column.data_type) + ".", ErrorCodes::NOT_IMPLEMENTED);
}
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, ASTPtr &, Data & data)
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, const ASTPtr &, Data & data)
{
data.addPartitionKey(declare_partition_options.partition_expression);
}

View File

@ -45,29 +45,21 @@ public:
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return false; }
private:
static void visit(MySQLParser::ASTCreateQuery & create, ASTPtr & ast, Data &);
static void visit(const MySQLParser::ASTCreateQuery & create, const ASTPtr &, Data & data);
static void visit(MySQLParser::ASTCreateDefines & create_defines, ASTPtr & ast, Data & data);
static void visit(const MySQLParser::ASTDeclareIndex & declare_index, const ASTPtr &, Data & data);
static void visit(const MySQLParser::ASTDeclareIndex & declare_index, ASTPtr & ast, Data & data);
static void visit(const MySQLParser::ASTCreateDefines & create_defines, const ASTPtr &, Data & data);
static void visit(const MySQLParser::ASTDeclareColumn & declare_column, ASTPtr & ast, Data & data);
static void visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, ASTPtr & ast, Data & data);
// static void visitPartitionBy(MySQLParser::ASTCreateQuery & create, ASTPtr & ast, Data & data);
// static void visitPartitionBy(MySQLParser::ASTCreateDefines & create_defines, ASTPtr & ast, Data & data);
// static void visitPartitionBy(const MySQLParser::ASTDeclarePartitionOptions & partition_options, ASTPtr & ast, Data & data);
// static void visitColumns(const ASTFunction & declare_column, ASTPtr & ast, Data & data);
// static void visit(ASTTableJoin & join, const ASTPtr & ast, Data &);
static void visit(const MySQLParser::ASTDeclareColumn & declare_column, const ASTPtr &, Data & data);
static void visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, const ASTPtr &, Data & data);
};
using CreateQueryConvertVisitor = CreateQueryMatcher::Visitor;
}
}