diff --git a/src/Core/MySQLClient.cpp b/src/Core/MySQLClient.cpp index 109e8e1a7da..5cd71377a68 100644 --- a/src/Core/MySQLClient.cpp +++ b/src/Core/MySQLClient.cpp @@ -152,7 +152,8 @@ void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String b try { packet_sender->receivePacket(replication); - events.push(std::make_pair(replication.readOneEvent(), replication.getPosition())); + auto receive_event = replication.readOneEvent(); + events.push(std::make_pair(receive_event, replication.getPosition())); } catch(...) { diff --git a/src/Core/MySQLReplication.cpp b/src/Core/MySQLReplication.cpp index e73a920ac88..d31c4c0b128 100644 --- a/src/Core/MySQLReplication.cpp +++ b/src/Core/MySQLReplication.cpp @@ -757,24 +757,17 @@ namespace MySQLReplication event->parseEvent(payload); auto query = std::static_pointer_cast(event); - if (query->schema == replicate_do_db) + switch (query->typ) { - switch (query->typ) - { - case BEGIN: - case XA: { - event = std::make_shared(); - break; - } - default: - position.updateLogPos(event->header.log_pos); + case BEGIN: + case XA: { + event = std::make_shared(); + break; } + default: + position.updateLogPos(event->header.log_pos); } - else - { - event = std::make_shared(); - position.updateLogPos(event->header.log_pos); - } + break; } case XID_EVENT: { diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index f96461af427..d629cab977a 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -7,22 +7,20 @@ #include # include +# include +# include +# include # include -# include -# include # include +# include +# include # include # include # include # include # include # include -# include # include -# include -# include -# include -# include namespace DB { @@ -35,18 +33,21 @@ namespace ErrorCodes static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync"; +template static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & database, const String & comment) { try { - LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread(" + database + ")"), "Try execute query: " + query_to_execute); - Context context(context_); CurrentThread::QueryScope query_scope(context); context.setCurrentDatabase(database); context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; context.setCurrentQueryId(""); // generate random query_id - return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true); + + if constexpr (execute_ddl) + return executeMySQLDDLQuery("/*" + comment + "*/ " + query_to_execute, context, true); + else + return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true); } catch (...) { @@ -55,8 +56,6 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context "Query " + query_to_execute + " wasn't finished successfully"); throw; } - - LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread(" + database + ")"), "Executed query: " + query_to_execute); } static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name) @@ -97,6 +96,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( , mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_) { /// TODO: 做简单的check, 失败即报错 + query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ")"; startSynchronization(); } @@ -162,14 +162,15 @@ static inline void cleanOutdatedTables(const String & database_name, const Conte for (auto iterator = clean_database->getTablesIterator(context); iterator->isValid(); iterator->next()) { String table_name = backQuoteIfNeed(iterator->name()); - String comment = String("Clean ") + table_name + " for dump mysql."; - tryToExecuteQuery("DROP TABLE " + table_name, context, backQuoteIfNeed(database_name), comment); + String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data"; + tryToExecuteQuery("DROP TABLE " + table_name, context, database_name, comment); } } static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, const Context & context) { - BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, ""); + String comment = "Materialize MySQL step 1: execute dump data"; + BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, comment); if (!res.out) throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR); @@ -186,7 +187,7 @@ static inline void dumpDataForTables( for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator) { const auto & table_name = iterator->first; - String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name); + String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data"; tryToExecuteQuery(iterator->second, context, database_name, comment); /// create table. BlockOutputStreamPtr out = std::make_shared(master_info.version, getTableOutput(database_name, table_name, context)); @@ -395,7 +396,8 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr try { - tryToExecuteQuery(query_event.query, global_context, database_name, ""); + String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data"; + tryToExecuteQuery(query_prefix + query_event.query, global_context, query_event.schema, comment); } catch (Exception & exception) { diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h index 006147a6cd7..19a9f318398 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.h +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -44,6 +44,7 @@ private: mutable mysqlxx::Pool pool; mutable MySQLClient client; MaterializeMySQLSettings * settings; + String query_prefix; struct Buffers { diff --git a/src/Interpreters/InterpreterFactory.cpp b/src/Interpreters/InterpreterFactory.cpp index e934e7a5afc..4878fcdc9f5 100644 --- a/src/Interpreters/InterpreterFactory.cpp +++ b/src/Interpreters/InterpreterFactory.cpp @@ -30,20 +30,22 @@ #include #include +#include #include #include #include -#include -#include #include +#include #include #include +#include #include -#include #include #include #include +#include #include +#include #include #include #include @@ -55,17 +57,15 @@ #include #include #include +#include #include #include -#include #include #include #include #include #include -#include -#include -#include +#include #include @@ -246,12 +246,6 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, Context & } else { - if (MaterializeMySQLSyncThread::isMySQLSyncThread()) - { - if (query->as()) - return std::make_unique(query, context); - } - throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY); } } diff --git a/src/Interpreters/MySQL/InterpreterMySQLCreateQuery.h b/src/Interpreters/MySQL/InterpreterMySQLCreateQuery.h deleted file mode 100644 index 22f71501e81..00000000000 --- a/src/Interpreters/MySQL/InterpreterMySQLCreateQuery.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - -namespace MySQLInterpreter -{ - -/** - */ -class InterpreterMySQLCreateQuery : public IInterpreter -{ -public: - InterpreterMySQLCreateQuery(const ASTPtr & query_ptr_, Context & context_); - - BlockIO execute() override; - -private: - ASTPtr query_ptr; - Context & context; - - ASTPtr getRewrittenQuery(); -}; - -} - -} diff --git a/src/Interpreters/MySQL/InterpreterMySQLCreateQuery.cpp b/src/Interpreters/MySQL/MySQLInterpreterFactory.cpp similarity index 93% rename from src/Interpreters/MySQL/InterpreterMySQLCreateQuery.cpp rename to src/Interpreters/MySQL/MySQLInterpreterFactory.cpp index 3d273d32772..8bf783eadb8 100644 --- a/src/Interpreters/MySQL/InterpreterMySQLCreateQuery.cpp +++ b/src/Interpreters/MySQL/MySQLInterpreterFactory.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include @@ -13,10 +13,10 @@ #include #include #include -#include -#include #include #include +#include +#include namespace DB { @@ -31,14 +31,12 @@ namespace ErrorCodes namespace MySQLInterpreter { -InterpreterMySQLCreateQuery::InterpreterMySQLCreateQuery(const ASTPtr & query_ptr_, Context & context_) - : query_ptr(query_ptr_), context(context_) +std::unique_ptr MySQLInterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum) { -} + if (query->as()) + return std::make_unique(query, context); -BlockIO InterpreterMySQLCreateQuery::execute() -{ - return InterpreterCreateQuery(getRewrittenQuery(), context).execute(); + return std::unique_ptr(); } static inline NamesAndTypesList getColumnsList(ASTExpressionList * columns_define) @@ -235,10 +233,14 @@ static ASTPtr getOrderByPolicy( return order_by_expression; } -ASTPtr InterpreterMySQLCreateQuery::getRewrittenQuery() +void InterpreterCreateImpl::validate(const InterpreterCreateImpl::TQuery &, const Context &) +{ + +} + +ASTPtr InterpreterCreateImpl::getRewrittenQuery(const InterpreterCreateImpl::TQuery & create_query, const Context & context) { auto rewritten_query = std::make_shared(); - const auto & create_query = query_ptr->as(); /// This is dangerous, because the like table may not exists in ClickHouse if (create_query.like_table) @@ -252,8 +254,10 @@ ASTPtr InterpreterMySQLCreateQuery::getRewrittenQuery() NamesAndTypesList columns_name_and_type = getColumnsList(create_defines->columns); const auto & [primary_keys, unique_keys, keys, increment_columns] = getKeys(create_defines->columns, create_defines->indices, context, columns_name_and_type); + const auto & database_name = context.resolveDatabase(create_query.database); + if (primary_keys.empty()) - throw Exception("The " + backQuoteIfNeed(create_query.database) + "." + backQuoteIfNeed(create_query.table) + throw Exception("The " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(create_query.table) + " cannot be materialized, because there is no primary keys.", ErrorCodes::NOT_IMPLEMENTED); auto columns = std::make_shared(); diff --git a/src/Interpreters/MySQL/MySQLInterpreterFactory.h b/src/Interpreters/MySQL/MySQLInterpreterFactory.h new file mode 100644 index 00000000000..ccc13ebe1b0 --- /dev/null +++ b/src/Interpreters/MySQL/MySQLInterpreterFactory.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace MySQLInterpreter +{ + +class MySQLInterpreterFactory +{ +public: + static std::unique_ptr get( + ASTPtr & query, Context & context, QueryProcessingStage::Enum stage = QueryProcessingStage::Complete); +}; + +struct InterpreterCreateImpl +{ + using TQuery = MySQLParser::ASTCreateQuery; + + static void validate(const TQuery & query, const Context & context); + + static ASTPtr getRewrittenQuery(const TQuery & query, const Context & context); +}; + +template +class InterpreterMySQLQuery : public IInterpreter +{ +public: + InterpreterMySQLQuery(const ASTPtr & query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) {} + + BlockIO execute() override + { + const typename InterpreterImpl::TQuery & query = query_ptr->as(); + + InterpreterImpl::validate(query, context); + ASTPtr rewritten_query = InterpreterImpl::getRewrittenQuery(query, context); + + if (rewritten_query) + return executeQuery("/* Rewritten MySQL DDL Query */ " + queryToString(rewritten_query), context, true); + + return BlockIO{}; + } + +private: + ASTPtr query_ptr; + Context & context; +}; + +using InterpreterMySQLCreateQuery = InterpreterMySQLQuery; + +} + +} diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 87b5da991a9..e5566304bac 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -35,6 +36,7 @@ #include #include #include +#include #include #include @@ -237,6 +239,7 @@ static void setQuerySpecificSettings(ASTPtr & ast, Context & context) } } +template static std::tuple executeQueryImpl( const char * begin, const char * end, @@ -258,7 +261,7 @@ static std::tuple executeQueryImpl( const Settings & settings = context.getSettingsRef(); - ParserQuery parser(end, settings.enable_debug_queries); + TParser parser(end, settings.enable_debug_queries); ASTPtr ast; const char * query_end; @@ -363,7 +366,7 @@ static std::tuple executeQueryImpl( /// reset Input callbacks if query is not INSERT SELECT context.resetInputCallbacks(); - auto interpreter = InterpreterFactory::get(ast, context, stage); + auto interpreter = TInterpreterFactory::get(ast, context, stage); std::shared_ptr quota; if (!interpreter->ignoreQuota()) @@ -854,4 +857,15 @@ void executeQuery( streams.onFinish(); } +BlockIO executeMySQLDDLQuery( + const String & query, Context & context, bool internal, QueryProcessingStage::Enum stage, bool may_have_embedded_data +) +{ + ASTPtr ast; + BlockIO streams; + std::tie(ast, streams) = executeQueryImpl( + query.data(), query.data() + query.size(), context, internal, stage, !may_have_embedded_data, nullptr); + return streams; +} + } diff --git a/src/Interpreters/executeQuery.h b/src/Interpreters/executeQuery.h index 2850bb3baf4..d3f52c6cea5 100644 --- a/src/Interpreters/executeQuery.h +++ b/src/Interpreters/executeQuery.h @@ -55,4 +55,12 @@ BlockIO executeQuery( bool allow_processors /// If can use processors pipeline ); +BlockIO executeMySQLDDLQuery( + const String & query, /// Query text without INSERT data. The latter must be written to BlockIO::out. + Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions... + bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList. + QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, /// To which stage the query must be executed. + bool may_have_embedded_data = false /// If insert query may have embedded data +); + } diff --git a/src/Parsers/MySQL/ASTAlterQuery.cpp b/src/Parsers/MySQL/ASTAlterQuery.cpp new file mode 100644 index 00000000000..3ce0c62463c --- /dev/null +++ b/src/Parsers/MySQL/ASTAlterQuery.cpp @@ -0,0 +1,76 @@ +#include + +#include +#include +#include + +namespace DB +{ + +namespace MySQLParser +{ + +ASTPtr ASTAlterQuery::clone() const +{ + auto res = std::make_shared(*this); + res->children.clear(); + + if (command_list) + res->set(res->command_list, command_list->clone()); + + return res; +} + +void ASTAlterQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState & state, IAST::FormatStateStacked frame) const +{ + frame.need_parens = false; + + std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' '); + + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : ""); + + if (!table.empty()) + { + if (!database.empty()) + { + settings.ostr << indent_str << backQuoteIfNeed(database); + settings.ostr << "."; + } + settings.ostr << indent_str << backQuoteIfNeed(table); + } + + settings.ostr << settings.nl_or_ws; + FormatStateStacked frame_nested = frame; + frame_nested.need_parens = false; + ++frame_nested.indent; + static_cast(command_list)->formatImpl(settings, state, frame_nested); +} + +bool ParserAlterQuery::parseImpl(IParser::Pos & pos, ASTPtr & /*node*/, Expected & expected) +{ + ParserKeyword k_add("ADD"); + ParserKeyword k_alter_table("ALTER TABLE"); + + ASTPtr table; + + if (!k_alter_table.ignore(pos, expected)) + return false; + + if (!ParserCompoundIdentifier(false).parse(pos, table, expected)) + return false; + + if (k_add.ignore(pos, expected)) + { + ASTPtr declare_index; + ParserDeclareIndex p_index; + + /// TODO: add column + if (!p_index.parse(pos, declare_index, expected)) + return false; + } + return false; + +} +} + +} diff --git a/src/Parsers/MySQL/ASTAlterQuery.h b/src/Parsers/MySQL/ASTAlterQuery.h new file mode 100644 index 00000000000..6692b96ee76 --- /dev/null +++ b/src/Parsers/MySQL/ASTAlterQuery.h @@ -0,0 +1,75 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +namespace MySQLParser +{ + +class ASTAlterCommand : public IAST +{ +public: + enum Type + { + ADD_COLUMN, + ADD_INDEX, + }; + + /// For ADD INDEX + ASTDeclareIndex * add_index; + + /// For ADD COLUMN + ASTDeclareColumn * add_column; + +}; + +class ASTAlterCommandList : public IAST +{ +public: + std::vector commands; + + void add(const ASTPtr & command) + { + commands.push_back(command->as()); + children.push_back(command); + } + + String getID(char) const override { return "AlterCommandList"; } + + ASTPtr clone() const override; + +protected: + void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; +}; + +class ASTAlterQuery : public IAST +{ +public: + String database; + String table; + + ASTAlterCommandList * command_list = nullptr; + + ASTPtr clone() const override; + + String getID(char delim) const override { return "AlterQuery" + (delim + database) + delim + table; } +protected: + void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; +}; + +class ParserAlterQuery : public IParserBase +{ +protected: + const char * getName() const override { return "alter query"; } + + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + +} + +} diff --git a/src/Parsers/MySQL/ParserMySQLQuery.cpp b/src/Parsers/MySQL/ParserMySQLQuery.cpp index 3d565224c88..7bab50fe264 100644 --- a/src/Parsers/MySQL/ParserMySQLQuery.cpp +++ b/src/Parsers/MySQL/ParserMySQLQuery.cpp @@ -1,6 +1,5 @@ #include -#include #include #include #include @@ -16,9 +15,6 @@ namespace MySQLParser bool ParserMySQLQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected) { - if (getThreadName() != "MySQLDBSync") - return false; - ParserDropQuery p_drop_query; ParserRenameQuery p_rename_query; ParserCreateQuery p_create_query; diff --git a/src/Parsers/MySQL/ParserMySQLQuery.h b/src/Parsers/MySQL/ParserMySQLQuery.h index aa52cd88a6e..6d467781aed 100644 --- a/src/Parsers/MySQL/ParserMySQLQuery.h +++ b/src/Parsers/MySQL/ParserMySQLQuery.h @@ -15,6 +15,9 @@ protected: const char * getName() const override { return "MySQL Query"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; + +public: + ParserMySQLQuery(const char *, bool) {} }; } diff --git a/src/Parsers/ParserExternalDDLQuery.cpp b/src/Parsers/ParserExternalDDLQuery.cpp new file mode 100644 index 00000000000..e0b59913e69 --- /dev/null +++ b/src/Parsers/ParserExternalDDLQuery.cpp @@ -0,0 +1,5 @@ +// +// Created by coswde on 2020/7/6. +// + +#include "ParserExternalDDLQuery.h" diff --git a/src/Parsers/ParserExternalDDLQuery.h b/src/Parsers/ParserExternalDDLQuery.h new file mode 100644 index 00000000000..2cab5853127 --- /dev/null +++ b/src/Parsers/ParserExternalDDLQuery.h @@ -0,0 +1,6 @@ +#pragma once + +namespace DB +{ + +} diff --git a/src/Parsers/ParserQuery.cpp b/src/Parsers/ParserQuery.cpp index 49897b9865d..144c309927b 100644 --- a/src/Parsers/ParserQuery.cpp +++ b/src/Parsers/ParserQuery.cpp @@ -17,7 +17,6 @@ #include #include #include -#include namespace DB @@ -39,7 +38,6 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserDropAccessEntityQuery drop_access_entity_p; ParserGrantQuery grant_p; ParserSetRoleQuery set_role_p; - MySQLParser::ParserMySQLQuery mysql_query_p; bool res = query_with_output_p.parse(pos, node, expected) || insert_p.parse(pos, node, expected) @@ -53,8 +51,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) || create_row_policy_p.parse(pos, node, expected) || create_settings_profile_p.parse(pos, node, expected) || drop_access_entity_p.parse(pos, node, expected) - || grant_p.parse(pos, node, expected) - || mysql_query_p.parse(pos, node, expected); + || grant_p.parse(pos, node, expected); return res; }