ISSUES-4006 some refactor for query convert

This commit is contained in:
zhang2014 2020-06-29 12:34:49 +08:00
parent 3afffaf303
commit 5fb92359d2
9 changed files with 49 additions and 84 deletions

View File

@ -1,6 +1,7 @@
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
#include <Databases/MySQL/DatabaseMaterializeTablesIterator.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/StorageMaterializeMySQL.h>
@ -10,8 +11,6 @@
namespace DB
{
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
@ -121,7 +120,7 @@ time_t DatabaseMaterializeMySQL::getObjectMetadataModificationTime(const String
void DatabaseMaterializeMySQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MySQL database in locality_data mode does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->createTable(context, name, table, query);
@ -129,7 +128,7 @@ void DatabaseMaterializeMySQL::createTable(const Context & context, const String
void DatabaseMaterializeMySQL::dropTable(const Context & context, const String & name, bool no_delay)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MySQL database in locality_data mode does not support drop table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->dropTable(context, name, no_delay);
@ -137,7 +136,7 @@ void DatabaseMaterializeMySQL::dropTable(const Context & context, const String &
void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MySQL database in locality_data mode does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->attachTable(name, table, relative_table_path);
@ -145,7 +144,7 @@ void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr
StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MySQL database in locality_data mode does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
return getNestedDatabase()->detachTable(name);
@ -153,7 +152,7 @@ StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name)
void DatabaseMaterializeMySQL::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MySQL database in locality_data mode does not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->renameTable(context, name, to_database, to_name, exchange);
@ -161,7 +160,7 @@ void DatabaseMaterializeMySQL::renameTable(const Context & context, const String
void DatabaseMaterializeMySQL::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MySQL database in locality_data mode does not support alter table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->alterTable(context, table_id, metadata);
@ -184,7 +183,7 @@ bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context &
StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
return std::make_shared<StorageMaterializeMySQL>(getNestedDatabase()->tryGetTable(name, context));
return getNestedDatabase()->tryGetTable(name, context);
@ -192,7 +191,7 @@ StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Cont
DatabaseTablesIteratorPtr DatabaseMaterializeMySQL::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
{
DatabaseTablesIteratorPtr iterator = getNestedDatabase()->getTablesIterator(context, filter_by_table_name);
return std::make_unique<DatabaseMaterializeTablesIterator>(std::move(iterator));

View File

@ -34,6 +34,8 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
}
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & comment)
{
try
@ -98,7 +100,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
void MaterializeMySQLSyncThread::synchronization()
{
setThreadName("MySQLDBSync");
setThreadName(MYSQL_BACKGROUND_THREAD_NAME);
try
{
@ -376,6 +378,10 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
/// TODO: 直接使用Interpreter执行即可
}
}
bool MaterializeMySQLSyncThread::isMySQLSyncThread()
{
return getThreadName() == MYSQL_BACKGROUND_THREAD_NAME;
}
void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes)
{

View File

@ -33,6 +33,8 @@ public:
void startSynchronization();
static bool isMySQLSyncThread();
private:
Poco::Logger * log;
const Context & global_context;

View File

@ -7,7 +7,6 @@
#include <Parsers/MySQL/ASTDeclareOption.h>
#include <Parsers/queryToString.h>
#include <Poco/String.h>
#include <Common/quoteString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
@ -28,55 +27,6 @@ namespace ErrorCodes
namespace MySQLVisitor
{
static String convertDataType(const String & type_name, const ASTPtr & arguments, bool is_unsigned)
{
if (type_name == "TINYINT")
return is_unsigned ? "UInt8" : "Int8";
else if (type_name == "BOOL" || type_name == "BOOLEAN")
return "UInt8";
else if (type_name == "SMALLINT")
return is_unsigned ? "UInt16" : "Int16";
else if (type_name == "INT" || type_name == "MEDIUMINT" || type_name == "INTEGER")
return is_unsigned ? "UInt32" : "Int32";
else if (type_name == "BIGINT")
return is_unsigned ? "UInt64" : "Int64";
else if (type_name == "FLOAT")
return "Float32";
else if (type_name == "DOUBLE" || type_name == "PRECISION" || type_name == "REAL")
return "Float64";
else if (type_name == "DECIMAL" || type_name == "DEC" || type_name == "NUMERIC" || type_name == "FIXED")
{
if (!arguments)
return "Decimal(10, 0)";
else if (arguments->children.size() == 1)
return "Decimal(" + queryToString(arguments) + ", 0)";
else if (arguments->children.size() == 2)
return "Decimal(" + queryToString(arguments) + ")";
else
throw Exception("Decimal data type family must have exactly two arguments: precision and scale", ErrorCodes::UNKNOWN_TYPE);
}
if (type_name == "DATE")
return "Date";
else if (type_name == "DATETIME" || type_name == "TIMESTAMP")
return "DateTime";
else if (type_name == "TIME")
return "DateTime64";
else if (type_name == "YEAR")
return "Int16";
if (type_name == "BINARY")
return arguments ? "FixedString(" + queryToString(arguments) + ")" : "FixedString(1)";
return "String";
}
static String convertDataType(const String & type_name, const ASTPtr & arguments, bool is_unsigned, bool is_nullable)
{
return (is_nullable ? "Nullable(" : "") + convertDataType(type_name, arguments, is_unsigned) + (is_nullable ? ")" : "");
}
void CreateQueryMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * t = ast->as<MySQLParser::ASTCreateQuery>())
@ -143,14 +93,8 @@ void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareColumn & declare_col
}
}
if (ASTFunction * function = declare_column.data_type->as<ASTFunction>())
data.columns_name_and_type.emplace_back(declare_column.name,
DataTypeFactory::instance().get(convertDataType(Poco::toUpper(function->name), function->arguments, is_unsigned, is_nullable)));
else if (ASTIdentifier * identifier = declare_column.data_type->as<ASTIdentifier>())
data.columns_name_and_type.emplace_back(declare_column.name,
DataTypeFactory::instance().get(convertDataType(Poco::toUpper(identifier->name), ASTPtr{}, is_unsigned, is_nullable)));
else
throw Exception("Unsupported MySQL data type " + queryToString(declare_column.data_type) + ".", ErrorCodes::NOT_IMPLEMENTED);
data.columns_name_and_type.emplace_back(declare_column.name, DataTypeFactory::instance().get(
(is_nullable ? "Nullable(" : "") + queryToString(declare_column.data_type) + (is_nullable ? ")" : "")));
}
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, const ASTPtr &, Data & data)

View File

@ -0,0 +1,5 @@
//
// Created by coswde on 2020/6/29.
//
#include "InterpreterMySQLCreateQuery.h"

View File

@ -0,0 +1,13 @@
#pragma once
namespace DB
{
namespace MySQLInterpreter
{
}
}

View File

@ -1,10 +1,11 @@
#include <Parsers/MySQL/tryParseMySQLQuery.h>
#include <Parsers/MySQL/ParserMySQLQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserRenameQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
namespace DB
{
@ -12,8 +13,11 @@ namespace DB
namespace MySQLParser
{
bool ParserQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
bool ParserMySQLQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
return false;
ParserDropQuery p_drop_query;
ParserRenameQuery p_rename_query;
ParserCreateQuery p_create_query;
@ -25,13 +29,4 @@ bool ParserQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expect
}
ASTPtr tryParseMySQLQuery(const std::string & query, size_t max_query_size, size_t max_parser_depth)
{
std::string error_message;
const char * pos = query.data();
MySQLParser::ParserQuery p_query;
return tryParseQuery(p_query, pos, query.data() + query.size(), error_message, false, "", false, max_query_size, max_parser_depth);
}
}

View File

@ -9,7 +9,7 @@ namespace DB
namespace MySQLParser
{
class ParserQuery : public IParserBase
class ParserMySQLQuery : public IParserBase
{
protected:
const char * getName() const override { return "MySQL Query"; }
@ -19,6 +19,4 @@ protected:
}
ASTPtr tryParseMySQLQuery(const std::string & query, size_t max_query_size, size_t max_parser_depth);
}

View File

@ -17,6 +17,7 @@
#include <Parsers/ParserCreateSettingsProfileQuery.h>
#include <Parsers/ParserDropAccessEntityQuery.h>
#include <Parsers/ParserGrantQuery.h>
#include <Parsers/MySQL/ParserMySQLQuery.h>
namespace DB
@ -38,6 +39,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserDropAccessEntityQuery drop_access_entity_p;
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
MySQLParser::ParserMySQLQuery mysql_query_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
@ -51,7 +53,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| create_row_policy_p.parse(pos, node, expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected);
|| grant_p.parse(pos, node, expected)
|| mysql_query_p.parse(pos, node, expected);
return res;
}