diff --git a/src/Core/MySQLClient.cpp b/src/Core/MySQLClient.cpp index 4f3186fd2f7..5905de11dd2 100644 --- a/src/Core/MySQLClient.cpp +++ b/src/Core/MySQLClient.cpp @@ -17,6 +17,12 @@ MySQLClient::MySQLClient(const String & host_, UInt16 port_, const String & user client_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION; } +MySQLClient::MySQLClient(MySQLClient && other) + : host(other.host), port(other.port), user(other.user), password(other.password) + , client_capability_flags(other.client_capability_flags) +{ +} + void MySQLClient::connect() { if (connected) diff --git a/src/Core/MySQLClient.h b/src/Core/MySQLClient.h index 726707dd129..5c42e5c5d34 100644 --- a/src/Core/MySQLClient.h +++ b/src/Core/MySQLClient.h @@ -28,6 +28,8 @@ class MySQLClient { public: MySQLClient(const String & host_, UInt16 port_, const String & user_, const String & password_); + MySQLClient(MySQLClient && other); + void connect(); void disconnect(); void ping(); diff --git a/src/DataStreams/AddingVersionsBlockOutputStream.cpp b/src/DataStreams/AddingVersionsBlockOutputStream.cpp new file mode 100644 index 00000000000..2adc1488e98 --- /dev/null +++ b/src/DataStreams/AddingVersionsBlockOutputStream.cpp @@ -0,0 +1,55 @@ +#include + +#include +#include + +namespace DB +{ + +void AddingVersionsBlockOutputStream::writePrefix() +{ + output->writePrefix(); +} + +void AddingVersionsBlockOutputStream::writeSuffix() +{ + output->writeSuffix(); +} + +void AddingVersionsBlockOutputStream::flush() +{ + output->flush(); +} + +void AddingVersionsBlockOutputStream::write(const Block & block) +{ + /// create_version and delete_version are always in the last place + Block res; + size_t rows = block.rows(); + + for (size_t index = 0; index < block.columns(); ++index) + res.insert(block.getByPosition(index)); + + DataTypePtr data_type = std::make_shared(); + + auto create_version = ColumnUInt64::create(rows); + for (size_t index = 0; index < rows; ++index) + create_version->getData()[index] = ++version; + + Block header = output->getHeader(); + res.insert(ColumnWithTypeAndName(create_version->getPtr(), data_type, header.getByPosition(header.columns() - 2).name)); + res.insert(ColumnWithTypeAndName(data_type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst(), data_type, header.getByPosition(header.columns() - 1).name)); + output->write(res); +} +Block AddingVersionsBlockOutputStream::getHeader() const +{ + Block res; + Block header = output->getHeader(); + + for (size_t index = 0; index < header.columns() - 2; ++index) + res.insert(header.getByPosition(index)); + + return res; +} + +} diff --git a/src/DataStreams/AddingVersionsBlockOutputStream.h b/src/DataStreams/AddingVersionsBlockOutputStream.h new file mode 100644 index 00000000000..cd6eb84e9bc --- /dev/null +++ b/src/DataStreams/AddingVersionsBlockOutputStream.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +namespace DB +{ + +class AddingVersionsBlockOutputStream : public IBlockOutputStream +{ +public: + AddingVersionsBlockOutputStream(size_t & version_, const BlockOutputStreamPtr & output_) + : version(version_), output(output_) + { + } + + Block getHeader() const override; + + void write(const Block & block) override; + + void flush() override; + + void writePrefix() override; + void writeSuffix() override; + +private: + size_t & version; + BlockOutputStreamPtr output; +}; + +} diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index abfbfbd8878..87a8e6d775a 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -23,6 +23,7 @@ # include # include # include +# include #endif namespace DB @@ -118,8 +119,12 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port); if (materializeMySQLDatabase(engine_define->settings)) + { + MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password); + return std::make_shared( - context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); + context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client)); + } return std::make_shared(context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); } diff --git a/src/Databases/MySQL/DataBuffers.cpp b/src/Databases/MySQL/DataBuffers.cpp new file mode 100644 index 00000000000..8659eadb8bb --- /dev/null +++ b/src/Databases/MySQL/DataBuffers.cpp @@ -0,0 +1,135 @@ +#include + +#include +#include + +namespace DB +{ + +DataBuffers::DataBuffers(size_t & version_, IDatabase * database_, const std::function &)> & flush_function_) + : version(version_), database(database_), flush_function(flush_function_) +{ + + /// TODO: 定时刷新 +} + +void DataBuffers::flush() +{ + flush_function(buffers); + buffers.clear(); +} + +void DataBuffers::writeData(const std::string & table_name, const std::vector & rows_data) +{ + Block & to = getTableBuffer(table_name, rows_data.size()); + for (size_t column = 0; column < to.columns() - 2; ++column) + { + /// normally columns + MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate(); + + for (size_t index = 0; index < rows_data.size(); ++index) + col_to->insert(DB::get(rows_data[index])[column]); + } + + Field new_version(UInt64(++version)); + MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate(); + MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate(); + + delete_version_column->insertManyDefaults(rows_data.size()); + + for (size_t index = 0; index < rows_data.size(); ++index) + create_version_column->insert(new_version); +} + +void DataBuffers::updateData(const String & table_name, const std::vector & rows_data) +{ + if (rows_data.size() % 2 != 0) + throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR); + + Block & to = getTableBuffer(table_name, rows_data.size()); + for (size_t column = 0; column < to.columns() - 2; ++column) + { + /// normally columns + MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate(); + + for (size_t index = 0; index < rows_data.size(); ++index) + col_to->insert(DB::get(rows_data[index])[column]); + } + + Field new_version(UInt64(++version)); + MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate(); + MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate(); + + for (size_t index = 0; index < rows_data.size(); ++index) + { + if (index % 2 == 0) + { + create_version_column->insertDefault(); + delete_version_column->insert(new_version); + } + else + { + delete_version_column->insertDefault(); + create_version_column->insert(new_version); + } + } +} + +void DataBuffers::deleteData(const String & table_name, const std::vector & rows_data) +{ + Block & to = getTableBuffer(table_name, rows_data.size()); + for (size_t column = 0; column < to.columns() - 2; ++column) + { + /// normally columns + MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate(); + + for (size_t index = 0; index < rows_data.size(); ++index) + col_to->insert(DB::get(rows_data[index])[column]); + } + + Field new_version(UInt64(++version)); + MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate(); + MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate(); + + create_version_column->insertManyDefaults(rows_data.size()); + + for (size_t index = 0; index < rows_data.size(); ++index) + delete_version_column->insert(new_version); +} + +Block & DataBuffers::getTableBuffer(const String & table_name, size_t write_size) +{ + if (buffers.find(table_name) == buffers.end()) + { + StoragePtr write_storage = database->tryGetTable(table_name); + buffers[table_name] = write_storage->getSampleBlockNonMaterialized(); + } + + /// TODO: settings + if (buffers[table_name].rows() + write_size > 8192) + flush(); + + return buffers[table_name]; +} + +[[noreturn]] void DataBuffers::scheduleFlush() +{ + while (1) + { + try + { + flush(); + sleepForSeconds(1); + } + catch (...) + { +// ++error_count; +// sleep_time = std::min( +// std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))}, +// max_sleep_time); + tryLogCurrentException(""); + } + } +} + +} diff --git a/src/Databases/MySQL/DataBuffers.h b/src/Databases/MySQL/DataBuffers.h new file mode 100644 index 00000000000..4e69eb438d7 --- /dev/null +++ b/src/Databases/MySQL/DataBuffers.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class DataBuffers : private boost::noncopyable +{ +public: + DataBuffers(size_t & version_, IDatabase * database_, const std::function &)> & flush_function_); + + void flush(); + + void writeData(const std::string & table_name, const std::vector & rows_data); + + void updateData(const std::string & table_name, const std::vector & rows_data); + + void deleteData(const std::string & table_name, const std::vector & rows_data); + + +private: + size_t & version; + IDatabase * database; + std::function &)> flush_function; + + mutable std::mutex mutex; + std::unordered_map buffers; + + [[noreturn]] void scheduleFlush(); + + Block & getTableBuffer(const String & table_name, size_t write_size); + + ThreadFromGlobalPool thread{&DataBuffers::scheduleFlush, this}; +}; + +} diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 508cdd5a861..63676e74f30 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -6,21 +6,25 @@ #include +# include +# include +# include # include # include -# include +# include +# include # include +# include # include -# include # include # include -# include +# include # include # include # include -# include # include # include +# include namespace DB { @@ -32,26 +36,25 @@ namespace ErrorCodes DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( const Context & context, const String & database_name_, const String & metadata_path_, - const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_) - : DatabaseOrdinary(database_name_, metadata_path_, context) - /*, global_context(context.getGlobalContext()), metadata_path(metadata_path_)*/ - , database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_), pool(std::move(pool_)) + const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_) + : IDatabase(database_name_), global_context(context.getGlobalContext()), metadata_path(metadata_path_) + , database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_) + , nested_database(std::make_shared(database_name_, metadata_path, global_context)) + , pool(std::move(pool_)), client(std::move(client_)), log(&Logger::get("DatabaseMaterializeMySQL")) { /// TODO: 做简单的check, 失败即报错 } -void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute) +BlockIO DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute, const String & comment) { - ReadBufferFromString istr(query_to_execute); - String dummy_string; - WriteBufferFromString ostr(dummy_string); + String query_prefix = "/*" + comment + " for " + backQuoteIfNeed(database_name) + " Database */ "; try { Context context = global_context; context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; context.setCurrentQueryId(""); // generate random query_id - executeQuery(istr, ostr, false, context, {}); + return executeQuery(query_prefix + query_to_execute, context, true); } catch (...) { @@ -62,7 +65,7 @@ void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute LOG_DEBUG(log, "Executed query: " << query_to_execute); } -String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name) +String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name) { Block show_create_table_header{ {std::make_shared(), "Table"}, @@ -77,57 +80,82 @@ String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & con if (!create_query_block || create_query_block.rows() != 1) throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR); - const auto & create_query = create_query_block.getByName("Create Table").column->getDataAt(0); - - MySQLParser::ParserCreateQuery p_create_query; - ASTPtr ast = parseQuery(p_create_query, create_query.data, create_query.data + create_query.size, "", 0, 0); - - if (!ast || !ast->as()) - throw Exception("LOGICAL ERROR: ast cannot cast to MySQLParser::ASTCreateQuery.", ErrorCodes::LOGICAL_ERROR); - - WriteBufferFromOwnString out; - ast->as()->database = database; - MySQLVisitor::CreateQueryConvertVisitor::Data data{.out = out, .context = global_context}; - MySQLVisitor::CreateQueryConvertVisitor visitor(data); - visitor.visit(ast); - return out.str(); + return create_query_block.getByName("Create Table").column->getDataAt(0).toString(); } -void DatabaseMaterializeMySQL::dumpMySQLDatabase(const std::function & is_cancelled) +BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name, bool fill_version) { - mysqlxx::PoolWithFailover::Entry connection = pool.get(); + Context context = global_context; + context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + context.setCurrentQueryId(""); // generate random query_id - MasterStatusInfo info(connection, mysql_database_name); + StoragePtr write_storage = nested_database->tryGetTable(table_name); + auto table_lock = write_storage->lockStructureForShare(true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout); - for (const auto & dumping_table_name : info.need_dumping_tables) + BlockOutputStreamPtr output = write_storage->write(ASTPtr{}, global_context); + output->addTableLock(table_lock); + + return fill_version ? std::make_shared(version, output) : output; +} + +void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector & tables_name, const std::function & is_cancelled) +{ + std::unordered_map tables_create_query; + for (size_t index = 0; index < tables_name.size() && !is_cancelled(); ++index) + tables_create_query[tables_name[index]] = getCreateQuery(connection, tables_name[index]); + + auto iterator = tables_create_query.begin(); + for (; iterator != tables_create_query.end() && !is_cancelled(); ++iterator) { - if (is_cancelled()) - return; + const auto & table_name = iterator->first; + MySQLTableStruct table_struct = visitCreateQuery(iterator->second, global_context, database_name); + String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name); + tryToExecuteQuery(toCreateQuery(table_struct, global_context), comment); - const auto & table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name); - String query_prefix = "/* Dumping " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name) + " for " - + backQuoteIfNeed(database_name) + " Database */ "; - - tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + table_name); - tryToExecuteQuery(query_prefix + getCreateQuery(connection, database_name, dumping_table_name)); - - Context context = global_context; - context.setCurrentQueryId(""); // generate random query_id - context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - BlockIO streams = executeQuery(query_prefix + " INSERT INTO " + table_name + " VALUES", context, true); - - if (!streams.out) - throw Exception("LOGICAL ERROR out stream is undefined.", ErrorCodes::LOGICAL_ERROR); - - MySQLBlockInputStream input( - connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name), - streams.out->getHeader(), DEFAULT_BLOCK_SIZE); - - copyData(input, *streams.out, is_cancelled); - /// TODO: 启动slave, 监听事件 + BlockOutputStreamPtr out = getTableOutput(table_name, true); + MySQLBlockInputStream input(connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE); + copyData(input, *out, is_cancelled); } } -void DatabaseMaterializeMySQL::synchronization() + +MasterStatusInfo DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock & lock) +{ + while(!sync_quit.load(std::memory_order_seq_cst)) + { + try + { + LOG_DEBUG(log, "Checking " + database_name + " database status."); + while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name)) + sync_cond.wait_for(lock, std::chrono::seconds(1)); + + LOG_DEBUG(log, database_name + " database status is OK."); + + mysqlxx::PoolWithFailover::Entry connection = pool.get(); + MasterStatusInfo master_info(connection, getMetadataPath() + "/.master_status", mysql_database_name); + + if (!master_info.need_dumping_tables.empty()) + { + /// TODO: 删除所有表结构, 这可能需要考虑到仍然有查询在使用这个表. + dumpDataForTables(connection, master_info.need_dumping_tables, [&]() { return sync_quit.load(std::memory_order_seq_cst); }); + master_info.finishDump(); + } + + client.connect(); + client.startBinlogDump(std::rand(), mysql_database_name, master_info.binlog_file, master_info.binlog_position); + return master_info; + } + catch(...) + { + tryLogCurrentException(log, "Prepare MySQL Synchronized exception and retry"); + + sleepForSeconds(1); + } + } + + throw Exception("", ErrorCodes::LOGICAL_ERROR); +} + +void DatabaseMaterializeMySQL::synchronized() { setThreadName("MySQLDBSync"); @@ -135,18 +163,55 @@ void DatabaseMaterializeMySQL::synchronization() { std::unique_lock lock{sync_mutex}; - LOG_DEBUG(log, "Checking " + database_name + " database status."); - while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name)) - sync_cond.wait_for(lock, std::chrono::seconds(1)); + MasterStatusInfo master_status = prepareSynchronized(lock); - LOG_DEBUG(log, database_name + " database status is OK."); - - Poco::File dumped_flag(getMetadataPath() + "/dumped.flag"); - - if (!dumped_flag.exists()) + DataBuffers buffers(version, this, [&](const std::unordered_map & tables_data) { - dumpMySQLDatabase([&]() { return sync_quit.load(std::memory_order_seq_cst); }); - dumped_flag.createFile(); + master_status.transaction(client.getPosition(), [&]() /// At least once, There is only one possible reference: https://github.com/ClickHouse/ClickHouse/pull/8467 + { + for (const auto & [table_name, data] : tables_data) + { + if (!sync_quit.load(std::memory_order_seq_cst)) + { + LOG_DEBUG(log, "Prepare to flush data."); + BlockOutputStreamPtr output = getTableOutput(table_name, false); + output->writePrefix(); + output->write(data); + output->writeSuffix(); + output->flush(); + LOG_DEBUG(log, "Finish data flush."); + } + } + }); + }); + + while (!sync_quit.load(std::memory_order_seq_cst)) + { + const auto & event = client.readOneBinlogEvent(); + + if (event->type() == MYSQL_WRITE_ROWS_EVENT) + { + WriteRowsEvent & write_rows_event = static_cast(*event); + write_rows_event.dump(); + buffers.writeData(write_rows_event.table, write_rows_event.rows); + } + else if (event->type() == MYSQL_UPDATE_ROWS_EVENT) + { + UpdateRowsEvent & update_rows_event = static_cast(*event); + update_rows_event.dump(); + buffers.updateData(update_rows_event.table, update_rows_event.rows); + } + else if (event->type() == MYSQL_DELETE_ROWS_EVENT) + { + DeleteRowsEvent & delete_rows_event = static_cast(*event); + delete_rows_event.dump(); + buffers.deleteData(delete_rows_event.table, delete_rows_event.rows); + } + else if (event->type() == MYSQL_QUERY_EVENT) + { + /// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL. + buffers.flush(); + } } } catch(...) diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index b4fabdb20eb..b6b27d33cae 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -3,46 +3,165 @@ #include "config_core.h" #if USE_MYSQL -#include -#include -#include -#include -#include -#include -#include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include namespace DB { -class DatabaseMaterializeMySQL : public DatabaseOrdinary +class DatabaseMaterializeMySQL : public IDatabase { public: DatabaseMaterializeMySQL( const Context & context, const String & database_name_, const String & metadata_path_, - const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_); + const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, + MySQLClient && client_); String getEngineName() const override { return "MySQL"; } + void shutdown() override { nested_database->shutdown(); } + + bool empty() const override { return nested_database->empty(); } + + String getDataPath() const override { return nested_database->getDataPath(); } + + String getTableDataPath(const String & string) const override { return nested_database->getTableDataPath(string); } + + String getTableDataPath(const ASTCreateQuery & query) const override { return nested_database->getTableDataPath(query); } + + UUID tryGetTableUUID(const String & string) const override { return nested_database->tryGetTableUUID(string); } + + bool isDictionaryExist(const String & string) const override { return nested_database->isDictionaryExist(string); } + + DatabaseDictionariesIteratorPtr getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name) override + { + return nested_database->getDictionariesIterator(filter_by_dictionary_name); + } + + void createTable(const Context & context, const String & string, const StoragePtr & ptr, const ASTPtr & astPtr) override + { + nested_database->createTable(context, string, ptr, astPtr); + } + + void createDictionary(const Context & context, const String & string, const ASTPtr & ptr) override + { + nested_database->createDictionary(context, string, ptr); + } + + void dropTable(const Context & context, const String & string, bool no_delay) override + { + nested_database->dropTable(context, string, no_delay); + } + + void removeDictionary(const Context & context, const String & string) override { nested_database->removeDictionary(context, string); } + + void attachTable(const String & string, const StoragePtr & ptr, const String & relative_table_path) override + { + nested_database->attachTable(string, ptr, relative_table_path); + } + + void attachDictionary(const String & string, const DictionaryAttachInfo & info) override { nested_database->attachDictionary(string, info); } + + StoragePtr detachTable(const String & string) override { return nested_database->detachTable(string); } + + void detachDictionary(const String & string) override { nested_database->detachDictionary(string); } + + void renameTable(const Context & context, const String & string, IDatabase & database, const String & string1, bool b) override + { + nested_database->renameTable(context, string, database, string1, b); + } + + void alterTable(const Context & context, const StorageID & id, const StorageInMemoryMetadata & metadata) override + { + nested_database->alterTable(context, id, metadata); + } + + time_t getObjectMetadataModificationTime(const String & string) const override + { + return nested_database->getObjectMetadataModificationTime(string); + } + + Poco::AutoPtr getDictionaryConfiguration(const String & string) const override + { + return nested_database->getDictionaryConfiguration(string); + } + + String getMetadataPath() const override { return nested_database->getMetadataPath(); } + + String getObjectMetadataPath(const String & string) const override { return nested_database->getObjectMetadataPath(string); } + + bool shouldBeEmptyOnDetach() const override { return nested_database->shouldBeEmptyOnDetach(); } + + void drop(const Context & context) override { nested_database->drop(context); } + + bool isTableExist(const String & name) const override { return nested_database->isTableExist(name); } + + StoragePtr tryGetTable(const String & name) const override { return nested_database->tryGetTable(name); } + + void loadStoredObjects(Context & context, bool b) override + { + try + { + LOG_DEBUG(log, "Loading MySQL nested database stored objects."); + nested_database->loadStoredObjects(context, b); + LOG_DEBUG(log, "Loaded MySQL nested database stored objects."); + } + catch (...) + { + tryLogCurrentException(log, "Cannot load MySQL nested database stored objects."); + throw; + } + } + + ASTPtr getCreateDatabaseQuery() const override + { + const auto & create_query = std::make_shared(); + create_query->database = database_name; + create_query->set(create_query->storage, database_engine_define); + return create_query; + } + + DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override { return nested_database->getTablesIterator(filter_by_table_name); } + private: -// const Context & global_context; -// String metadata_path; + const Context & global_context; + String metadata_path; ASTPtr database_engine_define; String mysql_database_name; + DatabasePtr nested_database; + size_t version{0}; mutable mysqlxx::Pool pool; + mutable MySQLClient client; - void synchronization(); + Poco::Logger * log; - void tryToExecuteQuery(const String & query_to_execute); + void synchronized(); - void dumpMySQLDatabase(const std::function & is_cancelled); + MasterStatusInfo prepareSynchronized(std::unique_lock & lock); - String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name); + BlockOutputStreamPtr getTableOutput(const String & table_name, bool fill_version); + + BlockIO tryToExecuteQuery(const String & query_to_execute, const String & comment); + + String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name); + + void dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector & tables_name, const std::function & is_cancelled); mutable std::mutex sync_mutex; std::atomic sync_quit{false}; std::condition_variable sync_cond; - ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronization, this}; + ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronized, this}; }; } diff --git a/src/Databases/MySQL/MasterStatusInfo.cpp b/src/Databases/MySQL/MasterStatusInfo.cpp index 3a1a6c3d7dd..687a028ebdc 100644 --- a/src/Databases/MySQL/MasterStatusInfo.cpp +++ b/src/Databases/MySQL/MasterStatusInfo.cpp @@ -4,15 +4,13 @@ #include #include #include +#include +#include +#include +#include namespace DB { -/*MasterStatusInfo::MasterStatusInfo( - String binlog_file_, UInt64 binlog_position_, String binlog_do_db_, String binlog_ignore_db_, String executed_gtid_set_) - : binlog_file(binlog_file_), binlog_position(binlog_position_), binlog_do_db(binlog_do_db_), binlog_ignore_db(binlog_ignore_db_), - executed_gtid_set(executed_gtid_set_) -{ -}*/ static std::vector fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database) { @@ -31,42 +29,15 @@ static std::vector fetchTablesInDB(const mysqlxx::PoolWithFailover::Entr return tables_in_db; } - -MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database) -{ - bool locked_tables = false; - - try - { - connection->query("FLUSH TABLES;").execute(); - connection->query("FLUSH TABLES WITH READ LOCK;").execute(); - - locked_tables = true; - fetchMasterStatus(connection); - connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); - connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); - - need_dumping_tables = fetchTablesInDB(connection, database); - connection->query("UNLOCK TABLES;").execute(); - } - catch (...) - { - if (locked_tables) - connection->query("UNLOCK TABLES;").execute(); - - throw; - } -} void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection) { - Block header - { - {std::make_shared(), "File"}, - {std::make_shared(), "Position"}, - {std::make_shared(), "Binlog_Do_DB"}, - {std::make_shared(), "Binlog_Ignore_DB"}, - {std::make_shared(), "Executed_Gtid_Set"}, - }; + Block header{ + {std::make_shared(), "File"}, + {std::make_shared(), "Position"}, + {std::make_shared(), "Binlog_Do_DB"}, + {std::make_shared(), "Binlog_Ignore_DB"}, + {std::make_shared(), "Executed_Gtid_Set"}, + }; MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE); Block master_status = input.read(); @@ -81,4 +52,99 @@ void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & conn executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet(); } +bool MasterStatusInfo::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) +{ + Block header{ + {std::make_shared(), "Log_name"}, + {std::make_shared(), "File_size"}, + {std::make_shared(), "Encrypted"} + }; + + MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", header, DEFAULT_BLOCK_SIZE); + + while (Block block = input.read()) + { + for (size_t index = 0; index < block.rows(); ++index) + { + const auto & log_name = (*block.getByPosition(0).column)[index].safeGet(); + if (log_name == binlog_file) + return true; + } + } + return false; +} +void MasterStatusInfo::finishDump() +{ + WriteBufferFromFile out(persistent_path); + out << "Version:\t1\n" + << "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db + << "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set; + + out.next(); + out.sync(); +} + +void MasterStatusInfo::transaction(const MySQLReplication::Position & position, const std::function & fun) +{ + binlog_file = position.binlog_name; + binlog_position = position.binlog_pos; + + { + Poco::File temp_file(persistent_path + ".temp"); + if (temp_file.exists()) + temp_file.remove(); + } + + WriteBufferFromFile out(persistent_path + ".temp"); + out << "Version:\t1\n" + << "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db + << "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set; + out.next(); + out.sync(); + + fun(); + Poco::File(persistent_path + ".temp").renameTo(persistent_path); +} + +MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database) + : persistent_path(path_) +{ + if (Poco::File(persistent_path).exists()) + { + ReadBufferFromFile in(persistent_path); + in >> "Version:\t1\n" >> "Binlog File:\t" >> binlog_file >> "\nBinlog Position:\t" >> binlog_position >> "\nBinlog Do DB:\t" + >> binlog_do_db >> "\nBinlog Ignore DB:\t" >> binlog_ignore_db >> "\nExecuted GTID SET:\t" >> executed_gtid_set; + + if (checkBinlogFileExists(connection)) + { + std::cout << "Load From File \n"; + return; + } + } + + bool locked_tables = false; + + try + { + connection->query("FLUSH TABLES;").execute(); + connection->query("FLUSH TABLES WITH READ LOCK;").execute(); + + locked_tables = true; + fetchMasterStatus(connection); + connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); + connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); + + need_dumping_tables = fetchTablesInDB(connection, database); + connection->query("UNLOCK TABLES;").execute(); + /// TODO: 拉取建表语句, 解析并构建出表结构(列列表, 主键, 唯一索引, 分区键) + } + catch (...) + { + if (locked_tables) + connection->query("UNLOCK TABLES;").execute(); + + throw; + } +} + } diff --git a/src/Databases/MySQL/MasterStatusInfo.h b/src/Databases/MySQL/MasterStatusInfo.h index ebc2a8c8d12..d18caf4b5fe 100644 --- a/src/Databases/MySQL/MasterStatusInfo.h +++ b/src/Databases/MySQL/MasterStatusInfo.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -9,6 +10,8 @@ namespace DB struct MasterStatusInfo { + const String persistent_path; + String binlog_file; UInt64 binlog_position; String binlog_do_db; @@ -17,10 +20,16 @@ struct MasterStatusInfo std::vector need_dumping_tables; - MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database); + void finishDump(); void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection); + bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection); + + void transaction(const MySQLReplication::Position & position, const std::function & fun); + + MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database); + }; diff --git a/src/Databases/MySQL/queryConvert.cpp b/src/Databases/MySQL/queryConvert.cpp new file mode 100644 index 00000000000..decbf8f3cb6 --- /dev/null +++ b/src/Databases/MySQL/queryConvert.cpp @@ -0,0 +1,152 @@ +#include + +#include +#include + +namespace DB +{ + +ASTPtr getFormattedOrderByExpression(const MySQLTableStruct & table_struct) +{ + if (table_struct.primary_keys.empty()) + return makeASTFunction("tuple"); + + /// TODO: support unique key & key + const auto function = std::make_shared(); + function->name = "tuple"; + function->arguments = std::make_shared(); + function->children.push_back(function->arguments); + function->arguments->children = table_struct.primary_keys; + return function; +} + +template +Field choiceBetterRangeSize(TType min, TType max, size_t max_ranges, size_t min_size_pre_range) +{ + UInt64 interval = UInt64(max) - min; + size_t calc_rows_pre_range = std::ceil(interval / double(max_ranges)); + size_t rows_pre_range = std::max(min_size_pre_range, calc_rows_pre_range); + + if (rows_pre_range >= interval) + return Null(); + + return rows_pre_range > std::numeric_limits::max() ? Field(UInt64(rows_pre_range)) : Field(TType(rows_pre_range)); +} + +ASTPtr getFormattedPartitionByExpression(const MySQLTableStruct & table_struct, const Context & context, size_t max_ranges, size_t min_rows_pre_range) +{ + ASTPtr partition_columns = std::make_shared(); + + if (!table_struct.partition_keys.empty()) + partition_columns->children = table_struct.partition_keys; + else if (!table_struct.primary_keys.empty()) + { + ASTPtr expr_list = std::make_shared(); + expr_list->children = table_struct.primary_keys; + + auto syntax = SyntaxAnalyzer(context).analyze(expr_list, table_struct.columns_name_and_type); + auto index_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false); + const NamesAndTypesList & required_names_and_types = index_expr->getRequiredColumnsWithTypes(); + + const auto & addPartitionColumn = [&](const String & column_name, const DataTypePtr & type, Field better_pre_range_size) + { + partition_columns->children.emplace_back(std::make_shared(column_name)); + + if (type->isNullable()) + partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back()); + + if (!better_pre_range_size.isNull()) + partition_columns->children.back() + = makeASTFunction("divide", partition_columns->children.back(), std::make_shared(better_pre_range_size)); + }; + + for (const auto & required_name_and_type : required_names_and_types) + { + DataTypePtr assume_not_null = required_name_and_type.type; + if (assume_not_null->isNullable()) + assume_not_null = (static_cast(*assume_not_null)).getNestedType(); + + WhichDataType which(assume_not_null); + if (which.isInt8()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isInt16()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isInt32()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isInt64()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isUInt8()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isUInt16()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isUInt32()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isUInt64()) + addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( + std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); + else if (which.isDateOrDateTime()) + { + partition_columns->children.emplace_back(std::make_shared(required_name_and_type.name)); + + if (required_name_and_type.type->isNullable()) + partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back()); + + partition_columns->children.back() = makeASTFunction("toYYYYMM", partition_columns->children.back()); + } + } + } + + const auto function = std::make_shared(); + function->name = "tuple"; + function->arguments = partition_columns; + function->children.push_back(function->arguments); + return function; +} + +String getUniqueColumnName(NamesAndTypesList columns_name_and_type, const String & prefix) +{ + const auto & is_unique = [&](const String & column_name) + { + for (const auto & column_name_and_type : columns_name_and_type) + { + if (column_name_and_type.name == column_name) + return false; + } + + return true; + }; + + if (is_unique(prefix)) + return prefix; + + for (size_t index = 0; ; ++index) + { + const String & cur_name = prefix + "_" + toString(index); + if (is_unique(cur_name)) + return cur_name; + } +} + +String toCreateQuery(const MySQLTableStruct & table_struct, const Context & context) +{ + /// TODO: settings + String create_version = getUniqueColumnName(table_struct.columns_name_and_type, "_create_version"); + String delete_version = getUniqueColumnName(table_struct.columns_name_and_type, "_delete_version"); + WriteBufferFromOwnString out; + out << "CREATE TABLE " << (table_struct.if_not_exists ? "IF NOT EXISTS" : "") + << backQuoteIfNeed(table_struct.database_name) + "." << backQuoteIfNeed(table_struct.table_name) << "(" + << queryToString(InterpreterCreateQuery::formatColumns(table_struct.columns_name_and_type)) + << ", " << create_version << " UInt64, " << delete_version << " UInt64" << ") ENGINE = MergeTree()" + << " PARTITION BY " << queryToString(getFormattedPartitionByExpression(table_struct, context, 1000, 50000)) + << " ORDER BY " << queryToString(getFormattedOrderByExpression(table_struct)); + return out.str(); +} + +} diff --git a/src/Databases/MySQL/queryConvert.h b/src/Databases/MySQL/queryConvert.h new file mode 100644 index 00000000000..eb5ae5b2648 --- /dev/null +++ b/src/Databases/MySQL/queryConvert.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +String toCreateQuery(const MySQLTableStruct & table_struct, const Context & context); + +} diff --git a/src/Interpreters/MySQL/CreateQueryConvertVisitor.cpp b/src/Interpreters/MySQL/CreateQueryVisitor.cpp similarity index 55% rename from src/Interpreters/MySQL/CreateQueryConvertVisitor.cpp rename to src/Interpreters/MySQL/CreateQueryVisitor.cpp index c40cbaf846a..c41b51d54dc 100644 --- a/src/Interpreters/MySQL/CreateQueryConvertVisitor.cpp +++ b/src/Interpreters/MySQL/CreateQueryVisitor.cpp @@ -1,13 +1,13 @@ -#include -#include #include -#include -#include -#include +#include #include +#include +#include #include #include +#include #include +#include #include #include @@ -94,14 +94,9 @@ void CreateQueryMatcher::visit(const MySQLParser::ASTCreateQuery & create, const if (create.partition_options) visit(*create.partition_options->as(), create.partition_options, data); - auto expression_list = std::make_shared(); - expression_list->children = data.primary_keys; - - data.out << "CREATE TABLE " << (create.if_not_exists ? "IF NOT EXISTS" : "") - << (create.database.empty() ? "" : backQuoteIfNeed(create.database) + ".") << backQuoteIfNeed(create.table) - << "(" << queryToString(InterpreterCreateQuery::formatColumns(data.columns_name_and_type)) << ") ENGINE = MergeTree()" - " PARTITION BY " << queryToString(data.getFormattedPartitionByExpression()) - << " ORDER BY " << queryToString(data.getFormattedOrderByExpression()); + data.table_name = create.table; + data.database_name = create.database; + data.if_not_exists = create.if_not_exists; } void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareIndex & declare_index, const ASTPtr &, Data & data) @@ -201,111 +196,39 @@ void CreateQueryMatcher::Data::addPartitionKey(const ASTPtr & partition_key) partition_keys.emplace_back(partition_key); } -ASTPtr CreateQueryMatcher::Data::getFormattedOrderByExpression() +} + +bool MySQLTableStruct::operator==(const MySQLTableStruct & other) const { - if (primary_keys.empty()) - return makeASTFunction("tuple"); + const auto & this_expression = std::make_shared(); + this_expression->children.insert(this_expression->children.begin(), primary_keys.begin(), primary_keys.end()); + this_expression->children.insert(this_expression->children.begin(), partition_keys.begin(), partition_keys.end()); - /// TODO: support unique key & key - const auto function = std::make_shared(); - function->name = "tuple"; - function->arguments = std::make_shared(); - function->children.push_back(function->arguments); - function->arguments->children = primary_keys; + const auto & other_expression = std::make_shared(); + other_expression->children.insert(other_expression->children.begin(), other.primary_keys.begin(), other.primary_keys.end()); + other_expression->children.insert(other_expression->children.begin(), other.partition_keys.begin(), other.partition_keys.end()); - return function; + return queryToString(this_expression) == queryToString(other_expression) && columns_name_and_type == other.columns_name_and_type; } -template -Field choiceBetterRangeSize(TType min, TType max, size_t max_ranges, size_t min_size_pre_range) +MySQLTableStruct visitCreateQuery(ASTPtr & create_query, const Context & context, const std::string & new_database) { - UInt64 interval = UInt64(max) - min; - size_t calc_rows_pre_range = std::ceil(interval / double(max_ranges)); - size_t rows_pre_range = std::max(min_size_pre_range, calc_rows_pre_range); - - if (rows_pre_range >= interval) - return Null(); - - return rows_pre_range > std::numeric_limits::max() ? Field(UInt64(rows_pre_range)) : Field(TType(rows_pre_range)); + create_query->as()->database = new_database; + MySQLVisitor::CreateQueryVisitor::Data table_struct(context); + MySQLVisitor::CreateQueryVisitor visitor(table_struct); + visitor.visit(create_query); + return std::move(table_struct); } -ASTPtr CreateQueryMatcher::Data::getFormattedPartitionByExpression() +MySQLTableStruct visitCreateQuery(const String & create_query, const Context & context, const std::string & new_database) { - ASTPtr partition_columns = std::make_shared(); + MySQLParser::ParserCreateQuery p_create_query; + ASTPtr ast_create_query = parseQuery(p_create_query, create_query.data(), create_query.data() + create_query.size(), "", 0, 0); - if (!partition_keys.empty()) - partition_columns->children = partition_keys; - else if (!primary_keys.empty()) - { - ASTPtr expr_list = std::make_shared(); - expr_list->children = primary_keys; - - auto syntax = SyntaxAnalyzer(context).analyze(expr_list, columns_name_and_type); - auto index_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false); - const NamesAndTypesList & required_names_and_types = index_expr->getRequiredColumnsWithTypes(); - - const auto & addPartitionColumn = [&](const String & column_name, const DataTypePtr & type, Field better_pre_range_size) - { - partition_columns->children.emplace_back(std::make_shared(column_name)); - - if (type->isNullable()) - partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back()); - - if (!better_pre_range_size.isNull()) - partition_columns->children.back() - = makeASTFunction("divide", partition_columns->children.back(), std::make_shared(better_pre_range_size)); - }; - - for (const auto & required_name_and_type : required_names_and_types) - { - DataTypePtr assume_not_null = required_name_and_type.type; - if (assume_not_null->isNullable()) - assume_not_null = (static_cast(*assume_not_null)).getNestedType(); - - WhichDataType which(assume_not_null); - if (which.isInt8()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isInt16()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isInt32()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isInt64()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isUInt8()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isUInt16()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isUInt32()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isUInt64()) - addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize( - std::numeric_limits::min(), std::numeric_limits::max(), max_ranges, min_rows_pre_range)); - else if (which.isDateOrDateTime()) - { - partition_columns->children.emplace_back(std::make_shared(required_name_and_type.name)); - - if (required_name_and_type.type->isNullable()) - partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back()); - - partition_columns->children.back() = makeASTFunction("toYYYYMM", partition_columns->children.back()); - } - } - } - - const auto function = std::make_shared(); - function->name = "tuple"; - function->arguments = partition_columns; - function->children.push_back(function->arguments); - return function; -} + if (!ast_create_query || !ast_create_query->as()) + throw Exception("LOGICAL ERROR: ast cannot cast to MySQLParser::ASTCreateQuery.", ErrorCodes::LOGICAL_ERROR); + return visitCreateQuery(ast_create_query, context, new_database); } } diff --git a/src/Interpreters/MySQL/CreateQueryConvertVisitor.h b/src/Interpreters/MySQL/CreateQueryVisitor.h similarity index 60% rename from src/Interpreters/MySQL/CreateQueryConvertVisitor.h rename to src/Interpreters/MySQL/CreateQueryVisitor.h index 10fcb0e7b7e..2a57ab817c1 100644 --- a/src/Interpreters/MySQL/CreateQueryConvertVisitor.h +++ b/src/Interpreters/MySQL/CreateQueryVisitor.h @@ -2,15 +2,34 @@ #include #include -#include #include -#include +#include #include +#include #include +#include namespace DB { +struct MySQLTableStruct +{ + bool if_not_exists; + String table_name; + String database_name; + ASTs primary_keys; + ASTs partition_keys; + NamesAndTypesList columns_name_and_type; + + MySQLTableStruct() {} + + MySQLTableStruct(const ASTs & primary_keys_, const ASTs & partition_keys_, const NamesAndTypesList & columns_name_and_type_) + : primary_keys(primary_keys_), partition_keys(partition_keys_), columns_name_and_type(columns_name_and_type_) + {} + + bool operator==(const MySQLTableStruct & other) const; +}; + namespace MySQLVisitor { @@ -20,25 +39,17 @@ class CreateQueryMatcher public: using Visitor = InDepthNodeVisitor; - struct Data + struct Data : public MySQLTableStruct { - /// SETTINGS - WriteBuffer & out; const Context & context; size_t max_ranges; size_t min_rows_pre_range; - ASTs primary_keys; - ASTs partition_keys; - NamesAndTypesList columns_name_and_type; + Data(const Context & context_) : MySQLTableStruct(), context(context_) {} void addPrimaryKey(const ASTPtr & primary_key); void addPartitionKey(const ASTPtr & partition_key); - - ASTPtr getFormattedOrderByExpression(); - - ASTPtr getFormattedPartitionByExpression(); }; static void visit(ASTPtr & ast, Data & data); @@ -56,10 +67,12 @@ private: static void visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, const ASTPtr &, Data & data); }; -using CreateQueryConvertVisitor = CreateQueryMatcher::Visitor; +using CreateQueryVisitor = CreateQueryMatcher::Visitor; } +MySQLTableStruct visitCreateQuery(ASTPtr & create_query, const Context & context, const std::string & new_database); +MySQLTableStruct visitCreateQuery(const String & create_query, const Context & context, const std::string & new_database); } diff --git a/src/Interpreters/MySQL/tests/gtest_create_query_convert_visitor.cpp b/src/Interpreters/MySQL/tests/gtest_create_query_convert_visitor.cpp index beed50b1450..d44c513fea6 100644 --- a/src/Interpreters/MySQL/tests/gtest_create_query_convert_visitor.cpp +++ b/src/Interpreters/MySQL/tests/gtest_create_query_convert_visitor.cpp @@ -1,150 +1,187 @@ -#include -#include +#include #include -#include #include -#include +#include +#include +#include +#include +#include using namespace DB; using namespace MySQLParser; using namespace MySQLVisitor; +static DataTypePtr getType(const String & data_type) +{ + return DataTypeFactory::instance().get(data_type); +} + static ContextShared * contextShared() { static SharedContextHolder shared = Context::createShared(); return shared.get(); } -static String convert(const String & input) +static MySQLTableStruct visitQuery(const String & query) { ParserCreateQuery p_create_query; - ASTPtr ast = parseQuery(p_create_query, input.data(), input.data() + input.size(), "", 0, 0); + ASTPtr ast = parseQuery(p_create_query, query.data(), query.data() + query.size(), "", 0, 0); - WriteBufferFromOwnString out; - CreateQueryConvertVisitor::Data data{ - .out = out, .context = Context::createGlobal(contextShared()), .max_ranges = 1000, .min_rows_pre_range = 1000000}; - CreateQueryConvertVisitor visitor(data); + CreateQueryVisitor::Data data(Context::createGlobal(contextShared())); + data.max_ranges = 1000; + data.min_rows_pre_range = 1000000; + CreateQueryVisitor visitor(data); visitor.visit(ast); - return out.str(); + return std::move(data); } -TEST(CreateQueryConvert, TestConvertNumberColumnsType) +TEST(CreateQueryVisitor, TestWithNumberColumnsType) { EXPECT_EQ( - convert("CREATE TABLE test(a tinyint, b SMALLINT, c MEDIUMINT, d INT, e INTEGER, f BIGINT, g DECIMAL, h DEC, i NUMERIC, j FIXED, k " - "FLOAT, l DOUBLE, m DOUBLE PRECISION, n REAL)"), - "CREATE TABLE test(`a` Nullable(Int8), `b` Nullable(Int16), `c` Nullable(Int32), `d` Nullable(Int32), `e` Nullable(Int32), `f` " - "Nullable(Int64), `g` Nullable(Decimal(10, 0)), `h` Nullable(Decimal(10, 0)), `i` Nullable(Decimal(10, 0)), `j` " - "Nullable(Decimal(10, 0)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = " - "MergeTree() PARTITION BY tuple() ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a tinyint, b SMALLINT, c MEDIUMINT, d INT, e INTEGER, f BIGINT, g DECIMAL, h DEC, i NUMERIC, j " + "FIXED, k FLOAT, l DOUBLE, m DOUBLE PRECISION, n REAL)"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Int8)")}, {"b", getType("Nullable(Int16)")} + , {"c", getType("Nullable(Int32)")}, {"d", getType("Nullable(Int32)")}, {"e", getType("Nullable(Int32)")} + , {"f", getType("Nullable(Int64)")}, {"g", getType("Nullable(Decimal(10, 0))")}, {"h", getType("Nullable(Decimal(10, 0))")} + , {"i", getType("Nullable(Decimal(10, 0))")}, {"j", getType("Nullable(Decimal(10, 0))")} + , {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")} + , {"n", getType("Nullable(Float64)")}} + ) + ); EXPECT_EQ( - convert("CREATE TABLE test(a tinyint(1), b SMALLINT(1), c MEDIUMINT(1), d INT(1), e INTEGER(1), f BIGINT(1), g DECIMAL(1), h " - "DEC(2, 1), i NUMERIC(4, 3), j FIXED(6, 5), k FLOAT(1), l DOUBLE(1, 2), m DOUBLE PRECISION(3, 4), n REAL(5, 6))"), - "CREATE TABLE test(`a` Nullable(Int8), `b` Nullable(Int16), `c` Nullable(Int32), `d` Nullable(Int32), `e` Nullable(Int32), `f` " - "Nullable(Int64), `g` Nullable(Decimal(1, 0)), `h` Nullable(Decimal(2, 1)), `i` Nullable(Decimal(4, 3)), `j` Nullable(Decimal(6, " - "5)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = MergeTree() PARTITION " - "BY tuple() ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a tinyint(1), b SMALLINT(1), c MEDIUMINT(1), d INT(1), e INTEGER(1), f BIGINT(1), g DECIMAL(1), h " + "DEC(2, 1), i NUMERIC(4, 3), j FIXED(6, 5), k FLOAT(1), l DOUBLE(1, 2), m DOUBLE PRECISION(3, 4), n REAL(5, 6))"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Int8)")}, {"b", getType("Nullable(Int16)")} + , {"c", getType("Nullable(Int32)")}, {"d", getType("Nullable(Int32)")}, {"e", getType("Nullable(Int32)")} + , {"f", getType("Nullable(Int64)")}, {"g", getType("Nullable(Decimal(1, 0))")}, {"h", getType("Nullable(Decimal(2, 1))")} + , {"i", getType("Nullable(Decimal(4, 3))")}, {"j", getType("Nullable(Decimal(6, 5))")} + , {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")} + , {"n", getType("Nullable(Float64)")}} + ) + ); /// UNSIGNED EXPECT_EQ( - convert("CREATE TABLE test(a tinyint UNSIGNED, b SMALLINT(1) UNSIGNED, c MEDIUMINT(1) UNSIGNED, d INT(1) UNSIGNED, e INTEGER(1), f " - "BIGINT(1) UNSIGNED, g DECIMAL(1) UNSIGNED, h DEC(2, 1) UNSIGNED, i NUMERIC(4, 3) UNSIGNED, j FIXED(6, 5) UNSIGNED, k FLOAT(1) " - "UNSIGNED, l DOUBLE(1, 2) UNSIGNED, m DOUBLE PRECISION(3, 4) UNSIGNED, n REAL(5, 6) UNSIGNED)"), - "CREATE TABLE test(`a` Nullable(UInt8), `b` Nullable(UInt16), `c` Nullable(UInt32), `d` Nullable(UInt32), `e` Nullable(Int32), `f` " - "Nullable(UInt64), `g` Nullable(Decimal(1, 0)), `h` Nullable(Decimal(2, 1)), `i` Nullable(Decimal(4, 3)), `j` Nullable(Decimal(6, " - "5)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = MergeTree() PARTITION " - "BY tuple() ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a tinyint UNSIGNED, b SMALLINT(1) UNSIGNED, c MEDIUMINT(1) UNSIGNED, d INT(1) UNSIGNED, e INTEGER(1), f " + "BIGINT(1) UNSIGNED, g DECIMAL(1) UNSIGNED, h DEC(2, 1) UNSIGNED, i NUMERIC(4, 3) UNSIGNED, j FIXED(6, 5) UNSIGNED, k FLOAT(1) " + "UNSIGNED, l DOUBLE(1, 2) UNSIGNED, m DOUBLE PRECISION(3, 4) UNSIGNED, n REAL(5, 6) UNSIGNED)"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(UInt8)")}, {"b", getType("Nullable(UInt16)")} + , {"c", getType("Nullable(UInt32)")}, {"d", getType("Nullable(UInt32)")}, {"e", getType("Nullable(Int32)")} + , {"f", getType("Nullable(UInt64)")}, {"g", getType("Nullable(Decimal(1, 0))")}, {"h", getType("Nullable(Decimal(2, 1))")} + , {"i", getType("Nullable(Decimal(4, 3))")}, {"j", getType("Nullable(Decimal(6, 5))")} + , {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")} + , {"n", getType("Nullable(Float64)")}} + ) + ); /// NOT NULL EXPECT_EQ( - convert("CREATE TABLE test(a tinyint NOT NULL, b SMALLINT(1) NOT NULL, c MEDIUMINT(1) NOT NULL, d INT(1) NOT NULL, e INTEGER(1), f " - "BIGINT(1) NOT NULL, g DECIMAL(1) NOT NULL, h DEC(2, 1) NOT NULL, i NUMERIC(4, 3) NOT NULL, j FIXED(6, 5) NOT NULL, k FLOAT(1) NOT " - "NULL, l DOUBLE(1, 2) NOT NULL, m DOUBLE PRECISION(3, 4) NOT NULL, n REAL(5, 6) NOT NULL)"), - "CREATE TABLE test(`a` Int8, `b` Int16, `c` Int32, `d` Int32, `e` Nullable(Int32), `f` Int64, `g` Decimal(1, 0), `h` Decimal(2, " - "1), `i` Decimal(4, 3), `j` Decimal(6, 5), `k` Float32, `l` Float64, `m` Float64, `n` Float64) ENGINE = MergeTree() PARTITION BY " - "tuple() ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a tinyint NOT NULL, b SMALLINT(1) NOT NULL, c MEDIUMINT(1) NOT NULL, d INT(1) NOT NULL, e INTEGER(1), f " + "BIGINT(1) NOT NULL, g DECIMAL(1) NOT NULL, h DEC(2, 1) NOT NULL, i NUMERIC(4, 3) NOT NULL, j FIXED(6, 5) NOT NULL, k FLOAT(1) NOT " + "NULL, l DOUBLE(1, 2) NOT NULL, m DOUBLE PRECISION(3, 4) NOT NULL, n REAL(5, 6) NOT NULL)"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Int8")}, {"b", getType("Int16")} + , {"c", getType("Int32")}, {"d", getType("Int32")}, {"e", getType("Nullable(Int32)")} + , {"f", getType("Int64")}, {"g", getType("Decimal(1, 0)")}, {"h", getType("Decimal(2, 1)")} + , {"i", getType("Decimal(4, 3)")}, {"j", getType("Decimal(6, 5)")} + , {"k", getType("Float32")}, {"l", getType("Float64")}, {"m", getType("Float64")}, {"n", getType("Float64")}} + ) + ); /// ZEROFILL EXPECT_EQ( - convert("CREATE TABLE test(a tinyint ZEROFILL, b SMALLINT(1) ZEROFILL, c MEDIUMINT(1) ZEROFILL, d INT(1) ZEROFILL, e INTEGER(1), f " - "BIGINT(1) ZEROFILL, g DECIMAL(1) ZEROFILL, h DEC(2, 1) ZEROFILL, i NUMERIC(4, 3) ZEROFILL, j FIXED(6, 5) ZEROFILL, k FLOAT(1) " - "ZEROFILL, l DOUBLE(1, 2) ZEROFILL, m DOUBLE PRECISION(3, 4) ZEROFILL, n REAL(5, 6) ZEROFILL)"), - "CREATE TABLE test(`a` Nullable(UInt8), `b` Nullable(UInt16), `c` Nullable(UInt32), `d` Nullable(UInt32), `e` Nullable(Int32), `f` " - "Nullable(UInt64), `g` Nullable(Decimal(1, 0)), `h` Nullable(Decimal(2, 1)), `i` Nullable(Decimal(4, 3)), `j` Nullable(Decimal(6, " - "5)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = MergeTree() PARTITION " - "BY tuple() ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a tinyint ZEROFILL, b SMALLINT(1) ZEROFILL, c MEDIUMINT(1) ZEROFILL, d INT(1) ZEROFILL, e INTEGER(1), f " + "BIGINT(1) ZEROFILL, g DECIMAL(1) ZEROFILL, h DEC(2, 1) ZEROFILL, i NUMERIC(4, 3) ZEROFILL, j FIXED(6, 5) ZEROFILL, k FLOAT(1) " + "ZEROFILL, l DOUBLE(1, 2) ZEROFILL, m DOUBLE PRECISION(3, 4) ZEROFILL, n REAL(5, 6) ZEROFILL)"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(UInt8)")}, {"b", getType("Nullable(UInt16)")} + , {"c", getType("Nullable(UInt32)")}, {"d", getType("Nullable(UInt32)")}, {"e", getType("Nullable(Int32)")} + , {"f", getType("Nullable(UInt64)")}, {"g", getType("Nullable(Decimal(1, 0))")}, {"h", getType("Nullable(Decimal(2, 1))")} + , {"i", getType("Nullable(Decimal(4, 3))")}, {"j", getType("Nullable(Decimal(6, 5))")} + , {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")} + , {"n", getType("Nullable(Float64)")}} + ) + ); } -TEST(CreateQueryConvert, TestConvertDateTimesColumnsType) +TEST(CreateQueryVisitor, TestWithDateTimesColumnsType) { EXPECT_EQ( - convert("CREATE TABLE test(a DATE, b DATETIME, c TIMESTAMP, d TIME, e year)"), - "CREATE TABLE test(`a` Nullable(Date), `b` Nullable(DateTime), `c` Nullable(DateTime), `d` Nullable(DateTime64(3)), `e` " - "Nullable(Int16)) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE, b DATETIME, c TIMESTAMP, d TIME, e year)"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Date)")}, {"b", getType("Nullable(DateTime)")} + , {"c", getType("Nullable(DateTime)")}, {"d", getType("Nullable(DateTime64(3))")}, {"e", getType("Nullable(Int16)")} } + ) + ); EXPECT_EQ( - convert("CREATE TABLE test(a DATE, b DATETIME(1), c TIMESTAMP(1), d TIME(1), e year(4))"), - "CREATE TABLE test(`a` Nullable(Date), `b` Nullable(DateTime), `c` Nullable(DateTime), `d` Nullable(DateTime64(3)), `e` " - "Nullable(Int16)) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE, b DATETIME(1), c TIMESTAMP(1), d TIME(1), e year(4))"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Date)")}, {"b", getType("Nullable(DateTime)")} + , {"c", getType("Nullable(DateTime)")}, {"d", getType("Nullable(DateTime64(3))")}, {"e", getType("Nullable(Int16)")} } + ) + ); EXPECT_EQ( - convert( - "CREATE TABLE test(a DATE NOT NULL, b DATETIME(1) NOT NULL, c TIMESTAMP(1) NOT NULL, d TIME(1) NOT NULL, e year(4) NOT NULL)"), - "CREATE TABLE test(`a` Date, `b` DateTime, `c` DateTime, `d` DateTime64(3), `e` Int16) ENGINE = MergeTree() PARTITION BY tuple() " - "ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE NOT NULL, b DATETIME(1) NOT NULL, c TIMESTAMP(1) NOT NULL, d TIME(1) NOT NULL, e year(4) NOT NULL)"), + MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Date")}, {"b", getType("DateTime")} , {"c", getType("DateTime")}, {"d", getType("DateTime64")}, {"e", getType("Int16")} } + ) + ); } -TEST(CreateQueryConvert, TestConvertParitionOptions) +TEST(CreateQueryVisitor, TestWithParitionOptions) { EXPECT_EQ( - convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY HASH a"), - "CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY HASH a"), + MySQLTableStruct(ASTs{}, ASTs{std::make_shared("a")}, {{"a", getType("Date")}})); EXPECT_EQ( - convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LINEAR HASH a"), - "CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LINEAR HASH a"), + MySQLTableStruct(ASTs{}, ASTs{std::make_shared("a")}, {{"a", getType("Date")}})); EXPECT_EQ( - convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY RANGE(a)"), - "CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY RANGE(a)"), + MySQLTableStruct(ASTs{}, ASTs{std::make_shared("a")}, {{"a", getType("Date")}})); EXPECT_EQ( - convert("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY RANGE COLUMNS(a, b)"), - "CREATE TABLE test(`a` Date, `b` Nullable(Int32)) ENGINE = MergeTree() PARTITION BY (a, b) ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY RANGE COLUMNS(a, b)"), + MySQLTableStruct(ASTs{}, ASTs{std::make_shared("a"), std::make_shared("b")}, {{"a", getType("Date")}, {"b", getType("Nullable(Int32)")}})); EXPECT_EQ( - convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LIST(a)"), - "CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LIST(a)"), + MySQLTableStruct(ASTs{}, ASTs{std::make_shared("a")}, {{"a", getType("Date")}})); EXPECT_EQ( - convert("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY LIST COLUMNS(a, b)"), - "CREATE TABLE test(`a` Date, `b` Nullable(Int32)) ENGINE = MergeTree() PARTITION BY (a, b) ORDER BY tuple()"); + visitQuery("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY LIST COLUMNS(a, b)"), + MySQLTableStruct(ASTs{}, ASTs{std::make_shared("a"), std::make_shared("b")}, + {{"a", getType("Date")}, {"b", getType("Nullable(Int32)")}})); } -TEST(CreateQueryConvert, TestConvertPrimaryToPartitionBy) +TEST(CreateQueryVisitor, TestWithPrimaryToPartitionBy) { - EXPECT_EQ(convert("CREATE TABLE test(a DATE NOT NULL PRIMARY KEY)"), - "CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(toYYYYMM(a)) ORDER BY tuple(a)"); - - EXPECT_EQ(convert("CREATE TABLE test(a DATETIME NOT NULL PRIMARY KEY)"), - "CREATE TABLE test(`a` DateTime) ENGINE = MergeTree() PARTITION BY tuple(toYYYYMM(a)) ORDER BY tuple(a)"); - - EXPECT_EQ(convert("CREATE TABLE test(a TINYINT NOT NULL PRIMARY KEY)"), - "CREATE TABLE test(`a` Int8) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple(a)"); - - EXPECT_EQ(convert("CREATE TABLE test(a SMALLINT NOT NULL PRIMARY KEY)"), - "CREATE TABLE test(`a` Int16) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple(a)"); - - EXPECT_EQ(convert("CREATE TABLE test(a INT NOT NULL PRIMARY KEY)"), - "CREATE TABLE test(`a` Int32) ENGINE = MergeTree() PARTITION BY tuple(a / 4294968) ORDER BY tuple(a)"); - - EXPECT_EQ(convert("CREATE TABLE test(a BIGINT NOT NULL PRIMARY KEY)"), - "CREATE TABLE test(`a` Int64) ENGINE = MergeTree() PARTITION BY tuple(a / 18446744073709552) ORDER BY tuple(a)"); + EXPECT_EQ( + visitQuery("CREATE TABLE test(a DATE NOT NULL PRIMARY KEY)"), + MySQLTableStruct(ASTs{std::make_shared("a")}, ASTs{}, {{"a", getType("Date")}})); EXPECT_EQ( - convert("CREATE TABLE test(a BIGINT PRIMARY KEY)"), - "CREATE TABLE test(`a` Nullable(Int64)) ENGINE = MergeTree() PARTITION BY tuple(assumeNotNull(a) / 18446744073709552) ORDER BY " - "tuple(a)"); + visitQuery("CREATE TABLE test(a DATETIME NOT NULL PRIMARY KEY)"), + MySQLTableStruct(ASTs{std::make_shared("a")}, ASTs{}, {{"a", getType("DateTime")}})); + + EXPECT_EQ( + visitQuery("CREATE TABLE test(a TINYINT NOT NULL PRIMARY KEY)"), + MySQLTableStruct(ASTs{std::make_shared("a")}, ASTs{}, {{"a", getType("Int8")}})); + + EXPECT_EQ( + visitQuery("CREATE TABLE test(a SMALLINT NOT NULL PRIMARY KEY)"), + MySQLTableStruct(ASTs{std::make_shared("a")}, ASTs{}, {{"a", getType("Int16")}})); + + EXPECT_EQ( + visitQuery("CREATE TABLE test(a INT NOT NULL PRIMARY KEY)"), + MySQLTableStruct(ASTs{std::make_shared("a")}, ASTs{}, {{"a", getType("Int32")}})); + + EXPECT_EQ( + visitQuery("CREATE TABLE test(a BIGINT NOT NULL PRIMARY KEY)"), + MySQLTableStruct(ASTs{std::make_shared("a")}, ASTs{}, {{"a", getType("Int64")}})); + + EXPECT_EQ( + visitQuery("CREATE TABLE test(a BIGINT PRIMARY KEY)"), + MySQLTableStruct(ASTs{std::make_shared("a")}, ASTs{}, {{"a", getType("Nullable(Int64)")}})); }