diff --git a/dbms/src/Common/convertMySQLDataType.cpp b/dbms/src/Common/convertMySQLDataType.cpp new file mode 100644 index 00000000000..0db9b8ab2ef --- /dev/null +++ b/dbms/src/Common/convertMySQLDataType.cpp @@ -0,0 +1,65 @@ +#include + +namespace DB +{ + +ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type) +{ + WhichDataType which(data_type); + + if (!which.isNullable()) + return std::make_shared(data_type->getName()); + + return makeASTFunction("Nullable", dataTypeConvertToQuery(typeid_cast(data_type.get())->getNestedType())); +} + +DataTypePtr convertMySQLDataType(const String & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length) +{ + DataTypePtr res; + if (mysql_data_type == "tinyint") + { + if (is_unsigned) + res = std::make_shared(); + else + res = std::make_shared(); + } + else if (mysql_data_type == "smallint") + { + if (is_unsigned) + res = std::make_shared(); + else + res = std::make_shared(); + } + else if (mysql_data_type == "int" || mysql_data_type == "mediumint") + { + if (is_unsigned) + res = std::make_shared(); + else + res = std::make_shared(); + } + else if (mysql_data_type == "bigint") + { + if (is_unsigned) + res = std::make_shared(); + else + res = std::make_shared(); + } + else if (mysql_data_type == "float") + res = std::make_shared(); + else if (mysql_data_type == "double") + res = std::make_shared(); + else if (mysql_data_type == "date") + res = std::make_shared(); + else if (mysql_data_type == "datetime" || mysql_data_type == "timestamp") + res = std::make_shared(); + else if (mysql_data_type == "binary") + res = std::make_shared(length); + else + /// Also String is fallback for all unknown types. + res = std::make_shared(); + if (is_nullable) + res = std::make_shared(res); + return res; +} + +} diff --git a/dbms/src/Common/convertMySQLDataType.h b/dbms/src/Common/convertMySQLDataType.h new file mode 100644 index 00000000000..8cff8880353 --- /dev/null +++ b/dbms/src/Common/convertMySQLDataType.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +/// Convert data type to query. for example +/// DataTypeUInt8 -> ASTIdentifier(UInt8) +/// DataTypeNullable(DataTypeUInt8) -> ASTFunction(ASTIdentifier(UInt8)) +ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type); + +/// Convert MySQL type to ClickHouse data type. +DataTypePtr convertMySQLDataType(const String & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length); + +} diff --git a/dbms/src/Databases/DatabaseFactory.cpp b/dbms/src/Databases/DatabaseFactory.cpp index 0b5f8c0643f..4c998e772b9 100644 --- a/dbms/src/Databases/DatabaseFactory.cpp +++ b/dbms/src/Databases/DatabaseFactory.cpp @@ -2,22 +2,44 @@ #include #include #include +#include +#include +#include +#include +#include + +#if USE_MYSQL + +#include + +#endif + namespace DB { namespace ErrorCodes { - extern const int UNKNOWN_DATABASE_ENGINE; +extern const int BAD_ARGUMENTS; +extern const int UNKNOWN_DATABASE_ENGINE; } - DatabasePtr DatabaseFactory::get( - const String & engine_name, const String & database_name, const String & metadata_path, + const ASTStorage * engine_define, Context & context) { + String engine_name = engine_define->engine->name; + + if (engine_name != "MySQL" && engine_define->engine->arguments) + throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS); + + if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by || + engine_define->sample_by || engine_define->settings) + throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings", + ErrorCodes::UNKNOWN_ELEMENT_IN_AST); + if (engine_name == "Ordinary") return std::make_shared(database_name, metadata_path, context); else if (engine_name == "Memory") @@ -25,6 +47,29 @@ DatabasePtr DatabaseFactory::get( else if (engine_name == "Dictionary") return std::make_shared(database_name); +#if USE_MYSQL + + else if (engine_name == "MySQL") + { + const ASTFunction * engine = engine_define->engine; + const auto & arguments = engine->arguments->children; + + if (arguments.size() != 4) + throw Exception("MySQL Database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.", + ErrorCodes::BAD_ARGUMENTS); + + const auto & mysql_host_name = arguments[0]->as()->value.safeGet(); + const auto & mysql_database_name = arguments[1]->as()->value.safeGet(); + const auto & mysql_user_name = arguments[2]->as()->value.safeGet(); + const auto & mysql_user_password = arguments[3]->as()->value.safeGet(); + + auto parsed_host_port = parseAddress(mysql_host_name, 3306); + return std::make_shared(context, database_name, parsed_host_port.first, parsed_host_port.second, mysql_database_name, + mysql_user_name, mysql_user_password); + } + +#endif + throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE); } diff --git a/dbms/src/Databases/DatabaseFactory.h b/dbms/src/Databases/DatabaseFactory.h index bb912ca377b..0fab3e2307a 100644 --- a/dbms/src/Databases/DatabaseFactory.h +++ b/dbms/src/Databases/DatabaseFactory.h @@ -3,17 +3,18 @@ #include #include - namespace DB { +class ASTStorage; + class DatabaseFactory { public: static DatabasePtr get( - const String & engine_name, const String & database_name, const String & metadata_path, + const ASTStorage * engine_define, Context & context); }; diff --git a/dbms/src/Databases/DatabaseMySQL.cpp b/dbms/src/Databases/DatabaseMySQL.cpp new file mode 100644 index 00000000000..f5b2e2aec19 --- /dev/null +++ b/dbms/src/Databases/DatabaseMySQL.cpp @@ -0,0 +1,397 @@ +#include + +#if USE_MYSQL + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_TABLE; +} + +static constexpr const std::chrono::seconds cleaner_sleep_time{30}; + +String toQueryStringWithQuote(const std::vector & quote_list) +{ + WriteBufferFromOwnString quote_list_query; + quote_list_query << "("; + + for (size_t index = 0; index < quote_list.size(); ++index) + { + if (index) + quote_list_query << ","; + + quote_list_query << quote << quote_list[index]; + } + + quote_list_query << ")"; + return quote_list_query.str(); +} + +DatabaseMySQL::DatabaseMySQL( + const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_, + const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_) + : global_context(context_), database_name(database_name_), mysql_host_name(mysql_host_name_), mysql_port(mysql_port_), + mysql_database_name(mysql_database_name_), mysql_user_name(mysql_user_name_), mysql_user_password(mysql_user_password_), + mysql_pool(mysql_database_name, mysql_host_name, mysql_user_name, mysql_user_password, mysql_port) +{ +} + +bool DatabaseMySQL::empty(const Context &) const +{ + std::lock_guard lock(mutex); + + fetchTablesIntoLocalCache(); + + return local_tables_cache.empty(); +} + +DatabaseIteratorPtr DatabaseMySQL::getIterator(const Context &) +{ + Tables tables; + std::lock_guard lock(mutex); + + fetchTablesIntoLocalCache(); + + for (const auto & local_table : local_tables_cache) + tables[local_table.first] = local_table.second.storage; + + return std::make_unique(tables); +} + +bool DatabaseMySQL::isTableExist(const Context & context, const String & name) const +{ + return bool(tryGetTable(context, name)); +} + +StoragePtr DatabaseMySQL::tryGetTable(const Context &, const String & mysql_table_name) const +{ + std::lock_guard lock(mutex); + + fetchTablesIntoLocalCache(); + + if (local_tables_cache.find(mysql_table_name) != local_tables_cache.end()) + return local_tables_cache[mysql_table_name].storage; + + return StoragePtr{}; +} + +ASTPtr DatabaseMySQL::tryGetCreateTableQuery(const Context &, const String & table_name) const +{ + std::lock_guard lock(mutex); + + fetchTablesIntoLocalCache(); + + if (local_tables_cache.find(table_name) == local_tables_cache.end()) + throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE); + + return local_tables_cache[table_name].create_table_query; +} + +time_t DatabaseMySQL::getTableMetadataModificationTime(const Context &, const String & table_name) +{ + std::lock_guard lock(mutex); + + fetchTablesIntoLocalCache(); + + if (local_tables_cache.find(table_name) == local_tables_cache.end()) + throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE); + + return time_t(local_tables_cache[table_name].modification_time); +} + +ASTPtr DatabaseMySQL::getCreateDatabaseQuery(const Context &) const +{ + const auto & create_query = std::make_shared(); + create_query->database = database_name; + + const auto & storage = std::make_shared(); + storage->set(storage->engine, makeASTFunction("MySQL", + std::make_shared(mysql_host_name + ":" + toString(mysql_port)), std::make_shared(mysql_database_name), + std::make_shared(mysql_user_name), std::make_shared(mysql_user_password))); + + create_query->set(create_query->storage, storage); + return create_query; +} + +void DatabaseMySQL::fetchTablesIntoLocalCache() const +{ + const auto & tables_with_modification_time = fetchTablesWithModificationTime(); + + destroyLocalCacheExtraTables(tables_with_modification_time); + fetchLatestTablesStructureIntoCache(tables_with_modification_time); +} + +void DatabaseMySQL::destroyLocalCacheExtraTables(const std::map & tables_with_modification_time) const +{ + for (auto iterator = local_tables_cache.begin(); iterator != local_tables_cache.end();) + { + if (tables_with_modification_time.find(iterator->first) != tables_with_modification_time.end()) + ++iterator; + else + { + outdated_tables.emplace_back(iterator->second.storage); + iterator = local_tables_cache.erase(iterator); + } + } +} + +void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map &tables_modification_time) const +{ + std::vector wait_update_tables_name; + for (const auto & table_modification_time : tables_modification_time) + { + const auto & it = local_tables_cache.find(table_modification_time.first); + + /// Outdated or new table structures + if (it == local_tables_cache.end() || table_modification_time.second > it->second.modification_time) + wait_update_tables_name.emplace_back(table_modification_time.first); + } + + std::map tables_and_columns = fetchTablesColumnsList(wait_update_tables_name); + + for (const auto & table_and_columns : tables_and_columns) + { + const auto & table_name = table_and_columns.first; + const auto & columns_name_and_type = table_and_columns.second; + const auto & table_modification_time = tables_modification_time.at(table_name); + + const auto & iterator = local_tables_cache.find(table_name); + if (iterator != local_tables_cache.end()) + { + outdated_tables.emplace_back(iterator->second.storage); + local_tables_cache.erase(iterator); + } + + local_tables_cache[table_name] = createStorageInfo(table_name, columns_name_and_type, table_modification_time); + } +} + +static ASTPtr getTableColumnsCreateQuery(const NamesAndTypesList & names_and_types_list) +{ + const auto & table_columns_list_ast = std::make_shared(); + const auto & columns_expression_list = std::make_shared(); + + for (const auto & table_column_name_and_type : names_and_types_list) + { + const auto & column_declaration = std::make_shared(); + column_declaration->name = table_column_name_and_type.name; + column_declaration->type = dataTypeConvertToQuery(table_column_name_and_type.type); + columns_expression_list->children.emplace_back(column_declaration); + } + + table_columns_list_ast->set(table_columns_list_ast->columns, columns_expression_list); + return table_columns_list_ast; +} + +static ASTPtr getTableStorageCreateQuery( + const String & host_name, const UInt16 & port, + const String & database_name, const String & table_name, + const String & user_name, const String & password) +{ + const auto & table_storage = std::make_shared(); + const auto & storage_engine = std::make_shared(); + + storage_engine->name = "MySQL"; + storage_engine->arguments = std::make_shared(); + storage_engine->children.push_back(storage_engine->arguments); + + storage_engine->arguments->children = { + std::make_shared(host_name + ":" + toString(port)), + std::make_shared(database_name), std::make_shared(table_name), + std::make_shared(user_name), std::make_shared(password) + }; + + + table_storage->set(table_storage->engine, storage_engine); + return table_storage; +} + +DatabaseMySQL::MySQLStorageInfo DatabaseMySQL::createStorageInfo( + const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const +{ + const auto & mysql_table = StorageMySQL::create( + table_name, std::move(mysql_pool), mysql_database_name, table_name, + false, "", ColumnsDescription{columns_name_and_type}, global_context); + + const auto & create_table_query = std::make_shared(); + + create_table_query->table = table_name; + create_table_query->database = database_name; + create_table_query->set(create_table_query->columns_list, getTableColumnsCreateQuery(columns_name_and_type)); + create_table_query->set(create_table_query->storage, getTableStorageCreateQuery( + mysql_host_name, mysql_port, mysql_database_name, table_name, mysql_user_name, mysql_user_password)); + + MySQLStorageInfo storage_info; + storage_info.storage = mysql_table; + storage_info.create_table_query = create_table_query; + storage_info.modification_time = table_modification_time; + + return storage_info; +} + +std::map DatabaseMySQL::fetchTablesWithModificationTime() const +{ + Block tables_status_sample_block + { + { std::make_shared(), "table_name" }, + { std::make_shared(), "modification_time" }, + }; + + WriteBufferFromOwnString query; + query << "SELECT" + " TABLE_NAME AS table_name, " + " CREATE_TIME AS modification_time " + " FROM INFORMATION_SCHEMA.TABLES " + " WHERE TABLE_SCHEMA = " << quote << mysql_database_name; + + std::map tables_with_modification_time; + MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE); + + while (Block block = result.read()) + { + size_t rows = block.rows(); + for (size_t index = 0; index < rows; ++index) + { + String table_name = (*block.getByPosition(0).column)[index].safeGet(); + tables_with_modification_time[table_name] = (*block.getByPosition(1).column)[index].safeGet(); + } + } + + return tables_with_modification_time; +} + +std::map DatabaseMySQL::fetchTablesColumnsList(const std::vector & tables_name) const +{ + std::map tables_and_columns; + + if (tables_name.empty()) + return tables_and_columns; + + Block tables_columns_sample_block + { + { std::make_shared(), "table_name" }, + { std::make_shared(), "column_name" }, + { std::make_shared(), "column_type" }, + { std::make_shared(), "is_nullable" }, + { std::make_shared(), "is_unsigned" }, + { std::make_shared(), "length" }, + }; + + WriteBufferFromOwnString query; + query << "SELECT " + " TABLE_NAME AS table_name," + " COLUMN_NAME AS column_name," + " DATA_TYPE AS column_type," + " IS_NULLABLE = 'YES' AS is_nullable," + " COLUMN_TYPE LIKE '%unsigned' AS is_unsigned," + " CHARACTER_MAXIMUM_LENGTH AS length" + " FROM INFORMATION_SCHEMA.COLUMNS" + " WHERE TABLE_SCHEMA = " << quote << mysql_database_name + << " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION"; + + const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls; + MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE); + while (Block block = result.read()) + { + size_t rows = block.rows(); + for (size_t i = 0; i < rows; ++i) + { + String table_name = (*block.getByPosition(0).column)[i].safeGet(); + tables_and_columns[table_name].emplace_back((*block.getByPosition(1).column)[i].safeGet(), + convertMySQLDataType( + (*block.getByPosition(2).column)[i].safeGet(), + (*block.getByPosition(3).column)[i].safeGet() && + external_table_functions_use_nulls, + (*block.getByPosition(4).column)[i].safeGet(), + (*block.getByPosition(5).column)[i].safeGet())); + } + } + return tables_and_columns; +} + +void DatabaseMySQL::shutdown() +{ + std::map tables_snapshot; + { + std::lock_guard lock(mutex); + tables_snapshot = local_tables_cache; + } + + for (const auto & table_snapshot : tables_snapshot) + table_snapshot.second.storage->shutdown(); + + std::lock_guard lock(mutex); + local_tables_cache.clear(); +} + +void DatabaseMySQL::cleanOutdatedTables() +{ + setThreadName("MySQLDBCleaner"); + + std::unique_lock lock{mutex}; + + while (!quit.load(std::memory_order_relaxed)) + { + for (auto iterator = outdated_tables.begin(); iterator != outdated_tables.end();) + { + if (!iterator->unique()) + ++iterator; + else + { + const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY); + + (*iterator)->shutdown(); + (*iterator)->is_dropped = true; + iterator = outdated_tables.erase(iterator); + } + } + + cond.wait_for(lock, cleaner_sleep_time); + } +} + +DatabaseMySQL::~DatabaseMySQL() +{ + try + { + if (!quit) + { + { + quit = true; + std::lock_guard lock{mutex}; + } + cond.notify_one(); + thread.join(); + } + + shutdown(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +} + +#endif diff --git a/dbms/src/Databases/DatabaseMySQL.h b/dbms/src/Databases/DatabaseMySQL.h new file mode 100644 index 00000000000..3e89b395208 --- /dev/null +++ b/dbms/src/Databases/DatabaseMySQL.h @@ -0,0 +1,125 @@ +#pragma once + +#include + +#if USE_MYSQL + +#include +#include + +namespace DB +{ + +/** Real-time access to table list and table structure from remote MySQL + * It doesn't make any manipulations with filesystem. + * All tables are created by calling code after real-time pull-out structure from remote MySQL + */ +class DatabaseMySQL : public IDatabase +{ +public: + ~DatabaseMySQL() override; + + DatabaseMySQL(const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_, + const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_); + + String getEngineName() const override { return "MySQL"; } + + String getDatabaseName() const override { return database_name; } + + bool empty(const Context & context) const override; + + DatabaseIteratorPtr getIterator(const Context & context) override; + + ASTPtr getCreateDatabaseQuery(const Context & context) const override; + + bool isTableExist(const Context & context, const String & name) const override; + + StoragePtr tryGetTable(const Context & context, const String & name) const override; + + ASTPtr tryGetCreateTableQuery(const Context & context, const String & name) const override; + + time_t getTableMetadataModificationTime(const Context & context, const String & name) override; + + void shutdown() override; + + StoragePtr detachTable(const String &) override + { + throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED); + } + + void loadTables(Context &, ThreadPool *, bool) override + { + /// do nothing + } + + void removeTable(const Context &, const String &) override + { + throw Exception("MySQL database engine does not support remove table.", ErrorCodes::NOT_IMPLEMENTED); + } + + void attachTable(const String &, const StoragePtr &) override + { + throw Exception("MySQL database engine does not support attach table.", ErrorCodes::NOT_IMPLEMENTED); + } + + void renameTable(const Context &, const String &, IDatabase &, const String &) override + { + throw Exception("MySQL database engine does not support rename table.", ErrorCodes::NOT_IMPLEMENTED); + } + + void createTable(const Context &, const String &, const StoragePtr &, const ASTPtr &) override + { + throw Exception("MySQL database engine does not support create table.", ErrorCodes::NOT_IMPLEMENTED); + } + + void alterTable(const Context &, const String &, const ColumnsDescription &, const IndicesDescription &, const ASTModifier &) override + { + throw Exception("MySQL database engine does not support alter table.", ErrorCodes::NOT_IMPLEMENTED); + } + +private: + struct MySQLStorageInfo + { + StoragePtr storage; + UInt64 modification_time; + ASTPtr create_table_query; + }; + + const Context global_context; + const String database_name; + const String mysql_host_name; + const UInt16 mysql_port; + const String mysql_database_name; + const String mysql_user_name; + const String mysql_user_password; + + mutable std::mutex mutex; + std::atomic quit{false}; + std::condition_variable cond; + + mutable mysqlxx::Pool mysql_pool; + mutable std::vector outdated_tables; + mutable std::map local_tables_cache; + + + void cleanOutdatedTables(); + + void fetchTablesIntoLocalCache() const; + + std::map fetchTablesWithModificationTime() const; + + DatabaseMySQL::MySQLStorageInfo createStorageInfo( + const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const; + + std::map fetchTablesColumnsList(const std::vector & tables_name) const; + + void destroyLocalCacheExtraTables(const std::map & tables_with_modification_time) const; + + void fetchLatestTablesStructureIntoCache(const std::map & tables_modification_time) const; + + ThreadFromGlobalPool thread{&DatabaseMySQL::cleanOutdatedTables, this}; +}; + +} + +#endif diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index b6cdf35774f..973023cd4b2 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -110,9 +110,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) const ASTStorage & storage = *create.storage; const ASTFunction & engine = *storage.engine; /// Currently, there are no database engines, that support any arguments. - if (engine.arguments || engine.parameters || storage.partition_by || storage.primary_key - || storage.order_by || storage.sample_by || storage.settings || - (create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty())) + if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty())) { std::stringstream ostr; formatAST(storage, ostr, false, false); @@ -129,7 +127,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) String metadata_path = path + "metadata/" + database_name_escaped + "/"; Poco::File(metadata_path).createDirectory(); - DatabasePtr database = DatabaseFactory::get(database_engine_name, database_name, metadata_path, context); + DatabasePtr database = DatabaseFactory::get(database_name, metadata_path, create.storage, context); /// Will write file with database metadata, if needed. String metadata_file_tmp_path = path + "metadata/" + database_name_escaped + ".sql.tmp"; diff --git a/dbms/src/TableFunctions/TableFunctionMySQL.cpp b/dbms/src/TableFunctions/TableFunctionMySQL.cpp index e335af45bcb..3a737740c22 100644 --- a/dbms/src/TableFunctions/TableFunctionMySQL.cpp +++ b/dbms/src/TableFunctions/TableFunctionMySQL.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -32,56 +33,7 @@ namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int BAD_ARGUMENTS; -} - - -DataTypePtr getDataType(const String & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length) -{ - DataTypePtr res; - if (mysql_data_type == "tinyint") - { - if (is_unsigned) - res = std::make_shared(); - else - res = std::make_shared(); - } - else if (mysql_data_type == "smallint") - { - if (is_unsigned) - res = std::make_shared(); - else - res = std::make_shared(); - } - else if (mysql_data_type == "int" || mysql_data_type == "mediumint") - { - if (is_unsigned) - res = std::make_shared(); - else - res = std::make_shared(); - } - else if (mysql_data_type == "bigint") - { - if (is_unsigned) - res = std::make_shared(); - else - res = std::make_shared(); - } - else if (mysql_data_type == "float") - res = std::make_shared(); - else if (mysql_data_type == "double") - res = std::make_shared(); - else if (mysql_data_type == "date") - res = std::make_shared(); - else if (mysql_data_type == "datetime" || mysql_data_type == "timestamp") - res = std::make_shared(); - else if (mysql_data_type == "binary") - res = std::make_shared(length); - else - /// Also String is fallback for all unknown types. - res = std::make_shared(); - if (is_nullable) - res = std::make_shared(res); - return res; + extern const int UNKNOWN_TABLE; } @@ -147,16 +99,15 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co << " AND TABLE_NAME = " << quote << table_name << " ORDER BY ORDINAL_POSITION"; - MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE); - NamesAndTypesList columns; + MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE); while (Block block = result.read()) { size_t rows = block.rows(); for (size_t i = 0; i < rows; ++i) columns.emplace_back( (*block.getByPosition(0).column)[i].safeGet(), - getDataType( + convertMySQLDataType( (*block.getByPosition(1).column)[i].safeGet(), (*block.getByPosition(2).column)[i].safeGet() && context.getSettings().external_table_functions_use_nulls, (*block.getByPosition(3).column)[i].safeGet(), @@ -164,6 +115,9 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co } + if (columns.empty()) + throw Exception("MySQL table `" + database_name + "`.`" + table_name + "` doesn't exist.", ErrorCodes::UNKNOWN_TABLE); + auto res = StorageMySQL::create( table_name, std::move(pool), diff --git a/dbms/tests/integration/test_mysql_database_engine/__init__.py b/dbms/tests/integration/test_mysql_database_engine/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_mysql_database_engine/configs/remote_servers.xml b/dbms/tests/integration/test_mysql_database_engine/configs/remote_servers.xml new file mode 100644 index 00000000000..de8e5865f12 --- /dev/null +++ b/dbms/tests/integration/test_mysql_database_engine/configs/remote_servers.xml @@ -0,0 +1,12 @@ + + + + + + node1 + 9000 + + + + + diff --git a/dbms/tests/integration/test_mysql_database_engine/test.py b/dbms/tests/integration/test_mysql_database_engine/test.py new file mode 100644 index 00000000000..d89e7646414 --- /dev/null +++ b/dbms/tests/integration/test_mysql_database_engine/test.py @@ -0,0 +1,113 @@ +from contextlib import contextmanager + +import time +import pytest + +## sudo -H pip install PyMySQL +import pymysql.cursors + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql = True) +create_table_sql_template = """ + CREATE TABLE `clickhouse`.`{}` ( + `id` int(11) NOT NULL, + `name` varchar(50) NOT NULL, + `age` int NOT NULL default 0, + `money` int NOT NULL default 0, + PRIMARY KEY (`id`)) ENGINE=InnoDB; + """ + +drop_table_sql_template = "DROP TABLE `clickhouse`.`{}`" + +add_column_sql_template = "ALTER TABLE `clickhouse`.`{}` ADD COLUMN `pid` int(11)" +del_column_sql_template = "ALTER TABLE `clickhouse`.`{}` DROP COLUMN `pid`" + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + conn = get_mysql_conn() + ## create mysql db and table + create_mysql_db(conn, 'clickhouse') + node1.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql1:3306', 'clickhouse', 'root', 'clickhouse')") + yield cluster + + finally: + cluster.shutdown() + + +def test_sync_tables_list_between_clickhouse_and_mysql(started_cluster): + mysql_connection = get_mysql_conn() + assert node1.query('SHOW TABLES FROM clickhouse_mysql FORMAT TSV').rstrip() == '' + + create_mysql_table(mysql_connection, 'first_mysql_table') + assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'first_mysql_table' FORMAT TSV").rstrip() == 'first_mysql_table' + + create_mysql_table(mysql_connection, 'second_mysql_table') + assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == 'second_mysql_table' + + drop_mysql_table(mysql_connection, 'second_mysql_table') + assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == '' + + mysql_connection.close() + +def test_sync_tables_structure_between_clickhouse_and_mysql(started_cluster): + mysql_connection = get_mysql_conn() + + create_mysql_table(mysql_connection, 'test_sync_column') + + assert node1.query( + "SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == '' + + time.sleep(3) + add_mysql_table_column(mysql_connection, "test_sync_column") + + assert node1.query( + "SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == 'pid' + + time.sleep(3) + drop_mysql_table_column(mysql_connection, "test_sync_column") + assert node1.query( + "SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == '' + + mysql_connection.close() + +def test_insert_select(started_cluster): + mysql_connection = get_mysql_conn() + create_mysql_table(mysql_connection, 'test_insert_select') + + assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '0' + node1.query("INSERT INTO `clickhouse_mysql`.{}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format('test_insert_select')) + assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '10000' + assert node1.query("SELECT sum(money) FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '30000' + mysql_connection.close() + +def get_mysql_conn(): + conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308) + return conn + +def create_mysql_db(conn, name): + with conn.cursor() as cursor: + cursor.execute( + "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name)) + +def create_mysql_table(conn, tableName): + with conn.cursor() as cursor: + cursor.execute(create_table_sql_template.format(tableName)) + +def drop_mysql_table(conn, tableName): + with conn.cursor() as cursor: + cursor.execute(drop_table_sql_template.format(tableName)) + +def add_mysql_table_column(conn, tableName): + with conn.cursor() as cursor: + cursor.execute(add_column_sql_template.format(tableName)) + +def drop_mysql_table_column(conn, tableName): + with conn.cursor() as cursor: + cursor.execute(del_column_sql_template.format(tableName))