ISSUES-4006 add some check & add comment

This commit is contained in:
zhang2014 2020-07-21 17:56:55 +08:00
parent c173bd391f
commit 7ea2eee98a
10 changed files with 81 additions and 13 deletions

View File

@ -502,6 +502,8 @@ namespace ErrorCodes
extern const int UNKNOWN_RAID_TYPE = 535;
extern const int CANNOT_RESTORE_FROM_FIELD_DUMP = 536;
extern const int ILLEGAL_MYSQL_VARIABLE = 537;
extern const int ILLEGAL_MYSQL_VARIABLE = 538;
extern const int MYSQL_SYNTAX_ERROR = 539;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -69,9 +69,8 @@ void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_for
{
try
{
LOG_DEBUG(log, "Loading MySQL nested database stored objects.");
getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag);
LOG_DEBUG(log, "Loaded MySQL nested database stored objects.");
materialize_thread.startSynchronization();
}
catch (...)
{
@ -220,7 +219,14 @@ bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context &
StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
return std::make_shared<StorageMaterializeMySQL>(getNestedDatabase()->tryGetTable(name, context));
{
StoragePtr nested_storage = getNestedDatabase()->tryGetTable(name, context);
if (!nested_storage)
return {};
return std::make_shared<StorageMaterializeMySQL>(std::move(nested_storage));
}
return getNestedDatabase()->tryGetTable(name, context);
}

View File

@ -13,6 +13,10 @@
namespace DB
{
/** Real-time pull table structure and data from remote MySQL
*
* All table structure and data will be written to the local file system
*/
class DatabaseMaterializeMySQL : public IDatabase
{
public:

View File

@ -6,6 +6,12 @@
namespace DB
{
/** MaterializeMySQL database table iterator
*
* The iterator returns different storage engine types depending on the visitor.
* When MySQLSync thread accesses, it always returns MergeTree
* Other cases always convert MergeTree to StorageMaterializeMySQL
*/
class DatabaseMaterializeTablesIterator final : public IDatabaseTablesIterator
{
public:

View File

@ -8,6 +8,14 @@
namespace DB
{
/** Materialize database engine metadata
*
* Record data version and current snapshot of MySQL, including:
* binlog_file - currently executing binlog_file
* binlog_position - position of the currently executing binlog file
* executed_gtid_set - currently executing gtid
* need_dumping_tables - Table structure snapshot at the current moment(Only when database first created or executed binlog file is deleted)
*/
struct MaterializeMetadata
{
const String persistent_path;

View File

@ -8,7 +8,7 @@ namespace DB
class ASTStorage;
/** Settings for the MySQL Database engine(materialize mode).
/** Settings for the MaterializeMySQL database engine.
* Could be loaded from a CREATE DATABASE query (SETTINGS clause).
*/
struct MaterializeMySQLSettings : public SettingsCollection<MaterializeMySQLSettings>

View File

@ -144,9 +144,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
: log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context.getGlobalContext()), database_name(database_name_)
, mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_)
{
const auto & mysql_server_version = checkVariableAndGetVersion(pool.get());
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") ";
startSynchronization(mysql_server_version);
}
void MaterializeMySQLSyncThread::synchronization(const String & mysql_version)
@ -210,10 +208,12 @@ void MaterializeMySQLSyncThread::stopSynchronization()
}
}
void MaterializeMySQLSyncThread::startSynchronization(const String & mysql_version)
void MaterializeMySQLSyncThread::startSynchronization()
{
/// TODO: reset exception.
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this, mysql_version = mysql_version]() { synchronization(mysql_version); });
const auto & mysql_server_version = checkVariableAndGetVersion(pool.get());
background_thread_pool = std::make_unique<ThreadFromGlobalPool>(
[this, mysql_server_version = mysql_server_version]() { synchronization(mysql_server_version); });
}
static inline void cleanOutdatedTables(const String & database_name, const Context & context)

View File

@ -21,6 +21,19 @@
namespace DB
{
/** MySQL table structure and data synchronization thread
*
* When catch exception, it always exits immediately.
* In this case, you need to execute DETACH DATABASE and ATTACH DATABASE after manual processing
*
* The whole work of the thread includes synchronous full data and real-time pull incremental data
*
* synchronous full data:
* We will synchronize the full data when the database is first create or not found binlog file in MySQL after restart.
*
* real-time pull incremental data:
* We will pull the binlog event of MySQL to parse and execute when the full data synchronization is completed.
*/
class MaterializeMySQLSyncThread
{
public:
@ -32,7 +45,7 @@ public:
void stopSynchronization();
void startSynchronization(const String & mysql_version);
void startSynchronization();
static bool isMySQLSyncThread();

View File

@ -11,9 +11,10 @@
#include <Parsers/ASTExternalDDLQuery.h>
#ifdef USE_MYSQL
# include <Interpreters/Context.h>
# include <Interpreters/MySQL/InterpretersMySQLDDLQuery.h>
# include <Parsers/MySQL/ASTAlterQuery.h>
# include <Parsers/MySQL/ASTCreateQuery.h>
# include <Interpreters/MySQL/InterpretersMySQLDDLQuery.h>
#endif
namespace DB
@ -21,6 +22,7 @@ namespace DB
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -33,6 +35,9 @@ BlockIO InterpreterExternalDDLQuery::execute()
{
const ASTExternalDDLQuery & external_ddl_query = query->as<ASTExternalDDLQuery &>();
if (context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
throw Exception("Cannot parse and execute EXTERNAL DDL FROM.", ErrorCodes::SYNTAX_ERROR);
if (external_ddl_query.from->name == "MySQL")
{
#ifdef USE_MYSQL
@ -53,11 +58,11 @@ BlockIO InterpreterExternalDDLQuery::execute()
else if (external_ddl_query.external_ddl->as<MySQLParser::ASTAlterQuery>())
return MySQLInterpreter::InterpreterMySQLAlterQuery(
external_ddl_query.external_ddl, context, getIdentifierName(arguments[0]),
getIdentifierName(arguments[1])) .execute();
getIdentifierName(arguments[1])).execute();
else if (external_ddl_query.external_ddl->as<MySQLParser::ASTCreateQuery>())
return MySQLInterpreter::InterpreterMySQLCreateQuery(
external_ddl_query.external_ddl, context, getIdentifierName(arguments[0]),
getIdentifierName(arguments[1])) .execute();
getIdentifierName(arguments[1])).execute();
}
return BlockIO();

View File

@ -17,6 +17,13 @@
namespace DB
{
#ifdef USE_MYSQL
namespace ErrorCodes
{
extern const int MYSQL_SYNTAX_ERROR;
}
#endif
bool ParserExternalDDLQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
ParserFunction p_function;
@ -49,6 +56,23 @@ bool ParserExternalDDLQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expect
if (external_ddl_query->external_ddl)
external_ddl_query->children.push_back(external_ddl_query->external_ddl);
if (!res)
{
/// Syntax error is ignored, so we need to convert the error code for parsing failure
if (ParserKeyword("ALTER TABLE").ignore(pos))
throw Exception("Cannot parse MySQL alter query.", ErrorCodes::MYSQL_SYNTAX_ERROR);
if (ParserKeyword("RENAME TABLE").ignore(pos))
throw Exception("Cannot parse MySQL rename query.", ErrorCodes::MYSQL_SYNTAX_ERROR);
if (ParserKeyword("DROP TABLE").ignore(pos) || ParserKeyword("TRUNCATE").ignore(pos))
throw Exception("Cannot parse MySQL drop query.", ErrorCodes::MYSQL_SYNTAX_ERROR);
if (ParserKeyword("CREATE TABLE").ignore(pos) || ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos))
throw Exception("Cannot parse MySQL create query.", ErrorCodes::MYSQL_SYNTAX_ERROR);
}
#endif
}