Merge pull request #14849 from ClickHouse/allow_atomic_database_inside_materialize_mysql

Allow Atomic database inside MaterializeMySQL
This commit is contained in:
tavplubix 2020-12-14 16:27:13 +03:00 committed by GitHub
commit dd2ae6926d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 367 additions and 296 deletions

View File

@ -35,8 +35,8 @@ public:
};
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, Context & context_)
: DatabaseOrdinary(name_, std::move(metadata_path_), "store/", "DatabaseAtomic (" + name_ + ")", context_)
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, const Context & context_)
: DatabaseOrdinary(name_, std::move(metadata_path_), "store/", logger_name, context_)
, path_to_table_symlinks(global_context.getPath() + "data/" + escapeForFileName(name_) + "/")
, path_to_metadata_symlink(global_context.getPath() + "metadata/" + escapeForFileName(name_))
, db_uuid(uuid)
@ -46,6 +46,11 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, C
tryCreateMetadataSymlink();
}
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const Context & context_)
: DatabaseAtomic(name_, std::move(metadata_path_), uuid, "DatabaseAtomic (" + name_ + ")", context_)
{
}
String DatabaseAtomic::getTableDataPath(const String & table_name) const
{
std::lock_guard lock(mutex);

View File

@ -20,8 +20,8 @@ namespace DB
class DatabaseAtomic : public DatabaseOrdinary
{
public:
DatabaseAtomic(String name_, String metadata_path_, UUID uuid, Context & context_);
DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, const Context & context_);
DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const Context & context_);
String getEngineName() const override { return "Atomic"; }
UUID getUUID() const override { return db_uuid; }
@ -51,14 +51,14 @@ public:
void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override;
/// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup);
void assertCanBeDetached(bool cleanup) override;
UUID tryGetTableUUID(const String & table_name) const override;
void tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist = false);
void tryRemoveSymlink(const String & table_name);
void waitDetachedTableNotInUse(const UUID & uuid);
void waitDetachedTableNotInUse(const UUID & uuid) override;
private:
void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path) override;

View File

@ -120,27 +120,32 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
if (engine_name == "MaterializeMySQL")
if (engine_name == "MySQL")
{
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
auto materialize_mode_settings = std::make_unique<MaterializeMySQLSettings>();
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializeMySQL>(
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
return std::make_shared<DatabaseConnectionMySQL>(
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_database_settings), std::move(mysql_pool));
}
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
auto materialize_mode_settings = std::make_unique<MaterializeMySQLSettings>();
return std::make_shared<DatabaseConnectionMySQL>(
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_database_settings), std::move(mysql_pool));
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (create.uuid == UUIDHelpers::Nil)
return std::make_shared<DatabaseMaterializeMySQL<DatabaseOrdinary>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
else
return std::make_shared<DatabaseMaterializeMySQL<DatabaseAtomic>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
}
catch (...)
{

View File

@ -400,7 +400,7 @@ void DatabaseOnDisk::iterateMetadataFiles(const Context & context, const Iterati
{
auto process_tmp_drop_metadata_file = [&](const String & file_name)
{
assert(getEngineName() != "Atomic");
assert(getUUID() == UUIDHelpers::Nil);
static const char * tmp_drop_ext = ".sql.tmp_drop";
const std::string object_name = file_name.substr(0, file_name.size() - strlen(tmp_drop_ext));
if (Poco::File(context.getPath() + getDataPath() + '/' + object_name).exists())

View File

@ -80,7 +80,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
auto table_id = res->getStorageID();
if (table_id.hasUUID())
{
assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic");
assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getUUID() != UUIDHelpers::Nil);
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
}
@ -102,7 +102,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
if (table_id.hasUUID())
{
assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic");
assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getUUID() != UUIDHelpers::Nil);
DatabaseCatalog::instance().addUUIDMapping(table_id.uuid, shared_from_this(), table);
}
@ -131,7 +131,7 @@ void DatabaseWithOwnTablesBase::shutdown()
kv.second->shutdown();
if (table_id.hasUUID())
{
assert(getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic");
assert(getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE || getUUID() != UUIDHelpers::Nil);
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
}
}

View File

@ -334,6 +334,10 @@ public:
/// All tables and dictionaries should be detached before detaching the database.
virtual bool shouldBeEmptyOnDetach() const { return true; }
virtual void assertCanBeDetached(bool /*cleanup*/) {}
virtual void waitDetachedTableNotInUse(const UUID & /*uuid*/) { assert(false); }
/// Ask all tables to complete the background threads they are using and delete all table objects.
virtual void shutdown() = 0;

View File

@ -8,6 +8,7 @@
# include <Interpreters/Context.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/DatabaseAtomic.h>
# include <Databases/MySQL/DatabaseMaterializeTablesIterator.h>
# include <Databases/MySQL/MaterializeMySQLSyncThread.h>
# include <Parsers/ASTCreateQuery.h>
@ -22,21 +23,37 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
DatabaseMaterializeMySQL::DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_, const IAST * database_engine_define_
, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr<MaterializeMySQLSettings> settings_)
: IDatabase(database_name_), global_context(context.getGlobalContext()), engine_define(database_engine_define_->clone())
, nested_database(std::make_shared<DatabaseOrdinary>(database_name_, metadata_path_, context))
, settings(std::move(settings_)), log(&Poco::Logger::get("DatabaseMaterializeMySQL"))
template<>
DatabaseMaterializeMySQL<DatabaseOrdinary>::DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_, UUID /*uuid*/,
const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr<MaterializeMySQLSettings> settings_)
: DatabaseOrdinary(database_name_
, metadata_path_
, "data/" + escapeForFileName(database_name_) + "/"
, "DatabaseMaterializeMySQL<Ordinary> (" + database_name_ + ")", context
)
, settings(std::move(settings_))
, materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{
}
void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const
template<>
DatabaseMaterializeMySQL<DatabaseAtomic>::DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_, UUID uuid,
const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr<MaterializeMySQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializeMySQL<Atomic> (" + database_name_ + ")", context)
, settings(std::move(settings_))
, materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{
std::unique_lock<std::mutex> lock(mutex);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::rethrowExceptionIfNeed() const
{
std::unique_lock<std::mutex> lock(Base::mutex);
if (!settings->allows_query_when_mysql_lost && exception)
{
@ -46,129 +63,71 @@ void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const
}
catch (Exception & ex)
{
/// This method can be called from multiple threads
/// and Exception can be modified concurrently by calling addMessage(...),
/// so we rethrow a copy.
throw Exception(ex);
}
}
}
void DatabaseMaterializeMySQL::setException(const std::exception_ptr & exception_)
template<typename Base>
void DatabaseMaterializeMySQL<Base>::setException(const std::exception_ptr & exception_)
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(Base::mutex);
exception = exception_;
}
ASTPtr DatabaseMaterializeMySQL::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->set(create_query->storage, engine_define);
return create_query;
}
void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach)
template<typename Base>
void DatabaseMaterializeMySQL<Base>::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach)
{
Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach);
try
{
std::unique_lock<std::mutex> lock(mutex);
nested_database->loadStoredObjects(context, has_force_restore_data_flag, force_attach);
materialize_thread.startSynchronization();
started_up = true;
}
catch (...)
{
tryLogCurrentException(log, "Cannot load MySQL nested database stored objects.");
tryLogCurrentException(Base::log, "Cannot load MySQL nested database stored objects.");
if (!force_attach)
throw;
}
}
void DatabaseMaterializeMySQL::shutdown()
template<typename Base>
void DatabaseMaterializeMySQL<Base>::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
materialize_thread.stopSynchronization();
auto iterator = nested_database->getTablesIterator(global_context, {});
/// We only shutdown the table, The tables is cleaned up when destructed database
for (; iterator->isValid(); iterator->next())
iterator->table()->shutdown();
assertCalledFromSyncThreadOrDrop("create table");
Base::createTable(context, name, table, query);
}
bool DatabaseMaterializeMySQL::empty() const
template<typename Base>
void DatabaseMaterializeMySQL<Base>::dropTable(const Context & context, const String & name, bool no_delay)
{
return nested_database->empty();
assertCalledFromSyncThreadOrDrop("drop table");
Base::dropTable(context, name, no_delay);
}
String DatabaseMaterializeMySQL::getDataPath() const
template<typename Base>
void DatabaseMaterializeMySQL<Base>::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
return nested_database->getDataPath();
assertCalledFromSyncThreadOrDrop("attach table");
Base::attachTable(name, table, relative_table_path);
}
String DatabaseMaterializeMySQL::getMetadataPath() const
template<typename Base>
StoragePtr DatabaseMaterializeMySQL<Base>::detachTable(const String & name)
{
return nested_database->getMetadataPath();
assertCalledFromSyncThreadOrDrop("detach table");
return Base::detachTable(name);
}
String DatabaseMaterializeMySQL::getTableDataPath(const String & table_name) const
template<typename Base>
void DatabaseMaterializeMySQL<Base>::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
{
return nested_database->getTableDataPath(table_name);
}
String DatabaseMaterializeMySQL::getTableDataPath(const ASTCreateQuery & query) const
{
return nested_database->getTableDataPath(query);
}
String DatabaseMaterializeMySQL::getObjectMetadataPath(const String & table_name) const
{
return nested_database->getObjectMetadataPath(table_name);
}
UUID DatabaseMaterializeMySQL::tryGetTableUUID(const String & table_name) const
{
return nested_database->tryGetTableUUID(table_name);
}
time_t DatabaseMaterializeMySQL::getObjectMetadataModificationTime(const String & name) const
{
return nested_database->getObjectMetadataModificationTime(name);
}
void DatabaseMaterializeMySQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MaterializeMySQL database not support create table.", ErrorCodes::NOT_IMPLEMENTED);
nested_database->createTable(context, name, table, query);
}
void DatabaseMaterializeMySQL::dropTable(const Context & context, const String & name, bool no_delay)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MaterializeMySQL database not support drop table.", ErrorCodes::NOT_IMPLEMENTED);
nested_database->dropTable(context, name, no_delay);
}
void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MaterializeMySQL database not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
nested_database->attachTable(name, table, relative_table_path);
}
StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MaterializeMySQL database not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
return nested_database->detachTable(name);
}
void DatabaseMaterializeMySQL::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MaterializeMySQL database not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
assertCalledFromSyncThreadOrDrop("rename table");
if (exchange)
throw Exception("MaterializeMySQL database not support exchange table.", ErrorCodes::NOT_IMPLEMENTED);
@ -176,57 +135,37 @@ void DatabaseMaterializeMySQL::renameTable(const Context & context, const String
if (dictionary)
throw Exception("MaterializeMySQL database not support rename dictionary.", ErrorCodes::NOT_IMPLEMENTED);
if (to_database.getDatabaseName() != getDatabaseName())
if (to_database.getDatabaseName() != Base::getDatabaseName())
throw Exception("Cannot rename with other database for MaterializeMySQL database.", ErrorCodes::NOT_IMPLEMENTED);
nested_database->renameTable(context, name, *nested_database, to_name, exchange, dictionary);
Base::renameTable(context, name, *this, to_name, exchange, dictionary);
}
void DatabaseMaterializeMySQL::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
template<typename Base>
void DatabaseMaterializeMySQL<Base>::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
throw Exception("MaterializeMySQL database not support alter table.", ErrorCodes::NOT_IMPLEMENTED);
nested_database->alterTable(context, table_id, metadata);
assertCalledFromSyncThreadOrDrop("alter table");
Base::alterTable(context, table_id, metadata);
}
bool DatabaseMaterializeMySQL::shouldBeEmptyOnDetach() const
template<typename Base>
void DatabaseMaterializeMySQL<Base>::drop(const Context & context)
{
return false;
/// Remove metadata info
Poco::File metadata(Base::getMetadataPath() + "/.metadata");
if (metadata.exists())
metadata.remove(false);
Base::drop(context);
}
void DatabaseMaterializeMySQL::drop(const Context & context)
{
if (nested_database->shouldBeEmptyOnDetach())
{
for (auto iterator = nested_database->getTablesIterator(context, {}); iterator->isValid(); iterator->next())
{
TableExclusiveLockHolder table_lock = iterator->table()->lockExclusively(
context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
nested_database->dropTable(context, iterator->name(), true);
}
/// Remove metadata info
Poco::File metadata(getMetadataPath() + "/.metadata");
if (metadata.exists())
metadata.remove(false);
}
nested_database->drop(context);
}
bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & context) const
{
return nested_database->isTableExist(name, context);
}
StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const
template<typename Base>
StoragePtr DatabaseMaterializeMySQL<Base>::tryGetTable(const String & name, const Context & context) const
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
{
StoragePtr nested_storage = nested_database->tryGetTable(name, context);
StoragePtr nested_storage = Base::tryGetTable(name, context);
if (!nested_storage)
return {};
@ -234,20 +173,71 @@ StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Cont
return std::make_shared<StorageMaterializeMySQL>(std::move(nested_storage), this);
}
return nested_database->tryGetTable(name, context);
return Base::tryGetTable(name, context);
}
DatabaseTablesIteratorPtr DatabaseMaterializeMySQL::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
template<typename Base>
DatabaseTablesIteratorPtr DatabaseMaterializeMySQL<Base>::getTablesIterator(const Context & context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
{
DatabaseTablesIteratorPtr iterator = nested_database->getTablesIterator(context, filter_by_table_name);
DatabaseTablesIteratorPtr iterator = Base::getTablesIterator(context, filter_by_table_name);
return std::make_unique<DatabaseMaterializeTablesIterator>(std::move(iterator), this);
}
return nested_database->getTablesIterator(context, filter_by_table_name);
return Base::getTablesIterator(context, filter_by_table_name);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::assertCalledFromSyncThreadOrDrop(const char * method) const
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread() && started_up)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializeMySQL database not support {}", method);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::shutdownSynchronizationThread()
{
materialize_thread.stopSynchronization();
started_up = false;
}
template<typename Database, template<class> class Helper, typename... Args>
auto castToMaterializeMySQLAndCallHelper(Database * database, Args && ... args)
{
using Ordinary = DatabaseMaterializeMySQL<DatabaseOrdinary>;
using Atomic = DatabaseMaterializeMySQL<DatabaseAtomic>;
using ToOrdinary = typename std::conditional_t<std::is_const_v<Database>, const Ordinary *, Ordinary *>;
using ToAtomic = typename std::conditional_t<std::is_const_v<Database>, const Atomic *, Atomic *>;
if (auto * database_materialize = typeid_cast<ToOrdinary>(database))
return (database_materialize->*Helper<Ordinary>::v)(std::forward<Args>(args)...);
if (auto * database_materialize = typeid_cast<ToAtomic>(database))
return (database_materialize->*Helper<Atomic>::v)(std::forward<Args>(args)...);
throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR);
}
template<typename T> struct HelperSetException { static constexpr auto v = &T::setException; };
void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception)
{
castToMaterializeMySQLAndCallHelper<IDatabase, HelperSetException>(materialize_mysql_db.get(), exception);
}
template<typename T> struct HelperStopSync { static constexpr auto v = &T::shutdownSynchronizationThread; };
void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db)
{
castToMaterializeMySQLAndCallHelper<IDatabase, HelperStopSync>(materialize_mysql_db.get());
}
template<typename T> struct HelperRethrow { static constexpr auto v = &T::rethrowExceptionIfNeed; };
void rethrowSyncExceptionIfNeed(const IDatabase * materialize_mysql_db)
{
castToMaterializeMySQLAndCallHelper<const IDatabase, HelperRethrow>(materialize_mysql_db);
}
template class DatabaseMaterializeMySQL<DatabaseOrdinary>;
template class DatabaseMaterializeMySQL<DatabaseAtomic>;
}
#endif

View File

@ -17,48 +17,34 @@ namespace DB
*
* All table structure and data will be written to the local file system
*/
class DatabaseMaterializeMySQL : public IDatabase
template<typename Base>
class DatabaseMaterializeMySQL : public Base
{
public:
DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_,
const IAST * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_,
const Context & context, const String & database_name_, const String & metadata_path_, UUID uuid,
const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_, std::unique_ptr<MaterializeMySQLSettings> settings_);
void rethrowExceptionIfNeed() const;
void setException(const std::exception_ptr & exception);
protected:
const Context & global_context;
ASTPtr engine_define;
DatabasePtr nested_database;
std::unique_ptr<MaterializeMySQLSettings> settings;
Poco::Logger * log;
MaterializeMySQLSyncThread materialize_thread;
std::exception_ptr exception;
std::atomic_bool started_up{false};
public:
String getEngineName() const override { return "MaterializeMySQL"; }
ASTPtr getCreateDatabaseQuery() const override;
void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override;
void shutdown() override;
bool empty() const override;
String getDataPath() const override;
String getTableDataPath(const String & table_name) const override;
String getTableDataPath(const ASTCreateQuery & query) const override;
UUID tryGetTableUUID(const String & table_name) const override;
void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void dropTable(const Context & context, const String & name, bool no_delay) override;
@ -71,23 +57,22 @@ public:
void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(const String & name) const override;
String getMetadataPath() const override;
String getObjectMetadataPath(const String & table_name) const override;
bool shouldBeEmptyOnDetach() const override;
void drop(const Context & context) override;
bool isTableExist(const String & name, const Context & context) const override;
StoragePtr tryGetTable(const String & name, const Context & context) const override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) override;
void assertCalledFromSyncThreadOrDrop(const char * method) const;
void shutdownSynchronizationThread();
};
void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception);
void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db);
void rethrowSyncExceptionIfNeed(const IDatabase * materialize_mysql_db);
}
#endif

View File

@ -2,7 +2,6 @@
#include <Databases/IDatabase.h>
#include <Storages/StorageMaterializeMySQL.h>
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
namespace DB
{
@ -30,7 +29,7 @@ public:
UUID uuid() const override { return nested_iterator->uuid(); }
DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, DatabaseMaterializeMySQL * database_)
DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, const IDatabase * database_)
: nested_iterator(std::move(nested_iterator_)), database(database_)
{
}
@ -38,8 +37,7 @@ public:
private:
mutable std::vector<StoragePtr> tables;
DatabaseTablesIteratorPtr nested_iterator;
DatabaseMaterializeMySQL * database;
const IDatabase * database;
};
}

View File

@ -71,15 +71,6 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, Context & quer
}
}
static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name)
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name);
if (DatabaseMaterializeMySQL * database_materialize = typeid_cast<DatabaseMaterializeMySQL *>(database.get()))
return *database_materialize;
throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR);
}
MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
{
@ -190,7 +181,8 @@ void MaterializeMySQLSyncThread::synchronization()
{
client.disconnect();
tryLogCurrentException(log);
getDatabase(database_name).setException(std::current_exception());
auto db = DatabaseCatalog::instance().getDatabase(database_name);
setSynchronizationThreadException(db, std::current_exception());
}
}
@ -343,7 +335,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
opened_transaction = false;
MaterializeMetadata metadata(
connection, getDatabase(database_name).getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction);
connection, DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction);
if (!metadata.need_dumping_tables.empty())
{

View File

@ -4,7 +4,7 @@
#include <Storages/IStorage.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOnDisk.h>
#include <Poco/File.h>
#include <Common/quoteString.h>
#include <Storages/StorageMemory.h>
@ -16,6 +16,15 @@
#include <Common/CurrentMetrics.h>
#include <common/logger_useful.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_MYSQL
# include <Databases/MySQL/MaterializeMySQLSyncThread.h>
# include <Storages/StorageMaterializeMySQL.h>
#endif
#include <filesystem>
@ -217,6 +226,15 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
exception->emplace("Table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return {};
}
#if USE_MYSQL
/// It's definitely not the best place for this logic, but behaviour must be consistent with DatabaseMaterializeMySQL::tryGetTable(...)
if (db_and_table.first->getEngineName() == "MaterializeMySQL")
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
db_and_table.second = std::make_shared<StorageMaterializeMySQL>(std::move(db_and_table.second), db_and_table.first.get());
}
#endif
return db_and_table;
}
@ -286,7 +304,6 @@ void DatabaseCatalog::attachDatabase(const String & database_name, const Databas
assertDatabaseDoesntExistUnlocked(database_name);
databases.emplace(database_name, database);
UUID db_uuid = database->getUUID();
assert((db_uuid != UUIDHelpers::Nil) ^ (dynamic_cast<DatabaseAtomic *>(database.get()) == nullptr));
if (db_uuid != UUIDHelpers::Nil)
db_uuid_map.emplace(db_uuid, database);
}
@ -313,9 +330,8 @@ DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool d
if (!db->empty())
throw Exception("New table appeared in database being dropped or detached. Try again.",
ErrorCodes::DATABASE_NOT_EMPTY);
auto * database_atomic = typeid_cast<DatabaseAtomic *>(db.get());
if (!drop && database_atomic)
database_atomic->assertCanBeDetached(false);
if (!drop)
db->assertCanBeDetached(false);
}
catch (...)
{

View File

@ -157,6 +157,35 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (!create.attach && fs::exists(metadata_path))
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path.string());
}
else if (create.storage->engine->name == "MaterializeMySQL")
{
/// It creates nested database with Ordinary or Atomic engine depending on UUID in query and default engine setting.
/// Do nothing if it's an internal ATTACH on server startup or short-syntax ATTACH query from user,
/// because we got correct query from the metadata file in this case.
/// If we got query from user, then normalize it first.
bool attach_from_user = create.attach && !internal && !create.attach_short_syntax;
bool create_from_user = !create.attach;
if (create_from_user)
{
const auto & default_engine = context.getSettingsRef().default_database_engine.value;
if (create.uuid == UUIDHelpers::Nil && default_engine == DefaultDatabaseEngine::Atomic)
create.uuid = UUIDHelpers::generateV4(); /// Will enable Atomic engine for nested database
}
else if (attach_from_user && create.uuid == UUIDHelpers::Nil)
{
/// Ambiguity is possible: should we attach nested database as Ordinary
/// or throw "UUID must be specified" for Atomic? So we suggest short syntax for Ordinary.
throw Exception("Use short attach syntax ('ATTACH DATABASE name;' without engine) to attach existing database "
"or specify UUID to attach new database with Atomic engine", ErrorCodes::INCORRECT_QUERY);
}
/// Set metadata path according to nested engine
if (create.uuid == UUIDHelpers::Nil)
metadata_path = metadata_path / "metadata" / database_name_escaped;
else
metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid);
}
else
{
bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
@ -655,7 +684,7 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
bool from_path = create.attach_from_path.has_value();
if (database->getEngineName() == "Atomic")
if (database->getUUID() != UUIDHelpers::Nil)
{
if (create.attach && !from_path && create.uuid == UUIDHelpers::Nil)
{

View File

@ -11,7 +11,14 @@
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Databases/DatabaseAtomic.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_MYSQL
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
#endif
namespace DB
@ -66,10 +73,7 @@ void InterpreterDropQuery::waitForTableToBeActuallyDroppedOrDetached(const ASTDr
if (query.kind == ASTDropQuery::Kind::Drop)
DatabaseCatalog::instance().waitTableFinallyDropped(uuid_to_wait);
else if (query.kind == ASTDropQuery::Kind::Detach)
{
if (auto * atomic = typeid_cast<DatabaseAtomic *>(db.get()))
atomic->waitDetachedTableNotInUse(uuid_to_wait);
}
db->waitDetachedTableNotInUse(uuid_to_wait);
}
BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query)
@ -122,7 +126,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
table->checkTableCanBeDetached();
table->shutdown();
TableExclusiveLockHolder table_lock;
if (database->getEngineName() != "Atomic")
if (database->getUUID() == UUIDHelpers::Nil)
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Drop table from memory, don't touch data and metadata
database->detachTable(table_id.table_name);
@ -145,7 +149,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
table->shutdown();
TableExclusiveLockHolder table_lock;
if (database->getEngineName() != "Atomic")
if (database->getUUID() == UUIDHelpers::Nil)
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
database->dropTable(context, table_id.table_name, query.no_delay);
@ -282,6 +286,11 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
bool drop = query.kind == ASTDropQuery::Kind::Drop;
context.checkAccess(AccessType::DROP_DATABASE, database_name);
#if USE_MYSQL
if (database->getEngineName() == "MaterializeMySQL")
stopDatabaseSynchronization(database);
#endif
if (database->shouldBeEmptyOnDetach())
{
/// DETACH or DROP all tables and dictionaries inside database.
@ -312,9 +321,8 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
/// Protects from concurrent CREATE TABLE queries
auto db_guard = DatabaseCatalog::instance().getExclusiveDDLGuardForDatabase(database_name);
auto * database_atomic = typeid_cast<DatabaseAtomic *>(database.get());
if (!drop && database_atomic)
database_atomic->assertCanBeDetached(true);
if (!drop)
database->assertCanBeDetached(true);
/// DETACH or DROP database itself
DatabaseCatalog::instance().detachDatabase(database_name, drop, database->shouldBeEmptyOnDetach());

View File

@ -21,12 +21,13 @@
#include <Processors/Pipe.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_)
StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_)
: StorageProxy(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_)
{
auto nested_memory_metadata = nested_storage->getInMemoryMetadata();
@ -45,7 +46,7 @@ Pipe StorageMaterializeMySQL::read(
unsigned int num_streams)
{
/// If the background synchronization thread has exception.
database->rethrowExceptionIfNeed();
rethrowSyncExceptionIfNeed(database);
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
auto lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
@ -106,7 +107,7 @@ Pipe StorageMaterializeMySQL::read(
NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const
{
/// If the background synchronization thread has exception.
database->rethrowExceptionIfNeed();
rethrowSyncExceptionIfNeed(database);
return nested_storage->getVirtuals();
}

View File

@ -5,7 +5,6 @@
#if USE_MYSQL
#include <Storages/StorageProxy.h>
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
namespace DB
{
@ -21,7 +20,7 @@ class StorageMaterializeMySQL final : public ext::shared_ptr_helper<StorageMater
public:
String getName() const override { return "MaterializeMySQL"; }
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_);
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_);
Pipe read(
const Names & column_names, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info,
@ -32,15 +31,18 @@ public:
NamesAndTypesList getVirtuals() const override;
ColumnSizeByName getColumnSizes() const override;
private:
StoragePtr getNested() const override { return nested_storage; }
void drop() override { nested_storage->drop(); }
private:
[[noreturn]] void throwNotAllowed() const
{
throw Exception("This method is not allowed for MaterializeMySQL", ErrorCodes::NOT_IMPLEMENTED);
}
StoragePtr nested_storage;
const DatabaseMaterializeMySQL * database;
const IDatabase * database;
};
}

View File

@ -3,7 +3,7 @@
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<allow_introspection_functions>1</allow_introspection_functions>
<default_database_engine>Ordinary</default_database_engine>
</default>
</profiles>

View File

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<default_database_engine>Atomic</default_database_engine>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</yandex>

View File

@ -15,6 +15,7 @@ from multiprocessing.dummy import Pool
def check_query(clickhouse_node, query, result_set, retry_count=60, interval_seconds=3):
lastest_result = ''
for i in range(retry_count):
try:
lastest_result = clickhouse_node.query(query)
@ -35,6 +36,7 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
# existed before the mapping was created
mysql_node.query("CREATE TABLE test_database.test_table_1 ("
"`key` INT NOT NULL PRIMARY KEY, "
"unsigned_tiny_int TINYINT UNSIGNED, tiny_int TINYINT, "
@ -51,9 +53,10 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
"_date Date, _datetime DateTime, _timestamp TIMESTAMP, _bool BOOLEAN) ENGINE = InnoDB;")
# it already has some data
mysql_node.query(
"INSERT INTO test_database.test_table_1 VALUES(1, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', "
"'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', true);")
mysql_node.query("""
INSERT INTO test_database.test_table_1 VALUES(1, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char',
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', true);
""")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(
@ -65,9 +68,10 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
"1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
"2020-01-01 00:00:00\t2020-01-01 00:00:00\t1\n")
mysql_node.query(
"INSERT INTO test_database.test_table_1 VALUES(2, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', "
"'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', false);")
mysql_node.query("""
INSERT INTO test_database.test_table_1 VALUES(2, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char',
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', false);
""")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
"1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
@ -76,14 +80,16 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
mysql_node.query("UPDATE test_database.test_table_1 SET unsigned_tiny_int = 2 WHERE `key` = 1")
check_query(clickhouse_node, "SELECT key, unsigned_tiny_int, tiny_int, unsigned_small_int,"
" small_int, unsigned_medium_int, medium_int, unsigned_int, _int, unsigned_integer, _integer, "
" unsigned_bigint, _bigint, unsigned_float, _float, unsigned_double, _double, _varchar, _char, "
" _date, _datetime, /* exclude it, because ON UPDATE CURRENT_TIMESTAMP _timestamp, */ "
" _bool FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
"1\t2\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
"2020-01-01 00:00:00\t1\n2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\t"
"varchar\tchar\t2020-01-01\t2020-01-01 00:00:00\t0\n")
check_query(clickhouse_node, """
SELECT key, unsigned_tiny_int, tiny_int, unsigned_small_int,
small_int, unsigned_medium_int, medium_int, unsigned_int, _int, unsigned_integer, _integer,
unsigned_bigint, _bigint, unsigned_float, _float, unsigned_double, _double, _varchar, _char,
_date, _datetime, /* exclude it, because ON UPDATE CURRENT_TIMESTAMP _timestamp, */
_bool FROM test_database.test_table_1 ORDER BY key FORMAT TSV
""",
"1\t2\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
"2020-01-01 00:00:00\t1\n2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\t"
"varchar\tchar\t2020-01-01\t2020-01-01 00:00:00\t0\n")
# update primary key
mysql_node.query("UPDATE test_database.test_table_1 SET `key` = 3 WHERE `unsigned_tiny_int` = 2")
@ -556,6 +562,12 @@ def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_n
assert 'MySQL SYNC USER ACCESS ERR:' in str(exception.value)
assert "priv_err_db" not in clickhouse_node.query("SHOW DATABASES")
mysql_node.query("GRANT SELECT ON priv_err_db.* TO 'test'@'%'")
time.sleep(3)
clickhouse_node.query("ATTACH DATABASE priv_err_db")
clickhouse_node.query("DROP DATABASE priv_err_db")
mysql_node.query("REVOKE SELECT ON priv_err_db.* FROM 'test'@'%'")
mysql_node.query("DROP DATABASE priv_err_db;")
mysql_node.query("DROP USER 'test'@'%'")

View File

@ -14,7 +14,10 @@ from . import materialize_with_ddl
DOCKER_COMPOSE_PATH = get_docker_compose_path()
cluster = ClickHouseCluster(__file__)
clickhouse_node = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False, stay_alive=True)
node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False, stay_alive=True)
node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql=False, stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
@ -119,39 +122,30 @@ def started_mysql_8_0():
'--remove-orphans'])
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql1")
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
# mysql 5.7 cannot support alter rename column
# materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7):
try:
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7,
"mysql1")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7,
"mysql1")
# mysql 5.7 cannot support alter rename column
# materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7,
"mysql1")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7,
"mysql1")
except:
print((clickhouse_node.query(
"select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw")))
raise
def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
@ -166,61 +160,72 @@ def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0,
"mysql8_0")
def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_5_7, "mysql1")
def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_select_without_columns_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_select_without_columns_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_5_7, "mysql1")
def test_select_without_columns_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_select_without_columns_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql1")
def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_network_partition_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_network_partition_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_5_7, "mysql1")
def test_network_partition_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_network_partition_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_mysql_kill_sync_thread_restore_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_kill_sync_thread_restore_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_5_7, "mysql1")
def test_mysql_kill_sync_thread_restore_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_kill_sync_thread_restore_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_mysql_killed_while_insert_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1")
def test_mysql_killed_while_insert_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_clickhouse_killed_while_insert_5_7(started_cluster, started_mysql_5_7):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_clickhouse_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1")
def test_clickhouse_killed_while_insert_8_0(started_cluster, started_mysql_8_0):
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_clickhouse_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql8_0")