#include "config.h" #if USE_MYSQL #include #include #include #include #include #include #include #include #include #include #include #include #include #include "registerTableFunctions.h" #include // for fetchTablesColumnsList #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int UNKNOWN_TABLE; } void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr context) { const auto & args_func = ast_function->as(); if (!args_func.arguments) throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function 'mysql' must have arguments."); auto & args = args_func.arguments->children; MySQLSettings mysql_settings; const auto & settings = context->getSettingsRef(); mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec; mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec; for (auto * it = args.begin(); it != args.end(); ++it) { const ASTSetQuery * settings_ast = (*it)->as(); if (settings_ast) { mysql_settings.loadFromQuery(*settings_ast); args.erase(it); break; } } configuration = StorageMySQL::getConfiguration(args, context, mysql_settings); pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings)); } ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const { const auto & settings = context->getSettingsRef(); const auto tables_and_columns = fetchTablesColumnsList(*pool, configuration->database, {configuration->table}, settings, settings.mysql_datatypes_support_level); const auto columns = tables_and_columns.find(configuration->table); if (columns == tables_and_columns.end()) throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {} doesn't exist.", (configuration->database.empty() ? "" : (backQuote(configuration->database) + "." + backQuote(configuration->table)))); return columns->second; } StoragePtr TableFunctionMySQL::executeImpl( const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const { auto columns = getActualTableStructure(context); auto res = std::make_shared( StorageID(getDatabaseName(), table_name), std::move(*pool), configuration->database, configuration->table, configuration->replace_query, configuration->on_duplicate_clause, columns, ConstraintsDescription{}, String{}, context, MySQLSettings{}); pool.reset(); res->startup(); return res; } void registerTableFunctionMySQL(TableFunctionFactory & factory) { factory.registerFunction(); } } #endif