From 4a99ca0d80b6e40e295a191c3e7a6772717a5917 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 22 Jun 2020 11:12:05 +0800 Subject: [PATCH] ISSUES-4006 refactor --- src/Common/CurrentMetrics.cpp | 1 + src/Core/MySQLClient.cpp | 37 +- src/Core/MySQLClient.h | 8 +- src/Databases/DatabaseFactory.cpp | 4 +- .../MySQL/DatabaseMaterializeMySQL.cpp | 325 +++++++------ .../MySQL/DatabaseMaterializeMySQL.h | 100 ++-- .../MySQL/DatabaseMaterializeMySQLWrap.cpp | 188 -------- .../MySQL/DatabaseMaterializeMySQLWrap.h | 70 --- src/Databases/MySQL/EventConsumer.cpp | 261 ---------- src/Databases/MySQL/EventConsumer.h | 63 --- ...tings.cpp => MaterializeMySQLSettings.cpp} | 6 +- ...eSettings.h => MaterializeMySQLSettings.h} | 4 +- .../MySQL/MaterializeMySQLSyncThread.cpp | 444 ++++++++++++++++++ .../MySQL/MaterializeMySQLSyncThread.h | 91 ++++ 14 files changed, 798 insertions(+), 804 deletions(-) delete mode 100644 src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp delete mode 100644 src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h delete mode 100644 src/Databases/MySQL/EventConsumer.cpp delete mode 100644 src/Databases/MySQL/EventConsumer.h rename src/Databases/MySQL/{MaterializeModeSettings.cpp => MaterializeMySQLSettings.cpp} (79%) rename src/Databases/MySQL/{MaterializeModeSettings.h => MaterializeMySQLSettings.h} (76%) create mode 100644 src/Databases/MySQL/MaterializeMySQLSyncThread.cpp create mode 100644 src/Databases/MySQL/MaterializeMySQLSyncThread.h diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 4bab9ef2844..afbf14b4a38 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -13,6 +13,7 @@ M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \ M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \ M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \ + M(BackgroundMySQLSyncSchedulePoolTask, "Number of active tasks in BackgroundMySQLSyncSchedulePoolTask. This pool is used for MySQL Materialize Database sync.") \ M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \ M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \ diff --git a/src/Core/MySQLClient.cpp b/src/Core/MySQLClient.cpp index 5905de11dd2..109e8e1a7da 100644 --- a/src/Core/MySQLClient.cpp +++ b/src/Core/MySQLClient.cpp @@ -125,6 +125,9 @@ void MySQLClient::ping() void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos) { + if (dump_thread) + return; + /// Set binlog checksum to CRC32. String checksum = "CRC32"; writeCommand(Command::COM_QUERY, "SET @master_binlog_checksum = '" + checksum + "'"); @@ -142,15 +145,33 @@ void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String b binlog_pos = binlog_pos < 4 ? 4 : binlog_pos; BinlogDump binlog_dump(binlog_pos, binlog_file_name, slave_id); packet_sender->sendPacket(binlog_dump, true); -} - -BinlogEventPtr MySQLClient::readOneBinlogEvent() -{ - while (true) + dump_thread.emplace([this]() { - packet_sender->receivePacket(replication); - return replication.readOneEvent(); - } + while (true) + { + try + { + packet_sender->receivePacket(replication); + events.push(std::make_pair(replication.readOneEvent(), replication.getPosition())); + } + catch(...) + { + tryLogCurrentException("MySQLClient"); + /// TODO: maybe sleep? + } + } + }); +} + +BinlogEventPtr MySQLClient::readOneBinlogEvent(UInt64 milliseconds) +{ + std::pair event; + + if (!events.tryPop(event, milliseconds)) + return {}; + + last_position = event.second; + return event.first; } } diff --git a/src/Core/MySQLClient.h b/src/Core/MySQLClient.h index 5c42e5c5d34..753e2a6cd44 100644 --- a/src/Core/MySQLClient.h +++ b/src/Core/MySQLClient.h @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB @@ -35,8 +36,8 @@ public: void ping(); void startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos); - BinlogEventPtr readOneBinlogEvent(); - Position getPosition() const { return replication.getPosition(); } + BinlogEventPtr readOneBinlogEvent(UInt64 milliseconds = 0); + Position getPosition() const { return last_position; } private: String host; @@ -58,6 +59,9 @@ private: std::unique_ptr socket; std::optional address; std::shared_ptr packet_sender; + Position last_position; + std::optional dump_thread; + ConcurrentBoundedQueue> events{1}; void handshake(); void registerSlaveOnMaster(UInt32 slave_id); diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index e7f8b6f22de..71fdc15f70f 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -20,8 +20,8 @@ #if USE_MYSQL # include # include +# include # include -# include # include # include # include @@ -119,7 +119,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306); auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port); - auto materialize_mode_settings = std::make_unique(); + auto materialize_mode_settings = std::make_unique(); if (engine_define->settings) materialize_mode_settings->loadFromQuery(*engine_define); diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index b3d55c46d66..32b2e7c3b40 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -1,198 +1,195 @@ -#if !defined(ARCADIA_BUILD) -# include "config_core.h" -#endif - -#if USE_MYSQL - #include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include +#include +#include +#include +#include +#include +#include namespace DB { +static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync"; + namespace ErrorCodes { - extern const int INCORRECT_QUERY; -} - -static inline BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & comment) -{ - try - { - Context context = context_; - context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - context.setCurrentQueryId(""); // generate random query_id - return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true); - } - catch (...) - { - tryLogCurrentException("DatabaseMaterializeMySQL", "Query " + query_to_execute + " wasn't finished successfully"); - throw; - } - - LOG_DEBUG(&Logger::get("DatabaseMaterializeMySQL"), "Executed query: " << query_to_execute); + extern const int NOT_IMPLEMENTED; } DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( - const Context & context, const String & database_name_, const String & metadata_path_ - , const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_ - , MySQLClient && client_ , std::unique_ptr settings_) - : DatabaseMaterializeMySQLWrap(std::make_shared(database_name_, metadata_path_, context), database_engine_define_->clone(), "DatabaseMaterializeMySQL") - , global_context(context.getGlobalContext()), metadata_path(metadata_path_), mysql_database_name(mysql_database_name_) - , pool(std::move(pool_)), client(std::move(client_)), settings(std::move(settings_)) + const Context & context, const String & database_name_, const String & metadata_path_, const IAST * database_engine_define_ + , const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr settings_) + : IDatabase(database_name_), engine_define(database_engine_define_->clone()) + , nested_database(std::make_shared(database_name_, metadata_path_, context)) + , settings(std::move(settings_)), log(&Poco::Logger::get("DatabaseMaterializeMySQL")) + , materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get()) { - /// TODO: 做简单的check, 失败即报错 - scheduleSynchronized(); } -BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name) +void DatabaseMaterializeMySQL::setException(const std::exception_ptr & exception_) { - String with_database_table_name = backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name); - BlockIO res = tryToExecuteQuery("INSERT INTO " + with_database_table_name + " VALUES", global_context, ""); - - if (!res.out) - throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR); - - return res.out; + std::unique_lock lock(mutex); + exception = exception_; } -void DatabaseMaterializeMySQL::cleanOutdatedTables() +DatabasePtr DatabaseMaterializeMySQL::getNestedDatabase() const { - auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, ""); - const DatabasePtr & clean_database = DatabaseCatalog::instance().getDatabase(database_name); + std::unique_lock lock(mutex); - for (auto iterator = clean_database->getTablesIterator(); iterator->isValid(); iterator->next()) - { - String table = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name()); - String comment = String("Clean ") + table + " for dump mysql."; - tryToExecuteQuery("DROP TABLE " + table, global_context, comment); - } + if (exception) + std::rethrow_exception(exception); + + return nested_database; } -void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function & is_cancelled) +ASTPtr DatabaseMaterializeMySQL::getCreateDatabaseQuery() const { - auto iterator = master_info.need_dumping_tables.begin(); - for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator) - { - const auto & table_name = iterator->first; - MySQLTableStruct table_struct = visitCreateQuery(iterator->second, global_context, database_name); - String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name); - tryToExecuteQuery(toCreateQuery(table_struct, global_context), global_context, comment); - - BlockOutputStreamPtr out = std::make_shared(master_info.version, getTableOutput(table_name)); - MySQLBlockInputStream input(connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE); - copyData(input, *out, is_cancelled); - } + const auto & create_query = std::make_shared(); + create_query->database = database_name; + create_query->set(create_query->storage, engine_define); + return create_query; } - -std::optional DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock & lock, const std::function & is_cancelled) -{ - while (!is_cancelled()) - { - try - { - LOG_DEBUG(log, "Checking " + database_name + " database status."); - while (!is_cancelled && !DatabaseCatalog::instance().isDatabaseExist(database_name)) - sync_cond.wait_for(lock, std::chrono::seconds(1)); - - LOG_DEBUG(log, database_name + " database status is OK."); - - mysqlxx::PoolWithFailover::Entry connection = pool.get(); - MaterializeMetadata metadata(connection, getMetadataPath() + "/.metadata", mysql_database_name); - - if (!metadata.need_dumping_tables.empty()) - { - metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]() - { - cleanOutdatedTables(); - dumpDataForTables(connection, metadata, is_cancelled); - }); - } - - client.connect(); - client.startBinlogDump(std::rand(), mysql_database_name, metadata.binlog_file, metadata.binlog_position); - return metadata; - } - catch (mysqlxx::Exception & ) - { - tryLogCurrentException(log); - - /// Avoid busy loop when MySQL is not available. - sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); - } - } - - return {}; -} - -void DatabaseMaterializeMySQL::scheduleSynchronized() -{ - background_thread_pool.scheduleOrThrowOnError([&]() - { - ThreadStatus thread_status; - setThreadName("MySQLDBSync"); - - std::unique_lock lock(sync_mutex); - const auto quit_requested = [this] { return sync_quit.load(std::memory_order_relaxed); }; - - try - { - std::optional metadata = prepareSynchronized(lock, quit_requested); - - if (!quit_requested() && metadata) - { - EventConsumer consumer(getDatabaseName(), global_context, *metadata, *settings); - - while (!quit_requested()) - { - const auto & event = client.readOneBinlogEvent(); - consumer.onEvent(event, client.getPosition()); - } - } - } - catch(...) - { - setException(std::current_exception()); - } - }); -} -DatabaseMaterializeMySQL::~DatabaseMaterializeMySQL() +void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag) { try { - if (!sync_quit) - { - { - sync_quit = true; - std::lock_guard lock(sync_mutex); - } - - sync_cond.notify_one(); - background_thread_pool.wait(); - } + LOG_DEBUG(log, "Loading MySQL nested database stored objects."); + getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag); + LOG_DEBUG(log, "Loaded MySQL nested database stored objects."); } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, "Cannot load MySQL nested database stored objects."); + throw; } } +void DatabaseMaterializeMySQL::shutdown() +{ + getNestedDatabase()->shutdown(); } -#endif +bool DatabaseMaterializeMySQL::empty() const +{ + return getNestedDatabase()->empty(); +} + +String DatabaseMaterializeMySQL::getDataPath() const +{ + return getNestedDatabase()->getDataPath(); +} + +String DatabaseMaterializeMySQL::getMetadataPath() const +{ + return getNestedDatabase()->getMetadataPath(); +} + +String DatabaseMaterializeMySQL::getTableDataPath(const String & table_name) const +{ + return getNestedDatabase()->getTableDataPath(table_name); +} + +String DatabaseMaterializeMySQL::getTableDataPath(const ASTCreateQuery & query) const +{ + return getNestedDatabase()->getTableDataPath(query); +} + +String DatabaseMaterializeMySQL::getObjectMetadataPath(const String & table_name) const +{ + return getNestedDatabase()->getObjectMetadataPath(table_name); +} + +UUID DatabaseMaterializeMySQL::tryGetTableUUID(const String & table_name) const +{ + return getNestedDatabase()->tryGetTableUUID(table_name); +} + +time_t DatabaseMaterializeMySQL::getObjectMetadataModificationTime(const String & name) const +{ + return getNestedDatabase()->getObjectMetadataModificationTime(name); +} + +void DatabaseMaterializeMySQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support create table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->createTable(context, name, table, query); +} + +void DatabaseMaterializeMySQL::dropTable(const Context & context, const String & name, bool no_delay) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support drop table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->dropTable(context, name, no_delay); +} + +void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support attach table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->attachTable(name, table, relative_table_path); +} + +StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support detach table.", ErrorCodes::NOT_IMPLEMENTED); + + return getNestedDatabase()->detachTable(name); +} + +void DatabaseMaterializeMySQL::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support rename table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->renameTable(context, name, to_database, to_name, exchange); +} + +void DatabaseMaterializeMySQL::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support alter table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->alterTable(context, table_id, metadata); +} + +bool DatabaseMaterializeMySQL::shouldBeEmptyOnDetach() const +{ + return getNestedDatabase()->shouldBeEmptyOnDetach(); +} + +void DatabaseMaterializeMySQL::drop(const Context & context) +{ + getNestedDatabase()->drop(context); +} + +bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & context) const +{ + return getNestedDatabase()->isTableExist(name, context); +} + +StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + return std::make_shared(getNestedDatabase()->tryGetTable(name, context)); + + return getNestedDatabase()->tryGetTable(name, context); +} + +DatabaseTablesIteratorPtr DatabaseMaterializeMySQL::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + { + DatabaseTablesIteratorPtr iterator = getNestedDatabase()->getTablesIterator(context, filter_by_table_name); + return std::make_unique(std::move(iterator)); + } + + return getNestedDatabase()->getTablesIterator(context, filter_by_table_name); +} + +} diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index 74526bb4635..e698bf67689 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -1,64 +1,80 @@ #pragma once -#include "config_core.h" - -#if USE_MYSQL - -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include +#include +#include +#include +#include +#include namespace DB { -class DatabaseMaterializeMySQL : public DatabaseMaterializeMySQLWrap +class DatabaseMaterializeMySQL : public IDatabase { public: - ~DatabaseMaterializeMySQL() override; - DatabaseMaterializeMySQL( const Context & context, const String & database_name_, const String & metadata_path_, - const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, - MySQLClient && client_, std::unique_ptr settings_); + const IAST * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, + MySQLClient && client_, std::unique_ptr settings_); - String getEngineName() const override { return "MySQL"; } + void setException(const std::exception_ptr & exception); +protected: + ASTPtr engine_define; + DatabasePtr nested_database; + std::unique_ptr settings; -private: - const Context & global_context; - String metadata_path; - String mysql_database_name; + Poco::Logger * log; + MaterializeMySQLSyncThread materialize_thread; - mutable mysqlxx::Pool pool; - mutable MySQLClient client; - std::unique_ptr settings; + mutable std::mutex mutex; + std::exception_ptr exception; - void cleanOutdatedTables(); + DatabasePtr getNestedDatabase() const; - void scheduleSynchronized(); +public: + ASTPtr getCreateDatabaseQuery() const override; - BlockOutputStreamPtr getTableOutput(const String & table_name); + void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override; - std::optional prepareSynchronized(std::unique_lock & lock, const std::function & is_cancelled); + void shutdown() override; - void dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function & is_cancelled); + bool empty() const override; - std::mutex sync_mutex; - std::atomic sync_quit{false}; - std::condition_variable sync_cond; - ThreadPool background_thread_pool{1}; + String getDataPath() const override; + + String getTableDataPath(const String & table_name) const override; + + String getTableDataPath(const ASTCreateQuery & query) const override; + + UUID tryGetTableUUID(const String & table_name) const override; + + void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override; + + void dropTable(const Context & context, const String & name, bool no_delay) override; + + void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override; + + StoragePtr detachTable(const String & name) override; + + void renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) override; + + void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override; + + time_t getObjectMetadataModificationTime(const String & name) const override; + + String getMetadataPath() const override; + + String getObjectMetadataPath(const String & table_name) const override; + + bool shouldBeEmptyOnDetach() const override; + + void drop(const Context & context) override; + + bool isTableExist(const String & name, const Context & context) const override; + + StoragePtr tryGetTable(const String & name, const Context & context) const override; + + DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override; }; } - -#endif diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp deleted file mode 100644 index 6b628ddf26d..00000000000 --- a/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp +++ /dev/null @@ -1,188 +0,0 @@ -#include - -#include -#include -#include -#include - -namespace DB -{ - -static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync"; - -namespace ErrorCodes -{ - extern const int NOT_IMPLEMENTED; -} - -DatabaseMaterializeMySQLWrap::DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name) - : IDatabase(nested_database_->getDatabaseName()), nested_database(nested_database_), database_engine_define(database_engine_define_), log(&Logger::get(log_name)) -{ -} - -void DatabaseMaterializeMySQLWrap::setException(const std::exception_ptr & exception_) -{ - std::unique_lock lock(mutex); - exception = exception_; -} - -DatabasePtr DatabaseMaterializeMySQLWrap::getNestedDatabase() const -{ - std::unique_lock lock(mutex); - - if (exception) - std::rethrow_exception(exception); - - return nested_database; -} - -ASTPtr DatabaseMaterializeMySQLWrap::getCreateDatabaseQuery() const -{ - const auto & create_query = std::make_shared(); - create_query->database = database_name; - create_query->set(create_query->storage, database_engine_define); - return create_query; -} -void DatabaseMaterializeMySQLWrap::loadStoredObjects(Context & context, bool has_force_restore_data_flag) -{ - try - { - LOG_DEBUG(log, "Loading MySQL nested database stored objects."); - getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag); - LOG_DEBUG(log, "Loaded MySQL nested database stored objects."); - } - catch (...) - { - tryLogCurrentException(log, "Cannot load MySQL nested database stored objects."); - throw; - } -} - -void DatabaseMaterializeMySQLWrap::shutdown() -{ - getNestedDatabase()->shutdown(); -} - -bool DatabaseMaterializeMySQLWrap::empty() const -{ - return getNestedDatabase()->empty(); -} - -String DatabaseMaterializeMySQLWrap::getDataPath() const -{ - return getNestedDatabase()->getDataPath(); -} - -String DatabaseMaterializeMySQLWrap::getMetadataPath() const -{ - return getNestedDatabase()->getMetadataPath(); -} - -String DatabaseMaterializeMySQLWrap::getTableDataPath(const String & table_name) const -{ - return getNestedDatabase()->getTableDataPath(table_name); -} - -String DatabaseMaterializeMySQLWrap::getTableDataPath(const ASTCreateQuery & query) const -{ - return getNestedDatabase()->getTableDataPath(query); -} - -String DatabaseMaterializeMySQLWrap::getObjectMetadataPath(const String & table_name) const -{ - return getNestedDatabase()->getObjectMetadataPath(table_name); -} - -UUID DatabaseMaterializeMySQLWrap::tryGetTableUUID(const String & table_name) const -{ - return getNestedDatabase()->tryGetTableUUID(table_name); -} - -time_t DatabaseMaterializeMySQLWrap::getObjectMetadataModificationTime(const String & name) const -{ - return getNestedDatabase()->getObjectMetadataModificationTime(name); -} - -void DatabaseMaterializeMySQLWrap::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - throw Exception("MySQL database in locality_data mode does not support create table.", ErrorCodes::NOT_IMPLEMENTED); - - getNestedDatabase()->createTable(context, name, table, query); -} - -void DatabaseMaterializeMySQLWrap::dropTable(const Context & context, const String & name, bool no_delay) -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - throw Exception("MySQL database in locality_data mode does not support drop table.", ErrorCodes::NOT_IMPLEMENTED); - - getNestedDatabase()->dropTable(context, name, no_delay); -} - -void DatabaseMaterializeMySQLWrap::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - throw Exception("MySQL database in locality_data mode does not support attach table.", ErrorCodes::NOT_IMPLEMENTED); - - getNestedDatabase()->attachTable(name, table, relative_table_path); -} - -StoragePtr DatabaseMaterializeMySQLWrap::detachTable(const String & name) -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - throw Exception("MySQL database in locality_data mode does not support detach table.", ErrorCodes::NOT_IMPLEMENTED); - - return getNestedDatabase()->detachTable(name); -} - -void DatabaseMaterializeMySQLWrap::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - throw Exception("MySQL database in locality_data mode does not support rename table.", ErrorCodes::NOT_IMPLEMENTED); - - getNestedDatabase()->renameTable(context, name, to_database, to_name, exchange); -} - -void DatabaseMaterializeMySQLWrap::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - throw Exception("MySQL database in locality_data mode does not support alter table.", ErrorCodes::NOT_IMPLEMENTED); - - getNestedDatabase()->alterTable(context, table_id, metadata); -} - -bool DatabaseMaterializeMySQLWrap::shouldBeEmptyOnDetach() const -{ - return getNestedDatabase()->shouldBeEmptyOnDetach(); -} - -void DatabaseMaterializeMySQLWrap::drop(const Context & context) -{ - getNestedDatabase()->drop(context); -} - -bool DatabaseMaterializeMySQLWrap::isTableExist(const String & name) const -{ - return getNestedDatabase()->isTableExist(name); -} - -StoragePtr DatabaseMaterializeMySQLWrap::tryGetTable(const String & name) const -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - return std::make_shared(getNestedDatabase()->tryGetTable(name)); - - return getNestedDatabase()->tryGetTable(name); -} - -DatabaseTablesIteratorPtr DatabaseMaterializeMySQLWrap::getTablesIterator(const FilterByNameFunction & filter_by_table_name) -{ - if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) - { - DatabaseTablesIteratorPtr iterator = getNestedDatabase()->getTablesIterator(filter_by_table_name); - return std::make_unique(std::move(iterator)); - } - - return getNestedDatabase()->getTablesIterator(filter_by_table_name); -} - -} diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h b/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h deleted file mode 100644 index 0f16661c66f..00000000000 --- a/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -class DatabaseMaterializeMySQLWrap : public IDatabase -{ -public: - ASTPtr getCreateDatabaseQuery() const override; - - void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override; - - DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name); -protected: - DatabasePtr nested_database; - ASTPtr database_engine_define; - Poco::Logger * log; - - mutable std::mutex mutex; - std::exception_ptr exception; - - DatabasePtr getNestedDatabase() const; - - void setException(const std::exception_ptr & exception); - -public: - void shutdown() override; - - bool empty() const override; - - String getDataPath() const override; - - String getTableDataPath(const String & table_name) const override; - - String getTableDataPath(const ASTCreateQuery & query) const override; - - UUID tryGetTableUUID(const String & table_name) const override; - - void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override; - - void dropTable(const Context & context, const String & name, bool no_delay) override; - - void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override; - - StoragePtr detachTable(const String & name) override; - - void renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) override; - - void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override; - - time_t getObjectMetadataModificationTime(const String & name) const override; - - String getMetadataPath() const override; - - String getObjectMetadataPath(const String & table_name) const override; - - bool shouldBeEmptyOnDetach() const override; - - void drop(const Context & context) override; - - bool isTableExist(const String & name) const override; - - StoragePtr tryGetTable(const String & name) const override; - - DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override; -}; - -} diff --git a/src/Databases/MySQL/EventConsumer.cpp b/src/Databases/MySQL/EventConsumer.cpp deleted file mode 100644 index 2054f5810ef..00000000000 --- a/src/Databases/MySQL/EventConsumer.cpp +++ /dev/null @@ -1,261 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -using namespace MySQLReplication; - -EventConsumer::~EventConsumer() -{ - if (!quit && !background_exception) - { - { - quit = true; - std::lock_guard lock(mutex); - } - - cond.notify_one(); - background_thread_pool.wait(); - } -} - -EventConsumer::EventConsumer( - const String & database_, const Context & context_, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_) - : metadata(metadata_), context(context_), settings(settings_), database(database_), prev_version(metadata.version) -{ - background_thread_pool.scheduleOrThrowOnError([&]() - { - ThreadStatus thread_status; - setThreadName("MySQLDBSync"); - std::unique_lock lock(mutex); - const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); }; - - while (!quit_requested() && !background_exception) - { - if (!buffers.empty() && total_bytes_in_buffers) - flushBuffers(); - - cond.wait_for(lock, std::chrono::milliseconds(settings.max_flush_data_time), quit_requested); - } - }); -} - -void EventConsumer::onWriteData(const String & table_name, const std::vector & rows_data) -{ - BufferPtr buffer = getTableBuffer(table_name); - - size_t prev_bytes = buffer->data.bytes(); - for (size_t column = 0; column < buffer->data.columns() - 2; ++column) - { - MutableColumnPtr col_to = IColumn::mutate(std::move(buffer->data.getByPosition(column).column)); - - for (size_t index = 0; index < rows_data.size(); ++index) - col_to->insert(DB::get(rows_data[index])[column]); - } - - fillSignColumnsAndMayFlush(buffer->data, 1, ++metadata.version, rows_data.size(), prev_bytes); -} - -static inline bool differenceSortingKeys(const Tuple & row_old_data, const Tuple & row_new_data, const std::vector sorting_columns_index) -{ - for (const auto & sorting_column_index : sorting_columns_index) - if (row_old_data[sorting_column_index] != row_new_data[sorting_column_index]) - return true; - - return false; -} - -void EventConsumer::onUpdateData(const String & table_name, const std::vector & rows_data) -{ - if (rows_data.size() % 2 != 0) - throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR); - - BufferPtr buffer = getTableBuffer(table_name); - - size_t prev_bytes = buffer->data.bytes(); - std::vector difference_sorting_keys_mark(rows_data.size() / 2); - - for (size_t index = 0; index < rows_data.size(); index += 2) - difference_sorting_keys_mark.emplace_back(differenceSortingKeys( - DB::get(rows_data[index]), DB::get(rows_data[index + 1]), buffer->sorting_columns_index)); - - for (size_t column = 0; column < buffer->data.columns() - 2; ++column) - { - MutableColumnPtr col_to = IColumn::mutate(std::move(buffer->data.getByPosition(column).column)); - - for (size_t index = 0; index < rows_data.size(); index += 2) - { - if (likely(!difference_sorting_keys_mark[index / 2])) - col_to->insert(DB::get(rows_data[index + 1])[column]); - else - { - /// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently - col_to->insert(DB::get(rows_data[index])[column]); - col_to->insert(DB::get(rows_data[index + 1])[column]); - } - } - } - - MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(buffer->data.getByPosition(buffer->data.columns() - 2).column)); - MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(buffer->data.getByPosition(buffer->data.columns() - 1).column)); - - ColumnInt8::Container & sign_column_data = assert_cast(*sign_mutable_column).getData(); - ColumnUInt64::Container & version_column_data = assert_cast(*version_mutable_column).getData(); - - UInt64 new_version = ++metadata.version; - for (size_t index = 0; index < rows_data.size(); index += 2) - { - if (likely(!difference_sorting_keys_mark[index / 2])) - { - sign_column_data.emplace_back(1); - version_column_data.emplace_back(new_version); - } - else - { - /// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently - sign_column_data.emplace_back(-1); - sign_column_data.emplace_back(1); - version_column_data.emplace_back(new_version); - version_column_data.emplace_back(new_version); - } - } - - total_bytes_in_buffers += (buffer->data.bytes() - prev_bytes); - if (buffer->data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers) - flushBuffers(); -} - -void EventConsumer::onDeleteData(const String & table_name, const std::vector & rows_data) -{ - BufferPtr buffer = getTableBuffer(table_name); - - size_t prev_bytes = buffer->data.bytes(); - for (size_t column = 0; column < buffer->data.columns() - 2; ++column) - { - MutableColumnPtr col_to = IColumn::mutate(std::move(buffer->data.getByPosition(column).column)); - - for (size_t index = 0; index < rows_data.size(); ++index) - col_to->insert(DB::get(rows_data[index])[column]); - } - - fillSignColumnsAndMayFlush(buffer->data, -1, ++metadata.version, rows_data.size(), prev_bytes); -} - -EventConsumer::BufferPtr EventConsumer::getTableBuffer(const String & table_name) -{ - if (buffers.find(table_name) == buffers.end()) - { - StoragePtr storage = DatabaseCatalog::instance().getDatabase(database)->tryGetTable(table_name, context); - - buffers[table_name] = std::make_shared(); - buffers[table_name]->data = storage->getSampleBlockNonMaterialized(); - if (StorageMergeTree * table_merge_tree = dynamic_cast(storage.get())) - { - Names required_for_sorting_key = table_merge_tree->getColumnsRequiredForSortingKey(); - - for (const auto & required_name_for_sorting_key : required_for_sorting_key) - buffers[table_name]->sorting_columns_index.emplace_back( - buffers[table_name]->data.getPositionByName(required_name_for_sorting_key)); - } - } - - return buffers[table_name]; -} - -void EventConsumer::onEvent(const BinlogEventPtr & receive_event, const MySQLReplication::Position & position) -{ - std::unique_lock lock(mutex); - - if (background_exception) - background_thread_pool.wait(); - - last_position = position; - if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT) - { - WriteRowsEvent & write_rows_event = static_cast(*receive_event); - write_rows_event.dump(); - onWriteData(write_rows_event.table, write_rows_event.rows); - } - else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT) - { - UpdateRowsEvent & update_rows_event = static_cast(*receive_event); - update_rows_event.dump(); - onUpdateData(update_rows_event.table, update_rows_event.rows); - } - else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT) - { - DeleteRowsEvent & delete_rows_event = static_cast(*receive_event); - delete_rows_event.dump(); - onDeleteData(delete_rows_event.table, delete_rows_event.rows); - } - else if (receive_event->type() == MYSQL_QUERY_EVENT) - { - /// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL. -// flush_function(); - /// TODO: 直接使用Interpreter执行即可 - } -} - -void EventConsumer::fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes) -{ - MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 2).column)); - MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 1).column)); - - ColumnInt8::Container & sign_column_data = assert_cast(*sign_mutable_column).getData(); - ColumnUInt64::Container & version_column_data = assert_cast(*version_mutable_column).getData(); - - for (size_t index = 0; index < fill_size; ++index) - { - sign_column_data.emplace_back(sign_value); - version_column_data.emplace_back(version_value); - } - - total_bytes_in_buffers += (data.bytes() - prev_bytes); - if (data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers) - flushBuffers(); -} - -void EventConsumer::flushBuffers() -{ - /// TODO: 事务保证 - try - { - for (auto & table_name_and_buffer : buffers) - { - const String & table_name = table_name_and_buffer.first; - BufferPtr & buffer = table_name_and_buffer.second; - - Context query_context = context; - query_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - query_context.setCurrentQueryId(""); // generate random query_id - String with_database_table_name = backQuoteIfNeed(database) + "." + backQuoteIfNeed(table_name); - BlockIO res = executeQuery("INSERT INTO " + with_database_table_name + " VALUES", query_context, true); - - OneBlockInputStream input(buffer->data); - copyData(input, *res.out); - } - - buffers.clear(); - total_bytes_in_buffers = 0; - prev_version = metadata.version; - } - catch(...) - { - buffers.clear(); - total_bytes_in_buffers = 0; - metadata.version = prev_version; - background_exception = true; - throw; - } -} - -} diff --git a/src/Databases/MySQL/EventConsumer.h b/src/Databases/MySQL/EventConsumer.h deleted file mode 100644 index f21265efe9d..00000000000 --- a/src/Databases/MySQL/EventConsumer.h +++ /dev/null @@ -1,63 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include "MaterializeMetadata.h" - -namespace DB -{ - -class EventConsumer : private boost::noncopyable -{ -public: - ~EventConsumer(); - - void onEvent(const MySQLReplication::BinlogEventPtr & event, const MySQLReplication::Position & position); - - EventConsumer(const String & database_, const Context & context, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_); -private: - MaterializeMetadata & metadata; - - const Context & context; - const MaterializeModeSettings & settings; - - String database; - size_t prev_version; - size_t total_bytes_in_buffers = 0; - MySQLReplication::Position last_position; - - struct Buffer - { - Block data; - std::vector sorting_columns_index; - }; - - using BufferPtr = std::shared_ptr; - std::unordered_map buffers; - - void flushBuffers(); - - BufferPtr getTableBuffer(const String & table_name); - - void onWriteData(const std::string & table_name, const std::vector & rows_data); - - void onUpdateData(const std::string & table_name, const std::vector & rows_data); - - void onDeleteData(const std::string & table_name, const std::vector & rows_data); - - void fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes); - - mutable std::mutex mutex; - std::condition_variable cond; - std::atomic_bool quit = false; - std::atomic_bool background_exception = false; - ThreadPool background_thread_pool{1}; -}; - -} diff --git a/src/Databases/MySQL/MaterializeModeSettings.cpp b/src/Databases/MySQL/MaterializeMySQLSettings.cpp similarity index 79% rename from src/Databases/MySQL/MaterializeModeSettings.cpp rename to src/Databases/MySQL/MaterializeMySQLSettings.cpp index d8a25fa3bb9..fba4eaae2fa 100644 --- a/src/Databases/MySQL/MaterializeModeSettings.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSettings.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include @@ -13,9 +13,9 @@ namespace ErrorCodes extern const int UNKNOWN_SETTING; } -IMPLEMENT_SETTINGS_COLLECTION(MaterializeModeSettings, LIST_OF_MATERIALIZE_MODE_SETTINGS) +IMPLEMENT_SETTINGS_COLLECTION(MaterializeMySQLSettings, LIST_OF_MATERIALIZE_MODE_SETTINGS) -void MaterializeModeSettings::loadFromQuery(ASTStorage & storage_def) +void MaterializeMySQLSettings::loadFromQuery(ASTStorage & storage_def) { if (storage_def.settings) { diff --git a/src/Databases/MySQL/MaterializeModeSettings.h b/src/Databases/MySQL/MaterializeMySQLSettings.h similarity index 76% rename from src/Databases/MySQL/MaterializeModeSettings.h rename to src/Databases/MySQL/MaterializeMySQLSettings.h index 35c65253c87..77e8514d473 100644 --- a/src/Databases/MySQL/MaterializeModeSettings.h +++ b/src/Databases/MySQL/MaterializeMySQLSettings.h @@ -11,11 +11,13 @@ class ASTStorage; /** Settings for the MySQL Database engine(materialize mode). * Could be loaded from a CREATE DATABASE query (SETTINGS clause). */ -struct MaterializeModeSettings : public SettingsCollection +struct MaterializeMySQLSettings : public SettingsCollection { #define LIST_OF_MATERIALIZE_MODE_SETTINGS(M) \ M(SettingBool, locality_data, false, "", 0) \ M(SettingUInt64, max_rows_in_buffer, DEFAULT_BLOCK_SIZE, "", 0) \ + M(SettingUInt64, max_bytes_in_buffer, DBMS_DEFAULT_BUFFER_SIZE, "", 0) \ + M(SettingUInt64, max_rows_in_buffers, DEFAULT_BLOCK_SIZE, "", 0) \ M(SettingUInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "", 0) \ M(SettingUInt64, max_flush_data_time, 1000, "", 0) \ M(SettingUInt64, max_wait_time_when_mysql_unavailable, 1000, "", 0) \ diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp new file mode 100644 index 00000000000..83f7b45ed28 --- /dev/null +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -0,0 +1,444 @@ +#if !defined(ARCADIA_BUILD) +# include "config_core.h" +#endif + +#if USE_MYSQL + +#include + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_QUERY; +} + +namespace CurrentMetrics +{ + extern const Metric MemoryTracking; + extern const Metric BackgroundMySQLSyncSchedulePoolTask; +} + +static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & comment) +{ + try + { + Context context = context_; + context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + context.setCurrentQueryId(""); // generate random query_id + return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true); + } + catch (...) + { + tryLogCurrentException("MaterializeMySQLSyncThread", "Query " + query_to_execute + " wasn't finished successfully"); + throw; + } + +// LOG_DEBUG(&Logger::get("MaterializeMySQLSyncThread"), "Executed query: " + query_to_execute); +} + +static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name) +{ + DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name); + + if (DatabaseMaterializeMySQL * database_materialize = typeid_cast(database.get())) + return *database_materialize; + + throw Exception("", ErrorCodes::LOGICAL_ERROR); +} + +MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread() +{ + try + { + if (!sync_quit) + { + { + sync_quit = true; + std::lock_guard lock(sync_mutex); + } + + sync_cond.notify_one(); +// sync_task_handler->deactivate(); +// flush_task_handler->deactivate(); + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( + const Context & context, const String & database_name_, const String & mysql_database_name_, + mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_) + : global_context(context), database_name(database_name_), mysql_database_name(mysql_database_name_) + , pool(std::move(pool_)), client(std::move(client_)), settings(settings_) +{ + /// TODO: 做简单的check, 失败即报错 + startSynchronization(); +} + +/*MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( + const Context & context, const String & database_name_, const String & metadata_path_ + , const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_ + , MySQLClient && client_ , std::unique_ptr settings_) + : DatabaseMaterializeMySQL(std::make_shared(database_name_, metadata_path_, context), database_engine_define_->clone(), "MaterializeMySQLSyncThread") + , global_context(context.getGlobalContext()), metadata_path(metadata_path_), mysql_database_name(mysql_database_name_) + , pool(std::move(pool_)), client(std::move(client_)), settings(std::move(settings_)) +{ + +}*/ + +void MaterializeMySQLSyncThread::synchronization() +{ + try + { + if (std::optional metadata = prepareSynchronized()) + { + Stopwatch watch; + Buffers buffers(database_name); + + while (!isCancelled()) + { + /// TODO: add gc task for `sign = -1`(use alter table delete, execute by interval. need final state) + UInt64 max_flush_time = settings->max_flush_data_time; + BinlogEventPtr binlog_event = client.readOneBinlogEvent(std::max(UInt64(1), max_flush_time - watch.elapsedMilliseconds())); + + { + std::unique_lock lock(sync_mutex); + + if (binlog_event) + onEvent(buffers, binlog_event, *metadata); + + if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds( + settings->max_rows_in_buffer, settings->max_bytes_in_buffer, + settings->max_rows_in_buffers, settings->max_bytes_in_buffers) + ) + { + watch.restart(); + flushBuffersData(buffers, *metadata); + } + } + } + } + } + catch (...) + { + /// TODO: set + getDatabase(database_name).setException(std::current_exception()); + } +} + +void MaterializeMySQLSyncThread::startSynchronization() +{ + if (!background_thread_pool->joinable()) + throw Exception("", ErrorCodes::LOGICAL_ERROR); + + background_thread_pool = std::make_unique([this]() { synchronization(); }); +} + +static inline void cleanOutdatedTables(const String & database_name, const Context & context) +{ + auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, ""); + const DatabasePtr & clean_database = DatabaseCatalog::instance().getDatabase(database_name); + + for (auto iterator = clean_database->getTablesIterator(context); iterator->isValid(); iterator->next()) + { + String table = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name()); + String comment = String("Clean ") + table + " for dump mysql."; + tryToExecuteQuery("DROP TABLE " + table, context, comment); + } +} + +static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, const Context & context) +{ + String with_database_table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name); + BlockIO res = tryToExecuteQuery("INSERT INTO " + with_database_table_name + " VALUES", context, ""); + + if (!res.out) + throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR); + + return res.out; +} + +static inline void dumpDataForTables( + mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, + const String & database_name, const String & mysql_database_name, + const Context & context, const std::function & is_cancelled) +{ + auto iterator = master_info.need_dumping_tables.begin(); + for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator) + { + const auto & table_name = iterator->first; + MySQLTableStruct table_struct = visitCreateQuery(iterator->second, context, database_name); + String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name); + tryToExecuteQuery(toCreateQuery(table_struct, context), context, comment); + + BlockOutputStreamPtr out = std::make_shared(master_info.version, getTableOutput(database_name, table_name, context)); + MySQLBlockInputStream input( + connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), + out->getHeader(), DEFAULT_BLOCK_SIZE); + copyData(input, *out, is_cancelled); + } +} + +std::optional MaterializeMySQLSyncThread::prepareSynchronized() +{ + std::unique_lock lock(sync_mutex); + + while (!isCancelled()) + { + try + { + LOG_DEBUG(log, "Checking database status."); + while (!isCancelled() && !DatabaseCatalog::instance().isDatabaseExist(database_name)) + sync_cond.wait_for(lock, std::chrono::seconds(1)); + LOG_DEBUG(log, "Database status is OK."); + + mysqlxx::PoolWithFailover::Entry connection = pool.get(); + MaterializeMetadata metadata(connection, getDatabase(database_name).getMetadataPath() + "/.metadata", mysql_database_name); + + if (!metadata.need_dumping_tables.empty()) + { + metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]() + { + cleanOutdatedTables(database_name, global_context); + dumpDataForTables(connection, metadata, database_name, mysql_database_name, global_context, [this] { return isCancelled(); }); + }); + } + + client.connect(); + client.startBinlogDump(std::rand(), mysql_database_name, metadata.binlog_file, metadata.binlog_position); + return metadata; + } + catch (mysqlxx::Exception & ) + { + tryLogCurrentException(log); + + /// Avoid busy loop when MySQL is not available. + sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); + } + } + + return {}; +} + +void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata) +{ + metadata.transaction(client.getPosition(), [&]() { buffers.commit(metadata, global_context); }); +} + +static inline void fillSignAndVersionColumnsData(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size) +{ + MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 2).column)); + MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 1).column)); + + ColumnInt8::Container & sign_column_data = assert_cast(*sign_mutable_column).getData(); + ColumnUInt64::Container & version_column_data = assert_cast(*version_mutable_column).getData(); + + for (size_t index = 0; index < fill_size; ++index) + { + sign_column_data.emplace_back(sign_value); + version_column_data.emplace_back(version_value); + } +} + +template +static size_t onWriteOrDeleteData(const std::vector & rows_data, Block & buffer, size_t version) +{ + size_t prev_bytes = buffer.bytes(); + for (size_t column = 0; column < buffer.columns() - 2; ++column) + { + MutableColumnPtr col_to = IColumn::mutate(std::move(buffer.getByPosition(column).column)); + + for (size_t index = 0; index < rows_data.size(); ++index) + col_to->insert(DB::get(rows_data[index])[column]); + } + + fillSignAndVersionColumnsData(buffer, sign, version, rows_data.size()); + return buffer.bytes() - prev_bytes; +} + +static inline bool differenceSortingKeys(const Tuple & row_old_data, const Tuple & row_new_data, const std::vector sorting_columns_index) +{ + for (const auto & sorting_column_index : sorting_columns_index) + if (row_old_data[sorting_column_index] != row_new_data[sorting_column_index]) + return true; + + return false; +} + +static inline size_t onUpdateData(const std::vector & rows_data, Block & buffer, size_t version, const std::vector & sorting_columns_index) +{ + if (rows_data.size() % 2 != 0) + throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR); + + size_t prev_bytes = buffer.bytes(); + std::vector difference_sorting_keys_mark(rows_data.size() / 2); + + for (size_t index = 0; index < rows_data.size(); index += 2) + difference_sorting_keys_mark.emplace_back(differenceSortingKeys( + DB::get(rows_data[index]), DB::get(rows_data[index + 1]), sorting_columns_index)); + + for (size_t column = 0; column < buffer.columns() - 2; ++column) + { + MutableColumnPtr col_to = IColumn::mutate(std::move(buffer.getByPosition(column).column)); + + for (size_t index = 0; index < rows_data.size(); index += 2) + { + if (likely(!difference_sorting_keys_mark[index / 2])) + col_to->insert(DB::get(rows_data[index + 1])[column]); + else + { + /// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently + col_to->insert(DB::get(rows_data[index])[column]); + col_to->insert(DB::get(rows_data[index + 1])[column]); + } + } + } + + MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(buffer.getByPosition(buffer.columns() - 2).column)); + MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(buffer.getByPosition(buffer.columns() - 1).column)); + + ColumnInt8::Container & sign_column_data = assert_cast(*sign_mutable_column).getData(); + ColumnUInt64::Container & version_column_data = assert_cast(*version_mutable_column).getData(); + + for (size_t index = 0; index < rows_data.size(); index += 2) + { + if (likely(!difference_sorting_keys_mark[index / 2])) + { + sign_column_data.emplace_back(1); + version_column_data.emplace_back(version); + } + else + { + /// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently + sign_column_data.emplace_back(-1); + sign_column_data.emplace_back(1); + version_column_data.emplace_back(version); + version_column_data.emplace_back(version); + } + } + + return buffer.bytes() - prev_bytes; +} + +void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr & receive_event, MaterializeMetadata & metadata) +{ + if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT) + { + WriteRowsEvent & write_rows_event = static_cast(*receive_event); + Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(write_rows_event.table, global_context); + size_t bytes = onWriteOrDeleteData<1>(write_rows_event.rows, buffer->first, ++metadata.version); + buffers.add(buffer->first.rows(), buffer->first.bytes(), write_rows_event.rows.size(), bytes); + } + else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT) + { + UpdateRowsEvent & update_rows_event = static_cast(*receive_event); + Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(update_rows_event.table, global_context); + size_t bytes = onUpdateData(update_rows_event.rows, buffer->first, ++metadata.version, buffer->second); + buffers.add(buffer->first.rows(), buffer->first.bytes(), update_rows_event.rows.size(), bytes); + } + else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT) + { + DeleteRowsEvent & delete_rows_event = static_cast(*receive_event); + Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(delete_rows_event.table, global_context); + size_t bytes = onWriteOrDeleteData<-1>(delete_rows_event.rows, buffer->first, ++metadata.version); + buffers.add(buffer->first.rows(), buffer->first.bytes(), delete_rows_event.rows.size(), bytes); + } + else if (receive_event->type() == MYSQL_QUERY_EVENT) + { + flushBuffersData(buffers, metadata); + /// TODO: 执行DDL. + /// TODO: 直接使用Interpreter执行即可 + } +} + +void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes) +{ + total_blocks_rows += written_rows; + total_blocks_bytes += written_bytes; + max_block_rows = std::max(block_rows, max_block_rows); + max_block_bytes = std::max(block_bytes, max_block_bytes); +} + +bool MaterializeMySQLSyncThread::Buffers::checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes) +{ + return max_block_rows >= check_block_bytes || max_block_bytes >= check_block_bytes || total_blocks_rows >= check_total_rows + || total_blocks_bytes >= check_total_bytes; +} + +void MaterializeMySQLSyncThread::Buffers::commit(MaterializeMetadata & metatdata, const Context & context) +{ + try + { + for (auto & table_name_and_buffer : data) + { + OneBlockInputStream input(table_name_and_buffer.second->first); + BlockOutputStreamPtr out = getTableOutput(database, table_name_and_buffer.first, context); + copyData(input, *out); + } + + data.clear(); + max_block_rows = 0; + max_block_bytes = 0; + total_blocks_rows = 0; + total_blocks_bytes = 0; + } + catch(...) + { + data.clear(); + throw; + } +} + +MaterializeMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializeMySQLSyncThread::Buffers::getTableDataBuffer( + const String & table_name, const Context & context) +{ + const auto & iterator = data.find(table_name); + if (iterator == data.end()) + { + StoragePtr storage = getDatabase(database).tryGetTable(table_name, context); + + BufferAndSortingColumnsPtr & buffer_and_soring_columns = data.try_emplace( + table_name, std::make_shared(storage->getSampleBlockNonMaterialized(), std::vector{})).first->second; + + if (StorageMergeTree * table_merge_tree = storage->as()) + { + Names required_for_sorting_key = table_merge_tree->getColumnsRequiredForSortingKey(); + + for (const auto & required_name_for_sorting_key : required_for_sorting_key) + buffer_and_soring_columns->second.emplace_back( + buffer_and_soring_columns->first.getPositionByName(required_name_for_sorting_key)); + } + + return buffer_and_soring_columns; + } + + return iterator->second; +} + +} + +#endif diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h new file mode 100644 index 00000000000..94ff2c0b11f --- /dev/null +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -0,0 +1,91 @@ +#pragma once + +#include "config_core.h" + +#if USE_MYSQL + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +namespace DB +{ + +class MaterializeMySQLSyncThread +{ +public: + ~MaterializeMySQLSyncThread(); + + MaterializeMySQLSyncThread( + const Context & context, const String & database_name_, const String & mysql_database_name_ + , mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_); + + void startSynchronization(); + +private: + Poco::Logger * log; + const Context & global_context; + + String database_name; + String mysql_database_name; + + mutable mysqlxx::Pool pool; + mutable MySQLClient client; + MaterializeMySQLSettings * settings; + + struct Buffers + { + String database; + + /// thresholds + size_t max_block_rows = 0; + size_t max_block_bytes = 0; + size_t total_blocks_rows = 0; + size_t total_blocks_bytes = 0; + + using BufferAndSortingColumns = std::pair>; + using BufferAndSortingColumnsPtr = std::shared_ptr; + std::unordered_map data; + + Buffers(const String & database_) : database(database_) {} + + void commit(MaterializeMetadata & metatdata, const Context & context); + + void add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes); + + bool checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes); + + BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, const Context & context); + }; + + void synchronization(); + + bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); } + + std::optional prepareSynchronized(); + + void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata); + + void onEvent(Buffers & buffers, const MySQLReplication::BinlogEventPtr & event, MaterializeMetadata & metadata); + + std::mutex sync_mutex; + std::atomic sync_quit{false}; + std::condition_variable sync_cond; + std::unique_ptr background_thread_pool; +}; + +} + +#endif