Merge branch 'master' into libressl-to-openssl

This commit is contained in:
Alexey Milovidov 2019-12-15 02:01:56 +03:00
commit 1e887162a3
28 changed files with 609 additions and 374 deletions

View File

@ -474,6 +474,7 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_PRIVILEGES = 497;
extern const int LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED = 498;
extern const int S3_ERROR = 499;
extern const int CANNOT_CREATE_DATABASE = 500;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -9,6 +9,9 @@
#include <Parsers/ASTFunction.h>
#include <Common/parseAddress.h>
#include "config_core.h"
#include "DatabaseFactory.h"
#include <Poco/File.h>
#if USE_MYSQL
#include <Databases/DatabaseMySQL.h>
@ -21,15 +24,32 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int CANNOT_CREATE_DATABASE;
}
DatabasePtr DatabaseFactory::get(
const String & database_name,
const String & metadata_path,
const ASTStorage * engine_define,
Context & context)
const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context)
{
try
{
Poco::File(metadata_path).createDirectory();
return getImpl(database_name, metadata_path, engine_define, context);
}
catch (...)
{
Poco::File metadata_dir(metadata_path);
if (metadata_dir.exists())
metadata_dir.remove(true);
throw;
}
}
DatabasePtr DatabaseFactory::getImpl(
const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context)
{
String engine_name = engine_define->engine->name;
@ -55,20 +75,31 @@ DatabasePtr DatabaseFactory::get(
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 4)
throw Exception(
"MySQL Database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.",
ErrorCodes::BAD_ARGUMENTS);
throw Exception("MySQL Database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.",
ErrorCodes::BAD_ARGUMENTS);
const auto & arguments = engine->arguments->children;
const auto & mysql_host_name = arguments[0]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_database_name = arguments[1]->as<ASTLiteral>()->value.safeGet<String>();
const auto & host_name_and_port = arguments[0]->as<ASTLiteral>()->value.safeGet<String>();
const auto & database_name_in_mysql = arguments[1]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_user_name = arguments[2]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_user_password = arguments[3]->as<ASTLiteral>()->value.safeGet<String>();
auto parsed_host_port = parseAddress(mysql_host_name, 3306);
return std::make_shared<DatabaseMySQL>(context, database_name, parsed_host_port.first, parsed_host_port.second, mysql_database_name,
mysql_user_name, mysql_user_password);
try
{
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
auto mysql_pool = mysqlxx::Pool(database_name_in_mysql, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
auto mysql_database = std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, database_name_in_mysql, std::move(mysql_pool));
mysql_database->empty(context); /// test database is works fine.
return mysql_database;
}
catch (...)
{
const auto & exception_message = getCurrentExceptionMessage(true);
throw Exception("Cannot create MySQL database, because " + exception_message, ErrorCodes::CANNOT_CREATE_DATABASE);
}
}
#endif

View File

@ -11,11 +11,9 @@ class ASTStorage;
class DatabaseFactory
{
public:
static DatabasePtr get(
const String & database_name,
const String & metadata_path,
const ASTStorage * engine_define,
Context & context);
static DatabasePtr get(const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context);
static DatabasePtr getImpl(const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context);
};
}

View File

@ -1,6 +1,8 @@
#include "config_core.h"
#if USE_MYSQL
#include <string>
#include <Databases/DatabaseMySQL.h>
#include <Common/parseAddress.h>
#include <IO/Operators.h>
@ -12,12 +14,19 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Storages/StorageMySQL.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/setThreadName.h>
#include <Common/escapeForFileName.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCreateQuery.h>
#include <DataTypes/convertMySQLDataType.h>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
namespace DB
{
@ -25,8 +34,13 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_TABLE;
extern const int TABLE_IS_DROPPED;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int TABLE_ALREADY_EXISTS;
extern const int UNEXPECTED_AST_STRUCTURE;
}
constexpr static const auto suffix = ".remove_flag";
static constexpr const std::chrono::seconds cleaner_sleep_time{30};
String toQueryStringWithQuote(const std::vector<String> & quote_list)
@ -47,11 +61,10 @@ String toQueryStringWithQuote(const std::vector<String> & quote_list)
}
DatabaseMySQL::DatabaseMySQL(
const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_,
const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_)
: global_context(context_), database_name(database_name_), mysql_host_name(mysql_host_name_), mysql_port(mysql_port_),
mysql_database_name(mysql_database_name_), mysql_user_name(mysql_user_name_), mysql_user_password(mysql_user_password_),
mysql_pool(mysql_database_name, mysql_host_name, mysql_user_name, mysql_user_password, mysql_port)
const Context & global_context_, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, mysqlxx::Pool && pool)
: global_context(global_context_), database_name(database_name_), metadata_path(metadata_path_),
database_engine_define(database_engine_define_->clone()), database_name_in_mysql(database_name_in_mysql_), mysql_pool(std::move(pool))
{
}
@ -61,7 +74,14 @@ bool DatabaseMySQL::empty(const Context &) const
fetchTablesIntoLocalCache();
return local_tables_cache.empty();
if (local_tables_cache.empty())
return true;
for (const auto & [table_name, storage_info] : local_tables_cache)
if (!remove_or_detach_tables.count(table_name))
return false;
return true;
}
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
@ -71,9 +91,9 @@ DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, cons
fetchTablesIntoLocalCache();
for (const auto & local_table : local_tables_cache)
if (!filter_by_table_name || filter_by_table_name(local_table.first))
tables[local_table.first] = local_table.second.storage;
for (const auto & [table_name, modify_time_and_storage] : local_tables_cache)
if (!remove_or_detach_tables.count(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
tables[table_name] = modify_time_and_storage.second;
return std::make_unique<DatabaseTablesSnapshotIterator>(tables);
}
@ -89,12 +109,47 @@ StoragePtr DatabaseMySQL::tryGetTable(const Context &, const String & mysql_tabl
fetchTablesIntoLocalCache();
if (local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
return local_tables_cache[mysql_table_name].storage;
if (!remove_or_detach_tables.count(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
return local_tables_cache[mysql_table_name].second;
return StoragePtr{};
}
static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & database_engine_define)
{
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_define->clone();
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
{
/// init create query.
create_table_query->table = storage->getTableName();
create_table_query->database = storage->getDatabaseName();
for (const auto & column_type_and_name : storage->getColumns().getOrdinary())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
column_declaration->type = dataTypeConvertToQuery(column_type_and_name.type);
columns_expression_list->children.emplace_back(column_declaration);
}
auto mysql_table_name = std::make_shared<ASTLiteral>(storage->getTableName());
auto storage_engine_arguments = table_storage_define->as<ASTStorage>()->engine->arguments;
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
}
return create_table_query;
}
ASTPtr DatabaseMySQL::tryGetCreateTableQuery(const Context &, const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
@ -102,9 +157,9 @@ ASTPtr DatabaseMySQL::tryGetCreateTableQuery(const Context &, const String & tab
fetchTablesIntoLocalCache();
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE);
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return local_tables_cache[table_name].create_table_query;
return getCreateQueryFromStorage(local_tables_cache[table_name].second, database_engine_define);
}
time_t DatabaseMySQL::getObjectMetadataModificationTime(const Context &, const String & table_name)
@ -114,22 +169,16 @@ time_t DatabaseMySQL::getObjectMetadataModificationTime(const Context &, const S
fetchTablesIntoLocalCache();
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE);
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return time_t(local_tables_cache[table_name].modification_time);
return time_t(local_tables_cache[table_name].first);
}
ASTPtr DatabaseMySQL::getCreateDatabaseQuery(const Context &) const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
const auto & storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, makeASTFunction("MySQL",
std::make_shared<ASTLiteral>(mysql_host_name + ":" + toString(mysql_port)), std::make_shared<ASTLiteral>(mysql_database_name),
std::make_shared<ASTLiteral>(mysql_user_name), std::make_shared<ASTLiteral>(mysql_user_password)));
create_query->set(create_query->storage, storage);
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
@ -149,7 +198,7 @@ void DatabaseMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64>
++iterator;
else
{
outdated_tables.emplace_back(iterator->second.storage);
outdated_tables.emplace_back(iterator->second.second);
iterator = local_tables_cache.erase(iterator);
}
}
@ -163,7 +212,7 @@ void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, U
const auto & it = local_tables_cache.find(table_modification_time.first);
/// Outdated or new table structures
if (it == local_tables_cache.end() || table_modification_time.second > it->second.modification_time)
if (it == local_tables_cache.end() || table_modification_time.second > it->second.first)
wait_update_tables_name.emplace_back(table_modification_time.first);
}
@ -178,77 +227,16 @@ void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, U
const auto & iterator = local_tables_cache.find(table_name);
if (iterator != local_tables_cache.end())
{
outdated_tables.emplace_back(iterator->second.storage);
outdated_tables.emplace_back(iterator->second.second);
local_tables_cache.erase(iterator);
}
local_tables_cache[table_name] = createStorageInfo(table_name, columns_name_and_type, table_modification_time);
local_tables_cache[table_name] = std::make_pair(table_modification_time, StorageMySQL::create(
database_name, table_name, std::move(mysql_pool), database_name_in_mysql, table_name,
false, "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context));
}
}
static ASTPtr getTableColumnsCreateQuery(const NamesAndTypesList & names_and_types_list)
{
const auto & table_columns_list_ast = std::make_shared<ASTColumns>();
const auto & columns_expression_list = std::make_shared<ASTExpressionList>();
for (const auto & table_column_name_and_type : names_and_types_list)
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = table_column_name_and_type.name;
column_declaration->type = dataTypeConvertToQuery(table_column_name_and_type.type);
columns_expression_list->children.emplace_back(column_declaration);
}
table_columns_list_ast->set(table_columns_list_ast->columns, columns_expression_list);
return table_columns_list_ast;
}
static ASTPtr getTableStorageCreateQuery(
const String & host_name, const UInt16 & port,
const String & database_name, const String & table_name,
const String & user_name, const String & password)
{
const auto & table_storage = std::make_shared<ASTStorage>();
const auto & storage_engine = std::make_shared<ASTFunction>();
storage_engine->name = "MySQL";
storage_engine->arguments = std::make_shared<ASTExpressionList>();
storage_engine->children.push_back(storage_engine->arguments);
storage_engine->arguments->children = {
std::make_shared<ASTLiteral>(host_name + ":" + toString(port)),
std::make_shared<ASTLiteral>(database_name), std::make_shared<ASTLiteral>(table_name),
std::make_shared<ASTLiteral>(user_name), std::make_shared<ASTLiteral>(password)
};
table_storage->set(table_storage->engine, storage_engine);
return table_storage;
}
DatabaseMySQL::MySQLStorageInfo DatabaseMySQL::createStorageInfo(
const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const
{
const auto & mysql_table = StorageMySQL::create(
database_name, table_name, std::move(mysql_pool), mysql_database_name, table_name,
false, "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context);
const auto & create_table_query = std::make_shared<ASTCreateQuery>();
create_table_query->table = table_name;
create_table_query->database = database_name;
create_table_query->set(create_table_query->columns_list, getTableColumnsCreateQuery(columns_name_and_type));
create_table_query->set(create_table_query->storage, getTableStorageCreateQuery(
mysql_host_name, mysql_port, mysql_database_name, table_name, mysql_user_name, mysql_user_password));
MySQLStorageInfo storage_info;
storage_info.storage = mysql_table;
storage_info.create_table_query = create_table_query;
storage_info.modification_time = table_modification_time;
return storage_info;
}
std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime() const
{
Block tables_status_sample_block
@ -262,7 +250,7 @@ std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime() const
" TABLE_NAME AS table_name, "
" CREATE_TIME AS modification_time "
" FROM INFORMATION_SCHEMA.TABLES "
" WHERE TABLE_SCHEMA = " << quote << mysql_database_name;
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql;
std::map<String, UInt64> tables_with_modification_time;
MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE);
@ -306,7 +294,7 @@ std::map<String, NamesAndTypesList> DatabaseMySQL::fetchTablesColumnsList(const
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << mysql_database_name
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls;
@ -331,19 +319,24 @@ std::map<String, NamesAndTypesList> DatabaseMySQL::fetchTablesColumnsList(const
void DatabaseMySQL::shutdown()
{
std::map<String, MySQLStorageInfo> tables_snapshot;
std::map<String, ModifyTimeAndStorage> tables_snapshot;
{
std::lock_guard lock(mutex);
tables_snapshot = local_tables_cache;
}
for (const auto & table_snapshot : tables_snapshot)
table_snapshot.second.storage->shutdown();
for (const auto & [table_name, modify_time_and_storage] : tables_snapshot)
modify_time_and_storage.second->shutdown();
std::lock_guard lock(mutex);
local_tables_cache.clear();
}
void DatabaseMySQL::drop()
{
Poco::File(getMetadataPath()).remove(true);
}
void DatabaseMySQL::cleanOutdatedTables()
{
setThreadName("MySQLDBCleaner");
@ -370,6 +363,98 @@ void DatabaseMySQL::cleanOutdatedTables()
}
}
void DatabaseMySQL::attachTable(const String & table_name, const StoragePtr & storage)
{
std::lock_guard<std::mutex> lock{mutex};
if (!local_tables_cache.count(table_name))
throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
" because it does not exist.", ErrorCodes::UNKNOWN_TABLE);
if (!remove_or_detach_tables.count(table_name))
throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
" because it already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
/// We use the new storage to replace the original storage, because the original storage may have been dropped
/// Although we still keep its
local_tables_cache[table_name].second = storage;
remove_or_detach_tables.erase(table_name);
Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
if (remove_flag.exists())
remove_flag.remove();
}
StoragePtr DatabaseMySQL::detachTable(const String & table_name)
{
std::lock_guard<std::mutex> lock{mutex};
if (remove_or_detach_tables.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
ErrorCodes::TABLE_IS_DROPPED);
if (!local_tables_cache.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
remove_or_detach_tables.emplace(table_name);
return local_tables_cache[table_name].second;
}
String DatabaseMySQL::getMetadataPath() const
{
return metadata_path;
}
void DatabaseMySQL::loadStoredObjects(Context &, bool)
{
std::lock_guard<std::mutex> lock{mutex};
Poco::DirectoryIterator iterator(getMetadataPath());
for (Poco::DirectoryIterator end; iterator != end; ++iterator)
{
if (iterator->isFile() && endsWith(iterator.name(), suffix))
{
const auto & filename = iterator.name();
const auto & table_name = unescapeForFileName(filename.substr(0, filename.size() - strlen(suffix)));
remove_or_detach_tables.emplace(table_name);
}
}
}
void DatabaseMySQL::removeTable(const Context &, const String & table_name)
{
std::lock_guard<std::mutex> lock{mutex};
Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
if (remove_or_detach_tables.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
ErrorCodes::TABLE_IS_DROPPED);
if (remove_flag.exists())
throw Exception("The remove flag file already exists but the " + backQuoteIfNeed(getDatabaseName()) +
"." + backQuoteIfNeed(table_name) + " does not exists remove tables, it is bug.", ErrorCodes::LOGICAL_ERROR);
if (!local_tables_cache.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
remove_or_detach_tables.emplace(table_name);
try
{
remove_flag.createFile();
}
catch (...)
{
remove_or_detach_tables.erase(table_name);
throw;
}
}
DatabaseMySQL::~DatabaseMySQL()
{
try
@ -392,6 +477,27 @@ DatabaseMySQL::~DatabaseMySQL()
}
}
void DatabaseMySQL::createTable(const Context & context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
{
const auto & create = create_query->as<ASTCreateQuery>();
if (!create->attach)
throw Exception("MySQL database engine does not support create table. for tables that were detach or dropped before, "
"you can use attach to add them back to the MySQL database", ErrorCodes::NOT_IMPLEMENTED);
/// XXX: hack
/// In order to prevent users from broken the table structure by executing attach table database_name.table_name (...)
/// we should compare the old and new create_query to make them completely consistent
const auto & origin_create_query = getCreateTableQuery(context, table_name);
origin_create_query->as<ASTCreateQuery>()->attach = true;
if (queryToString(origin_create_query) != queryToString(create_query))
throw Exception("The MySQL database engine can only execute attach statements of type attach table database_name.table_name",
ErrorCodes::UNEXPECTED_AST_STRUCTURE);
attachTable(table_name, storage);
}
}
#endif

View File

@ -7,6 +7,7 @@
#include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h>
#include <memory>
#include <Parsers/ASTCreateQuery.h>
namespace DB
@ -21,8 +22,9 @@ class DatabaseMySQL : public IDatabase
public:
~DatabaseMySQL() override;
DatabaseMySQL(const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_,
const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_);
DatabaseMySQL(
const Context & global_context, const String & database_name, const String & metadata_path,
const ASTStorage * database_engine_define, const String & database_name_in_mysql, mysqlxx::Pool && pool);
String getEngineName() const override { return "MySQL"; }
@ -54,82 +56,63 @@ public:
ASTPtr tryGetCreateDictionaryQuery(const Context &, const String &) const override { return nullptr; }
time_t getObjectMetadataModificationTime(const Context & context, const String & name) override;
void shutdown() override;
StoragePtr detachTable(const String &) override
{
throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void drop() override;
String getMetadataPath() const override;
void createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void loadStoredObjects(Context &, bool) override;
StoragePtr detachTable(const String & table_name) override;
void removeTable(const Context &, const String & table_name) override;
void attachTable(const String & table_name, const StoragePtr & storage) override;
void detachDictionary(const String &, const Context &, bool) override
{
throw Exception("MySQL database engine does not support detach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
void loadStoredObjects(Context &, bool) override
{
/// do nothing
}
void removeTable(const Context &, const String &) override
{
throw Exception("MySQL database engine does not support remove table.", ErrorCodes::NOT_IMPLEMENTED);
}
void removeDictionary(const Context &, const String &) override
{
throw Exception("MySQL database engine does not support remove dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
void attachTable(const String &, const StoragePtr &) override
{
throw Exception("MySQL database engine does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void attachDictionary(const String &, const Context &, bool) override
{
throw Exception("MySQL database engine does not support attach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
void createTable(const Context &, const String &, const StoragePtr &, const ASTPtr &) override
{
throw Exception("MySQL database engine does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
}
void createDictionary(const Context &, const String &, const ASTPtr &) override
{
throw Exception("MySQL database engine does not support create dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
private:
struct MySQLStorageInfo
{
StoragePtr storage;
UInt64 modification_time;
ASTPtr create_table_query;
};
const Context global_context;
const String database_name;
const String mysql_host_name;
const UInt16 mysql_port;
const String mysql_database_name;
const String mysql_user_name;
const String mysql_user_password;
Context global_context;
String database_name;
String metadata_path;
ASTPtr database_engine_define;
String database_name_in_mysql;
mutable std::mutex mutex;
std::atomic<bool> quit{false};
std::condition_variable cond;
mutable mysqlxx::Pool mysql_pool;
mutable std::vector<StoragePtr> outdated_tables;
mutable std::map<String, MySQLStorageInfo> local_tables_cache;
using MySQLPool = mysqlxx::Pool;
using ModifyTimeAndStorage = std::pair<UInt64, StoragePtr>;
mutable MySQLPool mysql_pool;
mutable std::vector<StoragePtr> outdated_tables;
mutable std::map<String, ModifyTimeAndStorage> local_tables_cache;
std::unordered_set<String> remove_or_detach_tables;
void cleanOutdatedTables();
@ -137,9 +120,6 @@ private:
std::map<String, UInt64> fetchTablesWithModificationTime() const;
DatabaseMySQL::MySQLStorageInfo createStorageInfo(
const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;

View File

@ -40,7 +40,7 @@ protected:
UInt64 redirects { 0 };
Poco::URI initial_uri;
const ConnectionTimeouts & timeouts;
DB::SettingUInt64 max_redirects;
SettingUInt64 max_redirects;
public:
virtual void buildNewSession(const Poco::URI & uri) = 0;
@ -244,7 +244,7 @@ public:
const std::string & method_ = {},
OutStreamCallback out_stream_callback_ = {},
const ConnectionTimeouts & timeouts = {},
const DB::SettingUInt64 max_redirects = 0,
const SettingUInt64 max_redirects = 0,
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const HTTPHeaderEntries & http_header_entries_ = {},
@ -289,7 +289,7 @@ public:
const ConnectionTimeouts & timeouts_ = {},
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const DB::SettingUInt64 max_redirects = 0,
const SettingUInt64 max_redirects = 0,
size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
: Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
uri_,

View File

@ -2,6 +2,7 @@
#include <Core/Names.h>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Parsers/IAST_fwd.h>
#include <memory>
@ -36,7 +37,23 @@ struct DatabaseAndTableWithAlias
bool satisfies(const DatabaseAndTableWithAlias & table, bool table_may_be_an_alias);
};
using TableWithColumnNames = std::pair<DatabaseAndTableWithAlias, Names>;
struct TableWithColumnNames
{
DatabaseAndTableWithAlias table;
Names columns;
Names hidden_columns;
TableWithColumnNames(const DatabaseAndTableWithAlias & table_, const Names & columns_)
: table(table_)
, columns(columns_)
{}
void addHiddenColumns(const NamesAndTypesList & addition)
{
for (auto & column : addition)
hidden_columns.push_back(column.name);
}
};
std::vector<DatabaseAndTableWithAlias> getDatabaseAndTables(const ASTSelectQuery & select_query, const String & current_database);
std::optional<DatabaseAndTableWithAlias> getDatabaseAndTable(const ASTSelectQuery & select, size_t table_number);

View File

@ -18,11 +18,12 @@ void FindIdentifierBestTableData::visit(ASTIdentifier & identifier, ASTPtr &)
{
for (const auto & table_names : tables)
{
if (std::find(table_names.second.begin(), table_names.second.end(), identifier.name) != table_names.second.end())
auto & columns = table_names.columns;
if (std::find(columns.begin(), columns.end(), identifier.name) != columns.end())
{
// TODO: make sure no collision ever happens
if (!best_table)
best_table = &table_names.first;
best_table = &table_names.table;
}
}
}
@ -30,7 +31,7 @@ void FindIdentifierBestTableData::visit(ASTIdentifier & identifier, ASTPtr &)
{
size_t best_table_pos = 0;
if (IdentifierSemantic::chooseTable(identifier, tables, best_table_pos))
best_table = &tables[best_table_pos].first;
best_table = &tables[best_table_pos].table;
}
identifier_table.emplace_back(&identifier, best_table);

View File

@ -20,7 +20,7 @@ const DatabaseAndTableWithAlias & extractTable(const DatabaseAndTableWithAlias &
const DatabaseAndTableWithAlias & extractTable(const TableWithColumnNames & table)
{
return table.first;
return table.table;
}
template <typename T>

View File

@ -127,8 +127,6 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
/// Create directories for tables metadata.
String path = context.getPath();
String metadata_path = path + "metadata/" + database_name_escaped + "/";
Poco::File(metadata_path).createDirectory();
DatabasePtr database = DatabaseFactory::get(database_name, metadata_path, create.storage, context);
/// Will write file with database metadata, if needed.

View File

@ -1417,8 +1417,14 @@ void InterpreterSelectQuery::executeFetchColumns(
auto column = ColumnAggregateFunction::create(func);
column->insertFrom(place);
auto header = analysis_result.before_aggregation->getSampleBlock();
size_t arguments_size = desc->argument_names.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = header.getByName(desc->argument_names[j]).type;
Block block_with_count{
{std::move(column), std::make_shared<DataTypeAggregateFunction>(func, DataTypes(), Array()), desc->column_name}};
{std::move(column), std::make_shared<DataTypeAggregateFunction>(func, argument_types, desc->parameters), desc->column_name}};
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
if constexpr (pipeline_with_processors)

View File

@ -87,7 +87,8 @@ bool PredicateExpressionsOptimizer::optimizeImpl(
/// split predicate with `and`
std::vector<ASTPtr> outer_predicate_expressions = splitConjunctionPredicate(outer_expression);
std::vector<TableWithColumnNames> tables_with_columns = getDatabaseAndTablesWithColumnNames(*ast_select, context);
std::vector<const ASTTableExpression *> table_expressions = getTableExpressions(*ast_select);
std::vector<TableWithColumnNames> tables_with_columns = getDatabaseAndTablesWithColumnNames(table_expressions, context);
bool is_rewrite_subquery = false;
for (auto & outer_predicate : outer_predicate_expressions)

View File

@ -100,43 +100,20 @@ void collectSourceColumns(const ColumnsDescription & columns, NamesAndTypesList
}
}
std::vector<TableWithColumnNames> getTablesWithColumns(const ASTSelectQuery & select_query, const Context & context,
const ASTTablesInSelectQueryElement * table_join_node,
NamesAndTypesList & columns_from_joined_table,
std::function<Names()> get_column_names)
std::vector<TableWithColumnNames> getTablesWithColumns(const std::vector<const ASTTableExpression * > & table_expressions,
const Context & context)
{
std::vector<TableWithColumnNames> tables_with_columns = getDatabaseAndTablesWithColumnNames(select_query, context);
std::vector<TableWithColumnNames> tables_with_columns = getDatabaseAndTablesWithColumnNames(table_expressions, context);
auto & settings = context.getSettingsRef();
if (settings.joined_subquery_requires_alias && tables_with_columns.size() > 1)
{
for (auto & pr : tables_with_columns)
if (pr.first.table.empty() && pr.first.alias.empty())
if (pr.table.table.empty() && pr.table.alias.empty())
throw Exception("Not unique subquery in FROM requires an alias (or joined_subquery_requires_alias=0 to disable restriction).",
ErrorCodes::ALIAS_REQUIRED);
}
TableWithColumnNames joined_table;
if (table_join_node)
{
const auto & joined_expression = table_join_node->table_expression->as<ASTTableExpression &>();
columns_from_joined_table = getColumnsFromTableExpression(joined_expression, context);
joined_table.first = DatabaseAndTableWithAlias(joined_expression, context.getCurrentDatabase());
for (const auto & column : columns_from_joined_table)
joined_table.second.push_back(column.name);
}
/// If empty make table(s) with list of source and joined columns
if (tables_with_columns.empty())
{
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, get_column_names());
if (!joined_table.second.empty())
tables_with_columns.emplace_back(std::move(joined_table));
}
return tables_with_columns;
}
@ -859,24 +836,36 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
replaceJoinedTable(table_join_node);
}
auto get_column_names = [&]() -> Names
std::vector<const ASTTableExpression *> table_expressions = getTableExpressions(*select_query);
auto tables_with_columns = getTablesWithColumns(table_expressions, context);
if (tables_with_columns.empty())
{
if (storage)
return storage->getColumns().getOrdinary().getNames();
{
const ColumnsDescription & starage_columns = storage->getColumns();
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, starage_columns.getOrdinary().getNames());
auto & table = tables_with_columns.back();
table.addHiddenColumns(starage_columns.getMaterialized());
table.addHiddenColumns(starage_columns.getAliases());
table.addHiddenColumns(starage_columns.getVirtuals());
}
else
{
Names columns;
columns.reserve(result.source_columns.size());
for (const auto & column : result.source_columns)
columns.push_back(column.name);
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, columns);
}
}
Names columns;
columns.reserve(result.source_columns.size());
for (const auto & column : result.source_columns)
columns.push_back(column.name);
return columns;
};
auto tables_with_columns = getTablesWithColumns(*select_query, context, table_join_node,
result.analyzed_join->columns_from_joined_table, get_column_names);
if (tables_with_columns.size() > 1)
if (table_expressions.size() > 1)
{
result.analyzed_join->columns_from_joined_table = getColumnsFromTableExpression(*table_expressions[1], context);
result.analyzed_join->deduplicateAndQualifyColumnNames(
source_columns_set, tables_with_columns[1].first.getQualifiedNamePrefix());
source_columns_set, tables_with_columns[1].table.getQualifiedNamePrefix());
}
translateQualifiedNames(query, *select_query, source_columns_set, std::move(tables_with_columns));

View File

@ -31,12 +31,12 @@ namespace ErrorCodes
bool TranslateQualifiedNamesMatcher::Data::unknownColumn(size_t table_pos, const ASTIdentifier & identifier) const
{
const auto & table = tables[table_pos].first;
const auto & table = tables[table_pos].table;
auto nested1 = IdentifierSemantic::extractNestedName(identifier, table.table);
auto nested2 = IdentifierSemantic::extractNestedName(identifier, table.alias);
String short_name = identifier.shortName();
const Names & column_names = tables[table_pos].second;
const Names & column_names = tables[table_pos].columns;
for (auto & known_name : column_names)
{
if (short_name == known_name)
@ -46,6 +46,18 @@ bool TranslateQualifiedNamesMatcher::Data::unknownColumn(size_t table_pos, const
if (nested2 && *nested2 == known_name)
return false;
}
const Names & hidden_names = tables[table_pos].hidden_columns;
for (auto & known_name : hidden_names)
{
if (short_name == known_name)
return false;
if (nested1 && *nested1 == known_name)
return false;
if (nested2 && *nested2 == known_name)
return false;
}
return !column_names.empty();
}
@ -88,7 +100,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &,
{
if (data.unknownColumn(table_pos, identifier))
{
String table_name = data.tables[table_pos].first.getQualifiedNamePrefix(false);
String table_name = data.tables[table_pos].table.getQualifiedNamePrefix(false);
throw Exception("There's no column '" + identifier.name + "' in table '" + table_name + "'",
ErrorCodes::UNKNOWN_IDENTIFIER);
}
@ -96,7 +108,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &,
IdentifierSemantic::setMembership(identifier, table_pos);
/// In case if column from the joined table are in source columns, change it's name to qualified.
auto & table = data.tables[table_pos].first;
auto & table = data.tables[table_pos].table;
if (table_pos && data.hasColumn(short_name))
IdentifierSemantic::setColumnLongName(identifier, table);
else
@ -128,7 +140,7 @@ void TranslateQualifiedNamesMatcher::visit(const ASTQualifiedAsterisk & , const
DatabaseAndTableWithAlias db_and_table(ident);
for (const auto & known_table : data.tables)
if (db_and_table.satisfies(known_table.first, true))
if (db_and_table.satisfies(known_table.table, true))
return;
throw Exception("Unknown qualified identifier: " + ident->getAliasOrColumnName(), ErrorCodes::UNKNOWN_IDENTIFIER);
@ -216,13 +228,13 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
if (const auto * asterisk = child->as<ASTAsterisk>())
{
bool first_table = true;
for (const auto & [table, table_columns] : tables_with_columns)
for (const auto & table : tables_with_columns)
{
for (const auto & column_name : table_columns)
for (const auto & column_name : table.columns)
{
if (first_table || !data.join_using_columns.count(column_name))
{
addIdentifier(node.children, table, column_name, AsteriskSemantic::getAliases(*asterisk));
addIdentifier(node.children, table.table, column_name, AsteriskSemantic::getAliases(*asterisk));
}
}
@ -232,13 +244,13 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
else if (const auto * asterisk_pattern = child->as<ASTColumnsMatcher>())
{
bool first_table = true;
for (const auto & [table, table_columns] : tables_with_columns)
for (const auto & table : tables_with_columns)
{
for (const auto & column_name : table_columns)
for (const auto & column_name : table.columns)
{
if (asterisk_pattern->isColumnMatching(column_name) && (first_table || !data.join_using_columns.count(column_name)))
{
addIdentifier(node.children, table, column_name, AsteriskSemantic::getAliases(*asterisk_pattern));
addIdentifier(node.children, table.table, column_name, AsteriskSemantic::getAliases(*asterisk_pattern));
}
}
@ -249,13 +261,13 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
{
DatabaseAndTableWithAlias ident_db_and_name(qualified_asterisk->children[0]);
for (const auto & [table, table_columns] : tables_with_columns)
for (const auto & table : tables_with_columns)
{
if (ident_db_and_name.satisfies(table, true))
if (ident_db_and_name.satisfies(table.table, true))
{
for (const auto & column_name : table_columns)
for (const auto & column_name : table.columns)
{
addIdentifier(node.children, table, column_name, AsteriskSemantic::getAliases(*qualified_asterisk));
addIdentifier(node.children, table.table, column_name, AsteriskSemantic::getAliases(*qualified_asterisk));
}
break;
}

View File

@ -72,7 +72,8 @@ ASTPtr extractTableExpression(const ASTSelectQuery & select, size_t table_number
return nullptr;
}
NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression & table_expression, const Context & context)
static NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression & table_expression, const Context & context,
NamesAndTypesList & materialized, NamesAndTypesList & aliases, NamesAndTypesList & virtuals)
{
NamesAndTypesList names_and_type_list;
if (table_expression.subquery)
@ -85,34 +86,60 @@ NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression & table
const auto table_function = table_expression.table_function;
auto query_context = const_cast<Context *>(&context.getQueryContext());
const auto & function_storage = query_context->executeTableFunction(table_function);
names_and_type_list = function_storage->getSampleBlockNonMaterialized().getNamesAndTypesList();
auto & columns = function_storage->getColumns();
names_and_type_list = columns.getOrdinary();
materialized = columns.getMaterialized();
aliases = columns.getAliases();
virtuals = columns.getVirtuals();
}
else if (table_expression.database_and_table_name)
{
DatabaseAndTableWithAlias database_table(table_expression.database_and_table_name);
const auto & table = context.getTable(database_table.database, database_table.table);
names_and_type_list = table->getSampleBlockNonMaterialized().getNamesAndTypesList();
auto & columns = table->getColumns();
names_and_type_list = columns.getOrdinary();
materialized = columns.getMaterialized();
aliases = columns.getAliases();
virtuals = columns.getVirtuals();
}
return names_and_type_list;
}
std::vector<TableWithColumnNames> getDatabaseAndTablesWithColumnNames(const ASTSelectQuery & select_query, const Context & context)
NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression & table_expression, const Context & context)
{
NamesAndTypesList materialized;
NamesAndTypesList aliases;
NamesAndTypesList virtuals;
return getColumnsFromTableExpression(table_expression, context, materialized, aliases, virtuals);
}
std::vector<TableWithColumnNames> getDatabaseAndTablesWithColumnNames(const std::vector<const ASTTableExpression *> & table_expressions,
const Context & context, bool remove_duplicates)
{
std::vector<TableWithColumnNames> tables_with_columns;
if (select_query.tables() && !select_query.tables()->children.empty())
if (!table_expressions.empty())
{
String current_database = context.getCurrentDatabase();
for (const ASTTableExpression * table_expression : getTableExpressions(select_query))
for (const ASTTableExpression * table_expression : table_expressions)
{
DatabaseAndTableWithAlias table_name(*table_expression, current_database);
NamesAndTypesList names_and_types = getColumnsFromTableExpression(*table_expression, context);
removeDuplicateColumns(names_and_types);
NamesAndTypesList materialized;
NamesAndTypesList aliases;
NamesAndTypesList virtuals;
NamesAndTypesList names_and_types = getColumnsFromTableExpression(*table_expression, context, materialized, aliases, virtuals);
if (remove_duplicates)
removeDuplicateColumns(names_and_types);
tables_with_columns.emplace_back(std::move(table_name), names_and_types.getNames());
auto & table = tables_with_columns.back();
table.addHiddenColumns(materialized);
table.addHiddenColumns(aliases);
table.addHiddenColumns(virtuals);
}
}

View File

@ -17,6 +17,7 @@ const ASTTableExpression * getTableExpression(const ASTSelectQuery & select, siz
ASTPtr extractTableExpression(const ASTSelectQuery & select, size_t table_number);
NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression & table_expression, const Context & context);
std::vector<TableWithColumnNames> getDatabaseAndTablesWithColumnNames(const ASTSelectQuery & select_query, const Context & context);
std::vector<TableWithColumnNames> getDatabaseAndTablesWithColumnNames(const std::vector<const ASTTableExpression *> & table_expressions,
const Context & context, bool remove_duplicates = true);
}

View File

@ -239,8 +239,7 @@ static const ASTTablesInSelectQueryElement * getFirstTableJoin(const ASTSelectQu
if (!joined_table)
joined_table = &tables_element;
else
throw Exception("Multiple JOIN disabled or does not support the query. "
"'set allow_experimental_multiple_joins_emulation' to enable.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Multiple JOIN disabled or does not support the query.", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -41,6 +41,8 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
query = std::make_shared<ASTExistsTableQuery>();
else if (s_dictionary.checkWithoutMoving(pos, expected))
query = std::make_shared<ASTExistsDictionaryQuery>();
else
query = std::make_shared<ASTExistsTableQuery>();
}
else if (s_show.ignore(pos, expected))
{

View File

@ -1,156 +1,101 @@
from contextlib import contextmanager
import time
import pytest
import contextlib
## sudo -H pip install PyMySQL
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql = True)
create_table_normal_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
`age` int NOT NULL default 0,
`money` int NOT NULL default 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
"""
create_table_mysql_style_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`float` float NOT NULL,
`Float32` float NOT NULL,
`test``name` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
"""
drop_table_sql_template = "DROP TABLE `clickhouse`.`{}`"
add_column_sql_template = "ALTER TABLE `clickhouse`.`{}` ADD COLUMN `pid` int(11)"
del_column_sql_template = "ALTER TABLE `clickhouse`.`{}` DROP COLUMN `pid`"
clickhouse_node = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_mysql_conn()
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
node1.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql1:3306', 'clickhouse', 'root', 'clickhouse')")
yield cluster
finally:
cluster.shutdown()
def test_sync_tables_list_between_clickhouse_and_mysql(started_cluster):
mysql_connection = get_mysql_conn()
assert node1.query('SHOW TABLES FROM clickhouse_mysql FORMAT TSV').rstrip() == ''
class MySQLNodeInstance:
def __init__(self, user='root', password='clickhouse', hostname='127.0.0.1', port=3308):
self.user = user
self.port = port
self.hostname = hostname
self.password = password
self.mysql_connection = None # lazy init
create_normal_mysql_table(mysql_connection, 'first_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'first_mysql_table' FORMAT TSV").rstrip() == 'first_mysql_table'
def query(self, execution_query):
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.hostname, port=self.port)
with self.mysql_connection.cursor() as cursor:
cursor.execute(execution_query)
create_normal_mysql_table(mysql_connection, 'second_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == 'second_mysql_table'
drop_mysql_table(mysql_connection, 'second_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == ''
mysql_connection.close()
def test_sync_tables_structure_between_clickhouse_and_mysql(started_cluster):
mysql_connection = get_mysql_conn()
create_normal_mysql_table(mysql_connection, 'test_sync_column')
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == ''
time.sleep(3)
add_mysql_table_column(mysql_connection, "test_sync_column")
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == 'pid'
time.sleep(3)
drop_mysql_table_column(mysql_connection, "test_sync_column")
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == ''
mysql_connection.close()
def test_insert_select(started_cluster):
mysql_connection = get_mysql_conn()
create_normal_mysql_table(mysql_connection, 'test_insert_select')
assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '0'
node1.query("INSERT INTO `clickhouse_mysql`.{}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format('test_insert_select'))
assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '10000'
assert node1.query("SELECT sum(money) FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '30000'
mysql_connection.close()
def test_insert_select_with_mysql_style_table(started_cluster):
mysql_connection = get_mysql_conn()
create_mysql_style_mysql_table(mysql_connection, 'test_mysql``_style_table')
assert node1.query("SELECT count() FROM `clickhouse_mysql`.`{}`".format('test_mysql\`_style_table')).rstrip() == '0'
node1.query("INSERT INTO `clickhouse_mysql`.`{}`(id, `float`, `Float32`, `test\`name`) select number, 3, 3, 'name' from numbers(10000) ".format('test_mysql\`_style_table'))
assert node1.query("SELECT count() FROM `clickhouse_mysql`.`{}`".format('test_mysql\`_style_table')).rstrip() == '10000'
assert node1.query("SELECT sum(`float`) FROM `clickhouse_mysql`.`{}`".format('test_mysql\`_style_table')).rstrip() == '30000'
mysql_connection.close()
def test_table_function(started_cluster):
mysql_connection = get_mysql_conn()
create_normal_mysql_table(mysql_connection, 'table_function')
table_function = get_mysql_table_function_expr('table_function')
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0'
node1.query("INSERT INTO {} (id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000)".format('TABLE FUNCTION ' + table_function))
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(c) FROM ("
"SELECT count() as c FROM {} WHERE id % 3 == 0"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 1"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 2)".format(table_function, table_function, table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(`money`) FROM {}".format(table_function)).rstrip() == '30000'
mysql_connection.close()
def close(self):
if self.mysql_connection is not None:
self.mysql_connection.close()
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
def test_mysql_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
def get_mysql_table_function_expr(table_name):
return "mysql('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format(table_name)
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_database' in clickhouse_node.query('SHOW DATABASES')
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
def create_normal_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_normal_sql_template.format(table_name))
time.sleep(3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` ADD COLUMN `add_column` int(11)')
assert 'add_column' in clickhouse_node.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
def create_mysql_style_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_mysql_style_sql_template.format(table_name))
time.sleep(3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` DROP COLUMN `add_column`')
assert 'add_column' not in clickhouse_node.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
def drop_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(drop_table_sql_template.format(table_name))
mysql_node.query('DROP TABLE `test_database`.`test_table`;')
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
def add_mysql_table_column(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(add_column_sql_template.format(table_name))
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
def drop_mysql_table_column(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(del_column_sql_template.format(table_name))
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_dml_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `i``d` int(11) NOT NULL, PRIMARY KEY (`i``d`)) ENGINE=InnoDB;')
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '0'
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i\`d`) select number from numbers(10000)")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '10000'
mysql_node.query("DROP DATABASE test_database")

View File

@ -9,8 +9,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql = True)
create_table_sql_template = """
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
@ -19,6 +19,7 @@ create_table_sql_template = """
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -76,6 +77,7 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
assert node1.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '60000'
conn.close()
def test_where(started_cluster):
table_name = 'test_where'
conn = get_mysql_conn()
@ -92,15 +94,35 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
assert node1.query("SELECT count() FROM {} WHERE name LIKE concat('name_', toString(1))".format(table_name)).rstrip() == '1'
conn.close()
def test_table_function(started_cluster):
conn = get_mysql_conn()
create_mysql_table(conn, 'table_function')
table_function = "mysql('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('table_function')
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0'
node1.query("INSERT INTO {} (id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000)".format(
'TABLE FUNCTION ' + table_function))
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(c) FROM ("
"SELECT count() as c FROM {} WHERE id % 3 == 0"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 1"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 2)".format(table_function, table_function,
table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(`money`) FROM {}".format(table_function)).rstrip() == '30000'
conn.close()
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test_count;
CREATE TABLE test_count (`pt` Date) ENGINE = MergeTree PARTITION BY pt ORDER BY pt SETTINGS index_granularity = 8192;
INSERT INTO test_count values ('2019-12-12');
SELECT count(1) FROM remote('127.0.0.{1,1,2}', currentDatabase(), test_count);
DROP TABLE test_count;

View File

@ -0,0 +1,5 @@
2010-01-01 00:00:00
2010-01-01 00:00:00
2010-01-01 00:00:00
2010-01-01 00:00:00
2010-01-01 00:00:00

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS requests;
CREATE TABLE requests (
event_time DateTime,
event_date Date MATERIALIZED toDate(event_time),
event_tm DateTime ALIAS event_time
) ENGINE = MergeTree ORDER BY (event_time);
INSERT INTO requests (event_time) VALUES ('2010-01-01 00:00:00');
select * from requests where event_date > '2000-01-01';
select * from requests as t where t.event_date > '2000-01-01';
select * from requests as "t" where "t".event_date > '2000-01-01';
select * from requests as t where t.event_tm > toDate('2000-01-01');
select * from requests as `t` where `t`.event_tm > toDate('2000-01-01');
DROP TABLE requests;

View File

@ -0,0 +1,24 @@
0
0
0
0
0
0
1
1
0
0
0
0
0
0
0
1
1
1
0
0
0
0
0
0

View File

@ -0,0 +1,44 @@
EXISTS database_for_dict.t;
EXISTS TABLE database_for_dict.t;
EXISTS DICTIONARY database_for_dict.t;
DROP DATABASE IF EXISTS database_for_dict;
CREATE DATABASE database_for_dict Engine = Ordinary;
DROP TABLE IF EXISTS database_for_dict.t;
EXISTS database_for_dict.t;
EXISTS TABLE database_for_dict.t;
EXISTS DICTIONARY database_for_dict.t;
CREATE TABLE database_for_dict.t (x UInt8) ENGINE = Memory;
EXISTS database_for_dict.t;
EXISTS TABLE database_for_dict.t;
EXISTS DICTIONARY database_for_dict.t;
DROP TABLE database_for_dict.t;
EXISTS database_for_dict.t;
EXISTS TABLE database_for_dict.t;
EXISTS DICTIONARY database_for_dict.t;
DROP DICTIONARY IF EXISTS t;
CREATE TEMPORARY TABLE t (x UInt8);
EXISTS t; -- Does not work for temporary tables. Maybe have to fix.
EXISTS TABLE t;
EXISTS DICTIONARY t;
CREATE DICTIONARY database_for_dict.t (k UInt64, v String) PRIMARY KEY k LAYOUT(FLAT()) SOURCE(HTTP(URL 'http://example.test/' FORMAT TSV)) LIFETIME(1000);
EXISTS database_for_dict.t;
EXISTS TABLE database_for_dict.t; -- Dictionaries are tables as well. But not all tables are dictionaries.
EXISTS DICTIONARY database_for_dict.t;
-- But dictionary-tables cannot be dropped as usual tables.
DROP TABLE database_for_dict.t; -- { serverError 60 }
DROP DICTIONARY database_for_dict.t;
EXISTS database_for_dict.t;
EXISTS TABLE database_for_dict.t;
EXISTS DICTIONARY database_for_dict.t;
DROP DATABASE database_for_dict;
EXISTS database_for_dict.t;
EXISTS TABLE database_for_dict.t;
EXISTS DICTIONARY database_for_dict.t;

View File

@ -6,8 +6,6 @@ The `MySQL` database engine translate queries to the MySQL server so you can per
You cannot perform the following queries:
- `ATTACH`/`DETACH`
- `DROP`
- `RENAME`
- `CREATE TABLE`
- `ALTER`

View File

@ -137,7 +137,7 @@ Limits the maximum number of HTTP GET redirect hops for [URL](../table_engines/u
Possible values:
- Positive integer number of hops.
- 0 — Unlimited number of hops.
- 0 — No hops allowed.
Default value: 0.