Drop support for DatabaseOrdinary in MaterializedMySQL

1. Dropped support for DatabaseOrdinary for MaterializeMySQL. It
   is marked as experimental, and dropping support makes the code
   more maintaible, and speeds up integration tests by 50%.

2. Get rid of thread name logic for StorageMaterializeMySQL wrapping,
   use setInternalQuery instead (similar to MaterializedPostgreSQL).
This commit is contained in:
Stig Bakken 2021-11-10 14:13:27 +01:00 committed by Stig Bakken
parent 8d2a083bf9
commit ff46e8bb51
38 changed files with 305 additions and 406 deletions

View File

@ -622,7 +622,7 @@ void LocalServer::processConfig()
fs::create_directories(fs::path(path) / "metadata/");
loadMetadataSystem(global_context);
attachSystemTablesLocal(*createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
loadMetadata(global_context);
@ -633,7 +633,7 @@ void LocalServer::processConfig()
}
else if (!config().has("no-system-tables"))
{
attachSystemTablesLocal(*createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
}

View File

@ -1146,7 +1146,7 @@ if (ThreadFuzzer::instance().isEffective())
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
/// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread,
@ -1256,7 +1256,7 @@ if (ThreadFuzzer::instance().isEffective())
/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(
global_context, config().getUInt("asynchronous_metrics_update_period_s", 1), servers_to_start_before_tables, servers);
attachSystemTablesAsync(*DatabaseCatalog::instance().getSystemDatabase(), async_metrics);
attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics);
for (const auto & listen_host : listen_hosts)
{

View File

@ -84,7 +84,7 @@ void DatabaseAtomic::drop(ContextPtr)
fs::remove_all(getMetadataPath());
}
void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
@ -96,7 +96,7 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table,
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
StoragePtr DatabaseAtomic::detachTable(const String & name)
StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String & name)
{
DetachedTables not_in_use;
std::unique_lock lock(mutex);

View File

@ -37,8 +37,8 @@ public:
void dropTable(ContextPtr context, const String & table_name, bool no_delay) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & name) override;
void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & name) override;
String getTableDataPath(const String & table_name) const override;
String getTableDataPath(const ASTCreateQuery & query) const override;

View File

@ -211,14 +211,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (create.uuid == UUIDHelpers::Nil)
return std::make_shared<DatabaseMaterializedMySQL<DatabaseOrdinary>>(
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
else
return std::make_shared<DatabaseMaterializedMySQL<DatabaseAtomic>>(
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
return std::make_shared<DatabaseMaterializedMySQL>(
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
}
catch (...)
{

View File

@ -39,7 +39,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
void DatabaseLazy::loadStoredObjects(
ContextMutablePtr local_context, bool /* force_restore */, bool /*force_attach*/, bool /* skip_startup_tables */)
{
iterateMetadataFiles(local_context, [this](const String & file_name)
iterateMetadataFiles(local_context, [this, &local_context](const String & file_name)
{
const std::string table_name = unescapeForFileName(file_name.substr(0, file_name.size() - 4));
@ -50,7 +50,7 @@ void DatabaseLazy::loadStoredObjects(
return;
}
attachTable(table_name, nullptr, {});
attachTable(local_context, table_name, nullptr, {});
});
}
@ -160,7 +160,7 @@ bool DatabaseLazy::empty() const
return tables_cache.empty();
}
void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & table, const String &)
void DatabaseLazy::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
LOG_DEBUG(log, "Attach table {}.", backQuote(table_name));
std::lock_guard lock(mutex);
@ -175,7 +175,7 @@ void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & tab
it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), current_time, table_name);
}
StoragePtr DatabaseLazy::detachTable(const String & table_name)
StoragePtr DatabaseLazy::detachTable(ContextPtr /* context */, const String & table_name)
{
StoragePtr res;
{

View File

@ -64,9 +64,9 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void shutdown() override;

View File

@ -185,7 +185,7 @@ void DatabaseOnDisk::createTable(
{
/// Metadata already exists, table was detached
removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, true);
attachTable(table_name, table, getTableDataPath(create));
attachTable(getContext(), table_name, table, getTableDataPath(create));
return;
}
@ -246,12 +246,12 @@ void DatabaseOnDisk::removeDetachedPermanentlyFlag(ContextPtr, const String & ta
void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path,
ContextPtr /*query_context*/)
ContextPtr query_context)
{
try
{
/// Add a table to the map of known tables.
attachTable(query.getTable(), table, getTableDataPath(query));
attachTable(query_context, query.getTable(), table, getTableDataPath(query));
/// If it was ATTACH query and file with table metadata already exist
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
@ -264,9 +264,9 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora
}
}
void DatabaseOnDisk::detachTablePermanently(ContextPtr, const String & table_name)
void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const String & table_name)
{
auto table = detachTable(table_name);
auto table = detachTable(query_context, table_name);
fs::path detached_permanently_flag(getObjectMetadataPath(table_name) + detached_suffix);
try
@ -288,7 +288,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
if (table_data_path_relative.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path is empty");
StoragePtr table = detachTable(table_name);
StoragePtr table = detachTable(local_context, table_name);
/// This is possible for Lazy database.
if (!table)
@ -309,7 +309,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
catch (...)
{
LOG_WARNING(log, getCurrentExceptionMessage(__PRETTY_FUNCTION__));
attachTable(table_name, table, table_data_path_relative);
attachTable(local_context, table_name, table, table_data_path_relative);
if (renamed)
fs::rename(table_metadata_path_drop, table_metadata_path);
throw;
@ -372,7 +372,7 @@ void DatabaseOnDisk::renameTable(
ASTPtr attach_query;
/// DatabaseLazy::detachTable may return nullptr even if table exists, so we need tryGetTable for this case.
StoragePtr table = tryGetTable(table_name, getContext());
detachTable(table_name);
detachTable(local_context, table_name);
UUID prev_uuid = UUIDHelpers::Nil;
try
{
@ -397,12 +397,12 @@ void DatabaseOnDisk::renameTable(
}
catch (const Exception &)
{
attachTable(table_name, table, table_data_relative_path);
attachTable(local_context, table_name, table, table_data_relative_path);
throw;
}
catch (const Poco::Exception & e)
{
attachTable(table_name, table, table_data_relative_path);
attachTable(local_context, table_name, table, table_data_relative_path);
/// Better diagnostics.
throw Exception{Exception::CreateFromPocoTag{}, e};
}

View File

@ -50,7 +50,7 @@ namespace
context,
force_restore);
database.attachTable(table_name, table, database.getTableDataPath(query));
database.attachTable(context, table_name, table, database.getTableDataPath(query));
}
catch (Exception & e)
{

View File

@ -2,6 +2,7 @@
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseReplicatedSettings.h>
#include <Databases/IDatabaseReplicating.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/BlockIO.h>
@ -17,7 +18,7 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
class DatabaseReplicated : public DatabaseAtomic
class DatabaseReplicated : public DatabaseAtomic, IDatabaseReplicating
{
public:
DatabaseReplicated(const String & name_, const String & metadata_path_, UUID uuid,
@ -46,7 +47,7 @@ public:
/// then it will be executed on all replicas.
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context);
void stopReplication();
void stopReplication() override;
String getFullReplicaName() const;
static std::pair<String, String> parseFullReplicaName(const String & name);

View File

@ -125,7 +125,7 @@ bool DatabaseWithOwnTablesBase::empty() const
return tables.empty();
}
StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
StoragePtr DatabaseWithOwnTablesBase::detachTable(ContextPtr /* context_ */, const String & table_name)
{
std::unique_lock lock(mutex);
return detachTableUnlocked(table_name, lock);
@ -152,7 +152,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
return res;
}
void DatabaseWithOwnTablesBase::attachTable(const String & table_name, const StoragePtr & table, const String &)
void DatabaseWithOwnTablesBase::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
std::unique_lock lock(mutex);
attachTableUnlocked(table_name, table, lock);

View File

@ -27,9 +27,9 @@ public:
bool empty() const override;
void attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;

View File

@ -197,13 +197,13 @@ public:
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
///
/// Note: ATTACH TABLE statement actually uses createTable method.
virtual void attachTable(const String & /*name*/, const StoragePtr & /*table*/, [[maybe_unused]] const String & relative_table_path = {})
virtual void attachTable(ContextPtr /* context */, const String & /*name*/, const StoragePtr & /*table*/, [[maybe_unused]] const String & relative_table_path = {})
{
throw Exception("There is no ATTACH TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
}
/// Forget about the table without deleting it, and return it. The database may not support this method.
virtual StoragePtr detachTable(const String & /*name*/)
virtual StoragePtr detachTable(ContextPtr /* context */, const String & /*name*/)
{
throw Exception("There is no DETACH TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -0,0 +1,13 @@
#pragma once
namespace DB
{
class IDatabaseReplicating
{
public:
virtual void stopReplication() = 0;
virtual ~IDatabaseReplicating() = default;
};
}

View File

@ -5,8 +5,6 @@
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Interpreters/Context.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/DatabaseAtomic.h>
# include <Databases/MySQL/DatabaseMaterializedTablesIterator.h>
# include <Databases/MySQL/MaterializedMySQLSyncThread.h>
# include <Parsers/ASTCreateQuery.h>
@ -23,32 +21,9 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
template <>
DatabaseMaterializedMySQL<DatabaseOrdinary>::DatabaseMaterializedMySQL(
ContextPtr context_,
const String & database_name_,
const String & metadata_path_,
UUID /*uuid*/,
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
std::unique_ptr<MaterializedMySQLSettings> settings_)
: DatabaseOrdinary(
database_name_,
metadata_path_,
"data/" + escapeForFileName(database_name_) + "/",
"DatabaseMaterializedMySQL<Ordinary> (" + database_name_ + ")",
context_)
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{
}
template <>
DatabaseMaterializedMySQL<DatabaseAtomic>::DatabaseMaterializedMySQL(
DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
ContextPtr context_,
const String & database_name_,
const String & metadata_path_,
@ -57,16 +32,15 @@ DatabaseMaterializedMySQL<DatabaseAtomic>::DatabaseMaterializedMySQL(
mysqlxx::Pool && pool_,
MySQLClient && client_,
std::unique_ptr<MaterializedMySQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL<Atomic> (" + database_name_ + ")", context_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL(" + database_name_ + ")", context_)
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::rethrowExceptionIfNeed() const
void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const
{
std::unique_lock<std::mutex> lock(Base::mutex);
std::unique_lock<std::mutex> lock(mutex);
if (!settings->allows_query_when_mysql_lost && exception)
{
@ -84,17 +58,15 @@ void DatabaseMaterializedMySQL<Base>::rethrowExceptionIfNeed() const
}
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & exception_)
void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exception_)
{
std::unique_lock<std::mutex> lock(Base::mutex);
std::unique_lock<std::mutex> lock(mutex);
exception = exception_;
}
template <typename Base>
void DatabaseMaterializedMySQL<Base>::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
void DatabaseMaterializedMySQL::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
Base::startupTables(thread_pool, force_restore, force_attach);
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
if (!force_attach)
materialize_thread.assertMySQLAvailable();
@ -103,149 +75,92 @@ void DatabaseMaterializedMySQL<Base>::startupTables(ThreadPool & thread_pool, bo
started_up = true;
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)
void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)
{
assertCalledFromSyncThreadOrDrop("create table");
Base::createTable(context_, name, table, query);
checkIsInternalQuery(context_, "CREATE TABLE");
DatabaseAtomic::createTable(context_, name, table, query);
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::dropTable(ContextPtr context_, const String & name, bool no_delay)
void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & name, bool no_delay)
{
assertCalledFromSyncThreadOrDrop("drop table");
Base::dropTable(context_, name, no_delay);
checkIsInternalQuery(context_, "DROP TABLE");
DatabaseAtomic::dropTable(context_, name, no_delay);
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
{
assertCalledFromSyncThreadOrDrop("attach table");
Base::attachTable(name, table, relative_table_path);
checkIsInternalQuery(context_, "ATTACH TABLE");
DatabaseAtomic::attachTable(context_, name, table, relative_table_path);
}
template<typename Base>
StoragePtr DatabaseMaterializedMySQL<Base>::detachTable(const String & name)
StoragePtr DatabaseMaterializedMySQL::detachTable(ContextPtr context_, const String & name)
{
assertCalledFromSyncThreadOrDrop("detach table");
return Base::detachTable(name);
checkIsInternalQuery(context_, "DETACH TABLE");
return DatabaseAtomic::detachTable(context_, name);
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
void DatabaseMaterializedMySQL::renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
{
assertCalledFromSyncThreadOrDrop("rename table");
checkIsInternalQuery(context_, "RENAME TABLE");
if (exchange)
throw Exception("MaterializedMySQL database not support exchange table.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("MaterializedMySQL database does not support EXCHANGE TABLE.", ErrorCodes::NOT_IMPLEMENTED);
if (dictionary)
throw Exception("MaterializedMySQL database not support rename dictionary.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("MaterializedMySQL database does not support RENAME DICTIONARY.", ErrorCodes::NOT_IMPLEMENTED);
if (to_database.getDatabaseName() != Base::getDatabaseName())
if (to_database.getDatabaseName() != DatabaseAtomic::getDatabaseName())
throw Exception("Cannot rename with other database for MaterializedMySQL database.", ErrorCodes::NOT_IMPLEMENTED);
Base::renameTable(context_, name, *this, to_name, exchange, dictionary);
DatabaseAtomic::renameTable(context_, name, *this, to_name, exchange, dictionary);
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::alterTable(ContextPtr context_, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
void DatabaseMaterializedMySQL::alterTable(ContextPtr context_, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
assertCalledFromSyncThreadOrDrop("alter table");
Base::alterTable(context_, table_id, metadata);
checkIsInternalQuery(context_, "ALTER TABLE");
DatabaseAtomic::alterTable(context_, table_id, metadata);
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::drop(ContextPtr context_)
void DatabaseMaterializedMySQL::drop(ContextPtr context_)
{
/// Remove metadata info
fs::path metadata(Base::getMetadataPath() + "/.metadata");
fs::path metadata(getMetadataPath() + "/.metadata");
if (fs::exists(metadata))
fs::remove(metadata);
Base::drop(context_);
DatabaseAtomic::drop(context_);
}
template<typename Base>
StoragePtr DatabaseMaterializedMySQL<Base>::tryGetTable(const String & name, ContextPtr context_) const
StoragePtr DatabaseMaterializedMySQL::tryGetTable(const String & name, ContextPtr context_) const
{
if (!MaterializedMySQLSyncThread::isMySQLSyncThread())
{
StoragePtr nested_storage = Base::tryGetTable(name, context_);
if (!nested_storage)
return {};
return std::make_shared<StorageMaterializedMySQL>(std::move(nested_storage), this);
}
return Base::tryGetTable(name, context_);
StoragePtr nested_storage = DatabaseAtomic::tryGetTable(name, context_);
if (context_->isInternalQuery())
return nested_storage;
return std::make_shared<StorageMaterializedMySQL>(std::move(nested_storage), this);
}
template <typename Base>
DatabaseTablesIteratorPtr
DatabaseMaterializedMySQL<Base>::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
DatabaseMaterializedMySQL::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
{
if (!MaterializedMySQLSyncThread::isMySQLSyncThread())
{
DatabaseTablesIteratorPtr iterator = Base::getTablesIterator(context_, filter_by_table_name);
return std::make_unique<DatabaseMaterializedTablesIterator>(std::move(iterator), this);
}
return Base::getTablesIterator(context_, filter_by_table_name);
DatabaseTablesIteratorPtr iterator = DatabaseAtomic::getTablesIterator(context_, filter_by_table_name);
if (context_->isInternalQuery())
return iterator;
return std::make_unique<DatabaseMaterializedTablesIterator>(std::move(iterator), this);
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::assertCalledFromSyncThreadOrDrop(const char * method) const
void DatabaseMaterializedMySQL::checkIsInternalQuery(ContextPtr context_, const char * method) const
{
if (!MaterializedMySQLSyncThread::isMySQLSyncThread() && started_up)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database not support {}", method);
if (started_up && context_ && !context_->isInternalQuery())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database does not support {}", method);
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::shutdownSynchronizationThread()
void DatabaseMaterializedMySQL::stopReplication()
{
materialize_thread.stopSynchronization();
started_up = false;
}
template<typename Database, template<class> class Helper, typename... Args>
auto castToMaterializedMySQLAndCallHelper(Database * database, Args && ... args)
{
using Ordinary = DatabaseMaterializedMySQL<DatabaseOrdinary>;
using Atomic = DatabaseMaterializedMySQL<DatabaseAtomic>;
using ToOrdinary = typename std::conditional_t<std::is_const_v<Database>, const Ordinary *, Ordinary *>;
using ToAtomic = typename std::conditional_t<std::is_const_v<Database>, const Atomic *, Atomic *>;
if (auto * database_materialize = typeid_cast<ToOrdinary>(database))
return (database_materialize->*Helper<Ordinary>::v)(std::forward<Args>(args)...);
if (auto * database_materialize = typeid_cast<ToAtomic>(database))
return (database_materialize->*Helper<Atomic>::v)(std::forward<Args>(args)...);
throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializedMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR);
}
template<typename T> struct HelperSetException { static constexpr auto v = &T::setException; };
void setSynchronizationThreadException(const DatabasePtr & materialized_mysql_db, const std::exception_ptr & exception)
{
castToMaterializedMySQLAndCallHelper<IDatabase, HelperSetException>(materialized_mysql_db.get(), exception);
}
template<typename T> struct HelperStopSync { static constexpr auto v = &T::shutdownSynchronizationThread; };
void stopDatabaseSynchronization(const DatabasePtr & materialized_mysql_db)
{
castToMaterializedMySQLAndCallHelper<IDatabase, HelperStopSync>(materialized_mysql_db.get());
}
template<typename T> struct HelperRethrow { static constexpr auto v = &T::rethrowExceptionIfNeed; };
void rethrowSyncExceptionIfNeed(const IDatabase * materialized_mysql_db)
{
castToMaterializedMySQLAndCallHelper<const IDatabase, HelperRethrow>(materialized_mysql_db);
}
template class DatabaseMaterializedMySQL<DatabaseOrdinary>;
template class DatabaseMaterializedMySQL<DatabaseAtomic>;
}
#endif

View File

@ -6,7 +6,10 @@
#include <mysqlxx/Pool.h>
#include <Core/MySQL/MySQLClient.h>
#include <base/UUID.h>
#include <Databases/IDatabase.h>
#include <Databases/IDatabaseReplicating.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/MySQL/MaterializedMySQLSettings.h>
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
@ -17,17 +20,20 @@ namespace DB
*
* All table structure and data will be written to the local file system
*/
template<typename Base>
class DatabaseMaterializedMySQL : public Base
class DatabaseMaterializedMySQL : public DatabaseAtomic, IDatabaseReplicating
{
public:
DatabaseMaterializedMySQL(
ContextPtr context, const String & database_name_, const String & metadata_path_, UUID uuid,
const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_, std::unique_ptr<MaterializedMySQLSettings> settings_);
ContextPtr context,
const String & database_name_,
const String & metadata_path_,
UUID uuid,
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
std::unique_ptr<MaterializedMySQLSettings> settings_);
void rethrowExceptionIfNeed() const;
void rethrowExceptionIfNeeded() const;
void setException(const std::exception_ptr & exception);
protected:
@ -49,9 +55,9 @@ public:
void dropTable(ContextPtr context_, const String & name, bool no_delay) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
void attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & name) override;
StoragePtr detachTable(ContextPtr context_, const String & name) override;
void renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) override;
@ -63,18 +69,13 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
void assertCalledFromSyncThreadOrDrop(const char * method) const;
void checkIsInternalQuery(ContextPtr context_, const char * method) const;
void shutdownSynchronizationThread();
void stopReplication() override;
friend class DatabaseMaterializedTablesIterator;
};
void setSynchronizationThreadException(const DatabasePtr & materialized_mysql_db, const std::exception_ptr & exception);
void stopDatabaseSynchronization(const DatabasePtr & materialized_mysql_db);
void rethrowSyncExceptionIfNeed(const IDatabase * materialized_mysql_db);
}
#endif

View File

@ -362,7 +362,7 @@ void DatabaseMySQL::cleanOutdatedTables()
}
}
void DatabaseMySQL::attachTable(const String & table_name, const StoragePtr & storage, const String &)
void DatabaseMySQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard<std::mutex> lock{mutex};
@ -385,7 +385,7 @@ void DatabaseMySQL::attachTable(const String & table_name, const StoragePtr & st
fs::remove(remove_flag);
}
StoragePtr DatabaseMySQL::detachTable(const String & table_name)
StoragePtr DatabaseMySQL::detachTable(ContextPtr /* context */, const String & table_name)
{
std::lock_guard<std::mutex> lock{mutex};
@ -482,7 +482,7 @@ DatabaseMySQL::~DatabaseMySQL()
}
}
void DatabaseMySQL::createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
void DatabaseMySQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
{
const auto & create = create_query->as<ASTCreateQuery>();
@ -500,7 +500,7 @@ void DatabaseMySQL::createTable(ContextPtr, const String & table_name, const Sto
throw Exception("The MySQL database engine can only execute attach statements of type attach table database_name.table_name",
ErrorCodes::UNEXPECTED_AST_STRUCTURE);
attachTable(table_name, storage, {});
attachTable(local_context, table_name, storage, {});
}
}

View File

@ -77,13 +77,13 @@ public:
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
StoragePtr detachTable(const String & table_name) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void detachTablePermanently(ContextPtr context, const String & table_name) override;
void dropTable(ContextPtr context, const String & table_name, bool no_delay) override;
void attachTable(const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
protected:
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;

View File

@ -53,6 +53,8 @@ static ContextMutablePtr createQueryContext(ContextPtr context)
auto query_context = Context::createCopy(context);
query_context->setSettings(new_query_settings);
query_context->setInternalQuery(true);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setCurrentQueryId(""); // generate random query_id
return query_context;
@ -764,15 +766,9 @@ void MaterializedMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_even
}
}
bool MaterializedMySQLSyncThread::isMySQLSyncThread()
{
return getThreadName() == std::string_view(MYSQL_BACKGROUND_THREAD_NAME);
}
void MaterializedMySQLSyncThread::setSynchronizationThreadException(const std::exception_ptr & exception)
{
auto db = DatabaseCatalog::instance().getDatabase(database_name);
DB::setSynchronizationThreadException(db, exception);
assert_cast<DatabaseMaterializedMySQL *>(DatabaseCatalog::instance().getDatabase(database_name).get())->setException(exception);
}
void MaterializedMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes)

View File

@ -53,8 +53,6 @@ public:
void assertMySQLAvailable();
static bool isMySQLSyncThread();
private:
Poco::Logger * log;

View File

@ -266,11 +266,11 @@ void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const
DatabaseAtomic::createTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, table, query_copy);
/// Attach MaterializedPostgreSQL table.
attachTable(table_name, table, {});
attachTable(local_context, table_name, table, {});
}
void DatabaseMaterializedPostgreSQL::attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path)
void DatabaseMaterializedPostgreSQL::attachTable(ContextPtr context_, const String & table_name, const StoragePtr & table, const String & relative_table_path)
{
/// If there is query context then we need to attach materialized storage.
/// If there is no query context then we need to attach internal storage from atomic database.
@ -310,12 +310,12 @@ void DatabaseMaterializedPostgreSQL::attachTable(const String & table_name, cons
}
else
{
DatabaseAtomic::attachTable(table_name, table, relative_table_path);
DatabaseAtomic::attachTable(context_, table_name, table, relative_table_path);
}
}
StoragePtr DatabaseMaterializedPostgreSQL::detachTable(const String & table_name)
StoragePtr DatabaseMaterializedPostgreSQL::detachTable(ContextPtr context_, const String & table_name)
{
/// If there is query context then we need to detach materialized storage.
/// If there is no query context then we need to detach internal storage from atomic database.
@ -369,7 +369,7 @@ StoragePtr DatabaseMaterializedPostgreSQL::detachTable(const String & table_name
}
else
{
return DatabaseAtomic::detachTable(table_name);
return DatabaseAtomic::detachTable(context_, table_name);
}
}

View File

@ -11,6 +11,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Parsers/ASTCreateQuery.h>
#include <Databases/IDatabase.h>
#include <Databases/IDatabaseReplicating.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseAtomic.h>
@ -22,7 +23,7 @@ class PostgreSQLConnection;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
class DatabaseMaterializedPostgreSQL : public DatabaseAtomic
class DatabaseMaterializedPostgreSQL : public DatabaseAtomic, IDatabaseReplicating
{
public:
@ -49,15 +50,15 @@ public:
void createTable(ContextPtr context, const String & table_name, const StoragePtr & table, const ASTPtr & query) override;
void attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void dropTable(ContextPtr local_context, const String & name, bool no_delay) override;
void drop(ContextPtr local_context) override;
void stopReplication();
void stopReplication() override;
void applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context) override;

View File

@ -206,7 +206,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
}
void DatabasePostgreSQL::attachTable(const String & table_name, const StoragePtr & storage, const String &)
void DatabasePostgreSQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard<std::mutex> lock{mutex};
@ -231,7 +231,7 @@ void DatabasePostgreSQL::attachTable(const String & table_name, const StoragePtr
}
StoragePtr DatabasePostgreSQL::detachTable(const String & table_name)
StoragePtr DatabasePostgreSQL::detachTable(ContextPtr /* context_ */, const String & table_name)
{
std::lock_guard<std::mutex> lock{mutex};
@ -251,14 +251,14 @@ StoragePtr DatabasePostgreSQL::detachTable(const String & table_name)
}
void DatabasePostgreSQL::createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
void DatabasePostgreSQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
{
const auto & create = create_query->as<ASTCreateQuery>();
if (!create->attach)
throw Exception("PostgreSQL database engine does not support create table", ErrorCodes::NOT_IMPLEMENTED);
attachTable(table_name, storage, {});
attachTable(local_context, table_name, storage, {});
}

View File

@ -55,8 +55,8 @@ public:
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(ContextPtr, const String & table_name, bool no_delay) override;
void attachTable(const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void drop(ContextPtr /*context*/) override;
void shutdown() override;

View File

@ -248,10 +248,9 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
#if USE_MYSQL
/// It's definitely not the best place for this logic, but behaviour must be consistent with DatabaseMaterializedMySQL::tryGetTable(...)
if (db_and_table.first->getEngineName() == "MaterializedMySQL")
if (!context_->isInternalQuery() && db_and_table.first->getEngineName() == "MaterializedMySQL")
{
if (!MaterializedMySQLSyncThread::isMySQLSyncThread())
db_and_table.second = std::make_shared<StorageMaterializedMySQL>(std::move(db_and_table.second), db_and_table.first.get());
db_and_table.second = std::make_shared<StorageMaterializedMySQL>(std::move(db_and_table.second), db_and_table.first.get());
}
#endif
return db_and_table;

View File

@ -80,22 +80,22 @@ BlockIO InterpreterDropQuery::executeToTable(ASTDropQuery & query)
{
DatabasePtr database;
UUID table_to_wait_on = UUIDHelpers::Nil;
auto res = executeToTableImpl(query, database, table_to_wait_on);
auto res = executeToTableImpl(getContext(), query, database, table_to_wait_on);
if (query.no_delay)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_to_wait_on);
return res;
}
BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait)
BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait)
{
/// NOTE: it does not contain UUID, we will resolve it with locked DDLGuard
auto table_id = StorageID(query);
if (query.temporary || table_id.database_name.empty())
{
if (getContext()->tryResolveStorageID(table_id, Context::ResolveExternal))
if (context_->tryResolveStorageID(table_id, Context::ResolveExternal))
return executeToTemporaryTable(table_id.getTableName(), query.kind);
else
query.setDatabase(table_id.database_name = getContext()->getCurrentDatabase());
query.setDatabase(table_id.database_name = context_->getCurrentDatabase());
}
if (query.temporary)
@ -109,8 +109,8 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
auto ddl_guard = (!query.no_ddl_lock ? DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name) : nullptr);
/// If table was already dropped by anyone, an exception will be thrown
auto [database, table] = query.if_exists ? DatabaseCatalog::instance().tryGetDatabaseAndTable(table_id, getContext())
: DatabaseCatalog::instance().getDatabaseAndTable(table_id, getContext());
auto [database, table] = query.if_exists ? DatabaseCatalog::instance().tryGetDatabaseAndTable(table_id, context_)
: DatabaseCatalog::instance().getDatabaseAndTable(table_id, context_);
if (database && table)
{
@ -132,7 +132,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
/// Prevents recursive drop from drop database query. The original query must specify a table.
bool is_drop_or_detach_database = !query_ptr->as<ASTDropQuery>()->table;
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
!getContext()->getClientInfo().is_replicated_database_internal &&
!context_->getClientInfo().is_replicated_database_internal &&
!is_drop_or_detach_database;
AccessFlags drop_storage;
@ -147,20 +147,20 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
if (is_replicated_ddl_query)
{
if (query.kind == ASTDropQuery::Kind::Detach)
getContext()->checkAccess(drop_storage, table_id);
context_->checkAccess(drop_storage, table_id);
else if (query.kind == ASTDropQuery::Kind::Truncate)
getContext()->checkAccess(AccessType::TRUNCATE, table_id);
context_->checkAccess(AccessType::TRUNCATE, table_id);
else if (query.kind == ASTDropQuery::Kind::Drop)
getContext()->checkAccess(drop_storage, table_id);
context_->checkAccess(drop_storage, table_id);
ddl_guard->releaseTableLock();
table.reset();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query.clone(), getContext());
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query.clone(), context_);
}
if (query.kind == ASTDropQuery::Kind::Detach)
{
getContext()->checkAccess(drop_storage, table_id);
context_->checkAccess(drop_storage, table_id);
if (table->isDictionary())
{
@ -175,7 +175,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
TableExclusiveLockHolder table_lock;
if (database->getUUID() == UUIDHelpers::Nil)
table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
table_lock = table->lockExclusively(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout);
if (query.permanently)
{
@ -183,12 +183,12 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
DatabaseCatalog::instance().tryRemoveLoadingDependencies(table_id, getContext()->getSettingsRef().check_table_dependencies,
is_drop_or_detach_database);
/// Drop table from memory, don't touch data, metadata file renamed and will be skipped during server restart
database->detachTablePermanently(getContext(), table_id.table_name);
database->detachTablePermanently(context_, table_id.table_name);
}
else
{
/// Drop table from memory, don't touch data and metadata
database->detachTable(table_id.table_name);
database->detachTable(context_, table_id.table_name);
}
}
else if (query.kind == ASTDropQuery::Kind::Truncate)
@ -196,20 +196,20 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
if (table->isDictionary())
throw Exception("Cannot TRUNCATE dictionary", ErrorCodes::SYNTAX_ERROR);
getContext()->checkAccess(AccessType::TRUNCATE, table_id);
context_->checkAccess(AccessType::TRUNCATE, table_id);
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
table->checkTableCanBeDropped();
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
auto table_lock = table->lockExclusively(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// Drop table data, don't touch metadata
table->truncate(query_ptr, metadata_snapshot, getContext(), table_lock);
table->truncate(query_ptr, metadata_snapshot, context_, table_lock);
}
else if (query.kind == ASTDropQuery::Kind::Drop)
{
getContext()->checkAccess(drop_storage, table_id);
context_->checkAccess(drop_storage, table_id);
if (table->isDictionary())
{
@ -224,11 +224,13 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
TableExclusiveLockHolder table_lock;
if (database->getUUID() == UUIDHelpers::Nil)
table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
table_lock = table->lockExclusively(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout);
DatabaseCatalog::instance().tryRemoveLoadingDependencies(table_id, getContext()->getSettingsRef().check_table_dependencies,
is_drop_or_detach_database);
database->dropTable(getContext(), table_id.table_name, query.no_delay);
auto inner_context = Context::createCopy(context_);
inner_context->setInternalQuery(true);
database->dropTable(inner_context, table_id.table_name, query.no_delay);
}
db = database;
@ -320,16 +322,8 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
if (query.kind == ASTDropQuery::Kind::Detach && query.permanently)
throw Exception("DETACH PERMANENTLY is not implemented for databases", ErrorCodes::NOT_IMPLEMENTED);
#if USE_MYSQL
if (database->getEngineName() == "MaterializedMySQL")
stopDatabaseSynchronization(database);
#endif
if (auto * replicated = typeid_cast<DatabaseReplicated *>(database.get()))
replicated->stopReplication();
#if USE_LIBPQXX
if (auto * materialize_postgresql = typeid_cast<DatabaseMaterializedPostgreSQL *>(database.get()))
materialize_postgresql->stopReplication();
#endif
if (auto * replicating = dynamic_cast<IDatabaseReplicating *>(database.get()))
replicating->stopReplication();
if (database->shouldBeEmptyOnDetach())
{
@ -341,19 +335,21 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
/// Flush should not be done if shouldBeEmptyOnDetach() == false,
/// since in this case getTablesIterator() may do some additional work,
/// see DatabaseMaterializedMySQL<>::getTablesIterator()
/// see DatabaseMaterializedMySQL::getTablesIterator()
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
iterator->table()->flush();
}
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
auto table_context = Context::createCopy(getContext());
table_context->setInternalQuery(true);
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{
DatabasePtr db;
UUID table_to_wait = UUIDHelpers::Nil;
query_for_table.setTable(iterator->name());
query_for_table.is_dictionary = iterator->table()->isDictionary();
executeToTableImpl(query_for_table, db, table_to_wait);
executeToTableImpl(table_context, query_for_table, db, table_to_wait);
uuids_to_wait.push_back(table_to_wait);
}
}

View File

@ -36,7 +36,7 @@ private:
BlockIO executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait);
BlockIO executeToTable(ASTDropQuery & query);
BlockIO executeToTableImpl(ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait);
BlockIO executeToTableImpl(ContextPtr context_, ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait);
static void waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait);

View File

@ -523,7 +523,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
create_ast = database->getCreateTableQuery(replica.table_name, getContext());
database->detachTable(replica.table_name);
database->detachTable(getContext(), replica.table_name);
}
table.reset();
@ -544,7 +544,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
constraints,
false);
database->attachTable(replica.table_name, table, data_path);
database->attachTable(system_context, replica.table_name, table, data_path);
table->startup();
return table;

View File

@ -48,8 +48,8 @@ Pipe StorageMaterializedMySQL::read(
size_t max_block_size,
unsigned int num_streams)
{
/// If the background synchronization thread has exception.
rethrowSyncExceptionIfNeed(database);
if (auto * db = typeid_cast<const DatabaseMaterializedMySQL *>(database))
db->rethrowExceptionIfNeeded();
return readFinalFromNestedStorage(nested_storage, column_names, metadata_snapshot,
query_info, context, processed_stage, max_block_size, num_streams);
@ -57,8 +57,9 @@ Pipe StorageMaterializedMySQL::read(
NamesAndTypesList StorageMaterializedMySQL::getVirtuals() const
{
/// If the background synchronization thread has exception.
rethrowSyncExceptionIfNeed(database);
if (auto * db = typeid_cast<const DatabaseMaterializedMySQL *>(database))
db->rethrowExceptionIfNeeded();
return nested_storage->getVirtuals();
}

View File

@ -4,6 +4,7 @@
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachSystemTablesImpl.h>
#include <Interpreters/Context.h>
#include <Storages/System/StorageSystemAggregateFunctionCombinators.h>
#include <Storages/System/StorageSystemAsynchronousMetrics.h>
#include <Storages/System/StorageSystemBuildOptions.h>
@ -80,92 +81,92 @@
namespace DB
{
void attachSystemTablesLocal(IDatabase & system_database)
void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
{
attach<StorageSystemOne>(system_database, "one");
attach<StorageSystemNumbers>(system_database, "numbers", false);
attach<StorageSystemNumbers>(system_database, "numbers_mt", true);
attach<StorageSystemZeros>(system_database, "zeros", false);
attach<StorageSystemZeros>(system_database, "zeros_mt", true);
attach<StorageSystemDatabases>(system_database, "databases");
attach<StorageSystemTables>(system_database, "tables");
attach<StorageSystemColumns>(system_database, "columns");
attach<StorageSystemFunctions>(system_database, "functions");
attach<StorageSystemEvents>(system_database, "events");
attach<StorageSystemSettings>(system_database, "settings");
attach<SystemMergeTreeSettings<false>>(system_database, "merge_tree_settings");
attach<SystemMergeTreeSettings<true>>(system_database, "replicated_merge_tree_settings");
attach<StorageSystemBuildOptions>(system_database, "build_options");
attach<StorageSystemFormats>(system_database, "formats");
attach<StorageSystemTableFunctions>(system_database, "table_functions");
attach<StorageSystemAggregateFunctionCombinators>(system_database, "aggregate_function_combinators");
attach<StorageSystemDataTypeFamilies>(system_database, "data_type_families");
attach<StorageSystemCollations>(system_database, "collations");
attach<StorageSystemTableEngines>(system_database, "table_engines");
attach<StorageSystemContributors>(system_database, "contributors");
attach<StorageSystemUsers>(system_database, "users");
attach<StorageSystemRoles>(system_database, "roles");
attach<StorageSystemGrants>(system_database, "grants");
attach<StorageSystemRoleGrants>(system_database, "role_grants");
attach<StorageSystemCurrentRoles>(system_database, "current_roles");
attach<StorageSystemEnabledRoles>(system_database, "enabled_roles");
attach<StorageSystemSettingsProfiles>(system_database, "settings_profiles");
attach<StorageSystemSettingsProfileElements>(system_database, "settings_profile_elements");
attach<StorageSystemRowPolicies>(system_database, "row_policies");
attach<StorageSystemQuotas>(system_database, "quotas");
attach<StorageSystemQuotaLimits>(system_database, "quota_limits");
attach<StorageSystemQuotaUsage>(system_database, "quota_usage");
attach<StorageSystemQuotasUsage>(system_database, "quotas_usage");
attach<StorageSystemUserDirectories>(system_database, "user_directories");
attach<StorageSystemPrivileges>(system_database, "privileges");
attach<StorageSystemErrors>(system_database, "errors");
attach<StorageSystemWarnings>(system_database, "warnings");
attach<StorageSystemDataSkippingIndices>(system_database, "data_skipping_indices");
attach<StorageSystemLicenses>(system_database, "licenses");
attach<StorageSystemTimeZones>(system_database, "time_zones");
attach<StorageSystemOne>(context, system_database, "one");
attach<StorageSystemNumbers>(context, system_database, "numbers", false);
attach<StorageSystemNumbers>(context, system_database, "numbers_mt", true);
attach<StorageSystemZeros>(context, system_database, "zeros", false);
attach<StorageSystemZeros>(context, system_database, "zeros_mt", true);
attach<StorageSystemDatabases>(context, system_database, "databases");
attach<StorageSystemTables>(context, system_database, "tables");
attach<StorageSystemColumns>(context, system_database, "columns");
attach<StorageSystemFunctions>(context, system_database, "functions");
attach<StorageSystemEvents>(context, system_database, "events");
attach<StorageSystemSettings>(context, system_database, "settings");
attach<SystemMergeTreeSettings<false>>(context, system_database, "merge_tree_settings");
attach<SystemMergeTreeSettings<true>>(context, system_database, "replicated_merge_tree_settings");
attach<StorageSystemBuildOptions>(context, system_database, "build_options");
attach<StorageSystemFormats>(context, system_database, "formats");
attach<StorageSystemTableFunctions>(context, system_database, "table_functions");
attach<StorageSystemAggregateFunctionCombinators>(context, system_database, "aggregate_function_combinators");
attach<StorageSystemDataTypeFamilies>(context, system_database, "data_type_families");
attach<StorageSystemCollations>(context, system_database, "collations");
attach<StorageSystemTableEngines>(context, system_database, "table_engines");
attach<StorageSystemContributors>(context, system_database, "contributors");
attach<StorageSystemUsers>(context, system_database, "users");
attach<StorageSystemRoles>(context, system_database, "roles");
attach<StorageSystemGrants>(context, system_database, "grants");
attach<StorageSystemRoleGrants>(context, system_database, "role_grants");
attach<StorageSystemCurrentRoles>(context, system_database, "current_roles");
attach<StorageSystemEnabledRoles>(context, system_database, "enabled_roles");
attach<StorageSystemSettingsProfiles>(context, system_database, "settings_profiles");
attach<StorageSystemSettingsProfileElements>(context, system_database, "settings_profile_elements");
attach<StorageSystemRowPolicies>(context, system_database, "row_policies");
attach<StorageSystemQuotas>(context, system_database, "quotas");
attach<StorageSystemQuotaLimits>(context, system_database, "quota_limits");
attach<StorageSystemQuotaUsage>(context, system_database, "quota_usage");
attach<StorageSystemQuotasUsage>(context, system_database, "quotas_usage");
attach<StorageSystemUserDirectories>(context, system_database, "user_directories");
attach<StorageSystemPrivileges>(context, system_database, "privileges");
attach<StorageSystemErrors>(context, system_database, "errors");
attach<StorageSystemWarnings>(context, system_database, "warnings");
attach<StorageSystemDataSkippingIndices>(context, system_database, "data_skipping_indices");
attach<StorageSystemLicenses>(context, system_database, "licenses");
attach<StorageSystemTimeZones>(context, system_database, "time_zones");
#ifdef OS_LINUX
attach<StorageSystemStackTrace>(system_database, "stack_trace");
attach<StorageSystemStackTrace>(context, system_database, "stack_trace");
#endif
#if USE_ROCKSDB
attach<StorageSystemRocksDB>(system_database, "rocksdb");
attach<StorageSystemRocksDB>(context, system_database, "rocksdb");
#endif
}
void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, bool has_zookeeper)
{
attachSystemTablesLocal(system_database);
attachSystemTablesLocal(context, system_database);
attach<StorageSystemParts>(system_database, "parts");
attach<StorageSystemProjectionParts>(system_database, "projection_parts");
attach<StorageSystemDetachedParts>(system_database, "detached_parts");
attach<StorageSystemPartsColumns>(system_database, "parts_columns");
attach<StorageSystemProjectionPartsColumns>(system_database, "projection_parts_columns");
attach<StorageSystemDisks>(system_database, "disks");
attach<StorageSystemStoragePolicies>(system_database, "storage_policies");
attach<StorageSystemProcesses>(system_database, "processes");
attach<StorageSystemMetrics>(system_database, "metrics");
attach<StorageSystemMerges>(system_database, "merges");
attach<StorageSystemMutations>(system_database, "mutations");
attach<StorageSystemReplicas>(system_database, "replicas");
attach<StorageSystemReplicationQueue>(system_database, "replication_queue");
attach<StorageSystemDDLWorkerQueue>(system_database, "distributed_ddl_queue");
attach<StorageSystemDistributionQueue>(system_database, "distribution_queue");
attach<StorageSystemDictionaries>(system_database, "dictionaries");
attach<StorageSystemModels>(system_database, "models");
attach<StorageSystemClusters>(system_database, "clusters");
attach<StorageSystemGraphite>(system_database, "graphite_retentions");
attach<StorageSystemMacros>(system_database, "macros");
attach<StorageSystemReplicatedFetches>(system_database, "replicated_fetches");
attach<StorageSystemPartMovesBetweenShards>(system_database, "part_moves_between_shards");
attach<StorageSystemAsynchronousInserts>(system_database, "asynchronous_inserts");
attach<StorageSystemParts>(context, system_database, "parts");
attach<StorageSystemProjectionParts>(context, system_database, "projection_parts");
attach<StorageSystemDetachedParts>(context, system_database, "detached_parts");
attach<StorageSystemPartsColumns>(context, system_database, "parts_columns");
attach<StorageSystemProjectionPartsColumns>(context, system_database, "projection_parts_columns");
attach<StorageSystemDisks>(context, system_database, "disks");
attach<StorageSystemStoragePolicies>(context, system_database, "storage_policies");
attach<StorageSystemProcesses>(context, system_database, "processes");
attach<StorageSystemMetrics>(context, system_database, "metrics");
attach<StorageSystemMerges>(context, system_database, "merges");
attach<StorageSystemMutations>(context, system_database, "mutations");
attach<StorageSystemReplicas>(context, system_database, "replicas");
attach<StorageSystemReplicationQueue>(context, system_database, "replication_queue");
attach<StorageSystemDDLWorkerQueue>(context, system_database, "distributed_ddl_queue");
attach<StorageSystemDistributionQueue>(context, system_database, "distribution_queue");
attach<StorageSystemDictionaries>(context, system_database, "dictionaries");
attach<StorageSystemModels>(context, system_database, "models");
attach<StorageSystemClusters>(context, system_database, "clusters");
attach<StorageSystemGraphite>(context, system_database, "graphite_retentions");
attach<StorageSystemMacros>(context, system_database, "macros");
attach<StorageSystemReplicatedFetches>(context, system_database, "replicated_fetches");
attach<StorageSystemPartMovesBetweenShards>(context, system_database, "part_moves_between_shards");
attach<StorageSystemAsynchronousInserts>(context, system_database, "asynchronous_inserts");
if (has_zookeeper)
attach<StorageSystemZooKeeper>(system_database, "zookeeper");
attach<StorageSystemZooKeeper>(context, system_database, "zookeeper");
}
void attachSystemTablesAsync(IDatabase & system_database, AsynchronousMetrics & async_metrics)
void attachSystemTablesAsync(ContextPtr context, IDatabase & system_database, AsynchronousMetrics & async_metrics)
{
attach<StorageSystemAsynchronousMetrics>(system_database, "asynchronous_metrics", async_metrics);
attach<StorageSystemAsynchronousMetrics>(context, system_database, "asynchronous_metrics", async_metrics);
}
}

View File

@ -1,17 +1,16 @@
#pragma once
#include <memory>
#include <Interpreters/Context_fwd.h>
namespace DB
{
class Context;
class AsynchronousMetrics;
class IDatabase;
void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper);
void attachSystemTablesLocal(IDatabase & system_database);
void attachSystemTablesAsync(IDatabase & system_database, AsynchronousMetrics & async_metrics);
void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, bool has_zookeeper);
void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database);
void attachSystemTablesAsync(ContextPtr context, IDatabase & system_database, AsynchronousMetrics & async_metrics);
}

View File

@ -7,14 +7,14 @@ namespace DB
{
template<typename StorageT, typename... StorageArgs>
void attach(IDatabase & system_database, const String & table_name, StorageArgs && ... args)
void attach(ContextPtr context, IDatabase & system_database, const String & table_name, StorageArgs && ... args)
{
assert(system_database.getDatabaseName() == DatabaseCatalog::SYSTEM_DATABASE);
if (system_database.getUUID() == UUIDHelpers::Nil)
{
/// Attach to Ordinary database
auto table_id = StorageID(DatabaseCatalog::SYSTEM_DATABASE, table_name);
system_database.attachTable(table_name, StorageT::create(table_id, std::forward<StorageArgs>(args)...));
system_database.attachTable(context, table_name, StorageT::create(table_id, std::forward<StorageArgs>(args)...));
}
else
{
@ -23,7 +23,7 @@ void attach(IDatabase & system_database, const String & table_name, StorageArgs
/// and path is actually not used
auto table_id = StorageID(DatabaseCatalog::SYSTEM_DATABASE, table_name, UUIDHelpers::generateV4());
String path = "store/" + DatabaseCatalog::getPathForUUID(table_id.uuid);
system_database.attachTable(table_name, StorageT::create(table_id, std::forward<StorageArgs>(args)...), path);
system_database.attachTable(context, table_name, StorageT::create(table_id, std::forward<StorageArgs>(args)...), path);
}
}

View File

@ -84,6 +84,7 @@ private:
const auto & table_name = tab.table.table;
const auto & db_name = tab.table.database;
database->attachTable(
context,
table_name,
StorageMemory::create(
StorageID(db_name, table_name), ColumnsDescription{getColumns()}, ConstraintsDescription{}, String{}));

View File

@ -767,7 +767,7 @@ class ClickHouseCluster:
hostname=None, env_variables=None, image="clickhouse/integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, external_dirs=None, tmpfs=None,
zookeeper_docker_compose_path=None, minio_certs_dir=None, use_keeper=True,
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True, config_root_name="clickhouse"):
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True, config_root_name="clickhouse") -> 'ClickHouseInstance':
"""Add an instance to the cluster.

View File

@ -3,9 +3,9 @@
<profiles>
<default>
<allow_experimental_database_materialized_mysql>1</allow_experimental_database_materialized_mysql>
<default_database_engine>Atomic</default_database_engine>
<allow_introspection_functions>1</allow_introspection_functions>
<optimize_on_insert>0</optimize_on_insert>
<default_database_engine>Ordinary</default_database_engine>
</default>
</profiles>

View File

@ -1,19 +0,0 @@
<?xml version="1.0"?>
<clickhouse>
<profiles>
<default>
<allow_experimental_database_materialized_mysql>1</allow_experimental_database_materialized_mysql>
<default_database_engine>Atomic</default_database_engine>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -21,7 +21,7 @@ def check_query(clickhouse_node, query, result_set, retry_count=10, interval_sec
if result_set == lastest_result:
return
logging.debug(f"latest_result{lastest_result}")
logging.debug(f"latest_result {lastest_result}")
time.sleep(interval_seconds)
except Exception as e:
logging.debug(f"check_query retry {i+1} exception {e}")

View File

@ -5,7 +5,7 @@ import pwd
import re
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster, get_docker_compose_path, run_and_check
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_docker_compose_path, run_and_check
import docker
import logging
@ -17,10 +17,10 @@ cluster = ClickHouseCluster(__file__)
mysql_node = None
mysql8_node = None
node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=True, stay_alive=True)
node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql8=True, stay_alive=True)
node_disable_bytes_settings = cluster.add_instance('node3', user_configs=["configs/users_disable_bytes_settings.xml"], with_mysql=False, stay_alive=True)
node_disable_rows_settings = cluster.add_instance('node4', user_configs=["configs/users_disable_rows_settings.xml"], with_mysql=False, stay_alive=True)
node_db = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=True, with_mysql8=True, stay_alive=True)
node_disable_bytes_settings = cluster.add_instance('node2', user_configs=["configs/users_disable_bytes_settings.xml"], with_mysql=False, stay_alive=True)
node_disable_rows_settings = cluster.add_instance('node3', user_configs=["configs/users_disable_rows_settings.xml"], with_mysql=False, stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
@ -82,32 +82,39 @@ class MySQLConnection:
if self.mysql_connection is not None:
self.mysql_connection.close()
@pytest.fixture(scope="module")
def started_mysql_5_7():
mysql_node = MySQLConnection(cluster.mysql_port, 'root', 'clickhouse', cluster.mysql_ip)
yield mysql_node
@pytest.fixture(scope="module")
def started_mysql_8_0():
mysql8_node = MySQLConnection(cluster.mysql8_port, 'root', 'clickhouse', cluster.mysql8_ip)
yield mysql8_node
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
@pytest.fixture(scope='module')
def clickhouse_node():
yield node_db
def test_materialized_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node: ClickHouseInstance):
materialize_with_ddl.dml_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialized_mysql_database_with_views(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialized_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.move_to_prewhere_and_column_filtering(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
def test_materialized_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.dml_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_mysql_database_with_views(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.move_to_prewhere_and_column_filtering(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
def test_materialized_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.drop_table_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.create_table_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.rename_table_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
@ -119,8 +126,8 @@ def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_
materialize_with_ddl.alter_modify_column_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.create_table_like_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
def test_materialized_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.drop_table_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.create_table_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.rename_table_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
@ -131,102 +138,96 @@ def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_
materialize_with_ddl.alter_modify_column_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.create_table_like_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
def test_materialized_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
def test_materialized_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_select_without_columns_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_select_without_columns_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
def test_materialized_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
def test_materialized_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_network_partition_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_network_partition_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_mysql_kill_sync_thread_restore_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_mysql_kill_sync_thread_restore_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_mysql_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_mysql_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_clickhouse_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_clickhouse_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_utf8mb4(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_parts_table(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.system_parts_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_multi_table_update(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_tables_table(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_materialize_with_column_comments(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
def test_materialized_with_column_comments(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialize_with_column_comments_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_with_column_comments_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_materialize_with_enum(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
def test_materialized_with_enum(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialize_with_enum8_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_with_enum16_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_enum8_to_enum16_test(clickhouse_node, started_mysql_5_7, "mysql57")
@ -240,7 +241,7 @@ def test_mysql_settings(started_cluster, started_mysql_8_0, started_mysql_5_7, c
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_large_transaction(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_mysql_large_transaction(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_mysql_large_transaction(clickhouse_node, started_mysql_5_7, "mysql57")