From 0336a4ad58037bdb92fda6a5d141e50855253dc3 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 12 Jun 2020 12:21:43 +0800 Subject: [PATCH] ISSUES-4006 suport synchronous MySQL incremental data[part 2] --- src/Core/MySQLReplication.h | 1 + .../AddingVersionsBlockOutputStream.cpp | 13 +- src/DataTypes/DataTypeEnum.h | 3 +- src/Databases/DatabaseFactory.cpp | 13 +- src/Databases/MySQL/DataBuffers.cpp | 135 --------- src/Databases/MySQL/DataBuffers.h | 42 --- .../MySQL/DatabaseMaterializeMySQL.cpp | 225 +++++++-------- .../MySQL/DatabaseMaterializeMySQL.h | 137 ++------- .../MySQL/DatabaseMaterializeMySQLWrap.cpp | 188 +++++++++++++ .../MySQL/DatabaseMaterializeMySQLWrap.h | 70 +++++ .../MySQL/DatabaseMaterializeTablesIterator.h | 34 +++ src/Databases/MySQL/EventConsumer.cpp | 261 ++++++++++++++++++ src/Databases/MySQL/EventConsumer.h | 63 +++++ ...StatusInfo.cpp => MaterializeMetadata.cpp} | 111 +++++--- ...sterStatusInfo.h => MaterializeMetadata.h} | 11 +- .../MySQL/MaterializeModeSettings.cpp | 42 +++ src/Databases/MySQL/MaterializeModeSettings.h | 28 ++ src/Databases/MySQL/queryConvert.cpp | 13 +- src/Parsers/ParserSetQuery.cpp | 6 +- src/Storages/StorageMaterializeMySQL.cpp | 56 ++++ src/Storages/StorageMaterializeMySQL.h | 28 ++ 21 files changed, 996 insertions(+), 484 deletions(-) delete mode 100644 src/Databases/MySQL/DataBuffers.cpp delete mode 100644 src/Databases/MySQL/DataBuffers.h create mode 100644 src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp create mode 100644 src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h create mode 100644 src/Databases/MySQL/DatabaseMaterializeTablesIterator.h create mode 100644 src/Databases/MySQL/EventConsumer.cpp create mode 100644 src/Databases/MySQL/EventConsumer.h rename src/Databases/MySQL/{MasterStatusInfo.cpp => MaterializeMetadata.cpp} (52%) rename src/Databases/MySQL/{MasterStatusInfo.h => MaterializeMetadata.h} (72%) create mode 100644 src/Databases/MySQL/MaterializeModeSettings.cpp create mode 100644 src/Databases/MySQL/MaterializeModeSettings.h create mode 100644 src/Storages/StorageMaterializeMySQL.cpp create mode 100644 src/Storages/StorageMaterializeMySQL.h diff --git a/src/Core/MySQLReplication.h b/src/Core/MySQLReplication.h index 420bd8fdc02..f4ac02c2ffd 100644 --- a/src/Core/MySQLReplication.h +++ b/src/Core/MySQLReplication.h @@ -460,6 +460,7 @@ namespace MySQLReplication String binlog_name; Position() : binlog_pos(0), binlog_name("") { } + Position(UInt64 binlog_pos_, const String & binlog_name_) : binlog_pos(binlog_pos_), binlog_name(binlog_name_) { } void updateLogPos(UInt64 pos) { binlog_pos = pos; } void updateLogName(String binlog) { binlog_name = std::move(binlog); } }; diff --git a/src/DataStreams/AddingVersionsBlockOutputStream.cpp b/src/DataStreams/AddingVersionsBlockOutputStream.cpp index 2adc1488e98..c2c83e6adc9 100644 --- a/src/DataStreams/AddingVersionsBlockOutputStream.cpp +++ b/src/DataStreams/AddingVersionsBlockOutputStream.cpp @@ -23,22 +23,21 @@ void AddingVersionsBlockOutputStream::flush() void AddingVersionsBlockOutputStream::write(const Block & block) { - /// create_version and delete_version are always in the last place Block res; size_t rows = block.rows(); for (size_t index = 0; index < block.columns(); ++index) res.insert(block.getByPosition(index)); - DataTypePtr data_type = std::make_shared(); + DataTypePtr sign_type = std::make_shared(); + DataTypePtr version_type = std::make_shared(); - auto create_version = ColumnUInt64::create(rows); - for (size_t index = 0; index < rows; ++index) - create_version->getData()[index] = ++version; + ColumnPtr sign_column = sign_type->createColumnConst(rows, Field(Int8(1)))->convertToFullColumnIfConst(); + ColumnPtr version_column = version_type->createColumnConst(rows, Field(UInt64(++version)))->convertToFullColumnIfConst(); Block header = output->getHeader(); - res.insert(ColumnWithTypeAndName(create_version->getPtr(), data_type, header.getByPosition(header.columns() - 2).name)); - res.insert(ColumnWithTypeAndName(data_type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst(), data_type, header.getByPosition(header.columns() - 1).name)); + res.insert({sign_column, sign_type, header.getByPosition(header.columns() - 2).name}); + res.insert({version_column, version_type, header.getByPosition(header.columns() - 1).name}); output->write(res); } Block AddingVersionsBlockOutputStream::getHeader() const diff --git a/src/DataTypes/DataTypeEnum.h b/src/DataTypes/DataTypeEnum.h index 80b41692cdd..4756a218778 100644 --- a/src/DataTypes/DataTypeEnum.h +++ b/src/DataTypes/DataTypeEnum.h @@ -43,7 +43,8 @@ public: using ColumnType = ColumnVector; using Value = std::pair; using Values = std::vector; - using NameToValueMap = HashMap; + using NameToValueMap = HashMap; using ValueToNameMap = std::unordered_map; static constexpr bool is_parametric = true; diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 87a8e6d775a..e7f8b6f22de 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -18,12 +18,13 @@ #endif #if USE_MYSQL +# include # include # include +# include # include # include # include -# include #endif namespace DB @@ -118,12 +119,18 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306); auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port); - if (materializeMySQLDatabase(engine_define->settings)) + auto materialize_mode_settings = std::make_unique(); + + if (engine_define->settings) + materialize_mode_settings->loadFromQuery(*engine_define); + + if (materialize_mode_settings->locality_data) { MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password); return std::make_shared( - context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client)); + context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client) + , std::move(materialize_mode_settings)); } return std::make_shared(context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); diff --git a/src/Databases/MySQL/DataBuffers.cpp b/src/Databases/MySQL/DataBuffers.cpp deleted file mode 100644 index 8659eadb8bb..00000000000 --- a/src/Databases/MySQL/DataBuffers.cpp +++ /dev/null @@ -1,135 +0,0 @@ -#include - -#include -#include - -namespace DB -{ - -DataBuffers::DataBuffers(size_t & version_, IDatabase * database_, const std::function &)> & flush_function_) - : version(version_), database(database_), flush_function(flush_function_) -{ - - /// TODO: 定时刷新 -} - -void DataBuffers::flush() -{ - flush_function(buffers); - buffers.clear(); -} - -void DataBuffers::writeData(const std::string & table_name, const std::vector & rows_data) -{ - Block & to = getTableBuffer(table_name, rows_data.size()); - for (size_t column = 0; column < to.columns() - 2; ++column) - { - /// normally columns - MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate(); - - for (size_t index = 0; index < rows_data.size(); ++index) - col_to->insert(DB::get(rows_data[index])[column]); - } - - Field new_version(UInt64(++version)); - MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate(); - MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate(); - - delete_version_column->insertManyDefaults(rows_data.size()); - - for (size_t index = 0; index < rows_data.size(); ++index) - create_version_column->insert(new_version); -} - -void DataBuffers::updateData(const String & table_name, const std::vector & rows_data) -{ - if (rows_data.size() % 2 != 0) - throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR); - - Block & to = getTableBuffer(table_name, rows_data.size()); - for (size_t column = 0; column < to.columns() - 2; ++column) - { - /// normally columns - MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate(); - - for (size_t index = 0; index < rows_data.size(); ++index) - col_to->insert(DB::get(rows_data[index])[column]); - } - - Field new_version(UInt64(++version)); - MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate(); - MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate(); - - for (size_t index = 0; index < rows_data.size(); ++index) - { - if (index % 2 == 0) - { - create_version_column->insertDefault(); - delete_version_column->insert(new_version); - } - else - { - delete_version_column->insertDefault(); - create_version_column->insert(new_version); - } - } -} - -void DataBuffers::deleteData(const String & table_name, const std::vector & rows_data) -{ - Block & to = getTableBuffer(table_name, rows_data.size()); - for (size_t column = 0; column < to.columns() - 2; ++column) - { - /// normally columns - MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate(); - - for (size_t index = 0; index < rows_data.size(); ++index) - col_to->insert(DB::get(rows_data[index])[column]); - } - - Field new_version(UInt64(++version)); - MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate(); - MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate(); - - create_version_column->insertManyDefaults(rows_data.size()); - - for (size_t index = 0; index < rows_data.size(); ++index) - delete_version_column->insert(new_version); -} - -Block & DataBuffers::getTableBuffer(const String & table_name, size_t write_size) -{ - if (buffers.find(table_name) == buffers.end()) - { - StoragePtr write_storage = database->tryGetTable(table_name); - buffers[table_name] = write_storage->getSampleBlockNonMaterialized(); - } - - /// TODO: settings - if (buffers[table_name].rows() + write_size > 8192) - flush(); - - return buffers[table_name]; -} - -[[noreturn]] void DataBuffers::scheduleFlush() -{ - while (1) - { - try - { - flush(); - sleepForSeconds(1); - } - catch (...) - { -// ++error_count; -// sleep_time = std::min( -// std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))}, -// max_sleep_time); - tryLogCurrentException(""); - } - } -} - -} diff --git a/src/Databases/MySQL/DataBuffers.h b/src/Databases/MySQL/DataBuffers.h deleted file mode 100644 index 4e69eb438d7..00000000000 --- a/src/Databases/MySQL/DataBuffers.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -class DataBuffers : private boost::noncopyable -{ -public: - DataBuffers(size_t & version_, IDatabase * database_, const std::function &)> & flush_function_); - - void flush(); - - void writeData(const std::string & table_name, const std::vector & rows_data); - - void updateData(const std::string & table_name, const std::vector & rows_data); - - void deleteData(const std::string & table_name, const std::vector & rows_data); - - -private: - size_t & version; - IDatabase * database; - std::function &)> flush_function; - - mutable std::mutex mutex; - std::unordered_map buffers; - - [[noreturn]] void scheduleFlush(); - - Block & getTableBuffer(const String & table_name, size_t write_size); - - ThreadFromGlobalPool thread{&DataBuffers::scheduleFlush, this}; -}; - -} diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 63676e74f30..b3d55c46d66 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -10,17 +10,14 @@ # include # include # include -# include -# include -# include -# include # include +# include +# include # include # include # include # include # include -# include # include # include # include @@ -34,189 +31,165 @@ namespace ErrorCodes extern const int INCORRECT_QUERY; } -DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( - const Context & context, const String & database_name_, const String & metadata_path_, - const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_) - : IDatabase(database_name_), global_context(context.getGlobalContext()), metadata_path(metadata_path_) - , database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_) - , nested_database(std::make_shared(database_name_, metadata_path, global_context)) - , pool(std::move(pool_)), client(std::move(client_)), log(&Logger::get("DatabaseMaterializeMySQL")) +static inline BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & comment) { - /// TODO: 做简单的check, 失败即报错 -} - -BlockIO DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute, const String & comment) -{ - String query_prefix = "/*" + comment + " for " + backQuoteIfNeed(database_name) + " Database */ "; - try { - Context context = global_context; + Context context = context_; context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; context.setCurrentQueryId(""); // generate random query_id - return executeQuery(query_prefix + query_to_execute, context, true); + return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true); } catch (...) { - tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully"); + tryLogCurrentException("DatabaseMaterializeMySQL", "Query " + query_to_execute + " wasn't finished successfully"); throw; } - LOG_DEBUG(log, "Executed query: " << query_to_execute); + LOG_DEBUG(&Logger::get("DatabaseMaterializeMySQL"), "Executed query: " << query_to_execute); } -String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name) +DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( + const Context & context, const String & database_name_, const String & metadata_path_ + , const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_ + , MySQLClient && client_ , std::unique_ptr settings_) + : DatabaseMaterializeMySQLWrap(std::make_shared(database_name_, metadata_path_, context), database_engine_define_->clone(), "DatabaseMaterializeMySQL") + , global_context(context.getGlobalContext()), metadata_path(metadata_path_), mysql_database_name(mysql_database_name_) + , pool(std::move(pool_)), client(std::move(client_)), settings(std::move(settings_)) { - Block show_create_table_header{ - {std::make_shared(), "Table"}, - {std::make_shared(), "Create Table"}, - }; - - MySQLBlockInputStream show_create_table( - connection, "SHOW CREATE TABLE " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), - show_create_table_header, DEFAULT_BLOCK_SIZE); - - Block create_query_block = show_create_table.read(); - if (!create_query_block || create_query_block.rows() != 1) - throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR); - - return create_query_block.getByName("Create Table").column->getDataAt(0).toString(); + /// TODO: 做简单的check, 失败即报错 + scheduleSynchronized(); } -BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name, bool fill_version) +BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name) { - Context context = global_context; - context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - context.setCurrentQueryId(""); // generate random query_id + String with_database_table_name = backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name); + BlockIO res = tryToExecuteQuery("INSERT INTO " + with_database_table_name + " VALUES", global_context, ""); - StoragePtr write_storage = nested_database->tryGetTable(table_name); - auto table_lock = write_storage->lockStructureForShare(true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout); + if (!res.out) + throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR); - BlockOutputStreamPtr output = write_storage->write(ASTPtr{}, global_context); - output->addTableLock(table_lock); - - return fill_version ? std::make_shared(version, output) : output; + return res.out; } -void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector & tables_name, const std::function & is_cancelled) +void DatabaseMaterializeMySQL::cleanOutdatedTables() { - std::unordered_map tables_create_query; - for (size_t index = 0; index < tables_name.size() && !is_cancelled(); ++index) - tables_create_query[tables_name[index]] = getCreateQuery(connection, tables_name[index]); + auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, ""); + const DatabasePtr & clean_database = DatabaseCatalog::instance().getDatabase(database_name); - auto iterator = tables_create_query.begin(); - for (; iterator != tables_create_query.end() && !is_cancelled(); ++iterator) + for (auto iterator = clean_database->getTablesIterator(); iterator->isValid(); iterator->next()) + { + String table = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name()); + String comment = String("Clean ") + table + " for dump mysql."; + tryToExecuteQuery("DROP TABLE " + table, global_context, comment); + } +} + +void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function & is_cancelled) +{ + auto iterator = master_info.need_dumping_tables.begin(); + for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator) { const auto & table_name = iterator->first; MySQLTableStruct table_struct = visitCreateQuery(iterator->second, global_context, database_name); String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name); - tryToExecuteQuery(toCreateQuery(table_struct, global_context), comment); + tryToExecuteQuery(toCreateQuery(table_struct, global_context), global_context, comment); - BlockOutputStreamPtr out = getTableOutput(table_name, true); + BlockOutputStreamPtr out = std::make_shared(master_info.version, getTableOutput(table_name)); MySQLBlockInputStream input(connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE); copyData(input, *out, is_cancelled); } } -MasterStatusInfo DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock & lock) +std::optional DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock & lock, const std::function & is_cancelled) { - while(!sync_quit.load(std::memory_order_seq_cst)) + while (!is_cancelled()) { try { LOG_DEBUG(log, "Checking " + database_name + " database status."); - while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name)) + while (!is_cancelled && !DatabaseCatalog::instance().isDatabaseExist(database_name)) sync_cond.wait_for(lock, std::chrono::seconds(1)); LOG_DEBUG(log, database_name + " database status is OK."); mysqlxx::PoolWithFailover::Entry connection = pool.get(); - MasterStatusInfo master_info(connection, getMetadataPath() + "/.master_status", mysql_database_name); + MaterializeMetadata metadata(connection, getMetadataPath() + "/.metadata", mysql_database_name); - if (!master_info.need_dumping_tables.empty()) + if (!metadata.need_dumping_tables.empty()) { - /// TODO: 删除所有表结构, 这可能需要考虑到仍然有查询在使用这个表. - dumpDataForTables(connection, master_info.need_dumping_tables, [&]() { return sync_quit.load(std::memory_order_seq_cst); }); - master_info.finishDump(); + metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]() + { + cleanOutdatedTables(); + dumpDataForTables(connection, metadata, is_cancelled); + }); } client.connect(); - client.startBinlogDump(std::rand(), mysql_database_name, master_info.binlog_file, master_info.binlog_position); - return master_info; + client.startBinlogDump(std::rand(), mysql_database_name, metadata.binlog_file, metadata.binlog_position); + return metadata; + } + catch (mysqlxx::Exception & ) + { + tryLogCurrentException(log); + + /// Avoid busy loop when MySQL is not available. + sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); + } + } + + return {}; +} + +void DatabaseMaterializeMySQL::scheduleSynchronized() +{ + background_thread_pool.scheduleOrThrowOnError([&]() + { + ThreadStatus thread_status; + setThreadName("MySQLDBSync"); + + std::unique_lock lock(sync_mutex); + const auto quit_requested = [this] { return sync_quit.load(std::memory_order_relaxed); }; + + try + { + std::optional metadata = prepareSynchronized(lock, quit_requested); + + if (!quit_requested() && metadata) + { + EventConsumer consumer(getDatabaseName(), global_context, *metadata, *settings); + + while (!quit_requested()) + { + const auto & event = client.readOneBinlogEvent(); + consumer.onEvent(event, client.getPosition()); + } + } } catch(...) { - tryLogCurrentException(log, "Prepare MySQL Synchronized exception and retry"); - - sleepForSeconds(1); + setException(std::current_exception()); } - } - - throw Exception("", ErrorCodes::LOGICAL_ERROR); + }); } - -void DatabaseMaterializeMySQL::synchronized() +DatabaseMaterializeMySQL::~DatabaseMaterializeMySQL() { - setThreadName("MySQLDBSync"); - try { - std::unique_lock lock{sync_mutex}; - - MasterStatusInfo master_status = prepareSynchronized(lock); - - DataBuffers buffers(version, this, [&](const std::unordered_map & tables_data) + if (!sync_quit) { - master_status.transaction(client.getPosition(), [&]() /// At least once, There is only one possible reference: https://github.com/ClickHouse/ClickHouse/pull/8467 { - for (const auto & [table_name, data] : tables_data) - { - if (!sync_quit.load(std::memory_order_seq_cst)) - { - LOG_DEBUG(log, "Prepare to flush data."); - BlockOutputStreamPtr output = getTableOutput(table_name, false); - output->writePrefix(); - output->write(data); - output->writeSuffix(); - output->flush(); - LOG_DEBUG(log, "Finish data flush."); - } - } - }); - }); + sync_quit = true; + std::lock_guard lock(sync_mutex); + } - while (!sync_quit.load(std::memory_order_seq_cst)) - { - const auto & event = client.readOneBinlogEvent(); - - if (event->type() == MYSQL_WRITE_ROWS_EVENT) - { - WriteRowsEvent & write_rows_event = static_cast(*event); - write_rows_event.dump(); - buffers.writeData(write_rows_event.table, write_rows_event.rows); - } - else if (event->type() == MYSQL_UPDATE_ROWS_EVENT) - { - UpdateRowsEvent & update_rows_event = static_cast(*event); - update_rows_event.dump(); - buffers.updateData(update_rows_event.table, update_rows_event.rows); - } - else if (event->type() == MYSQL_DELETE_ROWS_EVENT) - { - DeleteRowsEvent & delete_rows_event = static_cast(*event); - delete_rows_event.dump(); - buffers.deleteData(delete_rows_event.table, delete_rows_event.rows); - } - else if (event->type() == MYSQL_QUERY_EVENT) - { - /// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL. - buffers.flush(); - } + sync_cond.notify_one(); + background_thread_pool.wait(); } } - catch(...) + catch (...) { - tryLogCurrentException(log); + tryLogCurrentException(__PRETTY_FUNCTION__); } } diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.h b/src/Databases/MySQL/DatabaseMaterializeMySQL.h index b6b27d33cae..74526bb4635 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.h +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.h @@ -1,6 +1,7 @@ #pragma once #include "config_core.h" + #if USE_MYSQL # include @@ -10,7 +11,9 @@ # include # include # include -# include +# include +# include +# include # include # include # include @@ -19,149 +22,41 @@ namespace DB { -class DatabaseMaterializeMySQL : public IDatabase +class DatabaseMaterializeMySQL : public DatabaseMaterializeMySQLWrap { public: + ~DatabaseMaterializeMySQL() override; + DatabaseMaterializeMySQL( const Context & context, const String & database_name_, const String & metadata_path_, const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, - MySQLClient && client_); + MySQLClient && client_, std::unique_ptr settings_); String getEngineName() const override { return "MySQL"; } - void shutdown() override { nested_database->shutdown(); } - - bool empty() const override { return nested_database->empty(); } - - String getDataPath() const override { return nested_database->getDataPath(); } - - String getTableDataPath(const String & string) const override { return nested_database->getTableDataPath(string); } - - String getTableDataPath(const ASTCreateQuery & query) const override { return nested_database->getTableDataPath(query); } - - UUID tryGetTableUUID(const String & string) const override { return nested_database->tryGetTableUUID(string); } - - bool isDictionaryExist(const String & string) const override { return nested_database->isDictionaryExist(string); } - - DatabaseDictionariesIteratorPtr getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name) override - { - return nested_database->getDictionariesIterator(filter_by_dictionary_name); - } - - void createTable(const Context & context, const String & string, const StoragePtr & ptr, const ASTPtr & astPtr) override - { - nested_database->createTable(context, string, ptr, astPtr); - } - - void createDictionary(const Context & context, const String & string, const ASTPtr & ptr) override - { - nested_database->createDictionary(context, string, ptr); - } - - void dropTable(const Context & context, const String & string, bool no_delay) override - { - nested_database->dropTable(context, string, no_delay); - } - - void removeDictionary(const Context & context, const String & string) override { nested_database->removeDictionary(context, string); } - - void attachTable(const String & string, const StoragePtr & ptr, const String & relative_table_path) override - { - nested_database->attachTable(string, ptr, relative_table_path); - } - - void attachDictionary(const String & string, const DictionaryAttachInfo & info) override { nested_database->attachDictionary(string, info); } - - StoragePtr detachTable(const String & string) override { return nested_database->detachTable(string); } - - void detachDictionary(const String & string) override { nested_database->detachDictionary(string); } - - void renameTable(const Context & context, const String & string, IDatabase & database, const String & string1, bool b) override - { - nested_database->renameTable(context, string, database, string1, b); - } - - void alterTable(const Context & context, const StorageID & id, const StorageInMemoryMetadata & metadata) override - { - nested_database->alterTable(context, id, metadata); - } - - time_t getObjectMetadataModificationTime(const String & string) const override - { - return nested_database->getObjectMetadataModificationTime(string); - } - - Poco::AutoPtr getDictionaryConfiguration(const String & string) const override - { - return nested_database->getDictionaryConfiguration(string); - } - - String getMetadataPath() const override { return nested_database->getMetadataPath(); } - - String getObjectMetadataPath(const String & string) const override { return nested_database->getObjectMetadataPath(string); } - - bool shouldBeEmptyOnDetach() const override { return nested_database->shouldBeEmptyOnDetach(); } - - void drop(const Context & context) override { nested_database->drop(context); } - - bool isTableExist(const String & name) const override { return nested_database->isTableExist(name); } - - StoragePtr tryGetTable(const String & name) const override { return nested_database->tryGetTable(name); } - - void loadStoredObjects(Context & context, bool b) override - { - try - { - LOG_DEBUG(log, "Loading MySQL nested database stored objects."); - nested_database->loadStoredObjects(context, b); - LOG_DEBUG(log, "Loaded MySQL nested database stored objects."); - } - catch (...) - { - tryLogCurrentException(log, "Cannot load MySQL nested database stored objects."); - throw; - } - } - - ASTPtr getCreateDatabaseQuery() const override - { - const auto & create_query = std::make_shared(); - create_query->database = database_name; - create_query->set(create_query->storage, database_engine_define); - return create_query; - } - - DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override { return nested_database->getTablesIterator(filter_by_table_name); } - private: const Context & global_context; String metadata_path; - ASTPtr database_engine_define; String mysql_database_name; - DatabasePtr nested_database; - size_t version{0}; mutable mysqlxx::Pool pool; mutable MySQLClient client; + std::unique_ptr settings; - Poco::Logger * log; + void cleanOutdatedTables(); - void synchronized(); + void scheduleSynchronized(); - MasterStatusInfo prepareSynchronized(std::unique_lock & lock); + BlockOutputStreamPtr getTableOutput(const String & table_name); - BlockOutputStreamPtr getTableOutput(const String & table_name, bool fill_version); + std::optional prepareSynchronized(std::unique_lock & lock, const std::function & is_cancelled); - BlockIO tryToExecuteQuery(const String & query_to_execute, const String & comment); + void dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function & is_cancelled); - String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name); - - void dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector & tables_name, const std::function & is_cancelled); - - mutable std::mutex sync_mutex; + std::mutex sync_mutex; std::atomic sync_quit{false}; std::condition_variable sync_cond; - ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronized, this}; + ThreadPool background_thread_pool{1}; }; } diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp new file mode 100644 index 00000000000..6b628ddf26d --- /dev/null +++ b/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.cpp @@ -0,0 +1,188 @@ +#include + +#include +#include +#include +#include + +namespace DB +{ + +static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync"; + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + +DatabaseMaterializeMySQLWrap::DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name) + : IDatabase(nested_database_->getDatabaseName()), nested_database(nested_database_), database_engine_define(database_engine_define_), log(&Logger::get(log_name)) +{ +} + +void DatabaseMaterializeMySQLWrap::setException(const std::exception_ptr & exception_) +{ + std::unique_lock lock(mutex); + exception = exception_; +} + +DatabasePtr DatabaseMaterializeMySQLWrap::getNestedDatabase() const +{ + std::unique_lock lock(mutex); + + if (exception) + std::rethrow_exception(exception); + + return nested_database; +} + +ASTPtr DatabaseMaterializeMySQLWrap::getCreateDatabaseQuery() const +{ + const auto & create_query = std::make_shared(); + create_query->database = database_name; + create_query->set(create_query->storage, database_engine_define); + return create_query; +} +void DatabaseMaterializeMySQLWrap::loadStoredObjects(Context & context, bool has_force_restore_data_flag) +{ + try + { + LOG_DEBUG(log, "Loading MySQL nested database stored objects."); + getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag); + LOG_DEBUG(log, "Loaded MySQL nested database stored objects."); + } + catch (...) + { + tryLogCurrentException(log, "Cannot load MySQL nested database stored objects."); + throw; + } +} + +void DatabaseMaterializeMySQLWrap::shutdown() +{ + getNestedDatabase()->shutdown(); +} + +bool DatabaseMaterializeMySQLWrap::empty() const +{ + return getNestedDatabase()->empty(); +} + +String DatabaseMaterializeMySQLWrap::getDataPath() const +{ + return getNestedDatabase()->getDataPath(); +} + +String DatabaseMaterializeMySQLWrap::getMetadataPath() const +{ + return getNestedDatabase()->getMetadataPath(); +} + +String DatabaseMaterializeMySQLWrap::getTableDataPath(const String & table_name) const +{ + return getNestedDatabase()->getTableDataPath(table_name); +} + +String DatabaseMaterializeMySQLWrap::getTableDataPath(const ASTCreateQuery & query) const +{ + return getNestedDatabase()->getTableDataPath(query); +} + +String DatabaseMaterializeMySQLWrap::getObjectMetadataPath(const String & table_name) const +{ + return getNestedDatabase()->getObjectMetadataPath(table_name); +} + +UUID DatabaseMaterializeMySQLWrap::tryGetTableUUID(const String & table_name) const +{ + return getNestedDatabase()->tryGetTableUUID(table_name); +} + +time_t DatabaseMaterializeMySQLWrap::getObjectMetadataModificationTime(const String & name) const +{ + return getNestedDatabase()->getObjectMetadataModificationTime(name); +} + +void DatabaseMaterializeMySQLWrap::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support create table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->createTable(context, name, table, query); +} + +void DatabaseMaterializeMySQLWrap::dropTable(const Context & context, const String & name, bool no_delay) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support drop table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->dropTable(context, name, no_delay); +} + +void DatabaseMaterializeMySQLWrap::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support attach table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->attachTable(name, table, relative_table_path); +} + +StoragePtr DatabaseMaterializeMySQLWrap::detachTable(const String & name) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support detach table.", ErrorCodes::NOT_IMPLEMENTED); + + return getNestedDatabase()->detachTable(name); +} + +void DatabaseMaterializeMySQLWrap::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support rename table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->renameTable(context, name, to_database, to_name, exchange); +} + +void DatabaseMaterializeMySQLWrap::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + throw Exception("MySQL database in locality_data mode does not support alter table.", ErrorCodes::NOT_IMPLEMENTED); + + getNestedDatabase()->alterTable(context, table_id, metadata); +} + +bool DatabaseMaterializeMySQLWrap::shouldBeEmptyOnDetach() const +{ + return getNestedDatabase()->shouldBeEmptyOnDetach(); +} + +void DatabaseMaterializeMySQLWrap::drop(const Context & context) +{ + getNestedDatabase()->drop(context); +} + +bool DatabaseMaterializeMySQLWrap::isTableExist(const String & name) const +{ + return getNestedDatabase()->isTableExist(name); +} + +StoragePtr DatabaseMaterializeMySQLWrap::tryGetTable(const String & name) const +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + return std::make_shared(getNestedDatabase()->tryGetTable(name)); + + return getNestedDatabase()->tryGetTable(name); +} + +DatabaseTablesIteratorPtr DatabaseMaterializeMySQLWrap::getTablesIterator(const FilterByNameFunction & filter_by_table_name) +{ + if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME) + { + DatabaseTablesIteratorPtr iterator = getNestedDatabase()->getTablesIterator(filter_by_table_name); + return std::make_unique(std::move(iterator)); + } + + return getNestedDatabase()->getTablesIterator(filter_by_table_name); +} + +} diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h b/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h new file mode 100644 index 00000000000..0f16661c66f --- /dev/null +++ b/src/Databases/MySQL/DatabaseMaterializeMySQLWrap.h @@ -0,0 +1,70 @@ +#pragma once + +#include + +namespace DB +{ + +class DatabaseMaterializeMySQLWrap : public IDatabase +{ +public: + ASTPtr getCreateDatabaseQuery() const override; + + void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override; + + DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name); +protected: + DatabasePtr nested_database; + ASTPtr database_engine_define; + Poco::Logger * log; + + mutable std::mutex mutex; + std::exception_ptr exception; + + DatabasePtr getNestedDatabase() const; + + void setException(const std::exception_ptr & exception); + +public: + void shutdown() override; + + bool empty() const override; + + String getDataPath() const override; + + String getTableDataPath(const String & table_name) const override; + + String getTableDataPath(const ASTCreateQuery & query) const override; + + UUID tryGetTableUUID(const String & table_name) const override; + + void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override; + + void dropTable(const Context & context, const String & name, bool no_delay) override; + + void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override; + + StoragePtr detachTable(const String & name) override; + + void renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) override; + + void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override; + + time_t getObjectMetadataModificationTime(const String & name) const override; + + String getMetadataPath() const override; + + String getObjectMetadataPath(const String & table_name) const override; + + bool shouldBeEmptyOnDetach() const override; + + void drop(const Context & context) override; + + bool isTableExist(const String & name) const override; + + StoragePtr tryGetTable(const String & name) const override; + + DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override; +}; + +} diff --git a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h new file mode 100644 index 00000000000..805ae3e4c6e --- /dev/null +++ b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h @@ -0,0 +1,34 @@ +#pragma once + +#include + +namespace DB +{ + +class DatabaseMaterializeTablesIterator final : public IDatabaseTablesIterator +{ +public: + virtual void next() { nested_iterator->next(); } + + virtual bool isValid() const { return nested_iterator->isValid(); } + + virtual const String & name() const { return nested_iterator->name(); } + + virtual const StoragePtr & table() const + { + StoragePtr storage = std::make_shared(nested_iterator->table()); + return tables.emplace_back(storage); + } + + virtual UUID uuid() const { return nested_iterator->uuid(); } + + DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_) : nested_iterator(std::move(nested_iterator_)) + {} + +private: + mutable std::vector tables; + DatabaseTablesIteratorPtr nested_iterator; + +}; + +} diff --git a/src/Databases/MySQL/EventConsumer.cpp b/src/Databases/MySQL/EventConsumer.cpp new file mode 100644 index 00000000000..9671464fd2c --- /dev/null +++ b/src/Databases/MySQL/EventConsumer.cpp @@ -0,0 +1,261 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +using namespace MySQLReplication; + +EventConsumer::~EventConsumer() +{ + if (!quit && !background_exception) + { + { + quit = true; + std::lock_guard lock(mutex); + } + + cond.notify_one(); + background_thread_pool.wait(); + } +} + +EventConsumer::EventConsumer( + const String & database_, const Context & context_, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_) + : metadata(metadata_), context(context_), settings(settings_), database(database_), prev_version(metadata.version) +{ + background_thread_pool.scheduleOrThrowOnError([&]() + { + ThreadStatus thread_status; + setThreadName("MySQLDBSync"); + std::unique_lock lock(mutex); + const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); }; + + while (!quit_requested() && !background_exception) + { + if (!buffers.empty() && total_bytes_in_buffers) + flushBuffers(); + + cond.wait_for(lock, std::chrono::milliseconds(settings.max_flush_data_time), quit_requested); + } + }); +} + +void EventConsumer::onWriteData(const String & table_name, const std::vector & rows_data) +{ + BufferPtr buffer = getTableBuffer(table_name); + + size_t prev_bytes = buffer->data.bytes(); + for (size_t column = 0; column < buffer->data.columns() - 2; ++column) + { + MutableColumnPtr col_to = (*std::move(buffer->data.getByPosition(column).column)).mutate(); + + for (size_t index = 0; index < rows_data.size(); ++index) + col_to->insert(DB::get(rows_data[index])[column]); + } + + fillSignColumnsAndMayFlush(buffer->data, 1, ++metadata.version, rows_data.size(), prev_bytes); +} + +static inline bool differenceSortingKeys(const Tuple & row_old_data, const Tuple & row_new_data, const std::vector sorting_columns_index) +{ + for (const auto & sorting_column_index : sorting_columns_index) + if (row_old_data[sorting_column_index] != row_new_data[sorting_column_index]) + return true; + + return false; +} + +void EventConsumer::onUpdateData(const String & table_name, const std::vector & rows_data) +{ + if (rows_data.size() % 2 != 0) + throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR); + + BufferPtr buffer = getTableBuffer(table_name); + + size_t prev_bytes = buffer->data.bytes(); + std::vector difference_sorting_keys_mark(rows_data.size() / 2); + + for (size_t index = 0; index < rows_data.size(); index += 2) + difference_sorting_keys_mark.emplace_back(differenceSortingKeys( + DB::get(rows_data[index]), DB::get(rows_data[index + 1]), buffer->sorting_columns_index)); + + for (size_t column = 0; column < buffer->data.columns() - 2; ++column) + { + MutableColumnPtr col_to = (*std::move(buffer->data.getByPosition(column).column)).mutate(); + + for (size_t index = 0; index < rows_data.size(); index += 2) + { + if (likely(!difference_sorting_keys_mark[index / 2])) + col_to->insert(DB::get(rows_data[index + 1])[column]); + else + { + /// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently + col_to->insert(DB::get(rows_data[index])[column]); + col_to->insert(DB::get(rows_data[index + 1])[column]); + } + } + } + + MutableColumnPtr sign_mutable_column = (*std::move(buffer->data.getByPosition(buffer->data.columns() - 2)).column).mutate(); + MutableColumnPtr version_mutable_column = (*std::move(buffer->data.getByPosition(buffer->data.columns() - 1)).column).mutate(); + + ColumnInt8::Container & sign_column_data = assert_cast(*sign_mutable_column).getData(); + ColumnUInt64::Container & version_column_data = assert_cast(*version_mutable_column).getData(); + + UInt64 new_version = ++metadata.version; + for (size_t index = 0; index < rows_data.size(); index += 2) + { + if (likely(!difference_sorting_keys_mark[index / 2])) + { + sign_column_data.emplace_back(1); + version_column_data.emplace_back(new_version); + } + else + { + /// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently + sign_column_data.emplace_back(-1); + sign_column_data.emplace_back(1); + version_column_data.emplace_back(new_version); + version_column_data.emplace_back(new_version); + } + } + + total_bytes_in_buffers += (buffer->data.bytes() - prev_bytes); + if (buffer->data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers) + flushBuffers(); +} + +void EventConsumer::onDeleteData(const String & table_name, const std::vector & rows_data) +{ + BufferPtr buffer = getTableBuffer(table_name); + + size_t prev_bytes = buffer->data.bytes(); + for (size_t column = 0; column < buffer->data.columns() - 2; ++column) + { + MutableColumnPtr col_to = (*std::move(buffer->data.getByPosition(column).column)).mutate(); + + for (size_t index = 0; index < rows_data.size(); ++index) + col_to->insert(DB::get(rows_data[index])[column]); + } + + fillSignColumnsAndMayFlush(buffer->data, -1, ++metadata.version, rows_data.size(), prev_bytes); +} + +EventConsumer::BufferPtr EventConsumer::getTableBuffer(const String & table_name) +{ + if (buffers.find(table_name) == buffers.end()) + { + StoragePtr storage = DatabaseCatalog::instance().getDatabase(database)->tryGetTable(table_name); + + buffers[table_name] = std::make_shared(); + buffers[table_name]->data = storage->getSampleBlockNonMaterialized(); + if (StorageMergeTree * table_merge_tree = dynamic_cast(storage.get())) + { + Names required_for_sorting_key = table_merge_tree->getColumnsRequiredForSortingKey(); + + for (const auto & required_name_for_sorting_key : required_for_sorting_key) + buffers[table_name]->sorting_columns_index.emplace_back( + buffers[table_name]->data.getPositionByName(required_name_for_sorting_key)); + } + } + + return buffers[table_name]; +} + +void EventConsumer::onEvent(const BinlogEventPtr & receive_event, const MySQLReplication::Position & position) +{ + std::unique_lock lock(mutex); + + if (background_exception) + background_thread_pool.wait(); + + last_position = position; + if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT) + { + WriteRowsEvent & write_rows_event = static_cast(*receive_event); + write_rows_event.dump(); + onWriteData(write_rows_event.table, write_rows_event.rows); + } + else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT) + { + UpdateRowsEvent & update_rows_event = static_cast(*receive_event); + update_rows_event.dump(); + onUpdateData(update_rows_event.table, update_rows_event.rows); + } + else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT) + { + DeleteRowsEvent & delete_rows_event = static_cast(*receive_event); + delete_rows_event.dump(); + onDeleteData(delete_rows_event.table, delete_rows_event.rows); + } + else if (receive_event->type() == MYSQL_QUERY_EVENT) + { + /// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL. +// flush_function(); + /// TODO: 直接使用Interpreter执行即可 + } +} + +void EventConsumer::fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes) +{ + MutableColumnPtr sign_mutable_column = (*std::move(data.getByPosition(data.columns() - 2)).column).mutate(); + MutableColumnPtr version_mutable_column = (*std::move(data.getByPosition(data.columns() - 1)).column).mutate(); + + ColumnInt8::Container & sign_column_data = assert_cast(*sign_mutable_column).getData(); + ColumnUInt64::Container & version_column_data = assert_cast(*version_mutable_column).getData(); + + for (size_t index = 0; index < fill_size; ++index) + { + sign_column_data.emplace_back(sign_value); + version_column_data.emplace_back(version_value); + } + + total_bytes_in_buffers += (data.bytes() - prev_bytes); + if (data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers) + flushBuffers(); +} + +void EventConsumer::flushBuffers() +{ + /// TODO: 事务保证 + try + { + for (auto & table_name_and_buffer : buffers) + { + const String & table_name = table_name_and_buffer.first; + BufferPtr & buffer = table_name_and_buffer.second; + + Context query_context = context; + query_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + query_context.setCurrentQueryId(""); // generate random query_id + String with_database_table_name = backQuoteIfNeed(database) + "." + backQuoteIfNeed(table_name); + BlockIO res = executeQuery("INSERT INTO " + with_database_table_name + " VALUES", query_context, true); + + OneBlockInputStream input(buffer->data); + copyData(input, *res.out); + } + + buffers.clear(); + total_bytes_in_buffers = 0; + prev_version = metadata.version; + } + catch(...) + { + buffers.clear(); + total_bytes_in_buffers = 0; + metadata.version = prev_version; + background_exception = true; + throw; + } +} + +} diff --git a/src/Databases/MySQL/EventConsumer.h b/src/Databases/MySQL/EventConsumer.h new file mode 100644 index 00000000000..f21265efe9d --- /dev/null +++ b/src/Databases/MySQL/EventConsumer.h @@ -0,0 +1,63 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "MaterializeMetadata.h" + +namespace DB +{ + +class EventConsumer : private boost::noncopyable +{ +public: + ~EventConsumer(); + + void onEvent(const MySQLReplication::BinlogEventPtr & event, const MySQLReplication::Position & position); + + EventConsumer(const String & database_, const Context & context, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_); +private: + MaterializeMetadata & metadata; + + const Context & context; + const MaterializeModeSettings & settings; + + String database; + size_t prev_version; + size_t total_bytes_in_buffers = 0; + MySQLReplication::Position last_position; + + struct Buffer + { + Block data; + std::vector sorting_columns_index; + }; + + using BufferPtr = std::shared_ptr; + std::unordered_map buffers; + + void flushBuffers(); + + BufferPtr getTableBuffer(const String & table_name); + + void onWriteData(const std::string & table_name, const std::vector & rows_data); + + void onUpdateData(const std::string & table_name, const std::vector & rows_data); + + void onDeleteData(const std::string & table_name, const std::vector & rows_data); + + void fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes); + + mutable std::mutex mutex; + std::condition_variable cond; + std::atomic_bool quit = false; + std::atomic_bool background_exception = false; + ThreadPool background_thread_pool{1}; +}; + +} diff --git a/src/Databases/MySQL/MasterStatusInfo.cpp b/src/Databases/MySQL/MaterializeMetadata.cpp similarity index 52% rename from src/Databases/MySQL/MasterStatusInfo.cpp rename to src/Databases/MySQL/MaterializeMetadata.cpp index 687a028ebdc..768ed4fc57b 100644 --- a/src/Databases/MySQL/MasterStatusInfo.cpp +++ b/src/Databases/MySQL/MaterializeMetadata.cpp @@ -1,17 +1,44 @@ #include #include #include -#include +#include #include -#include -#include -#include #include #include +#include +#include +#include +#include namespace DB { +static std::unordered_map fetchTablesCreateQuery( + const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name, const std::vector & fetch_tables) +{ + std::unordered_map tables_create_query; + for (size_t index = 0; index < fetch_tables.size(); ++index) + { + Block show_create_table_header{ + {std::make_shared(), "Table"}, + {std::make_shared(), "Create Table"}, + }; + + MySQLBlockInputStream show_create_table( + connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_tables[index]), + show_create_table_header, DEFAULT_BLOCK_SIZE); + + Block create_query_block = show_create_table.read(); + if (!create_query_block || create_query_block.rows() != 1) + throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR); + + tables_create_query[fetch_tables[index]] = create_query_block.getByName("Create Table").column->getDataAt(0).toString(); + } + + return tables_create_query; +} + + static std::vector fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database) { Block header{{std::make_shared(), "table_name"}}; @@ -29,7 +56,7 @@ static std::vector fetchTablesInDB(const mysqlxx::PoolWithFailover::Entr return tables_in_db; } -void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection) +void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection) { Block header{ {std::make_shared(), "File"}, @@ -45,6 +72,7 @@ void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & conn if (!master_status || master_status.rows() != 1) throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR); + version = 0; binlog_file = (*master_status.getByPosition(0).column)[0].safeGet(); binlog_position = (*master_status.getByPosition(1).column)[0].safeGet(); binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet(); @@ -52,7 +80,7 @@ void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & conn executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet(); } -bool MasterStatusInfo::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) +bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) { Block header{ {std::make_shared(), "Log_name"}, @@ -73,53 +101,65 @@ bool MasterStatusInfo::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & } return false; } -void MasterStatusInfo::finishDump() -{ - WriteBufferFromFile out(persistent_path); - out << "Version:\t1\n" - << "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db - << "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set; - out.next(); - out.sync(); +void commitMetadata(const std::function & function, const String & persistent_tmp_path, const String & persistent_path) +{ + try + { + function(); + + Poco::File(persistent_tmp_path).renameTo(persistent_path); + } + catch (...) + { + Poco::File(persistent_tmp_path).remove(); + throw; + } } -void MasterStatusInfo::transaction(const MySQLReplication::Position & position, const std::function & fun) +void MaterializeMetadata::transaction(const MySQLReplication::Position & position, const std::function & fun) { binlog_file = position.binlog_name; binlog_position = position.binlog_pos; + String persistent_tmp_path = persistent_path + ".tmp"; + { - Poco::File temp_file(persistent_path + ".temp"); - if (temp_file.exists()) - temp_file.remove(); + WriteBufferFromFile out(persistent_tmp_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT | O_EXCL); + + /// TSV format metadata file. + writeString("Version:\t1\n", out); + writeString("Binlog File:\t" + binlog_file + "\n", out); + writeString("Executed GTID:\t" + executed_gtid_set + "\n", out); + writeString("Binlog Position:\t" + toString(binlog_position) + "\n", out); + writeString("Data Version:\t" + toString(version) + "\n", out); + + out.next(); + out.sync(); + out.close(); } - WriteBufferFromFile out(persistent_path + ".temp"); - out << "Version:\t1\n" - << "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db - << "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set; - out.next(); - out.sync(); - - fun(); - Poco::File(persistent_path + ".temp").renameTo(persistent_path); + commitMetadata(fun, persistent_tmp_path, persistent_path); } -MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database) +MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database) : persistent_path(path_) { if (Poco::File(persistent_path).exists()) { - ReadBufferFromFile in(persistent_path); - in >> "Version:\t1\n" >> "Binlog File:\t" >> binlog_file >> "\nBinlog Position:\t" >> binlog_position >> "\nBinlog Do DB:\t" - >> binlog_do_db >> "\nBinlog Ignore DB:\t" >> binlog_ignore_db >> "\nExecuted GTID SET:\t" >> executed_gtid_set; + ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE); + assertString("Version:\t1\n", in); + assertString("Binlog File:\t", in); + readString(binlog_file, in); + assertString("Executed GTID:\t", in); + readString(executed_gtid_set, in); + assertString("Binlog Position:\t", in); + readIntText(binlog_position, in); + assertString("Data Version:\t", in); + readIntText(version, in); if (checkBinlogFileExists(connection)) - { - std::cout << "Load From File \n"; return; - } } bool locked_tables = false; @@ -134,9 +174,8 @@ MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); - need_dumping_tables = fetchTablesInDB(connection, database); + need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database)); connection->query("UNLOCK TABLES;").execute(); - /// TODO: 拉取建表语句, 解析并构建出表结构(列列表, 主键, 唯一索引, 分区键) } catch (...) { diff --git a/src/Databases/MySQL/MasterStatusInfo.h b/src/Databases/MySQL/MaterializeMetadata.h similarity index 72% rename from src/Databases/MySQL/MasterStatusInfo.h rename to src/Databases/MySQL/MaterializeMetadata.h index d18caf4b5fe..ad2f77a2f42 100644 --- a/src/Databases/MySQL/MasterStatusInfo.h +++ b/src/Databases/MySQL/MaterializeMetadata.h @@ -8,7 +8,7 @@ namespace DB { -struct MasterStatusInfo +struct MaterializeMetadata { const String persistent_path; @@ -18,9 +18,8 @@ struct MasterStatusInfo String binlog_ignore_db; String executed_gtid_set; - std::vector need_dumping_tables; - - void finishDump(); + size_t version = 0; + std::unordered_map need_dumping_tables; void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection); @@ -28,9 +27,7 @@ struct MasterStatusInfo void transaction(const MySQLReplication::Position & position, const std::function & fun); - MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database); - - + MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database); }; } diff --git a/src/Databases/MySQL/MaterializeModeSettings.cpp b/src/Databases/MySQL/MaterializeModeSettings.cpp new file mode 100644 index 00000000000..d8a25fa3bb9 --- /dev/null +++ b/src/Databases/MySQL/MaterializeModeSettings.cpp @@ -0,0 +1,42 @@ +#include + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_COLLECTION(MaterializeModeSettings, LIST_OF_MATERIALIZE_MODE_SETTINGS) + +void MaterializeModeSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + throw Exception(e.message() + " for storage " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS); + else + e.rethrow(); + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} + +} diff --git a/src/Databases/MySQL/MaterializeModeSettings.h b/src/Databases/MySQL/MaterializeModeSettings.h new file mode 100644 index 00000000000..35c65253c87 --- /dev/null +++ b/src/Databases/MySQL/MaterializeModeSettings.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class ASTStorage; + +/** Settings for the MySQL Database engine(materialize mode). + * Could be loaded from a CREATE DATABASE query (SETTINGS clause). + */ +struct MaterializeModeSettings : public SettingsCollection +{ +#define LIST_OF_MATERIALIZE_MODE_SETTINGS(M) \ + M(SettingBool, locality_data, false, "", 0) \ + M(SettingUInt64, max_rows_in_buffer, DEFAULT_BLOCK_SIZE, "", 0) \ + M(SettingUInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "", 0) \ + M(SettingUInt64, max_flush_data_time, 1000, "", 0) \ + M(SettingUInt64, max_wait_time_when_mysql_unavailable, 1000, "", 0) \ + + DECLARE_SETTINGS_COLLECTION(LIST_OF_MATERIALIZE_MODE_SETTINGS) + + void loadFromQuery(ASTStorage & storage_def); +}; + +} diff --git a/src/Databases/MySQL/queryConvert.cpp b/src/Databases/MySQL/queryConvert.cpp index decbf8f3cb6..0414c654bd6 100644 --- a/src/Databases/MySQL/queryConvert.cpp +++ b/src/Databases/MySQL/queryConvert.cpp @@ -137,13 +137,16 @@ String getUniqueColumnName(NamesAndTypesList columns_name_and_type, const String String toCreateQuery(const MySQLTableStruct & table_struct, const Context & context) { /// TODO: settings - String create_version = getUniqueColumnName(table_struct.columns_name_and_type, "_create_version"); - String delete_version = getUniqueColumnName(table_struct.columns_name_and_type, "_delete_version"); + if (table_struct.primary_keys.empty()) + throw Exception("", ErrorCodes::NOT_IMPLEMENTED); + WriteBufferFromOwnString out; + String sign = getUniqueColumnName(table_struct.columns_name_and_type, "__sign"); + String version = getUniqueColumnName(table_struct.columns_name_and_type, "__version"); out << "CREATE TABLE " << (table_struct.if_not_exists ? "IF NOT EXISTS" : "") - << backQuoteIfNeed(table_struct.database_name) + "." << backQuoteIfNeed(table_struct.table_name) << "(" - << queryToString(InterpreterCreateQuery::formatColumns(table_struct.columns_name_and_type)) - << ", " << create_version << " UInt64, " << delete_version << " UInt64" << ") ENGINE = MergeTree()" + << backQuoteIfNeed(table_struct.database_name) + "." << backQuoteIfNeed(table_struct.table_name) + << "(" << queryToString(InterpreterCreateQuery::formatColumns(table_struct.columns_name_and_type)) + << ", " << sign << " Int8, " << version << " UInt64" << ") ENGINE = ReplacingMergeTree(" + version + ")" << " PARTITION BY " << queryToString(getFormattedPartitionByExpression(table_struct, context, 1000, 50000)) << " ORDER BY " << queryToString(getFormattedOrderByExpression(table_struct)); return out.str(); diff --git a/src/Parsers/ParserSetQuery.cpp b/src/Parsers/ParserSetQuery.cpp index 30d681cb126..8bfd426b03f 100644 --- a/src/Parsers/ParserSetQuery.cpp +++ b/src/Parsers/ParserSetQuery.cpp @@ -28,7 +28,11 @@ bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & p if (!s_eq.ignore(pos, expected)) return false; - if (!value_p.parse(pos, value, expected)) + if (ParserKeyword("TRUE").ignore(pos, expected)) + value = std::make_shared(Field(UInt64(1))); + else if (ParserKeyword("FALSE").ignore(pos, expected)) + value = std::make_shared(Field(UInt64(0))); + else if (!value_p.parse(pos, value, expected)) return false; tryGetIdentifierNameInto(name, change.name); diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp new file mode 100644 index 00000000000..6bcdc9bd3b8 --- /dev/null +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -0,0 +1,56 @@ +#include + +#include +#include + +#include + +namespace DB +{ + +StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_) + : IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_) +{ + ColumnsDescription columns_desc; + const ColumnsDescription & nested_columns_desc = nested_storage->getColumns(); + + size_t index = 0; + auto iterator = nested_columns_desc.begin(); + for (; index < nested_columns_desc.size() - 2; ++index, ++iterator) + columns_desc.add(*iterator); + + setColumns(columns_desc); +} + +Pipes StorageMaterializeMySQL::read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned int num_streams) +{ + if (ASTSelectQuery * select_query = query_info.query->as()) + { + auto & tables_in_select_query = select_query->tables()->as(); + + if (!tables_in_select_query.children.empty()) + { + auto & tables_element = tables_in_select_query.children[0]->as(); + + if (tables_element.table_expression) + tables_element.table_expression->as().final = true; + } + } + + Names require_columns_name = column_names; + Block header = nested_storage->getSampleBlockNonMaterialized(); + ColumnWithTypeAndName & sign_column = header.getByPosition(header.columns() - 2); + + if (require_columns_name.end() == std::find(require_columns_name.begin(), require_columns_name.end(), sign_column.name)) + require_columns_name.emplace_back(sign_column.name); + + return nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams); +} + +} diff --git a/src/Storages/StorageMaterializeMySQL.h b/src/Storages/StorageMaterializeMySQL.h new file mode 100644 index 00000000000..d88e72ed737 --- /dev/null +++ b/src/Storages/StorageMaterializeMySQL.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +namespace DB +{ + +class StorageMaterializeMySQL final : public ext::shared_ptr_helper, public IStorage +{ + friend struct ext::shared_ptr_helper; +public: + String getName() const override { return "MySQL"; } + + bool supportsFinal() const override { return nested_storage->supportsFinal(); } + bool supportsSampling() const override { return nested_storage->supportsSampling(); } + + + StorageMaterializeMySQL(const StoragePtr & nested_storage_); + + Pipes read( + const Names & column_names, const SelectQueryInfo & query_info, + const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; + +private: + StoragePtr nested_storage; +}; + +}