From 7ea2eee98af4457e2c5db973ad38186cb2a721d9 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 21 Jul 2020 17:56:55 +0800 Subject: [PATCH] ISSUES-4006 add some check & add comment --- src/Common/ErrorCodes.cpp | 2 ++ .../MySQL/DatabaseMaterializeMySQL.cpp | 12 +++++++--- .../MySQL/DatabaseMaterializeMySQL.h | 4 ++++ .../MySQL/DatabaseMaterializeTablesIterator.h | 6 +++++ src/Databases/MySQL/MaterializeMetadata.h | 8 +++++++ .../MySQL/MaterializeMySQLSettings.h | 2 +- .../MySQL/MaterializeMySQLSyncThread.cpp | 10 ++++---- .../MySQL/MaterializeMySQLSyncThread.h | 15 +++++++++++- .../InterpreterExternalDDLQuery.cpp | 11 ++++++--- src/Parsers/ParserExternalDDLQuery.cpp | 24 +++++++++++++++++++ 10 files changed, 81 insertions(+), 13 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 0f7d9099314..bd05b346a9f 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -502,6 +502,8 @@ namespace ErrorCodes extern const int UNKNOWN_RAID_TYPE = 535; extern const int CANNOT_RESTORE_FROM_FIELD_DUMP = 536; extern const int ILLEGAL_MYSQL_VARIABLE = 537; + extern const int ILLEGAL_MYSQL_VARIABLE = 538; + extern const int MYSQL_SYNTAX_ERROR = 539; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index db657edefed..18b88f9a8f7 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -69,9 +69,8 @@ void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_for { try { - LOG_DEBUG(log, "Loading MySQL nested database stored objects."); getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag); - LOG_DEBUG(log, "Loaded MySQL nested database stored objects."); + materialize_thread.startSynchronization(); } catch (...) { @@ -220,7 +219,14 @@ bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const { if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) - return std::make_shared(getNestedDatabase()->tryGetTable(name, context)); + { + StoragePtr nested_storage = getNestedDatabase()->tryGetTable(name, context); + + if (!nested_storage) + return {}; + + return std::make_shared(std::move(nested_storage)); + } return getNestedDatabase()->tryGetTable(name, context); } diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index 78f8e293224..8b35a23378a 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -13,6 +13,10 @@ namespace DB { +/** Real-time pull table structure and data from remote MySQL + * + * All table structure and data will be written to the local file system + */ class DatabaseMaterializeMySQL : public IDatabase { public: diff --git a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h index bc46e4da46e..3bad8f9c8bb 100644 --- a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h +++ b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h @@ -6,6 +6,12 @@ namespace DB { +/** MaterializeMySQL database table iterator + * + * The iterator returns different storage engine types depending on the visitor. + * When MySQLSync thread accesses, it always returns MergeTree + * Other cases always convert MergeTree to StorageMaterializeMySQL + */ class DatabaseMaterializeTablesIterator final : public IDatabaseTablesIterator { public: diff --git a/src/Databases/MySQL/MaterializeMetadata.h b/src/Databases/MySQL/MaterializeMetadata.h index 79769ef9f4a..c377a99a27c 100644 --- a/src/Databases/MySQL/MaterializeMetadata.h +++ b/src/Databases/MySQL/MaterializeMetadata.h @@ -8,6 +8,14 @@ namespace DB { +/** Materialize database engine metadata + * + * Record data version and current snapshot of MySQL, including: + * binlog_file - currently executing binlog_file + * binlog_position - position of the currently executing binlog file + * executed_gtid_set - currently executing gtid + * need_dumping_tables - Table structure snapshot at the current moment(Only when database first created or executed binlog file is deleted) + */ struct MaterializeMetadata { const String persistent_path; diff --git a/src/Databases/MySQL/MaterializeMySQLSettings.h b/src/Databases/MySQL/MaterializeMySQLSettings.h index 6f64865afac..de6a1ae1ede 100644 --- a/src/Databases/MySQL/MaterializeMySQLSettings.h +++ b/src/Databases/MySQL/MaterializeMySQLSettings.h @@ -8,7 +8,7 @@ namespace DB class ASTStorage; -/** Settings for the MySQL Database engine(materialize mode). +/** Settings for the MaterializeMySQL database engine. * Could be loaded from a CREATE DATABASE query (SETTINGS clause). */ struct MaterializeMySQLSettings : public SettingsCollection diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index fc105ce177d..4ec6bc320b5 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -144,9 +144,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( : log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context.getGlobalContext()), database_name(database_name_) , mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_) { - const auto & mysql_server_version = checkVariableAndGetVersion(pool.get()); query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") "; - startSynchronization(mysql_server_version); } void MaterializeMySQLSyncThread::synchronization(const String & mysql_version) @@ -210,10 +208,12 @@ void MaterializeMySQLSyncThread::stopSynchronization() } } -void MaterializeMySQLSyncThread::startSynchronization(const String & mysql_version) +void MaterializeMySQLSyncThread::startSynchronization() { - /// TODO: reset exception. - background_thread_pool = std::make_unique([this, mysql_version = mysql_version]() { synchronization(mysql_version); }); + const auto & mysql_server_version = checkVariableAndGetVersion(pool.get()); + + background_thread_pool = std::make_unique( + [this, mysql_server_version = mysql_server_version]() { synchronization(mysql_server_version); }); } static inline void cleanOutdatedTables(const String & database_name, const Context & context) diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h index 4d4dbc28624..f244c24ab9a 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.h +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -21,6 +21,19 @@ namespace DB { +/** MySQL table structure and data synchronization thread + * + * When catch exception, it always exits immediately. + * In this case, you need to execute DETACH DATABASE and ATTACH DATABASE after manual processing + * + * The whole work of the thread includes synchronous full data and real-time pull incremental data + * + * synchronous full data: + * We will synchronize the full data when the database is first create or not found binlog file in MySQL after restart. + * + * real-time pull incremental data: + * We will pull the binlog event of MySQL to parse and execute when the full data synchronization is completed. + */ class MaterializeMySQLSyncThread { public: @@ -32,7 +45,7 @@ public: void stopSynchronization(); - void startSynchronization(const String & mysql_version); + void startSynchronization(); static bool isMySQLSyncThread(); diff --git a/src/Interpreters/InterpreterExternalDDLQuery.cpp b/src/Interpreters/InterpreterExternalDDLQuery.cpp index 2e9c594812b..8f1c9da22b7 100644 --- a/src/Interpreters/InterpreterExternalDDLQuery.cpp +++ b/src/Interpreters/InterpreterExternalDDLQuery.cpp @@ -11,9 +11,10 @@ #include #ifdef USE_MYSQL +# include +# include # include # include -# include #endif namespace DB @@ -21,6 +22,7 @@ namespace DB namespace ErrorCodes { + extern const int SYNTAX_ERROR; extern const int BAD_ARGUMENTS; } @@ -33,6 +35,9 @@ BlockIO InterpreterExternalDDLQuery::execute() { const ASTExternalDDLQuery & external_ddl_query = query->as(); + if (context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY) + throw Exception("Cannot parse and execute EXTERNAL DDL FROM.", ErrorCodes::SYNTAX_ERROR); + if (external_ddl_query.from->name == "MySQL") { #ifdef USE_MYSQL @@ -53,11 +58,11 @@ BlockIO InterpreterExternalDDLQuery::execute() else if (external_ddl_query.external_ddl->as()) return MySQLInterpreter::InterpreterMySQLAlterQuery( external_ddl_query.external_ddl, context, getIdentifierName(arguments[0]), - getIdentifierName(arguments[1])) .execute(); + getIdentifierName(arguments[1])).execute(); else if (external_ddl_query.external_ddl->as()) return MySQLInterpreter::InterpreterMySQLCreateQuery( external_ddl_query.external_ddl, context, getIdentifierName(arguments[0]), - getIdentifierName(arguments[1])) .execute(); + getIdentifierName(arguments[1])).execute(); } return BlockIO(); diff --git a/src/Parsers/ParserExternalDDLQuery.cpp b/src/Parsers/ParserExternalDDLQuery.cpp index 20020da22bd..b6ada39a800 100644 --- a/src/Parsers/ParserExternalDDLQuery.cpp +++ b/src/Parsers/ParserExternalDDLQuery.cpp @@ -17,6 +17,13 @@ namespace DB { +#ifdef USE_MYSQL +namespace ErrorCodes +{ + extern const int MYSQL_SYNTAX_ERROR; +} +#endif + bool ParserExternalDDLQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected) { ParserFunction p_function; @@ -49,6 +56,23 @@ bool ParserExternalDDLQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expect if (external_ddl_query->external_ddl) external_ddl_query->children.push_back(external_ddl_query->external_ddl); + + if (!res) + { + /// Syntax error is ignored, so we need to convert the error code for parsing failure + + if (ParserKeyword("ALTER TABLE").ignore(pos)) + throw Exception("Cannot parse MySQL alter query.", ErrorCodes::MYSQL_SYNTAX_ERROR); + + if (ParserKeyword("RENAME TABLE").ignore(pos)) + throw Exception("Cannot parse MySQL rename query.", ErrorCodes::MYSQL_SYNTAX_ERROR); + + if (ParserKeyword("DROP TABLE").ignore(pos) || ParserKeyword("TRUNCATE").ignore(pos)) + throw Exception("Cannot parse MySQL drop query.", ErrorCodes::MYSQL_SYNTAX_ERROR); + + if (ParserKeyword("CREATE TABLE").ignore(pos) || ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos)) + throw Exception("Cannot parse MySQL create query.", ErrorCodes::MYSQL_SYNTAX_ERROR); + } #endif }