ISSUES-4006 refactor interpreter

This commit is contained in:
zhang2014 2020-07-01 14:42:22 +08:00
parent 1583f67681
commit 6562597064
17 changed files with 302 additions and 98 deletions

View File

@ -152,7 +152,8 @@ void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String b
try
{
packet_sender->receivePacket(replication);
events.push(std::make_pair(replication.readOneEvent(), replication.getPosition()));
auto receive_event = replication.readOneEvent();
events.push(std::make_pair(receive_event, replication.getPosition()));
}
catch(...)
{

View File

@ -757,24 +757,17 @@ namespace MySQLReplication
event->parseEvent(payload);
auto query = std::static_pointer_cast<QueryEvent>(event);
if (query->schema == replicate_do_db)
switch (query->typ)
{
switch (query->typ)
{
case BEGIN:
case XA: {
event = std::make_shared<DryRunEvent>();
break;
}
default:
position.updateLogPos(event->header.log_pos);
case BEGIN:
case XA: {
event = std::make_shared<DryRunEvent>();
break;
}
default:
position.updateLogPos(event->header.log_pos);
}
else
{
event = std::make_shared<DryRunEvent>();
position.updateLogPos(event->header.log_pos);
}
break;
}
case XID_EVENT: {

View File

@ -7,22 +7,20 @@
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
# include <cstdlib>
# include <common/sleep.h>
# include <Common/quoteString.h>
# include <Common/setThreadName.h>
# include <Columns/ColumnTuple.h>
# include <DataStreams/AddingVersionsBlockOutputStream.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/copyData.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/AddingVersionsBlockOutputStream.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/executeQuery.h>
# include <Parsers/parseQuery.h>
# include <Storages/StorageMergeTree.h>
# include <Common/CurrentMetrics.h>
# include <Common/quoteString.h>
# include <Common/setThreadName.h>
# include <common/sleep.h>
namespace DB
{
@ -35,18 +33,21 @@ namespace ErrorCodes
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
template <bool execute_ddl = true>
static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & database, const String & comment)
{
try
{
LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread(" + database + ")"), "Try execute query: " + query_to_execute);
Context context(context_);
CurrentThread::QueryScope query_scope(context);
context.setCurrentDatabase(database);
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true);
if constexpr (execute_ddl)
return executeMySQLDDLQuery("/*" + comment + "*/ " + query_to_execute, context, true);
else
return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true);
}
catch (...)
{
@ -55,8 +56,6 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context
"Query " + query_to_execute + " wasn't finished successfully");
throw;
}
LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread(" + database + ")"), "Executed query: " + query_to_execute);
}
static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name)
@ -97,6 +96,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
, mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_)
{
/// TODO: 做简单的check, 失败即报错
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ")";
startSynchronization();
}
@ -162,14 +162,15 @@ static inline void cleanOutdatedTables(const String & database_name, const Conte
for (auto iterator = clean_database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
String table_name = backQuoteIfNeed(iterator->name());
String comment = String("Clean ") + table_name + " for dump mysql.";
tryToExecuteQuery("DROP TABLE " + table_name, context, backQuoteIfNeed(database_name), comment);
String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data";
tryToExecuteQuery("DROP TABLE " + table_name, context, database_name, comment);
}
}
static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, const Context & context)
{
BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, "");
String comment = "Materialize MySQL step 1: execute dump data";
BlockIO res = tryToExecuteQuery<false>("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, comment);
if (!res.out)
throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR);
@ -186,7 +187,7 @@ static inline void dumpDataForTables(
for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator)
{
const auto & table_name = iterator->first;
String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name);
String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data";
tryToExecuteQuery(iterator->second, context, database_name, comment); /// create table.
BlockOutputStreamPtr out = std::make_shared<AddingVersionsBlockOutputStream>(master_info.version, getTableOutput(database_name, table_name, context));
@ -395,7 +396,8 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
try
{
tryToExecuteQuery(query_event.query, global_context, database_name, "");
String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data";
tryToExecuteQuery(query_prefix + query_event.query, global_context, query_event.schema, comment);
}
catch (Exception & exception)
{

View File

@ -44,6 +44,7 @@ private:
mutable mysqlxx::Pool pool;
mutable MySQLClient client;
MaterializeMySQLSettings * settings;
String query_prefix;
struct Buffers
{

View File

@ -30,20 +30,22 @@
#include <Parsers/ASTGrantQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterCheckQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterCreateUserQuery.h>
#include <Interpreters/InterpreterCreateRoleQuery.h>
#include <Interpreters/InterpreterCreateQuotaQuery.h>
#include <Interpreters/InterpreterCreateRoleQuery.h>
#include <Interpreters/InterpreterCreateRowPolicyQuery.h>
#include <Interpreters/InterpreterCreateSettingsProfileQuery.h>
#include <Interpreters/InterpreterCreateUserQuery.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/InterpreterExplainQuery.h>
#include <Interpreters/InterpreterDropAccessEntityQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterExplainQuery.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterGrantQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterKillQueryQuery.h>
#include <Interpreters/InterpreterOptimizeQuery.h>
@ -55,17 +57,15 @@
#include <Interpreters/InterpreterShowAccessEntitiesQuery.h>
#include <Interpreters/InterpreterShowAccessQuery.h>
#include <Interpreters/InterpreterShowCreateAccessEntityQuery.h>
#include <Interpreters/InterpreterShowCreateQuery.h>
#include <Interpreters/InterpreterShowGrantsQuery.h>
#include <Interpreters/InterpreterShowPrivilegesQuery.h>
#include <Interpreters/InterpreterShowCreateQuery.h>
#include <Interpreters/InterpreterShowProcesslistQuery.h>
#include <Interpreters/InterpreterShowTablesQuery.h>
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/InterpreterGrantQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/MySQL/InterpreterMySQLCreateQuery.h>
#include <Interpreters/MySQL/MySQLInterpreterFactory.h>
#include <Parsers/ASTSystemQuery.h>
@ -246,12 +246,6 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
}
else
{
if (MaterializeMySQLSyncThread::isMySQLSyncThread())
{
if (query->as<MySQLParser::ASTCreateQuery>())
return std::make_unique<MySQLInterpreter::InterpreterMySQLCreateQuery>(query, context);
}
throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Interpreters/IInterpreter.h>
namespace DB
{
namespace MySQLInterpreter
{
/**
*/
class InterpreterMySQLCreateQuery : public IInterpreter
{
public:
InterpreterMySQLCreateQuery(const ASTPtr & query_ptr_, Context & context_);
BlockIO execute() override;
private:
ASTPtr query_ptr;
Context & context;
ASTPtr getRewrittenQuery();
};
}
}

View File

@ -1,4 +1,4 @@
#include <Interpreters/MySQL/InterpreterMySQLCreateQuery.h>
#include <Interpreters/MySQL/MySQLInterpreterFactory.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
@ -13,10 +13,10 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/MySQL/ASTDeclareIndex.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
namespace DB
{
@ -31,14 +31,12 @@ namespace ErrorCodes
namespace MySQLInterpreter
{
InterpreterMySQLCreateQuery::InterpreterMySQLCreateQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
std::unique_ptr<IInterpreter> MySQLInterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum)
{
}
if (query->as<MySQLParser::ASTCreateQuery>())
return std::make_unique<InterpreterMySQLCreateQuery>(query, context);
BlockIO InterpreterMySQLCreateQuery::execute()
{
return InterpreterCreateQuery(getRewrittenQuery(), context).execute();
return std::unique_ptr<IInterpreter>();
}
static inline NamesAndTypesList getColumnsList(ASTExpressionList * columns_define)
@ -235,10 +233,14 @@ static ASTPtr getOrderByPolicy(
return order_by_expression;
}
ASTPtr InterpreterMySQLCreateQuery::getRewrittenQuery()
void InterpreterCreateImpl::validate(const InterpreterCreateImpl::TQuery &, const Context &)
{
}
ASTPtr InterpreterCreateImpl::getRewrittenQuery(const InterpreterCreateImpl::TQuery & create_query, const Context & context)
{
auto rewritten_query = std::make_shared<ASTCreateQuery>();
const auto & create_query = query_ptr->as<MySQLParser::ASTCreateQuery &>();
/// This is dangerous, because the like table may not exists in ClickHouse
if (create_query.like_table)
@ -252,8 +254,10 @@ ASTPtr InterpreterMySQLCreateQuery::getRewrittenQuery()
NamesAndTypesList columns_name_and_type = getColumnsList(create_defines->columns);
const auto & [primary_keys, unique_keys, keys, increment_columns] = getKeys(create_defines->columns, create_defines->indices, context, columns_name_and_type);
const auto & database_name = context.resolveDatabase(create_query.database);
if (primary_keys.empty())
throw Exception("The " + backQuoteIfNeed(create_query.database) + "." + backQuoteIfNeed(create_query.table)
throw Exception("The " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(create_query.table)
+ " cannot be materialized, because there is no primary keys.", ErrorCodes::NOT_IMPLEMENTED);
auto columns = std::make_shared<ASTColumns>();

View File

@ -0,0 +1,59 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Parsers/queryToString.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
namespace DB
{
namespace MySQLInterpreter
{
class MySQLInterpreterFactory
{
public:
static std::unique_ptr<IInterpreter> get(
ASTPtr & query, Context & context, QueryProcessingStage::Enum stage = QueryProcessingStage::Complete);
};
struct InterpreterCreateImpl
{
using TQuery = MySQLParser::ASTCreateQuery;
static void validate(const TQuery & query, const Context & context);
static ASTPtr getRewrittenQuery(const TQuery & query, const Context & context);
};
template <typename InterpreterImpl>
class InterpreterMySQLQuery : public IInterpreter
{
public:
InterpreterMySQLQuery(const ASTPtr & query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) {}
BlockIO execute() override
{
const typename InterpreterImpl::TQuery & query = query_ptr->as<typename InterpreterImpl::TQuery &>();
InterpreterImpl::validate(query, context);
ASTPtr rewritten_query = InterpreterImpl::getRewrittenQuery(query, context);
if (rewritten_query)
return executeQuery("/* Rewritten MySQL DDL Query */ " + queryToString(rewritten_query), context, true);
return BlockIO{};
}
private:
ASTPtr query_ptr;
Context & context;
};
using InterpreterMySQLCreateQuery = InterpreterMySQLQuery<InterpreterCreateImpl>;
}
}

View File

@ -24,6 +24,7 @@
#include <Parsers/queryToString.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/Lexer.h>
#include <Parsers/MySQL/ParserMySQLQuery.h>
#include <Storages/StorageInput.h>
@ -35,6 +36,7 @@
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/MySQL/MySQLInterpreterFactory.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/DNSCacheUpdater.h>
@ -237,6 +239,7 @@ static void setQuerySpecificSettings(ASTPtr & ast, Context & context)
}
}
template <typename TInterpreterFactory = InterpreterFactory, typename TParser = ParserQuery>
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
const char * end,
@ -258,7 +261,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const Settings & settings = context.getSettingsRef();
ParserQuery parser(end, settings.enable_debug_queries);
TParser parser(end, settings.enable_debug_queries);
ASTPtr ast;
const char * query_end;
@ -363,7 +366,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// reset Input callbacks if query is not INSERT SELECT
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
auto interpreter = TInterpreterFactory::get(ast, context, stage);
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())
@ -854,4 +857,15 @@ void executeQuery(
streams.onFinish();
}
BlockIO executeMySQLDDLQuery(
const String & query, Context & context, bool internal, QueryProcessingStage::Enum stage, bool may_have_embedded_data
)
{
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl<MySQLInterpreter::MySQLInterpreterFactory, MySQLParser::ParserMySQLQuery>(
query.data(), query.data() + query.size(), context, internal, stage, !may_have_embedded_data, nullptr);
return streams;
}
}

View File

@ -55,4 +55,12 @@ BlockIO executeQuery(
bool allow_processors /// If can use processors pipeline
);
BlockIO executeMySQLDDLQuery(
const String & query, /// Query text without INSERT data. The latter must be written to BlockIO::out.
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, /// To which stage the query must be executed.
bool may_have_embedded_data = false /// If insert query may have embedded data
);
}

View File

@ -0,0 +1,76 @@
#include <Parsers/MySQL/ASTAlterQuery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Common/quoteString.h>
namespace DB
{
namespace MySQLParser
{
ASTPtr ASTAlterQuery::clone() const
{
auto res = std::make_shared<ASTAlterQuery>(*this);
res->children.clear();
if (command_list)
res->set(res->command_list, command_list->clone());
return res;
}
void ASTAlterQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState & state, IAST::FormatStateStacked frame) const
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : "");
if (!table.empty())
{
if (!database.empty())
{
settings.ostr << indent_str << backQuoteIfNeed(database);
settings.ostr << ".";
}
settings.ostr << indent_str << backQuoteIfNeed(table);
}
settings.ostr << settings.nl_or_ws;
FormatStateStacked frame_nested = frame;
frame_nested.need_parens = false;
++frame_nested.indent;
static_cast<IAST *>(command_list)->formatImpl(settings, state, frame_nested);
}
bool ParserAlterQuery::parseImpl(IParser::Pos & pos, ASTPtr & /*node*/, Expected & expected)
{
ParserKeyword k_add("ADD");
ParserKeyword k_alter_table("ALTER TABLE");
ASTPtr table;
if (!k_alter_table.ignore(pos, expected))
return false;
if (!ParserCompoundIdentifier(false).parse(pos, table, expected))
return false;
if (k_add.ignore(pos, expected))
{
ASTPtr declare_index;
ParserDeclareIndex p_index;
/// TODO: add column
if (!p_index.parse(pos, declare_index, expected))
return false;
}
return false;
}
}
}

View File

@ -0,0 +1,75 @@
#pragma once
#include <Parsers/IParserBase.h>
#include <Parsers/MySQL/ASTDeclareIndex.h>
#include <Parsers/MySQL/ASTDeclareColumn.h>
#include <Parsers/MySQL/ASTDeclareTableOptions.h>
namespace DB
{
namespace MySQLParser
{
class ASTAlterCommand : public IAST
{
public:
enum Type
{
ADD_COLUMN,
ADD_INDEX,
};
/// For ADD INDEX
ASTDeclareIndex * add_index;
/// For ADD COLUMN
ASTDeclareColumn * add_column;
};
class ASTAlterCommandList : public IAST
{
public:
std::vector<ASTAlterCommand *> commands;
void add(const ASTPtr & command)
{
commands.push_back(command->as<ASTAlterCommand>());
children.push_back(command);
}
String getID(char) const override { return "AlterCommandList"; }
ASTPtr clone() const override;
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
class ASTAlterQuery : public IAST
{
public:
String database;
String table;
ASTAlterCommandList * command_list = nullptr;
ASTPtr clone() const override;
String getID(char delim) const override { return "AlterQuery" + (delim + database) + delim + table; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
class ParserAlterQuery : public IParserBase
{
protected:
const char * getName() const override { return "alter query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}
}

View File

@ -1,6 +1,5 @@
#include <Parsers/MySQL/ParserMySQLQuery.h>
#include <Common/setThreadName.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserRenameQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
@ -16,9 +15,6 @@ namespace MySQLParser
bool ParserMySQLQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
if (getThreadName() != "MySQLDBSync")
return false;
ParserDropQuery p_drop_query;
ParserRenameQuery p_rename_query;
ParserCreateQuery p_create_query;

View File

@ -15,6 +15,9 @@ protected:
const char * getName() const override { return "MySQL Query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
public:
ParserMySQLQuery(const char *, bool) {}
};
}

View File

@ -0,0 +1,5 @@
//
// Created by coswde on 2020/7/6.
//
#include "ParserExternalDDLQuery.h"

View File

@ -0,0 +1,6 @@
#pragma once
namespace DB
{
}

View File

@ -17,7 +17,6 @@
#include <Parsers/ParserCreateSettingsProfileQuery.h>
#include <Parsers/ParserDropAccessEntityQuery.h>
#include <Parsers/ParserGrantQuery.h>
#include <Parsers/MySQL/ParserMySQLQuery.h>
namespace DB
@ -39,7 +38,6 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserDropAccessEntityQuery drop_access_entity_p;
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
MySQLParser::ParserMySQLQuery mysql_query_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
@ -53,8 +51,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| create_row_policy_p.parse(pos, node, expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| mysql_query_p.parse(pos, node, expected);
|| grant_p.parse(pos, node, expected);
return res;
}