From a812f7eb756fe768932c57b55c6ec0a2e0f7c0c6 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 14 Sep 2020 22:25:02 +0300 Subject: [PATCH 1/8] allow using Atomic database inside MaterializeMySQL --- src/Databases/DatabaseAtomic.cpp | 6 +- src/Databases/DatabaseAtomic.h | 4 +- src/Databases/DatabaseFactory.cpp | 11 +- src/Databases/DatabaseOnDisk.cpp | 2 +- src/Databases/DatabasesCommon.cpp | 6 +- src/Databases/IDatabase.h | 2 + .../MySQL/DatabaseMaterializeMySQL.cpp | 366 +++++++++++------- .../MySQL/DatabaseMaterializeMySQL.h | 57 ++- .../MySQL/DatabaseMaterializeTablesIterator.h | 10 +- .../MySQL/MaterializeMySQLSyncThread.cpp | 34 +- src/Interpreters/DatabaseCatalog.cpp | 52 ++- src/Interpreters/InterpreterCreateQuery.cpp | 33 +- src/Interpreters/InterpreterDropQuery.cpp | 14 +- src/Storages/StorageMaterializeMySQL.cpp | 15 +- src/Storages/StorageMaterializeMySQL.h | 16 +- .../configs/users.xml | 2 + .../materialize_with_ddl.py | 56 +-- .../test_materialize_mysql_database/test.py | 2 +- 18 files changed, 464 insertions(+), 224 deletions(-) diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 8f4a4522c59..4591771697b 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -35,7 +35,7 @@ public: }; -DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, Context & context_) +DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const Context & context_) : DatabaseOrdinary(name_, std::move(metadata_path_), "store/", "DatabaseAtomic (" + name_ + ")", context_) , path_to_table_symlinks(global_context.getPath() + "data/" + escapeForFileName(name_) + "/") , path_to_metadata_symlink(global_context.getPath() + "metadata/" + escapeForFileName(name_)) @@ -323,9 +323,9 @@ DatabaseAtomic::DetachedTables DatabaseAtomic::cleenupDetachedTables() return not_in_use; } -void DatabaseAtomic::assertCanBeDetached(bool cleenup) +void DatabaseAtomic::assertCanBeDetached(bool cleanup) { - if (cleenup) + if (cleanup) { DetachedTables not_in_use; { diff --git a/src/Databases/DatabaseAtomic.h b/src/Databases/DatabaseAtomic.h index 02c922f8b91..7ba5f26222d 100644 --- a/src/Databases/DatabaseAtomic.h +++ b/src/Databases/DatabaseAtomic.h @@ -21,7 +21,7 @@ class DatabaseAtomic : public DatabaseOrdinary { public: - DatabaseAtomic(String name_, String metadata_path_, UUID uuid, Context & context_); + DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const Context & context_); String getEngineName() const override { return "Atomic"; } UUID getUUID() const override { return db_uuid; } @@ -51,7 +51,7 @@ public: void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override; /// Atomic database cannot be detached if there is detached table which still in use - void assertCanBeDetached(bool cleenup); + void assertCanBeDetached(bool cleanup) override; UUID tryGetTableUUID(const String & table_name) const override; diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 1bf4db74bf0..6d00597fcf2 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -128,9 +128,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String if (engine_define->settings) materialize_mode_settings->loadFromQuery(*engine_define); - return std::make_shared( - context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client) - , std::move(materialize_mode_settings)); + if (create.uuid == UUIDHelpers::Nil) + return std::make_shared>( + context, database_name, metadata_path, uuid, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client) + , std::move(materialize_mode_settings)); + else + return std::make_shared>( + context, database_name, metadata_path, uuid, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client) + , std::move(materialize_mode_settings)); } return std::make_shared( diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp index a2a2dd992ed..2fa0737eb6e 100644 --- a/src/Databases/DatabaseOnDisk.cpp +++ b/src/Databases/DatabaseOnDisk.cpp @@ -397,7 +397,7 @@ void DatabaseOnDisk::iterateMetadataFiles(const Context & context, const Iterati { auto process_tmp_drop_metadata_file = [&](const String & file_name) { - assert(getEngineName() != "Atomic"); + assert(getUUID() == UUIDHelpers::Nil); static const char * tmp_drop_ext = ".sql.tmp_drop"; const std::string object_name = file_name.substr(0, file_name.size() - strlen(tmp_drop_ext)); if (Poco::File(context.getPath() + getDataPath() + '/' + object_name).exists()) diff --git a/src/Databases/DatabasesCommon.cpp b/src/Databases/DatabasesCommon.cpp index eadfa5f53c6..5420689e370 100644 --- a/src/Databases/DatabasesCommon.cpp +++ b/src/Databases/DatabasesCommon.cpp @@ -80,7 +80,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n auto table_id = res->getStorageID(); if (table_id.hasUUID()) { - assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic"); + assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getUUID() != UUIDHelpers::Nil); DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid); } @@ -105,7 +105,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c if (table_id.hasUUID()) { - assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic"); + assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getUUID() != UUIDHelpers::Nil); DatabaseCatalog::instance().addUUIDMapping(table_id.uuid, shared_from_this(), table); } } @@ -127,7 +127,7 @@ void DatabaseWithOwnTablesBase::shutdown() kv.second->shutdown(); if (table_id.hasUUID()) { - assert(getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic"); + assert(getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE || getUUID() != UUIDHelpers::Nil); DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid); } } diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index d82755a7bc8..47ba0ea190a 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -330,6 +330,8 @@ public: /// All tables and dictionaries should be detached before detaching the database. virtual bool shouldBeEmptyOnDetach() const { return true; } + virtual void assertCanBeDetached(bool /*cleanup*/) {} + /// Ask all tables to complete the background threads they are using and delete all table objects. virtual void shutdown() = 0; diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index b5231a23d7e..5cd8fb72615 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -8,6 +8,7 @@ # include # include +# include # include # include # include @@ -24,19 +25,42 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } -DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( - const Context & context, const String & database_name_, const String & metadata_path_, const IAST * database_engine_define_ +template<> +DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( + const Context & context, const String & database_name_, const String & metadata_path_, UUID /*uuid*/, const IAST * /*database_engine_define_*/ , const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) - : IDatabase(database_name_), global_context(context.getGlobalContext()), engine_define(database_engine_define_->clone()) - , nested_database(std::make_shared(database_name_, metadata_path_, context)) - , settings(std::move(settings_)), log(&Poco::Logger::get("DatabaseMaterializeMySQL")) + : DatabaseOrdinary(database_name_, metadata_path_, context) + //, global_context(context.getGlobalContext()) + //, engine_define(database_engine_define_->clone()) + //, nested_database(uuid == UUIDHelpers::Nil + // ? std::make_shared(database_name_, metadata_path_, context) + // : std::make_shared(database_name_, metadata_path_, uuid, context)) + , settings(std::move(settings_)) + //, log(&Poco::Logger::get("DatabaseMaterializeMySQL")) , materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get()) { } -void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const +template<> +DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( + const Context & context, const String & database_name_, const String & metadata_path_, UUID uuid, const IAST * /*database_engine_define_*/ + , const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) + : DatabaseAtomic(database_name_, metadata_path_, uuid, context) + //, global_context(context.getGlobalContext()) + //, engine_define(database_engine_define_->clone()) + //, nested_database(uuid == UUIDHelpers::Nil + // ? std::make_shared(database_name_, metadata_path_, context) + // : std::make_shared(database_name_, metadata_path_, uuid, context)) + , settings(std::move(settings_)) + //, log(&Poco::Logger::get("DatabaseMaterializeMySQL")) + , materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get()) { - std::unique_lock lock(mutex); +} + +template +void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const +{ + std::unique_lock lock(Base::mutex); if (!settings->allows_query_when_mysql_lost && exception) { @@ -51,124 +75,138 @@ void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const } } -void DatabaseMaterializeMySQL::setException(const std::exception_ptr & exception_) +template +void DatabaseMaterializeMySQL::setException(const std::exception_ptr & exception_) { - std::unique_lock lock(mutex); + std::unique_lock lock(Base::mutex); exception = exception_; } -ASTPtr DatabaseMaterializeMySQL::getCreateDatabaseQuery() const -{ - const auto & create_query = std::make_shared(); - create_query->database = database_name; - create_query->set(create_query->storage, engine_define); - return create_query; -} +//ASTPtr DatabaseMaterializeMySQL::getCreateDatabaseQuery() const +//{ +// const auto & create_query = std::make_shared(); +// create_query->database = database_name; +// create_query->uuid = nested_database->getUUID(); +// create_query->set(create_query->storage, engine_define); +// return create_query; +//} -void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) +template +void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) { + Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); try { - std::unique_lock lock(mutex); - nested_database->loadStoredObjects(context, has_force_restore_data_flag, force_attach); + //std::unique_lock lock(Base::mutex); + //Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); materialize_thread.startSynchronization(); + started_up = true; } catch (...) { - tryLogCurrentException(log, "Cannot load MySQL nested database stored objects."); + tryLogCurrentException(Base::log, "Cannot load MySQL nested database stored objects."); if (!force_attach) throw; } } -void DatabaseMaterializeMySQL::shutdown() +template +void DatabaseMaterializeMySQL::shutdown() { - materialize_thread.stopSynchronization(); + //materialize_thread.stopSynchronization(); + //started_up = false; - auto iterator = nested_database->getTablesIterator(global_context, {}); + Base::shutdown(); - /// We only shutdown the table, The tables is cleaned up when destructed database - for (; iterator->isValid(); iterator->next()) - iterator->table()->shutdown(); +// //FIXME +// //auto iterator = nested_database->getTablesIterator(global_context, {}); +// auto iterator = Base::getTablesIterator(Base::global_context, {}); +// +// /// We only shutdown the table, The tables is cleaned up when destructed database +// for (; iterator->isValid(); iterator->next()) +// iterator->table()->shutdown(); } -bool DatabaseMaterializeMySQL::empty() const +//bool DatabaseMaterializeMySQL::empty() const +//{ +// return nested_database->empty(); +//} +// +//String DatabaseMaterializeMySQL::getDataPath() const +//{ +// return nested_database->getDataPath(); +//} +// +//String DatabaseMaterializeMySQL::getMetadataPath() const +//{ +// return nested_database->getMetadataPath(); +//} +// +//String DatabaseMaterializeMySQL::getTableDataPath(const String & table_name) const +//{ +// return nested_database->getTableDataPath(table_name); +//} +// +//String DatabaseMaterializeMySQL::getTableDataPath(const ASTCreateQuery & query) const +//{ +// return nested_database->getTableDataPath(query); +//} +// +//String DatabaseMaterializeMySQL::getObjectMetadataPath(const String & table_name) const +//{ +// return nested_database->getObjectMetadataPath(table_name); +//} +// +//UUID DatabaseMaterializeMySQL::tryGetTableUUID(const String & table_name) const +//{ +// return nested_database->tryGetTableUUID(table_name); +//} +// +//time_t DatabaseMaterializeMySQL::getObjectMetadataModificationTime(const String & name) const +//{ +// return nested_database->getObjectMetadataModificationTime(name); +//} + +template +void DatabaseMaterializeMySQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) { - return nested_database->empty(); + assertCalledFromSyncThreadOrDrop("create table"); + + //nested_database->createTable(context, name, std::make_shared(std::move(table), this), query); + Base::createTable(context, name, table, query); } -String DatabaseMaterializeMySQL::getDataPath() const +template +void DatabaseMaterializeMySQL::dropTable(const Context & context, const String & name, bool no_delay) { - return nested_database->getDataPath(); + ///FIXME cannot be called from DROP DATABASE, so we need hack with shouldBeEmptyOnDetach = false + assertCalledFromSyncThreadOrDrop("drop table"); + + Base::dropTable(context, name, no_delay); } -String DatabaseMaterializeMySQL::getMetadataPath() const +template +void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) { - return nested_database->getMetadataPath(); + assertCalledFromSyncThreadOrDrop("attach table"); + + //nested_database->attachTable(name, std::make_shared(std::move(table), this), relative_table_path); + Base::attachTable(name, table, relative_table_path); } -String DatabaseMaterializeMySQL::getTableDataPath(const String & table_name) const +template +StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name) { - return nested_database->getTableDataPath(table_name); + assertCalledFromSyncThreadOrDrop("detach table"); + //return std::dynamic_pointer_cast(nested_database->detachTable(name)); + return Base::detachTable(name); } -String DatabaseMaterializeMySQL::getTableDataPath(const ASTCreateQuery & query) const +template +void DatabaseMaterializeMySQL::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) { - return nested_database->getTableDataPath(query); -} - -String DatabaseMaterializeMySQL::getObjectMetadataPath(const String & table_name) const -{ - return nested_database->getObjectMetadataPath(table_name); -} - -UUID DatabaseMaterializeMySQL::tryGetTableUUID(const String & table_name) const -{ - return nested_database->tryGetTableUUID(table_name); -} - -time_t DatabaseMaterializeMySQL::getObjectMetadataModificationTime(const String & name) const -{ - return nested_database->getObjectMetadataModificationTime(name); -} - -void DatabaseMaterializeMySQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) -{ - if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) - throw Exception("MaterializeMySQL database not support create table.", ErrorCodes::NOT_IMPLEMENTED); - - nested_database->createTable(context, name, table, query); -} - -void DatabaseMaterializeMySQL::dropTable(const Context & context, const String & name, bool no_delay) -{ - if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) - throw Exception("MaterializeMySQL database not support drop table.", ErrorCodes::NOT_IMPLEMENTED); - - nested_database->dropTable(context, name, no_delay); -} - -void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) -{ - if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) - throw Exception("MaterializeMySQL database not support attach table.", ErrorCodes::NOT_IMPLEMENTED); - - nested_database->attachTable(name, table, relative_table_path); -} - -StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name) -{ - if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) - throw Exception("MaterializeMySQL database not support detach table.", ErrorCodes::NOT_IMPLEMENTED); - - return nested_database->detachTable(name); -} - -void DatabaseMaterializeMySQL::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) -{ - if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) - throw Exception("MaterializeMySQL database not support rename table.", ErrorCodes::NOT_IMPLEMENTED); + assertCalledFromSyncThreadOrDrop("rename table"); if (exchange) throw Exception("MaterializeMySQL database not support exchange table.", ErrorCodes::NOT_IMPLEMENTED); @@ -176,78 +214,142 @@ void DatabaseMaterializeMySQL::renameTable(const Context & context, const String if (dictionary) throw Exception("MaterializeMySQL database not support rename dictionary.", ErrorCodes::NOT_IMPLEMENTED); - if (to_database.getDatabaseName() != getDatabaseName()) + if (to_database.getDatabaseName() != Base::getDatabaseName()) throw Exception("Cannot rename with other database for MaterializeMySQL database.", ErrorCodes::NOT_IMPLEMENTED); - nested_database->renameTable(context, name, *nested_database, to_name, exchange, dictionary); + Base::renameTable(context, name, *this, to_name, exchange, dictionary); } -void DatabaseMaterializeMySQL::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) +template +void DatabaseMaterializeMySQL::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) { - if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) - throw Exception("MaterializeMySQL database not support alter table.", ErrorCodes::NOT_IMPLEMENTED); - - nested_database->alterTable(context, table_id, metadata); + assertCalledFromSyncThreadOrDrop("alter table"); + Base::alterTable(context, table_id, metadata); } -bool DatabaseMaterializeMySQL::shouldBeEmptyOnDetach() const +//template +//bool DatabaseMaterializeMySQL::shouldBeEmptyOnDetach() const +//{ +// return false; /// FIXME +//} + +//template +//void DatabaseMaterializeMySQL::drop(const Context & context) +//{ +// /// FIXME +// if (Base::shouldBeEmptyOnDetach()) +// { +// for (auto iterator = Base::getTablesIterator(context, {}); iterator->isValid(); iterator->next()) +// { +// TableExclusiveLockHolder table_lock = iterator->table()->lockExclusively( +// context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); +// +// Base::dropTable(context, iterator->name(), true); +// } +// +// /// Remove metadata info +// Poco::File metadata(Base::getMetadataPath() + "/.metadata"); +// +// if (metadata.exists()) +// metadata.remove(false); +// } +// +// Base::drop(context); +//} + +template +void DatabaseMaterializeMySQL::drop(const Context & context) { - return false; + /// Remove metadata info + Poco::File metadata(Base::getMetadataPath() + "/.metadata"); + + if (metadata.exists()) + metadata.remove(false); + + Base::drop(context); } -void DatabaseMaterializeMySQL::drop(const Context & context) -{ - if (nested_database->shouldBeEmptyOnDetach()) - { - for (auto iterator = nested_database->getTablesIterator(context, {}); iterator->isValid(); iterator->next()) - { - TableExclusiveLockHolder table_lock = iterator->table()->lockExclusively( - context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); +//bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & context) const +//{ +// return nested_database->isTableExist(name, context); +//} - nested_database->dropTable(context, iterator->name(), true); - } - - /// Remove metadata info - Poco::File metadata(getMetadataPath() + "/.metadata"); - - if (metadata.exists()) - metadata.remove(false); - } - - nested_database->drop(context); -} - -bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & context) const -{ - return nested_database->isTableExist(name, context); -} - -StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const +template +StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const { if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) { - StoragePtr nested_storage = nested_database->tryGetTable(name, context); + using Storage = StorageMaterializeMySQL>; + StoragePtr nested_storage = Base::tryGetTable(name, context); if (!nested_storage) return {}; - return std::make_shared(std::move(nested_storage), this); + return std::make_shared(std::move(nested_storage), this); } - return nested_database->tryGetTable(name, context); + return Base::tryGetTable(name, context); + //auto table = nested_database->tryGetTable(name, context); + //if (table && MaterializeMySQLSyncThread::isMySQLSyncThread()) + // return typeid_cast(*table).getNested(); + //return table; } -DatabaseTablesIteratorPtr DatabaseMaterializeMySQL::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) +template +DatabaseTablesIteratorPtr DatabaseMaterializeMySQL::getTablesIterator(const Context & context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) { if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) { - DatabaseTablesIteratorPtr iterator = nested_database->getTablesIterator(context, filter_by_table_name); - return std::make_unique(std::move(iterator), this); + using TablesIterator = DatabaseMaterializeTablesIterator>; + DatabaseTablesIteratorPtr iterator = Base::getTablesIterator(context, filter_by_table_name); + return std::make_unique(std::move(iterator), this); } - return nested_database->getTablesIterator(context, filter_by_table_name); + return Base::getTablesIterator(context, filter_by_table_name); } +//void DatabaseMaterializeMySQL::assertCanBeDetached(bool cleanup) +//{ +// nested_database->assertCanBeDetached(cleanup); +//} + +template +void DatabaseMaterializeMySQL::assertCalledFromSyncThreadOrDrop(const char * method) const +{ + if (!MaterializeMySQLSyncThread::isMySQLSyncThread() && started_up) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializeMySQL database not support {}", method); +} + +template +void DatabaseMaterializeMySQL::shutdownSynchronizationThread() +{ + materialize_thread.stopSynchronization(); + started_up = false; +} + +void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception) +{ + if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) + return database_materialize->setException(exception); + if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) + return database_materialize->setException(exception); + + throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); +} + +void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db) +{ + if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) + return database_materialize->shutdownSynchronizationThread(); + if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) + return database_materialize->shutdownSynchronizationThread(); + + throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); +} + +template class DatabaseMaterializeMySQL; +template class DatabaseMaterializeMySQL; + } #endif diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index 799db65b481..0e43cd98028 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -6,9 +6,12 @@ #include #include -#include +//#include +#include +#include #include #include +//#include namespace DB { @@ -17,11 +20,14 @@ namespace DB * * All table structure and data will be written to the local file system */ -class DatabaseMaterializeMySQL : public IDatabase +template +class DatabaseMaterializeMySQL : public Base { public: + //using Storage = StorageMaterializeMySQL>; + DatabaseMaterializeMySQL( - const Context & context, const String & database_name_, const String & metadata_path_, + const Context & context, const String & database_name_, const String & metadata_path_, UUID uuid, const IAST * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_); @@ -29,35 +35,39 @@ public: void setException(const std::exception_ptr & exception); protected: - const Context & global_context; + //const Context & global_context; - ASTPtr engine_define; - DatabasePtr nested_database; + //ASTPtr engine_define; + //DatabasePtr nested_database; std::unique_ptr settings; - Poco::Logger * log; + //Poco::Logger * log; MaterializeMySQLSyncThread materialize_thread; std::exception_ptr exception; + std::atomic_bool started_up{false}; + public: String getEngineName() const override { return "MaterializeMySQL"; } - ASTPtr getCreateDatabaseQuery() const override; + //UUID getUUID() const override { return nested_database->getUUID(); } + + //ASTPtr getCreateDatabaseQuery() const override; void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override; void shutdown() override; - bool empty() const override; + //bool empty() const override; - String getDataPath() const override; + //String getDataPath() const override; - String getTableDataPath(const String & table_name) const override; + //String getTableDataPath(const String & table_name) const override; - String getTableDataPath(const ASTCreateQuery & query) const override; + //String getTableDataPath(const ASTCreateQuery & query) const override; - UUID tryGetTableUUID(const String & table_name) const override; + //UUID tryGetTableUUID(const String & table_name) const override; void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override; @@ -71,23 +81,32 @@ public: void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override; - time_t getObjectMetadataModificationTime(const String & name) const override; + //time_t getObjectMetadataModificationTime(const String & name) const override; - String getMetadataPath() const override; + //String getMetadataPath() const override; - String getObjectMetadataPath(const String & table_name) const override; + //String getObjectMetadataPath(const String & table_name) const override; - bool shouldBeEmptyOnDetach() const override; + //bool shouldBeEmptyOnDetach() const override; void drop(const Context & context) override; - bool isTableExist(const String & name, const Context & context) const override; + //bool isTableExist(const String & name, const Context & context) const override; StoragePtr tryGetTable(const String & name, const Context & context) const override; - DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override; + DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) override; + + //void assertCanBeDetached(bool cleanup) override; + + void assertCalledFromSyncThreadOrDrop(const char * method) const; + + void shutdownSynchronizationThread(); }; +void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception); +void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db); + } #endif diff --git a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h index 86a5cbf8206..d0fa4d112f8 100644 --- a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h +++ b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h @@ -13,8 +13,11 @@ namespace DB * When MySQLSync thread accesses, it always returns MergeTree * Other cases always convert MergeTree to StorageMaterializeMySQL */ +template class DatabaseMaterializeTablesIterator final : public IDatabaseTablesIterator { + //static_assert(std::is_same_v> || + // std::is_same_v>); public: void next() override { nested_iterator->next(); } @@ -24,13 +27,13 @@ public: const StoragePtr & table() const override { - StoragePtr storage = std::make_shared(nested_iterator->table(), database); + StoragePtr storage = std::make_shared>(nested_iterator->table(), database); return tables.emplace_back(storage); } UUID uuid() const override { return nested_iterator->uuid(); } - DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, DatabaseMaterializeMySQL * database_) + DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, DatabaseT * database_) : nested_iterator(std::move(nested_iterator_)), database(database_) { } @@ -38,8 +41,7 @@ public: private: mutable std::vector tables; DatabaseTablesIteratorPtr nested_iterator; - DatabaseMaterializeMySQL * database; - + DatabaseT * database; }; } diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 851ea351876..5dd3d6d3e5a 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -69,15 +69,27 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, Context & quer } } -static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name) -{ - DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name); +//static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name) +//{ +// DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name); +// +// if (DatabaseMaterializeMySQL * database_materialize = typeid_cast(database.get())) +// return *database_materialize; +// +// throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); +//} - if (DatabaseMaterializeMySQL * database_materialize = typeid_cast(database.get())) - return *database_materialize; - - throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); -} +//static inline void setSyncException(const String & database_name, const std::exception_ptr & exception) +//{ +// DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name); +// +// if (auto * database_materialize = typeid_cast *>(database.get())) +// return database_materialize->setException(exception); +// if (auto * database_materialize = typeid_cast *>(database.get())) +// return database_materialize->setException(exception); +// +// throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); +//} MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread() { @@ -196,7 +208,9 @@ void MaterializeMySQLSyncThread::synchronization(const String & mysql_version) catch (...) { tryLogCurrentException(log); - getDatabase(database_name).setException(std::current_exception()); + auto db = DatabaseCatalog::instance().getDatabase(database_name); + setSynchronizationThreadException(db, std::current_exception()); + //setSyncException(database_name, std::current_exception()); } } @@ -310,7 +324,7 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz opened_transaction = false; MaterializeMetadata metadata( - connection, getDatabase(database_name).getMetadataPath() + "/.metadata", + connection, DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction, mysql_version); if (!metadata.need_dumping_tables.empty()) diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 6153f6b52fb..79415ef4c65 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -14,6 +14,16 @@ #include #include +#if !defined(ARCADIA_BUILD) +# include "config_core.h" +#endif + +#if USE_MYSQL +# include +# include +# include +#endif + #include namespace DB @@ -196,6 +206,24 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( exception->emplace("Table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); return {}; } + +#if USE_MYSQL + /// It's definetly not the best place for this logic, but behaviour must be consistent with DatabaseMaterializeMySQL::tryGetTable(...) + if (db_and_table.first->getEngineName() == "MaterializeMySQL") + { + if (MaterializeMySQLSyncThread::isMySQLSyncThread()) + return db_and_table; + + //db_and_table.second = std::make_shared(std::move(db_and_table.second), mysql); + if (auto * mysql_ordinary = typeid_cast *>(db_and_table.first.get())) + db_and_table.second = std::make_shared>>(std::move(db_and_table.second), mysql_ordinary); + else if (auto * mysql_atomic = typeid_cast *>(db_and_table.first.get())) + db_and_table.second = std::make_shared>>(std::move(db_and_table.second), mysql_atomic); + else + throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); + return db_and_table; + } +#endif return db_and_table; } @@ -265,7 +293,6 @@ void DatabaseCatalog::attachDatabase(const String & database_name, const Databas assertDatabaseDoesntExistUnlocked(database_name); databases.emplace(database_name, database); UUID db_uuid = database->getUUID(); - assert((db_uuid != UUIDHelpers::Nil) ^ (dynamic_cast(database.get()) == nullptr)); if (db_uuid != UUIDHelpers::Nil) db_uuid_map.emplace(db_uuid, database); } @@ -292,9 +319,8 @@ DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool d if (!db->empty()) throw Exception("New table appeared in database being dropped or detached. Try again.", ErrorCodes::DATABASE_NOT_EMPTY); - auto * database_atomic = typeid_cast(db.get()); - if (!drop && database_atomic) - database_atomic->assertCanBeDetached(false); + if (!drop) + db->assertCanBeDetached(false); } catch (...) { @@ -411,9 +437,24 @@ DatabasePtr DatabaseCatalog::getSystemDatabase() const return getDatabase(SYSTEM_DATABASE); } +//namespace +//{ +// +//void addWrappersIfNeed(DatabasePtr & database) +//{ +// /// FIXME IDatabase::attachTable(...) and similar methods are called from the nested database of MaterializeMySQL, so we need such hacks +// if (MaterializeMySQLSyncThread::isMySQLSyncThread()) +// { +// database = DatabaseCatalog::instance().getDatabase(database->getUUID()); +// } +//} +// +//} + void DatabaseCatalog::addUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table) { assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); + //addWrappersIfNeed(database); UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; std::lock_guard lock{map_part.mutex}; auto [_, inserted] = map_part.map.try_emplace(uuid, std::move(database), std::move(table)); @@ -433,6 +474,7 @@ void DatabaseCatalog::removeUUIDMapping(const UUID & uuid) void DatabaseCatalog::updateUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table) { assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); + //addWrappersIfNeed(database); UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; std::lock_guard lock{map_part.mutex}; auto it = map_part.map.find(uuid); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 06973ab029b..cea328a8dba 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -150,6 +150,35 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) if (!create.attach && fs::exists(metadata_path)) throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path); } + else if (create.storage->engine->name == "MaterializeMySQL") + { + /// It creates nested database with Ordinary or Atomic engine depending on UUID in query and default engine setting. + /// Do nothing if it's an internal ATTACH on server startup or short-syntax ATTACH query from user, + /// because we got correct query from the metadata file in this case. + /// If we got query from user, then normalize it first. + bool attach_from_user = create.attach && !internal && !create.attach_short_syntax; + bool create_from_user = !create.attach; + + if (create_from_user) + { + auto & default_engine = context.getSettingsRef().default_database_engine.value; + if (create.uuid == UUIDHelpers::Nil && default_engine == DefaultDatabaseEngine::Atomic) + create.uuid = UUIDHelpers::generateV4(); /// Will enable Atomic engine for nested database + } + else if (attach_from_user && create.uuid == UUIDHelpers::Nil) + { + /// Ambiguity is possible: should we attach nested database as Ordinary + /// or throw "UUID must be specified" for Atomic? So we suggest short syntax for Ordinary. + throw Exception("Use short attach syntax ('ATTACH DATABASE name;' without engine) to attach existing database " + "or specify UUID to attach new database with Atomic engine", ErrorCodes::INCORRECT_QUERY); + } + + /// Set metadata path according to nested engine + if (create.uuid == UUIDHelpers::Nil) + metadata_path = metadata_path / "metadata" / database_name_escaped; + else + metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid); + } else { bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY; @@ -217,7 +246,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) { if (renamed) { - [[maybe_unused]] bool removed = fs::remove(metadata_file_tmp_path); + [[maybe_unused]] bool removed = fs::remove(metadata_file_path); assert(removed); } if (added) @@ -628,7 +657,7 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data const auto * kind = create.is_dictionary ? "Dictionary" : "Table"; const auto * kind_upper = create.is_dictionary ? "DICTIONARY" : "TABLE"; - if (database->getEngineName() == "Atomic") + if (database->getUUID() != UUIDHelpers::Nil) { if (create.attach && create.uuid == UUIDHelpers::Nil) throw Exception(ErrorCodes::INCORRECT_QUERY, diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index dec6a275872..a3c091e73c9 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include namespace DB @@ -94,7 +94,7 @@ BlockIO InterpreterDropQuery::executeToTable( context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id); table->shutdown(); TableExclusiveLockHolder table_lock; - if (database->getEngineName() != "Atomic") + if (database->getUUID() == UUIDHelpers::Nil) table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); /// Drop table from memory, don't touch data and metadata database->detachTable(table_id.table_name); @@ -117,7 +117,7 @@ BlockIO InterpreterDropQuery::executeToTable( table->shutdown(); TableExclusiveLockHolder table_lock; - if (database->getEngineName() != "Atomic") + if (database->getUUID() == UUIDHelpers::Nil) table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); database->dropTable(context, table_id.table_name, query.no_delay); @@ -222,6 +222,9 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS bool drop = kind == ASTDropQuery::Kind::Drop; context.checkAccess(AccessType::DROP_DATABASE, database_name); + if (database->getEngineName() == "MaterializeMySQL") + stopDatabaseSynchronization(database); + if (database->shouldBeEmptyOnDetach()) { /// DETACH or DROP all tables and dictionaries inside database. @@ -246,9 +249,8 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS /// Protects from concurrent CREATE TABLE queries auto db_guard = DatabaseCatalog::instance().getExclusiveDDLGuardForDatabase(database_name); - auto * database_atomic = typeid_cast(database.get()); - if (!drop && database_atomic) - database_atomic->assertCanBeDetached(true); + if (!drop) + database->assertCanBeDetached(true); /// DETACH or DROP database itself DatabaseCatalog::instance().detachDatabase(database_name, drop, database->shouldBeEmptyOnDetach()); diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp index 7d908736bdc..d1fa4c0c35d 100644 --- a/src/Storages/StorageMaterializeMySQL.cpp +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -19,10 +19,13 @@ #include #include +#include + namespace DB { -StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_) +template +StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseT * database_) : IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_) { auto nested_memory_metadata = nested_storage->getInMemoryMetadata(); @@ -31,7 +34,8 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora setInMemoryMetadata(in_memory_metadata); } -Pipe StorageMaterializeMySQL::read( +template +Pipe StorageMaterializeMySQL::read( const Names & column_names, const StorageMetadataPtr & /*metadata_snapshot*/, const SelectQueryInfo & query_info, @@ -98,13 +102,18 @@ Pipe StorageMaterializeMySQL::read( return pipe; } -NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const +template +NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const { /// If the background synchronization thread has exception. database->rethrowExceptionIfNeed(); return nested_storage->getVirtuals(); } +template class StorageMaterializeMySQL>; +template class StorageMaterializeMySQL>; + + } #endif diff --git a/src/Storages/StorageMaterializeMySQL.h b/src/Storages/StorageMaterializeMySQL.h index 4278ce64bd7..d689d30314b 100644 --- a/src/Storages/StorageMaterializeMySQL.h +++ b/src/Storages/StorageMaterializeMySQL.h @@ -5,21 +5,23 @@ #if USE_MYSQL #include -#include namespace DB { -class StorageMaterializeMySQL final : public ext::shared_ptr_helper, public IStorage +template +class StorageMaterializeMySQL final : public ext::shared_ptr_helper>, public IStorage { - friend struct ext::shared_ptr_helper; + //static_assert(std::is_same_v> || + // std::is_same_v>); + friend struct ext::shared_ptr_helper>; public: String getName() const override { return "MaterializeMySQL"; } bool supportsFinal() const override { return nested_storage->supportsFinal(); } bool supportsSampling() const override { return nested_storage->supportsSampling(); } - StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_); + StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseT * database_); Pipe read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, @@ -27,9 +29,13 @@ public: NamesAndTypesList getVirtuals() const override; + StoragePtr getNested() const { return nested_storage; } + + void drop() override { nested_storage->drop(); } + private: StoragePtr nested_storage; - const DatabaseMaterializeMySQL * database; + const DatabaseT * database; }; } diff --git a/tests/integration/test_materialize_mysql_database/configs/users.xml b/tests/integration/test_materialize_mysql_database/configs/users.xml index f6df1c30fc4..01becae7c02 100644 --- a/tests/integration/test_materialize_mysql_database/configs/users.xml +++ b/tests/integration/test_materialize_mysql_database/configs/users.xml @@ -4,7 +4,9 @@ 1 1 + Atomic + diff --git a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py index 37e204aae48..b643221c646 100644 --- a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py @@ -20,25 +20,28 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'") # existed before the mapping was created - mysql_node.query("CREATE TABLE test_database.test_table_1 (" - "`key` INT NOT NULL PRIMARY KEY, " - "unsigned_tiny_int TINYINT UNSIGNED, tiny_int TINYINT, " - "unsigned_small_int SMALLINT UNSIGNED, small_int SMALLINT, " - "unsigned_medium_int MEDIUMINT UNSIGNED, medium_int MEDIUMINT, " - "unsigned_int INT UNSIGNED, _int INT, " - "unsigned_integer INTEGER UNSIGNED, _integer INTEGER, " - "unsigned_bigint BIGINT UNSIGNED, _bigint BIGINT, " - "/* Need ClickHouse support read mysql decimal unsigned_decimal DECIMAL(19, 10) UNSIGNED, _decimal DECIMAL(19, 10), */" - "unsigned_float FLOAT UNSIGNED, _float FLOAT, " - "unsigned_double DOUBLE UNSIGNED, _double DOUBLE, " - "_varchar VARCHAR(10), _char CHAR(10), " - "/* Need ClickHouse support Enum('a', 'b', 'v') _enum ENUM('a', 'b', 'c'), */" - "_date Date, _datetime DateTime, _timestamp TIMESTAMP, _bool BOOLEAN) ENGINE = InnoDB;") + mysql_node.query(""" + CREATE TABLE test_database.test_table_1 ( + `key` INT NOT NULL PRIMARY KEY, + unsigned_tiny_int TINYINT UNSIGNED, tiny_int TINYINT, + unsigned_small_int SMALLINT UNSIGNED, small_int SMALLINT, + unsigned_medium_int MEDIUMINT UNSIGNED, medium_int MEDIUMINT, + unsigned_int INT UNSIGNED, _int INT, + unsigned_integer INTEGER UNSIGNED, _integer INTEGER, + unsigned_bigint BIGINT UNSIGNED, _bigint BIGINT, + /* Need ClickHouse support read mysql decimal unsigned_decimal DECIMAL(19, 10) UNSIGNED, _decimal DECIMAL(19, 10), */ + unsigned_float FLOAT UNSIGNED, _float FLOAT, + unsigned_double DOUBLE UNSIGNED, _double DOUBLE, + _varchar VARCHAR(10), _char CHAR(10), + /* Need ClickHouse support Enum('a', 'b', 'v') _enum ENUM('a', 'b', 'c'), */ + _date Date, _datetime DateTime, _timestamp TIMESTAMP, _bool BOOLEAN) ENGINE = InnoDB; + """) # it already has some data - mysql_node.query( - "INSERT INTO test_database.test_table_1 VALUES(1, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', " - "'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', true);") + mysql_node.query(""" + INSERT INTO test_database.test_table_1 VALUES(1, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', + '2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', true); + """) clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name)) @@ -48,9 +51,10 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam "1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t" "2020-01-01 00:00:00\t2020-01-01 00:00:00\t1\n") - mysql_node.query( - "INSERT INTO test_database.test_table_1 VALUES(2, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', " - "'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', false);") + mysql_node.query(""" + INSERT INTO test_database.test_table_1 VALUES(2, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', + '2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', false); + """) check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV", "1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t" @@ -59,11 +63,13 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam mysql_node.query("UPDATE test_database.test_table_1 SET unsigned_tiny_int = 2 WHERE `key` = 1") - check_query(clickhouse_node, "SELECT key, unsigned_tiny_int, tiny_int, unsigned_small_int," - " small_int, unsigned_medium_int, medium_int, unsigned_int, _int, unsigned_integer, _integer, " - " unsigned_bigint, _bigint, unsigned_float, _float, unsigned_double, _double, _varchar, _char, " - " _date, _datetime, /* exclude it, because ON UPDATE CURRENT_TIMESTAMP _timestamp, */ " - " _bool FROM test_database.test_table_1 ORDER BY key FORMAT TSV", + check_query(clickhouse_node, """ + SELECT key, unsigned_tiny_int, tiny_int, unsigned_small_int, + small_int, unsigned_medium_int, medium_int, unsigned_int, _int, unsigned_integer, _integer, + unsigned_bigint, _bigint, unsigned_float, _float, unsigned_double, _double, _varchar, _char, + _date, _datetime, /* exclude it, because ON UPDATE CURRENT_TIMESTAMP _timestamp, */ + _bool FROM test_database.test_table_1 ORDER BY key FORMAT TSV + """, "1\t2\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t" "2020-01-01 00:00:00\t1\n2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\t" "varchar\tchar\t2020-01-01\t2020-01-01 00:00:00\t0\n") diff --git a/tests/integration/test_materialize_mysql_database/test.py b/tests/integration/test_materialize_mysql_database/test.py index c00b310436d..2799b793847 100644 --- a/tests/integration/test_materialize_mysql_database/test.py +++ b/tests/integration/test_materialize_mysql_database/test.py @@ -106,7 +106,7 @@ def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_ materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") except: - print(clickhouse_node.query("select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw")) + #print(clickhouse_node.query("select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw")) raise From 0ecf8e595f40205259524482aebdcfc0170d8735 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 15 Sep 2020 16:30:30 +0300 Subject: [PATCH 2/8] cleanup code, add test --- src/Databases/DatabaseAtomic.cpp | 9 +- src/Databases/DatabaseAtomic.h | 2 +- src/Databases/DatabaseFactory.cpp | 36 ++-- .../MySQL/DatabaseMaterializeMySQL.cpp | 195 ++++-------------- .../MySQL/DatabaseMaterializeMySQL.h | 42 +--- .../MySQL/DatabaseMaterializeTablesIterator.h | 10 +- .../MySQL/MaterializeMySQLSyncThread.cpp | 22 -- src/Interpreters/DatabaseCatalog.cpp | 31 +-- src/Storages/StorageMaterializeMySQL.cpp | 17 +- src/Storages/StorageMaterializeMySQL.h | 11 +- .../configs/users.xml | 4 +- .../configs/users_db_atomic.xml | 19 ++ .../test_materialize_mysql_database/test.py | 45 ++-- 13 files changed, 124 insertions(+), 319 deletions(-) create mode 100644 tests/integration/test_materialize_mysql_database/configs/users_db_atomic.xml diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 4591771697b..159172fff39 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -35,8 +35,8 @@ public: }; -DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const Context & context_) - : DatabaseOrdinary(name_, std::move(metadata_path_), "store/", "DatabaseAtomic (" + name_ + ")", context_) +DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, const Context & context_) + : DatabaseOrdinary(name_, std::move(metadata_path_), "store/", logger_name, context_) , path_to_table_symlinks(global_context.getPath() + "data/" + escapeForFileName(name_) + "/") , path_to_metadata_symlink(global_context.getPath() + "metadata/" + escapeForFileName(name_)) , db_uuid(uuid) @@ -46,6 +46,11 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, c tryCreateMetadataSymlink(); } +DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const Context & context_) + : DatabaseAtomic(name_, std::move(metadata_path_), uuid, "DatabaseAtomic (" + name_ + ")", context_) +{ +} + String DatabaseAtomic::getTableDataPath(const String & table_name) const { std::lock_guard lock(mutex); diff --git a/src/Databases/DatabaseAtomic.h b/src/Databases/DatabaseAtomic.h index 7ba5f26222d..d446eb31d27 100644 --- a/src/Databases/DatabaseAtomic.h +++ b/src/Databases/DatabaseAtomic.h @@ -20,7 +20,7 @@ namespace DB class DatabaseAtomic : public DatabaseOrdinary { public: - + DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, const Context & context_); DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const Context & context_); String getEngineName() const override { return "Atomic"; } diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 6d00597fcf2..c7347adcaaa 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -119,27 +119,27 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306); auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port); - if (engine_name == "MaterializeMySQL") + if (engine_name == "MySQL") { - MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password); - - auto materialize_mode_settings = std::make_unique(); - - if (engine_define->settings) - materialize_mode_settings->loadFromQuery(*engine_define); - - if (create.uuid == UUIDHelpers::Nil) - return std::make_shared>( - context, database_name, metadata_path, uuid, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client) - , std::move(materialize_mode_settings)); - else - return std::make_shared>( - context, database_name, metadata_path, uuid, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client) - , std::move(materialize_mode_settings)); + return std::make_shared( + context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); } - return std::make_shared( - context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); + MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password); + + auto materialize_mode_settings = std::make_unique(); + + if (engine_define->settings) + materialize_mode_settings->loadFromQuery(*engine_define); + + if (create.uuid == UUIDHelpers::Nil) + return std::make_shared>( + context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client) + , std::move(materialize_mode_settings)); + else + return std::make_shared>( + context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client) + , std::move(materialize_mode_settings)); } catch (...) { diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 5cd8fb72615..68435393883 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -27,32 +27,24 @@ namespace ErrorCodes template<> DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( - const Context & context, const String & database_name_, const String & metadata_path_, UUID /*uuid*/, const IAST * /*database_engine_define_*/ - , const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) - : DatabaseOrdinary(database_name_, metadata_path_, context) - //, global_context(context.getGlobalContext()) - //, engine_define(database_engine_define_->clone()) - //, nested_database(uuid == UUIDHelpers::Nil - // ? std::make_shared(database_name_, metadata_path_, context) - // : std::make_shared(database_name_, metadata_path_, uuid, context)) + const Context & context, const String & database_name_, const String & metadata_path_, UUID /*uuid*/, + const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) + : DatabaseOrdinary(database_name_ + , metadata_path_ + , "data/" + escapeForFileName(database_name_) + "/" + , "DatabaseMaterializeMySQL (" + database_name_ + ")", context + ) , settings(std::move(settings_)) - //, log(&Poco::Logger::get("DatabaseMaterializeMySQL")) , materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get()) { } template<> DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( - const Context & context, const String & database_name_, const String & metadata_path_, UUID uuid, const IAST * /*database_engine_define_*/ - , const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) - : DatabaseAtomic(database_name_, metadata_path_, uuid, context) - //, global_context(context.getGlobalContext()) - //, engine_define(database_engine_define_->clone()) - //, nested_database(uuid == UUIDHelpers::Nil - // ? std::make_shared(database_name_, metadata_path_, context) - // : std::make_shared(database_name_, metadata_path_, uuid, context)) + const Context & context, const String & database_name_, const String & metadata_path_, UUID uuid, + const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) + : DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializeMySQL (" + database_name_ + ")", context) , settings(std::move(settings_)) - //, log(&Poco::Logger::get("DatabaseMaterializeMySQL")) , materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get()) { } @@ -64,14 +56,7 @@ void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const if (!settings->allows_query_when_mysql_lost && exception) { - try - { - std::rethrow_exception(exception); - } - catch (Exception & ex) - { - throw Exception(ex); - } + std::rethrow_exception(exception); } } @@ -82,23 +67,12 @@ void DatabaseMaterializeMySQL::setException(const std::exception_ptr & exc exception = exception_; } -//ASTPtr DatabaseMaterializeMySQL::getCreateDatabaseQuery() const -//{ -// const auto & create_query = std::make_shared(); -// create_query->database = database_name; -// create_query->uuid = nested_database->getUUID(); -// create_query->set(create_query->storage, engine_define); -// return create_query; -//} - template void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) { Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); try { - //std::unique_lock lock(Base::mutex); - //Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); materialize_thread.startSynchronization(); started_up = true; } @@ -111,78 +85,17 @@ void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool h } } -template -void DatabaseMaterializeMySQL::shutdown() -{ - //materialize_thread.stopSynchronization(); - //started_up = false; - - Base::shutdown(); - -// //FIXME -// //auto iterator = nested_database->getTablesIterator(global_context, {}); -// auto iterator = Base::getTablesIterator(Base::global_context, {}); -// -// /// We only shutdown the table, The tables is cleaned up when destructed database -// for (; iterator->isValid(); iterator->next()) -// iterator->table()->shutdown(); -} - -//bool DatabaseMaterializeMySQL::empty() const -//{ -// return nested_database->empty(); -//} -// -//String DatabaseMaterializeMySQL::getDataPath() const -//{ -// return nested_database->getDataPath(); -//} -// -//String DatabaseMaterializeMySQL::getMetadataPath() const -//{ -// return nested_database->getMetadataPath(); -//} -// -//String DatabaseMaterializeMySQL::getTableDataPath(const String & table_name) const -//{ -// return nested_database->getTableDataPath(table_name); -//} -// -//String DatabaseMaterializeMySQL::getTableDataPath(const ASTCreateQuery & query) const -//{ -// return nested_database->getTableDataPath(query); -//} -// -//String DatabaseMaterializeMySQL::getObjectMetadataPath(const String & table_name) const -//{ -// return nested_database->getObjectMetadataPath(table_name); -//} -// -//UUID DatabaseMaterializeMySQL::tryGetTableUUID(const String & table_name) const -//{ -// return nested_database->tryGetTableUUID(table_name); -//} -// -//time_t DatabaseMaterializeMySQL::getObjectMetadataModificationTime(const String & name) const -//{ -// return nested_database->getObjectMetadataModificationTime(name); -//} - template void DatabaseMaterializeMySQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) { assertCalledFromSyncThreadOrDrop("create table"); - - //nested_database->createTable(context, name, std::make_shared(std::move(table), this), query); Base::createTable(context, name, table, query); } template void DatabaseMaterializeMySQL::dropTable(const Context & context, const String & name, bool no_delay) { - ///FIXME cannot be called from DROP DATABASE, so we need hack with shouldBeEmptyOnDetach = false assertCalledFromSyncThreadOrDrop("drop table"); - Base::dropTable(context, name, no_delay); } @@ -190,8 +103,6 @@ template void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) { assertCalledFromSyncThreadOrDrop("attach table"); - - //nested_database->attachTable(name, std::make_shared(std::move(table), this), relative_table_path); Base::attachTable(name, table, relative_table_path); } @@ -199,7 +110,6 @@ template StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name) { assertCalledFromSyncThreadOrDrop("detach table"); - //return std::dynamic_pointer_cast(nested_database->detachTable(name)); return Base::detachTable(name); } @@ -227,36 +137,6 @@ void DatabaseMaterializeMySQL::alterTable(const Context & context, const S Base::alterTable(context, table_id, metadata); } -//template -//bool DatabaseMaterializeMySQL::shouldBeEmptyOnDetach() const -//{ -// return false; /// FIXME -//} - -//template -//void DatabaseMaterializeMySQL::drop(const Context & context) -//{ -// /// FIXME -// if (Base::shouldBeEmptyOnDetach()) -// { -// for (auto iterator = Base::getTablesIterator(context, {}); iterator->isValid(); iterator->next()) -// { -// TableExclusiveLockHolder table_lock = iterator->table()->lockExclusively( -// context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); -// -// Base::dropTable(context, iterator->name(), true); -// } -// -// /// Remove metadata info -// Poco::File metadata(Base::getMetadataPath() + "/.metadata"); -// -// if (metadata.exists()) -// metadata.remove(false); -// } -// -// Base::drop(context); -//} - template void DatabaseMaterializeMySQL::drop(const Context & context) { @@ -269,30 +149,20 @@ void DatabaseMaterializeMySQL::drop(const Context & context) Base::drop(context); } -//bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & context) const -//{ -// return nested_database->isTableExist(name, context); -//} - template StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const { if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) { - using Storage = StorageMaterializeMySQL>; StoragePtr nested_storage = Base::tryGetTable(name, context); if (!nested_storage) return {}; - return std::make_shared(std::move(nested_storage), this); + return std::make_shared(std::move(nested_storage), this); } return Base::tryGetTable(name, context); - //auto table = nested_database->tryGetTable(name, context); - //if (table && MaterializeMySQLSyncThread::isMySQLSyncThread()) - // return typeid_cast(*table).getNested(); - //return table; } template @@ -300,19 +170,13 @@ DatabaseTablesIteratorPtr DatabaseMaterializeMySQL::getTablesIterator(cons { if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) { - using TablesIterator = DatabaseMaterializeTablesIterator>; DatabaseTablesIteratorPtr iterator = Base::getTablesIterator(context, filter_by_table_name); - return std::make_unique(std::move(iterator), this); + return std::make_unique(std::move(iterator), this); } return Base::getTablesIterator(context, filter_by_table_name); } -//void DatabaseMaterializeMySQL::assertCanBeDetached(bool cleanup) -//{ -// nested_database->assertCanBeDetached(cleanup); -//} - template void DatabaseMaterializeMySQL::assertCalledFromSyncThreadOrDrop(const char * method) const { @@ -327,24 +191,37 @@ void DatabaseMaterializeMySQL::shutdownSynchronizationThread() started_up = false; } -void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception) +template class Helper, typename... Args> +auto castToMaterializeMySQLAndCallHelper(Database * database, Args && ... args) { - if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) - return database_materialize->setException(exception); - if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) - return database_materialize->setException(exception); + using Ordinary = DatabaseMaterializeMySQL; + using Atomic = DatabaseMaterializeMySQL; + using ToOrdinary = typename std::conditional_t, const Ordinary *, Ordinary *>; + using ToAtomic = typename std::conditional_t, const Atomic *, Atomic *>; + if (auto * database_materialize = typeid_cast(database)) + return (database_materialize->*Helper::v)(std::forward(args)...); + if (auto * database_materialize = typeid_cast(database)) + return (database_materialize->*Helper::v)(std::forward(args)...); throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); } +template struct HelperSetException { static constexpr auto v = &T::setException; }; +void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception) +{ + castToMaterializeMySQLAndCallHelper(materialize_mysql_db.get(), exception); +} + +template struct HelperStopSync { static constexpr auto v = &T::shutdownSynchronizationThread; }; void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db) { - if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) - return database_materialize->shutdownSynchronizationThread(); - if (auto * database_materialize = typeid_cast *>(materialize_mysql_db.get())) - return database_materialize->shutdownSynchronizationThread(); + castToMaterializeMySQLAndCallHelper(materialize_mysql_db.get()); +} - throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); +template struct HelperRethrow { static constexpr auto v = &T::rethrowExceptionIfNeed; }; +void rethrowSyncExceptionIfNeed(const IDatabase * materialize_mysql_db) +{ + castToMaterializeMySQLAndCallHelper(materialize_mysql_db); } template class DatabaseMaterializeMySQL; diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index 0e43cd98028..e1229269a33 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -6,12 +6,9 @@ #include #include -//#include -#include -#include +#include #include #include -//#include namespace DB { @@ -24,24 +21,19 @@ template class DatabaseMaterializeMySQL : public Base { public: - //using Storage = StorageMaterializeMySQL>; DatabaseMaterializeMySQL( const Context & context, const String & database_name_, const String & metadata_path_, UUID uuid, - const IAST * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, + const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_); void rethrowExceptionIfNeed() const; void setException(const std::exception_ptr & exception); protected: - //const Context & global_context; - //ASTPtr engine_define; - //DatabasePtr nested_database; std::unique_ptr settings; - //Poco::Logger * log; MaterializeMySQLSyncThread materialize_thread; std::exception_ptr exception; @@ -51,24 +43,8 @@ protected: public: String getEngineName() const override { return "MaterializeMySQL"; } - //UUID getUUID() const override { return nested_database->getUUID(); } - - //ASTPtr getCreateDatabaseQuery() const override; - void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override; - void shutdown() override; - - //bool empty() const override; - - //String getDataPath() const override; - - //String getTableDataPath(const String & table_name) const override; - - //String getTableDataPath(const ASTCreateQuery & query) const override; - - //UUID tryGetTableUUID(const String & table_name) const override; - void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override; void dropTable(const Context & context, const String & name, bool no_delay) override; @@ -81,31 +57,21 @@ public: void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override; - //time_t getObjectMetadataModificationTime(const String & name) const override; - - //String getMetadataPath() const override; - - //String getObjectMetadataPath(const String & table_name) const override; - - //bool shouldBeEmptyOnDetach() const override; - void drop(const Context & context) override; - //bool isTableExist(const String & name, const Context & context) const override; - StoragePtr tryGetTable(const String & name, const Context & context) const override; DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) override; - //void assertCanBeDetached(bool cleanup) override; - void assertCalledFromSyncThreadOrDrop(const char * method) const; void shutdownSynchronizationThread(); }; + void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception); void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db); +void rethrowSyncExceptionIfNeed(const IDatabase * materialize_mysql_db); } diff --git a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h index d0fa4d112f8..54031de40a2 100644 --- a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h +++ b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h @@ -2,7 +2,6 @@ #include #include -#include namespace DB { @@ -13,11 +12,8 @@ namespace DB * When MySQLSync thread accesses, it always returns MergeTree * Other cases always convert MergeTree to StorageMaterializeMySQL */ -template class DatabaseMaterializeTablesIterator final : public IDatabaseTablesIterator { - //static_assert(std::is_same_v> || - // std::is_same_v>); public: void next() override { nested_iterator->next(); } @@ -27,13 +23,13 @@ public: const StoragePtr & table() const override { - StoragePtr storage = std::make_shared>(nested_iterator->table(), database); + StoragePtr storage = std::make_shared(nested_iterator->table(), database); return tables.emplace_back(storage); } UUID uuid() const override { return nested_iterator->uuid(); } - DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, DatabaseT * database_) + DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, const IDatabase * database_) : nested_iterator(std::move(nested_iterator_)), database(database_) { } @@ -41,7 +37,7 @@ public: private: mutable std::vector tables; DatabaseTablesIteratorPtr nested_iterator; - DatabaseT * database; + const IDatabase * database; }; } diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 0e18a2bc721..708262a3b61 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -69,27 +69,6 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, Context & quer } } -//static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name) -//{ -// DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name); -// -// if (DatabaseMaterializeMySQL * database_materialize = typeid_cast(database.get())) -// return *database_materialize; -// -// throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); -//} - -//static inline void setSyncException(const String & database_name, const std::exception_ptr & exception) -//{ -// DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name); -// -// if (auto * database_materialize = typeid_cast *>(database.get())) -// return database_materialize->setException(exception); -// if (auto * database_materialize = typeid_cast *>(database.get())) -// return database_materialize->setException(exception); -// -// throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); -//} MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread() { @@ -211,7 +190,6 @@ void MaterializeMySQLSyncThread::synchronization(const String & mysql_version) tryLogCurrentException(log); auto db = DatabaseCatalog::instance().getDatabase(database_name); setSynchronizationThreadException(db, std::current_exception()); - //setSyncException(database_name, std::current_exception()); } } diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 720a2177b65..a7d3bfa2ec1 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -20,7 +20,6 @@ #if USE_MYSQL # include -# include # include #endif @@ -208,19 +207,11 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( } #if USE_MYSQL - /// It's definetly not the best place for this logic, but behaviour must be consistent with DatabaseMaterializeMySQL::tryGetTable(...) + /// It's definitely not the best place for this logic, but behaviour must be consistent with DatabaseMaterializeMySQL::tryGetTable(...) if (db_and_table.first->getEngineName() == "MaterializeMySQL") { - if (MaterializeMySQLSyncThread::isMySQLSyncThread()) - return db_and_table; - - //db_and_table.second = std::make_shared(std::move(db_and_table.second), mysql); - if (auto * mysql_ordinary = typeid_cast *>(db_and_table.first.get())) - db_and_table.second = std::make_shared>>(std::move(db_and_table.second), mysql_ordinary); - else if (auto * mysql_atomic = typeid_cast *>(db_and_table.first.get())) - db_and_table.second = std::make_shared>>(std::move(db_and_table.second), mysql_atomic); - else - throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR); + if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) + db_and_table.second = std::make_shared(std::move(db_and_table.second), db_and_table.first.get()); return db_and_table; } #endif @@ -437,24 +428,9 @@ DatabasePtr DatabaseCatalog::getSystemDatabase() const return getDatabase(SYSTEM_DATABASE); } -//namespace -//{ -// -//void addWrappersIfNeed(DatabasePtr & database) -//{ -// /// FIXME IDatabase::attachTable(...) and similar methods are called from the nested database of MaterializeMySQL, so we need such hacks -// if (MaterializeMySQLSyncThread::isMySQLSyncThread()) -// { -// database = DatabaseCatalog::instance().getDatabase(database->getUUID()); -// } -//} -// -//} - void DatabaseCatalog::addUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table) { assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); - //addWrappersIfNeed(database); UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; std::lock_guard lock{map_part.mutex}; auto [_, inserted] = map_part.map.try_emplace(uuid, std::move(database), std::move(table)); @@ -474,7 +450,6 @@ void DatabaseCatalog::removeUUIDMapping(const UUID & uuid) void DatabaseCatalog::updateUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table) { assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); - //addWrappersIfNeed(database); UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; std::lock_guard lock{map_part.mutex}; auto it = map_part.map.find(uuid); diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp index d1fa4c0c35d..7e0fadd9484 100644 --- a/src/Storages/StorageMaterializeMySQL.cpp +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -24,8 +24,7 @@ namespace DB { -template -StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseT * database_) +StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_) : IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_) { auto nested_memory_metadata = nested_storage->getInMemoryMetadata(); @@ -34,8 +33,7 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & n setInMemoryMetadata(in_memory_metadata); } -template -Pipe StorageMaterializeMySQL::read( +Pipe StorageMaterializeMySQL::read( const Names & column_names, const StorageMetadataPtr & /*metadata_snapshot*/, const SelectQueryInfo & query_info, @@ -45,7 +43,7 @@ Pipe StorageMaterializeMySQL::read( unsigned int num_streams) { /// If the background synchronization thread has exception. - database->rethrowExceptionIfNeed(); + rethrowSyncExceptionIfNeed(database); NameSet column_names_set = NameSet(column_names.begin(), column_names.end()); auto lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); @@ -102,18 +100,13 @@ Pipe StorageMaterializeMySQL::read( return pipe; } -template -NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const +NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const { /// If the background synchronization thread has exception. - database->rethrowExceptionIfNeed(); + rethrowSyncExceptionIfNeed(database); return nested_storage->getVirtuals(); } -template class StorageMaterializeMySQL>; -template class StorageMaterializeMySQL>; - - } #endif diff --git a/src/Storages/StorageMaterializeMySQL.h b/src/Storages/StorageMaterializeMySQL.h index d689d30314b..8047b59f464 100644 --- a/src/Storages/StorageMaterializeMySQL.h +++ b/src/Storages/StorageMaterializeMySQL.h @@ -9,19 +9,16 @@ namespace DB { -template -class StorageMaterializeMySQL final : public ext::shared_ptr_helper>, public IStorage +class StorageMaterializeMySQL final : public ext::shared_ptr_helper, public IStorage { - //static_assert(std::is_same_v> || - // std::is_same_v>); - friend struct ext::shared_ptr_helper>; + friend struct ext::shared_ptr_helper; public: String getName() const override { return "MaterializeMySQL"; } bool supportsFinal() const override { return nested_storage->supportsFinal(); } bool supportsSampling() const override { return nested_storage->supportsSampling(); } - StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseT * database_); + StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_); Pipe read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, @@ -35,7 +32,7 @@ public: private: StoragePtr nested_storage; - const DatabaseT * database; + const IDatabase * database; }; } diff --git a/tests/integration/test_materialize_mysql_database/configs/users.xml b/tests/integration/test_materialize_mysql_database/configs/users.xml index 01becae7c02..4c167c06d63 100644 --- a/tests/integration/test_materialize_mysql_database/configs/users.xml +++ b/tests/integration/test_materialize_mysql_database/configs/users.xml @@ -3,10 +3,8 @@ 1 - 1 - Atomic + Ordinary - diff --git a/tests/integration/test_materialize_mysql_database/configs/users_db_atomic.xml b/tests/integration/test_materialize_mysql_database/configs/users_db_atomic.xml new file mode 100644 index 00000000000..3add72ec554 --- /dev/null +++ b/tests/integration/test_materialize_mysql_database/configs/users_db_atomic.xml @@ -0,0 +1,19 @@ + + + + + 1 + Atomic + + + + + + + + ::/0 + + default + + + diff --git a/tests/integration/test_materialize_mysql_database/test.py b/tests/integration/test_materialize_mysql_database/test.py index 2799b793847..b1672281e7a 100644 --- a/tests/integration/test_materialize_mysql_database/test.py +++ b/tests/integration/test_materialize_mysql_database/test.py @@ -11,7 +11,8 @@ from helpers.cluster import ClickHouseCluster, get_docker_compose_path DOCKER_COMPOSE_PATH = get_docker_compose_path() cluster = ClickHouseCluster(__file__) -clickhouse_node = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False) +node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False) +node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql=False) @pytest.fixture(scope="module") @@ -87,30 +88,28 @@ def started_mysql_8_0(): subprocess.check_call(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'down', '--volumes', '--remove-orphans']) -def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node): materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - -def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node): materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") -def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7): - try: - materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - # mysql 5.7 cannot support alter rename column - # materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - except: - #print(clickhouse_node.query("select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw")) - raise +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node): + materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") + materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") + materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") + materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") + materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") + # mysql 5.7 cannot support alter rename column + # materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") + materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") + materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - -def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node): materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") @@ -120,8 +119,10 @@ def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_ materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") -def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7, clickhouse_node): materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_5_7, "mysql1") -def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0, clickhouse_node): materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql8_0") From 5455068226399a8c1065022ac3ee32608f0a254a Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 15 Sep 2020 17:17:30 +0300 Subject: [PATCH 3/8] try fix flacky test --- .../test_materialize_mysql_database/materialize_with_ddl.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py index b643221c646..2e177cab6d3 100644 --- a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py @@ -5,7 +5,11 @@ import pymysql.cursors def check_query(clickhouse_node, query, result_set, retry_count=3, interval_seconds=3): lastest_result = '' for index in range(retry_count): - lastest_result = clickhouse_node.query(query) + try: + lastest_result = clickhouse_node.query(query) + except Exception as e: + print(e) + continue if result_set == lastest_result: return From 18e48838bb136c2297fc30d256203e0832a4fa5d Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 15 Sep 2020 17:22:20 +0300 Subject: [PATCH 4/8] fix build --- src/Interpreters/InterpreterDropQuery.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index a3c091e73c9..6c85464ab8c 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -11,7 +11,14 @@ #include #include #include -#include + +#if !defined(ARCADIA_BUILD) +# include "config_core.h" +#endif + +#if USE_MYSQL +# include +#endif namespace DB @@ -222,8 +229,10 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS bool drop = kind == ASTDropQuery::Kind::Drop; context.checkAccess(AccessType::DROP_DATABASE, database_name); +#if USE_MYSQL if (database->getEngineName() == "MaterializeMySQL") stopDatabaseSynchronization(database); +#endif if (database->shouldBeEmptyOnDetach()) { From 1c37ac7c9d7bce591b150725d9d3114180376f78 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 17 Sep 2020 21:19:02 +0300 Subject: [PATCH 5/8] fix --- src/Databases/MySQL/DatabaseMaterializeMySQL.cpp | 13 ++++++++++++- src/Interpreters/InterpreterCreateQuery.cpp | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 68435393883..6a9f1e37f8e 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -23,6 +23,7 @@ namespace DB namespace ErrorCodes { extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; } template<> @@ -56,7 +57,17 @@ void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const if (!settings->allows_query_when_mysql_lost && exception) { - std::rethrow_exception(exception); + try + { + std::rethrow_exception(exception); + } + catch (Exception & ex) + { + /// This method can be called from multiple threads + /// and Exception can be modified concurrently by calling addMessage(...), + /// so we rethrow a copy. + throw Exception(ex); + } } } diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 0756e99d7cf..4b79d1e7271 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -161,7 +161,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) if (create_from_user) { - auto & default_engine = context.getSettingsRef().default_database_engine.value; + const auto & default_engine = context.getSettingsRef().default_database_engine.value; if (create.uuid == UUIDHelpers::Nil && default_engine == DefaultDatabaseEngine::Atomic) create.uuid = UUIDHelpers::generateV4(); /// Will enable Atomic engine for nested database } From e82c63e05cac931dd282ce0597c45385e01094c8 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 24 Nov 2020 15:28:54 +0300 Subject: [PATCH 6/8] fixes --- src/Databases/DatabaseAtomic.h | 2 +- src/Databases/IDatabase.h | 2 ++ src/Interpreters/DatabaseCatalog.cpp | 1 - .../test_materialize_mysql_database/test.py | 32 ++++++++----------- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/Databases/DatabaseAtomic.h b/src/Databases/DatabaseAtomic.h index c9bf8f0bd65..1b1c0cd4353 100644 --- a/src/Databases/DatabaseAtomic.h +++ b/src/Databases/DatabaseAtomic.h @@ -58,7 +58,7 @@ public: void tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist = false); void tryRemoveSymlink(const String & table_name); - void waitDetachedTableNotInUse(const UUID & uuid); + void waitDetachedTableNotInUse(const UUID & uuid) override; private: void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path) override; diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index 9a9535f0f9a..9a0eb8d9969 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -336,6 +336,8 @@ public: virtual void assertCanBeDetached(bool /*cleanup*/) {} + virtual void waitDetachedTableNotInUse(const UUID & /*uuid*/) { assert(false); } + /// Ask all tables to complete the background threads they are using and delete all table objects. virtual void shutdown() = 0; diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 08aab5b7962..c0d7287922f 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -232,7 +232,6 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( { if (!MaterializeMySQLSyncThread::isMySQLSyncThread()) db_and_table.second = std::make_shared(std::move(db_and_table.second), db_and_table.first.get()); - return db_and_table; } #endif return db_and_table; diff --git a/tests/integration/test_materialize_mysql_database/test.py b/tests/integration/test_materialize_mysql_database/test.py index 019f613fc9b..9720bad7e3b 100644 --- a/tests/integration/test_materialize_mysql_database/test.py +++ b/tests/integration/test_materialize_mysql_database/test.py @@ -156,36 +156,32 @@ def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, st materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql8_0") -def test_select_without_columns_5_7(started_cluster, started_mysql_5_7): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_select_without_columns_5_7(started_cluster, started_mysql_5_7, clickhouse_node): materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_5_7, "mysql1") -def test_select_without_columns_8_0(started_cluster, started_mysql_8_0): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_select_without_columns_8_0(started_cluster, started_mysql_8_0, clickhouse_node): materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_8_0, "mysql8_0") -def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5_7): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5_7, clickhouse_node): materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql1") -def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_8_0): +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_8_0, clickhouse_node): materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_8_0, "mysql8_0") -def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7): - try: - materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") - except: - print((clickhouse_node.query( - "select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw"))) - raise +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7, clickhouse_node): + materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1") -def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0): - try: - materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") - except: - print((clickhouse_node.query( - "select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw"))) - raise +@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) +def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0, clickhouse_node): + materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0") From ed355f866372504763d36983254dcca446f18281 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 11 Dec 2020 16:50:45 +0300 Subject: [PATCH 7/8] fix --- tests/integration/test_materialize_mysql_database/test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_materialize_mysql_database/test.py b/tests/integration/test_materialize_mysql_database/test.py index 86c8d1802a6..1b7aa041540 100644 --- a/tests/integration/test_materialize_mysql_database/test.py +++ b/tests/integration/test_materialize_mysql_database/test.py @@ -215,15 +215,15 @@ def test_mysql_kill_sync_thread_restore_8_0(started_cluster, started_mysql_8_0, materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_8_0, "mysql8_0") @pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) -def test_mysql_killed_while_insert_5_7(started_cluster, started_mysql_5_7): +def test_mysql_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node): materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1") @pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) -def test_mysql_killed_while_insert_8_0(started_cluster, started_mysql_8_0): +def test_mysql_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node): materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql8_0") @pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) -def test_clickhouse_killed_while_insert_5_7(started_cluster, started_mysql_5_7): +def test_clickhouse_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node): materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1") @pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic]) From 7f7eed4031b403c8edc01cddbd14b31cd644b152 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Sun, 13 Dec 2020 17:57:15 +0300 Subject: [PATCH 8/8] fix test --- .../test_materialize_mysql_database/materialize_with_ddl.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py index f653a7b7a3c..448e17de405 100644 --- a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py @@ -562,6 +562,12 @@ def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_n assert 'MySQL SYNC USER ACCESS ERR:' in str(exception.value) assert "priv_err_db" not in clickhouse_node.query("SHOW DATABASES") + mysql_node.query("GRANT SELECT ON priv_err_db.* TO 'test'@'%'") + time.sleep(3) + clickhouse_node.query("ATTACH DATABASE priv_err_db") + clickhouse_node.query("DROP DATABASE priv_err_db") + mysql_node.query("REVOKE SELECT ON priv_err_db.* FROM 'test'@'%'") + mysql_node.query("DROP DATABASE priv_err_db;") mysql_node.query("DROP USER 'test'@'%'")