Merge pull request #5599 from zhang2014/fix_createtable_bug

Add database engine of MySQL type
This commit is contained in:
alexey-milovidov 2019-06-15 14:52:57 +03:00 committed by GitHub
commit 807bfa913e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 798 additions and 62 deletions

View File

@ -0,0 +1,65 @@
#include <Common/convertMySQLDataType.h>
namespace DB
{
ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type)
{
WhichDataType which(data_type);
if (!which.isNullable())
return std::make_shared<ASTIdentifier>(data_type->getName());
return makeASTFunction("Nullable", dataTypeConvertToQuery(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
}
DataTypePtr convertMySQLDataType(const String & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length)
{
DataTypePtr res;
if (mysql_data_type == "tinyint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt8>();
else
res = std::make_shared<DataTypeInt8>();
}
else if (mysql_data_type == "smallint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt16>();
else
res = std::make_shared<DataTypeInt16>();
}
else if (mysql_data_type == "int" || mysql_data_type == "mediumint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt32>();
else
res = std::make_shared<DataTypeInt32>();
}
else if (mysql_data_type == "bigint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt64>();
else
res = std::make_shared<DataTypeInt64>();
}
else if (mysql_data_type == "float")
res = std::make_shared<DataTypeFloat32>();
else if (mysql_data_type == "double")
res = std::make_shared<DataTypeFloat64>();
else if (mysql_data_type == "date")
res = std::make_shared<DataTypeDate>();
else if (mysql_data_type == "datetime" || mysql_data_type == "timestamp")
res = std::make_shared<DataTypeDateTime>();
else if (mysql_data_type == "binary")
res = std::make_shared<DataTypeFixedString>(length);
else
/// Also String is fallback for all unknown types.
res = std::make_shared<DataTypeString>();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
return res;
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Core/Types.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeFixedString.h>
namespace DB
{
/// Convert data type to query. for example
/// DataTypeUInt8 -> ASTIdentifier(UInt8)
/// DataTypeNullable(DataTypeUInt8) -> ASTFunction(ASTIdentifier(UInt8))
ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type);
/// Convert MySQL type to ClickHouse data type.
DataTypePtr convertMySQLDataType(const String & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length);
}

View File

@ -2,22 +2,44 @@
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseDictionary.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/parseAddress.h>
#include <Common/config.h>
#if USE_MYSQL
#include <Databases/DatabaseMySQL.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DATABASE_ENGINE;
}
DatabasePtr DatabaseFactory::get(
const String & engine_name,
const String & database_name,
const String & metadata_path,
const ASTStorage * engine_define,
Context & context)
{
String engine_name = engine_define->engine->name;
if (engine_name != "MySQL" && engine_define->engine->arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by || engine_define->settings)
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
if (engine_name == "Ordinary")
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
else if (engine_name == "Memory")
@ -25,6 +47,29 @@ DatabasePtr DatabaseFactory::get(
else if (engine_name == "Dictionary")
return std::make_shared<DatabaseDictionary>(database_name);
#if USE_MYSQL
else if (engine_name == "MySQL")
{
const ASTFunction * engine = engine_define->engine;
const auto & arguments = engine->arguments->children;
if (arguments.size() != 4)
throw Exception("MySQL Database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.",
ErrorCodes::BAD_ARGUMENTS);
const auto & mysql_host_name = arguments[0]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_database_name = arguments[1]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_user_name = arguments[2]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_user_password = arguments[3]->as<ASTLiteral>()->value.safeGet<String>();
auto parsed_host_port = parseAddress(mysql_host_name, 3306);
return std::make_shared<DatabaseMySQL>(context, database_name, parsed_host_port.first, parsed_host_port.second, mysql_database_name,
mysql_user_name, mysql_user_password);
}
#endif
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}

View File

@ -3,17 +3,18 @@
#include <Common/ThreadPool.h>
#include <Databases/IDatabase.h>
namespace DB
{
class ASTStorage;
class DatabaseFactory
{
public:
static DatabasePtr get(
const String & engine_name,
const String & database_name,
const String & metadata_path,
const ASTStorage * engine_define,
Context & context);
};

View File

@ -0,0 +1,397 @@
#include <Common/config.h>
#if USE_MYSQL
#include <Databases/DatabaseMySQL.h>
#include <Common/parseAddress.h>
#include <IO/Operators.h>
#include <Formats/MySQLBlockInputStream.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Storages/StorageMySQL.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/setThreadName.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/convertMySQLDataType.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TABLE;
}
static constexpr const std::chrono::seconds cleaner_sleep_time{30};
String toQueryStringWithQuote(const std::vector<String> & quote_list)
{
WriteBufferFromOwnString quote_list_query;
quote_list_query << "(";
for (size_t index = 0; index < quote_list.size(); ++index)
{
if (index)
quote_list_query << ",";
quote_list_query << quote << quote_list[index];
}
quote_list_query << ")";
return quote_list_query.str();
}
DatabaseMySQL::DatabaseMySQL(
const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_,
const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_)
: global_context(context_), database_name(database_name_), mysql_host_name(mysql_host_name_), mysql_port(mysql_port_),
mysql_database_name(mysql_database_name_), mysql_user_name(mysql_user_name_), mysql_user_password(mysql_user_password_),
mysql_pool(mysql_database_name, mysql_host_name, mysql_user_name, mysql_user_password, mysql_port)
{
}
bool DatabaseMySQL::empty(const Context &) const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
return local_tables_cache.empty();
}
DatabaseIteratorPtr DatabaseMySQL::getIterator(const Context &)
{
Tables tables;
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
for (const auto & local_table : local_tables_cache)
tables[local_table.first] = local_table.second.storage;
return std::make_unique<DatabaseSnapshotIterator>(tables);
}
bool DatabaseMySQL::isTableExist(const Context & context, const String & name) const
{
return bool(tryGetTable(context, name));
}
StoragePtr DatabaseMySQL::tryGetTable(const Context &, const String & mysql_table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
if (local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
return local_tables_cache[mysql_table_name].storage;
return StoragePtr{};
}
ASTPtr DatabaseMySQL::tryGetCreateTableQuery(const Context &, const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE);
return local_tables_cache[table_name].create_table_query;
}
time_t DatabaseMySQL::getTableMetadataModificationTime(const Context &, const String & table_name)
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE);
return time_t(local_tables_cache[table_name].modification_time);
}
ASTPtr DatabaseMySQL::getCreateDatabaseQuery(const Context &) const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
const auto & storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, makeASTFunction("MySQL",
std::make_shared<ASTLiteral>(mysql_host_name + ":" + toString(mysql_port)), std::make_shared<ASTLiteral>(mysql_database_name),
std::make_shared<ASTLiteral>(mysql_user_name), std::make_shared<ASTLiteral>(mysql_user_password)));
create_query->set(create_query->storage, storage);
return create_query;
}
void DatabaseMySQL::fetchTablesIntoLocalCache() const
{
const auto & tables_with_modification_time = fetchTablesWithModificationTime();
destroyLocalCacheExtraTables(tables_with_modification_time);
fetchLatestTablesStructureIntoCache(tables_with_modification_time);
}
void DatabaseMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const
{
for (auto iterator = local_tables_cache.begin(); iterator != local_tables_cache.end();)
{
if (tables_with_modification_time.find(iterator->first) != tables_with_modification_time.end())
++iterator;
else
{
outdated_tables.emplace_back(iterator->second.storage);
iterator = local_tables_cache.erase(iterator);
}
}
}
void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> &tables_modification_time) const
{
std::vector<String> wait_update_tables_name;
for (const auto & table_modification_time : tables_modification_time)
{
const auto & it = local_tables_cache.find(table_modification_time.first);
/// Outdated or new table structures
if (it == local_tables_cache.end() || table_modification_time.second > it->second.modification_time)
wait_update_tables_name.emplace_back(table_modification_time.first);
}
std::map<String, NamesAndTypesList> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name);
for (const auto & table_and_columns : tables_and_columns)
{
const auto & table_name = table_and_columns.first;
const auto & columns_name_and_type = table_and_columns.second;
const auto & table_modification_time = tables_modification_time.at(table_name);
const auto & iterator = local_tables_cache.find(table_name);
if (iterator != local_tables_cache.end())
{
outdated_tables.emplace_back(iterator->second.storage);
local_tables_cache.erase(iterator);
}
local_tables_cache[table_name] = createStorageInfo(table_name, columns_name_and_type, table_modification_time);
}
}
static ASTPtr getTableColumnsCreateQuery(const NamesAndTypesList & names_and_types_list)
{
const auto & table_columns_list_ast = std::make_shared<ASTColumns>();
const auto & columns_expression_list = std::make_shared<ASTExpressionList>();
for (const auto & table_column_name_and_type : names_and_types_list)
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = table_column_name_and_type.name;
column_declaration->type = dataTypeConvertToQuery(table_column_name_and_type.type);
columns_expression_list->children.emplace_back(column_declaration);
}
table_columns_list_ast->set(table_columns_list_ast->columns, columns_expression_list);
return table_columns_list_ast;
}
static ASTPtr getTableStorageCreateQuery(
const String & host_name, const UInt16 & port,
const String & database_name, const String & table_name,
const String & user_name, const String & password)
{
const auto & table_storage = std::make_shared<ASTStorage>();
const auto & storage_engine = std::make_shared<ASTFunction>();
storage_engine->name = "MySQL";
storage_engine->arguments = std::make_shared<ASTExpressionList>();
storage_engine->children.push_back(storage_engine->arguments);
storage_engine->arguments->children = {
std::make_shared<ASTLiteral>(host_name + ":" + toString(port)),
std::make_shared<ASTLiteral>(database_name), std::make_shared<ASTLiteral>(table_name),
std::make_shared<ASTLiteral>(user_name), std::make_shared<ASTLiteral>(password)
};
table_storage->set(table_storage->engine, storage_engine);
return table_storage;
}
DatabaseMySQL::MySQLStorageInfo DatabaseMySQL::createStorageInfo(
const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const
{
const auto & mysql_table = StorageMySQL::create(
table_name, std::move(mysql_pool), mysql_database_name, table_name,
false, "", ColumnsDescription{columns_name_and_type}, global_context);
const auto & create_table_query = std::make_shared<ASTCreateQuery>();
create_table_query->table = table_name;
create_table_query->database = database_name;
create_table_query->set(create_table_query->columns_list, getTableColumnsCreateQuery(columns_name_and_type));
create_table_query->set(create_table_query->storage, getTableStorageCreateQuery(
mysql_host_name, mysql_port, mysql_database_name, table_name, mysql_user_name, mysql_user_password));
MySQLStorageInfo storage_info;
storage_info.storage = mysql_table;
storage_info.create_table_query = create_table_query;
storage_info.modification_time = table_modification_time;
return storage_info;
}
std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime() const
{
Block tables_status_sample_block
{
{ std::make_shared<DataTypeString>(), "table_name" },
{ std::make_shared<DataTypeDateTime>(), "modification_time" },
};
WriteBufferFromOwnString query;
query << "SELECT"
" TABLE_NAME AS table_name, "
" CREATE_TIME AS modification_time "
" FROM INFORMATION_SCHEMA.TABLES "
" WHERE TABLE_SCHEMA = " << quote << mysql_database_name;
std::map<String, UInt64> tables_with_modification_time;
MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
size_t rows = block.rows();
for (size_t index = 0; index < rows; ++index)
{
String table_name = (*block.getByPosition(0).column)[index].safeGet<String>();
tables_with_modification_time[table_name] = (*block.getByPosition(1).column)[index].safeGet<UInt64>();
}
}
return tables_with_modification_time;
}
std::map<String, NamesAndTypesList> DatabaseMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name) const
{
std::map<String, NamesAndTypesList> tables_and_columns;
if (tables_name.empty())
return tables_and_columns;
Block tables_columns_sample_block
{
{ std::make_shared<DataTypeString>(), "table_name" },
{ std::make_shared<DataTypeString>(), "column_name" },
{ std::make_shared<DataTypeString>(), "column_type" },
{ std::make_shared<DataTypeUInt8>(), "is_nullable" },
{ std::make_shared<DataTypeUInt8>(), "is_unsigned" },
{ std::make_shared<DataTypeUInt64>(), "length" },
};
WriteBufferFromOwnString query;
query << "SELECT "
" TABLE_NAME AS table_name,"
" COLUMN_NAME AS column_name,"
" DATA_TYPE AS column_type,"
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << mysql_database_name
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls;
MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
{
String table_name = (*block.getByPosition(0).column)[i].safeGet<String>();
tables_and_columns[table_name].emplace_back((*block.getByPosition(1).column)[i].safeGet<String>(),
convertMySQLDataType(
(*block.getByPosition(2).column)[i].safeGet<String>(),
(*block.getByPosition(3).column)[i].safeGet<UInt64>() &&
external_table_functions_use_nulls,
(*block.getByPosition(4).column)[i].safeGet<UInt64>(),
(*block.getByPosition(5).column)[i].safeGet<UInt64>()));
}
}
return tables_and_columns;
}
void DatabaseMySQL::shutdown()
{
std::map<String, MySQLStorageInfo> tables_snapshot;
{
std::lock_guard lock(mutex);
tables_snapshot = local_tables_cache;
}
for (const auto & table_snapshot : tables_snapshot)
table_snapshot.second.storage->shutdown();
std::lock_guard lock(mutex);
local_tables_cache.clear();
}
void DatabaseMySQL::cleanOutdatedTables()
{
setThreadName("MySQLDBCleaner");
std::unique_lock lock{mutex};
while (!quit.load(std::memory_order_relaxed))
{
for (auto iterator = outdated_tables.begin(); iterator != outdated_tables.end();)
{
if (!iterator->unique())
++iterator;
else
{
const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY);
(*iterator)->shutdown();
(*iterator)->is_dropped = true;
iterator = outdated_tables.erase(iterator);
}
}
cond.wait_for(lock, cleaner_sleep_time);
}
}
DatabaseMySQL::~DatabaseMySQL()
{
try
{
if (!quit)
{
{
quit = true;
std::lock_guard lock{mutex};
}
cond.notify_one();
thread.join();
}
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
#endif

View File

@ -0,0 +1,125 @@
#pragma once
#include <Common/config.h>
#if USE_MYSQL
#include <mysqlxx/Pool.h>
#include <Databases/DatabasesCommon.h>
namespace DB
{
/** Real-time access to table list and table structure from remote MySQL
* It doesn't make any manipulations with filesystem.
* All tables are created by calling code after real-time pull-out structure from remote MySQL
*/
class DatabaseMySQL : public IDatabase
{
public:
~DatabaseMySQL() override;
DatabaseMySQL(const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_,
const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_);
String getEngineName() const override { return "MySQL"; }
String getDatabaseName() const override { return database_name; }
bool empty(const Context & context) const override;
DatabaseIteratorPtr getIterator(const Context & context) override;
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
bool isTableExist(const Context & context, const String & name) const override;
StoragePtr tryGetTable(const Context & context, const String & name) const override;
ASTPtr tryGetCreateTableQuery(const Context & context, const String & name) const override;
time_t getTableMetadataModificationTime(const Context & context, const String & name) override;
void shutdown() override;
StoragePtr detachTable(const String &) override
{
throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void loadTables(Context &, ThreadPool *, bool) override
{
/// do nothing
}
void removeTable(const Context &, const String &) override
{
throw Exception("MySQL database engine does not support remove table.", ErrorCodes::NOT_IMPLEMENTED);
}
void attachTable(const String &, const StoragePtr &) override
{
throw Exception("MySQL database engine does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void renameTable(const Context &, const String &, IDatabase &, const String &) override
{
throw Exception("MySQL database engine does not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
}
void createTable(const Context &, const String &, const StoragePtr &, const ASTPtr &) override
{
throw Exception("MySQL database engine does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
}
void alterTable(const Context &, const String &, const ColumnsDescription &, const IndicesDescription &, const ASTModifier &) override
{
throw Exception("MySQL database engine does not support alter table.", ErrorCodes::NOT_IMPLEMENTED);
}
private:
struct MySQLStorageInfo
{
StoragePtr storage;
UInt64 modification_time;
ASTPtr create_table_query;
};
const Context global_context;
const String database_name;
const String mysql_host_name;
const UInt16 mysql_port;
const String mysql_database_name;
const String mysql_user_name;
const String mysql_user_password;
mutable std::mutex mutex;
std::atomic<bool> quit{false};
std::condition_variable cond;
mutable mysqlxx::Pool mysql_pool;
mutable std::vector<StoragePtr> outdated_tables;
mutable std::map<String, MySQLStorageInfo> local_tables_cache;
void cleanOutdatedTables();
void fetchTablesIntoLocalCache() const;
std::map<String, UInt64> fetchTablesWithModificationTime() const;
DatabaseMySQL::MySQLStorageInfo createStorageInfo(
const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time) const;
ThreadFromGlobalPool thread{&DatabaseMySQL::cleanOutdatedTables, this};
};
}
#endif

View File

@ -110,9 +110,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
const ASTStorage & storage = *create.storage;
const ASTFunction & engine = *storage.engine;
/// Currently, there are no database engines, that support any arguments.
if (engine.arguments || engine.parameters || storage.partition_by || storage.primary_key
|| storage.order_by || storage.sample_by || storage.settings ||
(create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{
std::stringstream ostr;
formatAST(storage, ostr, false, false);
@ -129,7 +127,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
String metadata_path = path + "metadata/" + database_name_escaped + "/";
Poco::File(metadata_path).createDirectory();
DatabasePtr database = DatabaseFactory::get(database_engine_name, database_name, metadata_path, context);
DatabasePtr database = DatabaseFactory::get(database_name, metadata_path, create.storage, context);
/// Will write file with database metadata, if needed.
String metadata_file_tmp_path = path + "metadata/" + database_name_escaped + ".sql.tmp";

View File

@ -19,6 +19,7 @@
#include <Common/Exception.h>
#include <Common/parseAddress.h>
#include <Common/typeid_cast.h>
#include <Common/convertMySQLDataType.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -32,56 +33,7 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
DataTypePtr getDataType(const String & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length)
{
DataTypePtr res;
if (mysql_data_type == "tinyint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt8>();
else
res = std::make_shared<DataTypeInt8>();
}
else if (mysql_data_type == "smallint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt16>();
else
res = std::make_shared<DataTypeInt16>();
}
else if (mysql_data_type == "int" || mysql_data_type == "mediumint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt32>();
else
res = std::make_shared<DataTypeInt32>();
}
else if (mysql_data_type == "bigint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt64>();
else
res = std::make_shared<DataTypeInt64>();
}
else if (mysql_data_type == "float")
res = std::make_shared<DataTypeFloat32>();
else if (mysql_data_type == "double")
res = std::make_shared<DataTypeFloat64>();
else if (mysql_data_type == "date")
res = std::make_shared<DataTypeDate>();
else if (mysql_data_type == "datetime" || mysql_data_type == "timestamp")
res = std::make_shared<DataTypeDateTime>();
else if (mysql_data_type == "binary")
res = std::make_shared<DataTypeFixedString>(length);
else
/// Also String is fallback for all unknown types.
res = std::make_shared<DataTypeString>();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
return res;
extern const int UNKNOWN_TABLE;
}
@ -147,16 +99,15 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
<< " AND TABLE_NAME = " << quote << table_name
<< " ORDER BY ORDINAL_POSITION";
MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE);
NamesAndTypesList columns;
MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
columns.emplace_back(
(*block.getByPosition(0).column)[i].safeGet<String>(),
getDataType(
convertMySQLDataType(
(*block.getByPosition(1).column)[i].safeGet<String>(),
(*block.getByPosition(2).column)[i].safeGet<UInt64>() && context.getSettings().external_table_functions_use_nulls,
(*block.getByPosition(3).column)[i].safeGet<UInt64>(),
@ -164,6 +115,9 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
}
if (columns.empty())
throw Exception("MySQL table `" + database_name + "`.`" + table_name + "` doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
auto res = StorageMySQL::create(
table_name,
std::move(pool),

View File

@ -0,0 +1,12 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,113 @@
from contextlib import contextmanager
import time
import pytest
## sudo -H pip install PyMySQL
import pymysql.cursors
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql = True)
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
`age` int NOT NULL default 0,
`money` int NOT NULL default 0,
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
drop_table_sql_template = "DROP TABLE `clickhouse`.`{}`"
add_column_sql_template = "ALTER TABLE `clickhouse`.`{}` ADD COLUMN `pid` int(11)"
del_column_sql_template = "ALTER TABLE `clickhouse`.`{}` DROP COLUMN `pid`"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_mysql_conn()
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
node1.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql1:3306', 'clickhouse', 'root', 'clickhouse')")
yield cluster
finally:
cluster.shutdown()
def test_sync_tables_list_between_clickhouse_and_mysql(started_cluster):
mysql_connection = get_mysql_conn()
assert node1.query('SHOW TABLES FROM clickhouse_mysql FORMAT TSV').rstrip() == ''
create_mysql_table(mysql_connection, 'first_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'first_mysql_table' FORMAT TSV").rstrip() == 'first_mysql_table'
create_mysql_table(mysql_connection, 'second_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == 'second_mysql_table'
drop_mysql_table(mysql_connection, 'second_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == ''
mysql_connection.close()
def test_sync_tables_structure_between_clickhouse_and_mysql(started_cluster):
mysql_connection = get_mysql_conn()
create_mysql_table(mysql_connection, 'test_sync_column')
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == ''
time.sleep(3)
add_mysql_table_column(mysql_connection, "test_sync_column")
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == 'pid'
time.sleep(3)
drop_mysql_table_column(mysql_connection, "test_sync_column")
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == ''
mysql_connection.close()
def test_insert_select(started_cluster):
mysql_connection = get_mysql_conn()
create_mysql_table(mysql_connection, 'test_insert_select')
assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '0'
node1.query("INSERT INTO `clickhouse_mysql`.{}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format('test_insert_select'))
assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '10000'
assert node1.query("SELECT sum(money) FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '30000'
mysql_connection.close()
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(drop_table_sql_template.format(tableName))
def add_mysql_table_column(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(add_column_sql_template.format(tableName))
def drop_mysql_table_column(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(del_column_sql_template.format(tableName))