#include #if USE_MYSQL #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int BAD_ARGUMENTS;; } DataTypePtr getDataType(const String & mysql_data_type, bool is_unsigned, size_t length) { if (mysql_data_type == "tinyint") { if (is_unsigned) return std::make_shared(); else return std::make_shared(); } if (mysql_data_type == "smallint") { if (is_unsigned) return std::make_shared(); else return std::make_shared(); } if (mysql_data_type == "int" || mysql_data_type == "mediumint") { if (is_unsigned) return std::make_shared(); else return std::make_shared(); } if (mysql_data_type == "bigint") { if (is_unsigned) return std::make_shared(); else return std::make_shared(); } if (mysql_data_type == "float") return std::make_shared(); if (mysql_data_type == "double") return std::make_shared(); if (mysql_data_type == "date") return std::make_shared(); if (mysql_data_type == "datetime" || mysql_data_type == "timestamp") return std::make_shared(); if (mysql_data_type == "binary") return std::make_shared(length); /// Also String is fallback for all unknown types. return std::make_shared(); } StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context) const { const ASTFunction & args_func = typeid_cast(*ast_function); if (!args_func.arguments) throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR); ASTs & args = typeid_cast(*args_func.arguments).children; if (args.size() < 5 || args.size() > 7) throw Exception("Table function 'mysql' requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (size_t i = 0; i < args.size(); ++i) args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context); std::string host_port = static_cast(*args[0]).value.safeGet(); std::string database_name = static_cast(*args[1]).value.safeGet(); std::string table_name = static_cast(*args[2]).value.safeGet(); std::string user_name = static_cast(*args[3]).value.safeGet(); std::string password = static_cast(*args[4]).value.safeGet(); bool replace_query = false; std::string on_duplicate_clause; if (args.size() >= 6) replace_query = static_cast(*args[5]).value.safeGet() > 0; if (args.size() == 7) on_duplicate_clause = static_cast(*args[6]).value.safeGet(); if (replace_query && !on_duplicate_clause.empty()) throw Exception( "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them", ErrorCodes::BAD_ARGUMENTS); /// 3306 is the default MySQL port number auto parsed_host_port = parseAddress(host_port, 3306); mysqlxx::Pool pool(database_name, parsed_host_port.first, user_name, password, parsed_host_port.second); /// Determine table definition by running a query to INFORMATION_SCHEMA. Block sample_block { { std::make_shared(), "name" }, { std::make_shared(), "type" }, { std::make_shared(), "is_nullable" }, { std::make_shared(), "is_unsigned" }, { std::make_shared(), "length" }, }; WriteBufferFromOwnString query; query << "SELECT" " COLUMN_NAME AS name," " DATA_TYPE AS type," " IS_NULLABLE = 'YES' AS is_nullable," " COLUMN_TYPE LIKE '%unsigned' AS is_unsigned," " CHARACTER_MAXIMUM_LENGTH AS length" " FROM INFORMATION_SCHEMA.COLUMNS" " WHERE TABLE_SCHEMA = " << quote << database_name << " AND TABLE_NAME = " << quote << table_name << " ORDER BY ORDINAL_POSITION"; MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE); NamesAndTypesList columns; while (Block block = result.read()) { size_t rows = block.rows(); for (size_t i = 0; i < rows; ++i) columns.emplace_back( (*block.getByPosition(0).column)[i].safeGet(), getDataType( (*block.getByPosition(1).column)[i].safeGet(), (*block.getByPosition(3).column)[i].safeGet(), (*block.getByPosition(4).column)[i].safeGet())); /// TODO is_nullable is ignored. } auto res = StorageMySQL::create( table_name, std::move(pool), database_name, table_name, replace_query, on_duplicate_clause, ColumnsDescription{columns}, context); res->startup(); return res; } void registerTableFunctionMySQL(TableFunctionFactory & factory) { factory.registerFunction(); } } #endif