ClickHouse/dbms/src/TableFunctions/TableFunctionMySQL.cpp

144 lines
5.2 KiB
C++
Raw Normal View History

#include "config_core.h"
2017-12-28 04:29:53 +00:00
#if USE_MYSQL
2017-12-28 04:28:05 +00:00
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
2019-02-15 11:46:07 +00:00
#include <Formats/MySQLBlockInputStream.h>
#include <Access/AccessFlags.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageMySQL.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionMySQL.h>
2017-12-28 04:28:05 +00:00
#include <Core/Defines.h>
#include <Common/Exception.h>
2017-12-28 04:28:05 +00:00
#include <Common/parseAddress.h>
#include <Common/quoteString.h>
2019-06-20 20:57:01 +00:00
#include <DataTypes/convertMySQLDataType.h>
2017-12-28 04:28:05 +00:00
#include <IO/Operators.h>
2019-12-15 06:34:43 +00:00
#include "registerTableFunctions.h"
#include <mysqlxx/Pool.h>
2017-12-28 04:28:05 +00:00
namespace DB
{
2017-12-28 04:28:05 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2018-05-14 23:59:58 +00:00
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
}
2019-07-18 18:29:49 +00:00
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
const auto & args_func = ast_function->as<ASTFunction &>();
if (!args_func.arguments)
2017-12-27 21:45:05 +00:00
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = args_func.arguments->children;
if (args.size() < 5 || args.size() > 7)
2018-05-14 11:00:22 +00:00
throw Exception("Table function 'mysql' requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < args.size(); ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
std::string host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
2019-07-19 13:28:28 +00:00
std::string remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::string remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
std::string user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
std::string password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
2017-12-27 21:45:05 +00:00
2020-01-24 16:20:36 +00:00
context.checkAccess(AccessType::mysql);
bool replace_query = false;
std::string on_duplicate_clause;
2018-05-14 11:00:22 +00:00
if (args.size() >= 6)
replace_query = args[5]->as<ASTLiteral &>().value.safeGet<UInt64>() > 0;
2018-05-14 11:00:22 +00:00
if (args.size() == 7)
on_duplicate_clause = args[6]->as<ASTLiteral &>().value.safeGet<String>();
2018-05-14 11:00:22 +00:00
if (replace_query && !on_duplicate_clause.empty())
throw Exception(
"Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them",
ErrorCodes::BAD_ARGUMENTS);
2017-12-28 04:28:05 +00:00
/// 3306 is the default MySQL port number
auto parsed_host_port = parseAddress(host_port, 3306);
2019-07-19 13:28:28 +00:00
mysqlxx::Pool pool(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
2017-12-26 21:34:06 +00:00
2017-12-28 04:28:05 +00:00
/// Determine table definition by running a query to INFORMATION_SCHEMA.
Block sample_block
{
{ std::make_shared<DataTypeString>(), "name" },
{ std::make_shared<DataTypeString>(), "type" },
{ std::make_shared<DataTypeUInt8>(), "is_nullable" },
{ std::make_shared<DataTypeUInt8>(), "is_unsigned" },
{ std::make_shared<DataTypeUInt64>(), "length" },
};
WriteBufferFromOwnString query;
query << "SELECT"
" COLUMN_NAME AS name,"
" DATA_TYPE AS type,"
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
2019-07-19 13:28:28 +00:00
" WHERE TABLE_SCHEMA = " << quote << remote_database_name
<< " AND TABLE_NAME = " << quote << remote_table_name
2017-12-28 04:28:05 +00:00
<< " ORDER BY ORDINAL_POSITION";
NamesAndTypesList columns;
MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE);
2017-12-28 04:28:05 +00:00
while (Block block = result.read())
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
columns.emplace_back(
(*block.getByPosition(0).column)[i].safeGet<String>(),
2019-06-14 16:18:48 +00:00
convertMySQLDataType(
2017-12-28 04:28:05 +00:00
(*block.getByPosition(1).column)[i].safeGet<String>(),
2019-02-09 23:53:57 +00:00
(*block.getByPosition(2).column)[i].safeGet<UInt64>() && context.getSettings().external_table_functions_use_nulls,
2017-12-28 04:28:05 +00:00
(*block.getByPosition(3).column)[i].safeGet<UInt64>(),
(*block.getByPosition(4).column)[i].safeGet<UInt64>()));
}
2017-12-26 21:34:06 +00:00
if (columns.empty())
2019-07-19 13:28:28 +00:00
throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
2017-12-26 21:34:06 +00:00
auto res = StorageMySQL::create(
2019-12-04 16:06:55 +00:00
StorageID(getDatabaseName(), table_name),
2017-12-28 04:28:05 +00:00
std::move(pool),
2019-07-19 13:28:28 +00:00
remote_database_name,
remote_table_name,
replace_query,
on_duplicate_clause,
ColumnsDescription{columns},
2019-08-24 21:20:20 +00:00
ConstraintsDescription{},
context);
2017-12-26 21:34:06 +00:00
res->startup();
return res;
}
void registerTableFunctionMySQL(TableFunctionFactory & factory)
{
2017-12-28 04:28:05 +00:00
factory.registerFunction<TableFunctionMySQL>();
}
}
2017-12-28 04:29:53 +00:00
#endif