diff --git a/src/Core/MySQLClient.cpp b/src/Core/MySQLClient.cpp index 5cd71377a68..ffcbf798bb1 100644 --- a/src/Core/MySQLClient.cpp +++ b/src/Core/MySQLClient.cpp @@ -125,9 +125,6 @@ void MySQLClient::ping() void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos) { - if (dump_thread) - return; - /// Set binlog checksum to CRC32. String checksum = "CRC32"; writeCommand(Command::COM_QUERY, "SET @master_binlog_checksum = '" + checksum + "'"); @@ -145,34 +142,14 @@ void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String b binlog_pos = binlog_pos < 4 ? 4 : binlog_pos; BinlogDump binlog_dump(binlog_pos, binlog_file_name, slave_id); packet_sender->sendPacket(binlog_dump, true); - dump_thread.emplace([this]() - { - while (true) - { - try - { - packet_sender->receivePacket(replication); - auto receive_event = replication.readOneEvent(); - events.push(std::make_pair(receive_event, replication.getPosition())); - } - catch(...) - { - tryLogCurrentException("MySQLClient"); - /// TODO: maybe sleep? - } - } - }); } BinlogEventPtr MySQLClient::readOneBinlogEvent(UInt64 milliseconds) { - std::pair event; + if (packet_sender->tryReceivePacket(replication, milliseconds)) + return replication.readOneEvent(); - if (!events.tryPop(event, milliseconds)) - return {}; - - last_position = event.second; - return event.first; + return {}; } } diff --git a/src/Core/MySQLClient.h b/src/Core/MySQLClient.h index 753e2a6cd44..ff0b1fbd3a5 100644 --- a/src/Core/MySQLClient.h +++ b/src/Core/MySQLClient.h @@ -11,7 +11,6 @@ #include #include #include -#include namespace DB @@ -37,7 +36,7 @@ public: void startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos); BinlogEventPtr readOneBinlogEvent(UInt64 milliseconds = 0); - Position getPosition() const { return last_position; } + Position getPosition() const { return replication.getPosition(); } private: String host; @@ -59,9 +58,6 @@ private: std::unique_ptr socket; std::optional address; std::shared_ptr packet_sender; - Position last_position; - std::optional dump_thread; - ConcurrentBoundedQueue> events{1}; void handshake(); void registerSlaveOnMaster(UInt32 slave_id); diff --git a/src/Core/MySQLProtocol.h b/src/Core/MySQLProtocol.h index 1615ae7c7c6..694e6d24baf 100644 --- a/src/Core/MySQLProtocol.h +++ b/src/Core/MySQLProtocol.h @@ -407,6 +407,23 @@ public: packet.readPayload(*in, sequence_id); } + bool tryReceivePacket(ReadPacket & packet, UInt64 millisecond = 0) + { + if (millisecond != 0) + { + ReadBufferFromPocoSocket * socket_in = typeid_cast(in); + + if (!socket_in) + throw Exception("LOGICAL ERROR: Attempt to pull the duration in a non socket stream", ErrorCodes::LOGICAL_ERROR); + + if (!socket_in->poll(millisecond * 1000)) + return false; + } + + packet.readPayload(*in, sequence_id); + return true; + } + template void sendPacket(const T & packet, bool flush = false) { diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 17dc2eeb804..cbd5f055f31 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -79,11 +79,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String const String & engine_name = engine_define->engine->name; const UUID & uuid = create.uuid; - if (engine_name != "MySQL" && engine_name != "Lazy" && engine_define->engine->arguments) + if (engine_name != "MySQL" && engine_name != "MaterializeMySQL" && engine_name != "Lazy" && engine_define->engine->arguments) throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS); if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by || - engine_define->sample_by || (engine_name != "MySQL" && engine_define->settings)) + engine_define->sample_by || (engine_name != "MaterializeMySQL" && engine_define->settings)) throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings", ErrorCodes::UNKNOWN_ELEMENT_IN_AST); diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index ed246825b5c..ed99e84749d 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -19,7 +20,7 @@ namespace ErrorCodes DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( const Context & context, const String & database_name_, const String & metadata_path_, const IAST * database_engine_define_ , const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) - : IDatabase(database_name_), engine_define(database_engine_define_->clone()) + : IDatabase(database_name_), global_context(context.getGlobalContext()), engine_define(database_engine_define_->clone()) , nested_database(std::make_shared(database_name_, metadata_path_, context)) , settings(std::move(settings_)), log(&Poco::Logger::get("DatabaseMaterializeMySQL")) , materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get()) @@ -75,7 +76,13 @@ void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_for void DatabaseMaterializeMySQL::shutdown() { - getNestedDatabase()->shutdown(); + materialize_thread.stopSynchronization(); + + auto iterator = nested_database->getTablesIterator(global_context, {}); + + /// We only shutdown the table, The tables is cleaned up when destructed database + for (; iterator->isValid(); iterator->next()) + iterator->table()->shutdown(); } bool DatabaseMaterializeMySQL::empty() const @@ -168,12 +175,26 @@ void DatabaseMaterializeMySQL::alterTable(const Context & context, const Storage bool DatabaseMaterializeMySQL::shouldBeEmptyOnDetach() const { - return getNestedDatabase()->shouldBeEmptyOnDetach(); + return false; } void DatabaseMaterializeMySQL::drop(const Context & context) { - getNestedDatabase()->drop(context); + DatabasePtr nested_database = getNestedDatabase(); + + if (nested_database->shouldBeEmptyOnDetach()) + { + for (auto iterator = nested_database->getTablesIterator(context, {}); iterator->isValid(); iterator->next()) + { + TableExclusiveLockHolder table_lock = iterator->table()->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); + nested_database->dropTable(context, iterator->name(), true); + } + + /// Remove metadata info + Poco::File(getMetadataPath() + "/.metadata").remove(false); + } + + nested_database->drop(context); } bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & context) const diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index a30390a4628..78f8e293224 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -23,6 +23,8 @@ public: void setException(const std::exception_ptr & exception); protected: + const Context & global_context; + ASTPtr engine_define; DatabasePtr nested_database; std::unique_ptr settings; diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 35cd9113b5e..e0baf98e252 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -68,16 +68,7 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread() { try { - if (!sync_quit) - { - { - sync_quit = true; - std::lock_guard lock(sync_mutex); - } - - sync_cond.notify_one(); - /// TODO: join thread - } + stopSynchronization(); } catch (...) { @@ -144,6 +135,20 @@ void MaterializeMySQLSyncThread::synchronization() } } +void MaterializeMySQLSyncThread::stopSynchronization() +{ + if (!sync_quit) + { + { + sync_quit = true; + std::lock_guard lock(sync_mutex); + } + + sync_cond.notify_one(); + background_thread_pool->join(); + } +} + void MaterializeMySQLSyncThread::startSynchronization() { /// TODO: reset exception. @@ -404,6 +409,7 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr } } } + bool MaterializeMySQLSyncThread::isMySQLSyncThread() { return getThreadName() == MYSQL_BACKGROUND_THREAD_NAME; diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h index 19a9f318398..76297a55f54 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.h +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -30,6 +30,8 @@ public: const Context & context, const String & database_name_, const String & mysql_database_name_ , mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_); + void stopSynchronization(); + void startSynchronization(); static bool isMySQLSyncThread();