ISSUES-4006 refactor

This commit is contained in:
zhang2014 2020-06-22 11:12:05 +08:00
parent 0f75b2d0f0
commit 4a99ca0d80
14 changed files with 798 additions and 804 deletions

View File

@ -13,6 +13,7 @@
M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \
M(BackgroundMySQLSyncSchedulePoolTask, "Number of active tasks in BackgroundMySQLSyncSchedulePoolTask. This pool is used for MySQL Materialize Database sync.") \
M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \
M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \
M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \

View File

@ -125,6 +125,9 @@ void MySQLClient::ping()
void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos)
{
if (dump_thread)
return;
/// Set binlog checksum to CRC32.
String checksum = "CRC32";
writeCommand(Command::COM_QUERY, "SET @master_binlog_checksum = '" + checksum + "'");
@ -142,15 +145,33 @@ void MySQLClient::startBinlogDump(UInt32 slave_id, String replicate_db, String b
binlog_pos = binlog_pos < 4 ? 4 : binlog_pos;
BinlogDump binlog_dump(binlog_pos, binlog_file_name, slave_id);
packet_sender->sendPacket<BinlogDump>(binlog_dump, true);
}
BinlogEventPtr MySQLClient::readOneBinlogEvent()
{
while (true)
dump_thread.emplace([this]()
{
packet_sender->receivePacket(replication);
return replication.readOneEvent();
}
while (true)
{
try
{
packet_sender->receivePacket(replication);
events.push(std::make_pair(replication.readOneEvent(), replication.getPosition()));
}
catch(...)
{
tryLogCurrentException("MySQLClient");
/// TODO: maybe sleep?
}
}
});
}
BinlogEventPtr MySQLClient::readOneBinlogEvent(UInt64 milliseconds)
{
std::pair<BinlogEventPtr, Position> event;
if (!events.tryPop(event, milliseconds))
return {};
last_position = event.second;
return event.first;
}
}

View File

@ -11,6 +11,7 @@
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/ConcurrentBoundedQueue.h>
namespace DB
@ -35,8 +36,8 @@ public:
void ping();
void startBinlogDump(UInt32 slave_id, String replicate_db, String binlog_file_name, UInt64 binlog_pos);
BinlogEventPtr readOneBinlogEvent();
Position getPosition() const { return replication.getPosition(); }
BinlogEventPtr readOneBinlogEvent(UInt64 milliseconds = 0);
Position getPosition() const { return last_position; }
private:
String host;
@ -58,6 +59,9 @@ private:
std::unique_ptr<Poco::Net::StreamSocket> socket;
std::optional<Poco::Net::SocketAddress> address;
std::shared_ptr<PacketSender> packet_sender;
Position last_position;
std::optional<ThreadFromGlobalPool> dump_thread;
ConcurrentBoundedQueue<std::pair<BinlogEventPtr, Position>> events{1};
void handshake();
void registerSlaveOnMaster(UInt32 slave_id);

View File

@ -20,8 +20,8 @@
#if USE_MYSQL
# include <Core/MySQLClient.h>
# include <Databases/MySQL/DatabaseConnectionMySQL.h>
# include <Databases/MySQL/MaterializeMySQLSettings.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Databases/MySQL/MaterializeModeSettings.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Common/parseAddress.h>
# include <mysqlxx/Pool.h>
@ -119,7 +119,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
auto materialize_mode_settings = std::make_unique<MaterializeModeSettings>();
auto materialize_mode_settings = std::make_unique<MaterializeMySQLSettings>();
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);

View File

@ -1,198 +1,195 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_MYSQL
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <cstdlib>
# include <Columns/ColumnTuple.h>
# include <DataStreams/AddingVersionsBlockOutputStream.h>
# include <DataStreams/copyData.h>
# include <Databases/MySQL/queryConvert.h>
# include <Databases/MySQL/EventConsumer.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Interpreters/executeQuery.h>
# include <Parsers/parseQuery.h>
# include <Common/quoteString.h>
# include <Common/setThreadName.h>
# include <common/sleep.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/MySQL/DatabaseMaterializeTablesIterator.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/StorageMaterializeMySQL.h>
#include <Poco/Logger.h>
#include <Common/setThreadName.h>
namespace DB
{
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
static inline BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & comment)
{
try
{
Context context = context_;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true);
}
catch (...)
{
tryLogCurrentException("DatabaseMaterializeMySQL", "Query " + query_to_execute + " wasn't finished successfully");
throw;
}
LOG_DEBUG(&Logger::get("DatabaseMaterializeMySQL"), "Executed query: " << query_to_execute);
extern const int NOT_IMPLEMENTED;
}
DatabaseMaterializeMySQL::DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_
, const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_
, MySQLClient && client_ , std::unique_ptr<MaterializeModeSettings> settings_)
: DatabaseMaterializeMySQLWrap(std::make_shared<DatabaseOrdinary>(database_name_, metadata_path_, context), database_engine_define_->clone(), "DatabaseMaterializeMySQL")
, global_context(context.getGlobalContext()), metadata_path(metadata_path_), mysql_database_name(mysql_database_name_)
, pool(std::move(pool_)), client(std::move(client_)), settings(std::move(settings_))
const Context & context, const String & database_name_, const String & metadata_path_, const IAST * database_engine_define_
, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, std::unique_ptr<MaterializeMySQLSettings> settings_)
: IDatabase(database_name_), engine_define(database_engine_define_->clone())
, nested_database(std::make_shared<DatabaseOrdinary>(database_name_, metadata_path_, context))
, settings(std::move(settings_)), log(&Poco::Logger::get("DatabaseMaterializeMySQL"))
, materialize_thread(context, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{
/// TODO: 做简单的check, 失败即报错
scheduleSynchronized();
}
BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name)
void DatabaseMaterializeMySQL::setException(const std::exception_ptr & exception_)
{
String with_database_table_name = backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name);
BlockIO res = tryToExecuteQuery("INSERT INTO " + with_database_table_name + " VALUES", global_context, "");
if (!res.out)
throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR);
return res.out;
std::unique_lock<std::mutex> lock(mutex);
exception = exception_;
}
void DatabaseMaterializeMySQL::cleanOutdatedTables()
DatabasePtr DatabaseMaterializeMySQL::getNestedDatabase() const
{
auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
const DatabasePtr & clean_database = DatabaseCatalog::instance().getDatabase(database_name);
std::unique_lock<std::mutex> lock(mutex);
for (auto iterator = clean_database->getTablesIterator(); iterator->isValid(); iterator->next())
{
String table = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name());
String comment = String("Clean ") + table + " for dump mysql.";
tryToExecuteQuery("DROP TABLE " + table, global_context, comment);
}
if (exception)
std::rethrow_exception(exception);
return nested_database;
}
void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function<bool()> & is_cancelled)
ASTPtr DatabaseMaterializeMySQL::getCreateDatabaseQuery() const
{
auto iterator = master_info.need_dumping_tables.begin();
for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator)
{
const auto & table_name = iterator->first;
MySQLTableStruct table_struct = visitCreateQuery(iterator->second, global_context, database_name);
String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name);
tryToExecuteQuery(toCreateQuery(table_struct, global_context), global_context, comment);
BlockOutputStreamPtr out = std::make_shared<AddingVersionsBlockOutputStream>(master_info.version, getTableOutput(table_name));
MySQLBlockInputStream input(connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE);
copyData(input, *out, is_cancelled);
}
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->set(create_query->storage, engine_define);
return create_query;
}
std::optional<MaterializeMetadata> DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock<std::mutex> & lock, const std::function<bool()> & is_cancelled)
{
while (!is_cancelled())
{
try
{
LOG_DEBUG(log, "Checking " + database_name + " database status.");
while (!is_cancelled && !DatabaseCatalog::instance().isDatabaseExist(database_name))
sync_cond.wait_for(lock, std::chrono::seconds(1));
LOG_DEBUG(log, database_name + " database status is OK.");
mysqlxx::PoolWithFailover::Entry connection = pool.get();
MaterializeMetadata metadata(connection, getMetadataPath() + "/.metadata", mysql_database_name);
if (!metadata.need_dumping_tables.empty())
{
metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]()
{
cleanOutdatedTables();
dumpDataForTables(connection, metadata, is_cancelled);
});
}
client.connect();
client.startBinlogDump(std::rand(), mysql_database_name, metadata.binlog_file, metadata.binlog_position);
return metadata;
}
catch (mysqlxx::Exception & )
{
tryLogCurrentException(log);
/// Avoid busy loop when MySQL is not available.
sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable);
}
}
return {};
}
void DatabaseMaterializeMySQL::scheduleSynchronized()
{
background_thread_pool.scheduleOrThrowOnError([&]()
{
ThreadStatus thread_status;
setThreadName("MySQLDBSync");
std::unique_lock<std::mutex> lock(sync_mutex);
const auto quit_requested = [this] { return sync_quit.load(std::memory_order_relaxed); };
try
{
std::optional<MaterializeMetadata> metadata = prepareSynchronized(lock, quit_requested);
if (!quit_requested() && metadata)
{
EventConsumer consumer(getDatabaseName(), global_context, *metadata, *settings);
while (!quit_requested())
{
const auto & event = client.readOneBinlogEvent();
consumer.onEvent(event, client.getPosition());
}
}
}
catch(...)
{
setException(std::current_exception());
}
});
}
DatabaseMaterializeMySQL::~DatabaseMaterializeMySQL()
void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag)
{
try
{
if (!sync_quit)
{
{
sync_quit = true;
std::lock_guard<std::mutex> lock(sync_mutex);
}
sync_cond.notify_one();
background_thread_pool.wait();
}
LOG_DEBUG(log, "Loading MySQL nested database stored objects.");
getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag);
LOG_DEBUG(log, "Loaded MySQL nested database stored objects.");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, "Cannot load MySQL nested database stored objects.");
throw;
}
}
void DatabaseMaterializeMySQL::shutdown()
{
getNestedDatabase()->shutdown();
}
#endif
bool DatabaseMaterializeMySQL::empty() const
{
return getNestedDatabase()->empty();
}
String DatabaseMaterializeMySQL::getDataPath() const
{
return getNestedDatabase()->getDataPath();
}
String DatabaseMaterializeMySQL::getMetadataPath() const
{
return getNestedDatabase()->getMetadataPath();
}
String DatabaseMaterializeMySQL::getTableDataPath(const String & table_name) const
{
return getNestedDatabase()->getTableDataPath(table_name);
}
String DatabaseMaterializeMySQL::getTableDataPath(const ASTCreateQuery & query) const
{
return getNestedDatabase()->getTableDataPath(query);
}
String DatabaseMaterializeMySQL::getObjectMetadataPath(const String & table_name) const
{
return getNestedDatabase()->getObjectMetadataPath(table_name);
}
UUID DatabaseMaterializeMySQL::tryGetTableUUID(const String & table_name) const
{
return getNestedDatabase()->tryGetTableUUID(table_name);
}
time_t DatabaseMaterializeMySQL::getObjectMetadataModificationTime(const String & name) const
{
return getNestedDatabase()->getObjectMetadataModificationTime(name);
}
void DatabaseMaterializeMySQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->createTable(context, name, table, query);
}
void DatabaseMaterializeMySQL::dropTable(const Context & context, const String & name, bool no_delay)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support drop table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->dropTable(context, name, no_delay);
}
void DatabaseMaterializeMySQL::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->attachTable(name, table, relative_table_path);
}
StoragePtr DatabaseMaterializeMySQL::detachTable(const String & name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
return getNestedDatabase()->detachTable(name);
}
void DatabaseMaterializeMySQL::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->renameTable(context, name, to_database, to_name, exchange);
}
void DatabaseMaterializeMySQL::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support alter table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->alterTable(context, table_id, metadata);
}
bool DatabaseMaterializeMySQL::shouldBeEmptyOnDetach() const
{
return getNestedDatabase()->shouldBeEmptyOnDetach();
}
void DatabaseMaterializeMySQL::drop(const Context & context)
{
getNestedDatabase()->drop(context);
}
bool DatabaseMaterializeMySQL::isTableExist(const String & name, const Context & context) const
{
return getNestedDatabase()->isTableExist(name, context);
}
StoragePtr DatabaseMaterializeMySQL::tryGetTable(const String & name, const Context & context) const
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
return std::make_shared<StorageMaterializeMySQL>(getNestedDatabase()->tryGetTable(name, context));
return getNestedDatabase()->tryGetTable(name, context);
}
DatabaseTablesIteratorPtr DatabaseMaterializeMySQL::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
{
DatabaseTablesIteratorPtr iterator = getNestedDatabase()->getTablesIterator(context, filter_by_table_name);
return std::make_unique<DatabaseMaterializeTablesIterator>(std::move(iterator));
}
return getNestedDatabase()->getTablesIterator(context, filter_by_table_name);
}
}

View File

@ -1,64 +1,80 @@
#pragma once
#include "config_core.h"
#if USE_MYSQL
# include <mutex>
# include <Core/MySQLClient.h>
# include <DataStreams/BlockIO.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/IDatabase.h>
# include <Databases/MySQL/DatabaseMaterializeMySQLWrap.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Databases/MySQL/MaterializeModeSettings.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Parsers/ASTCreateQuery.h>
# include <mysqlxx/Pool.h>
# include <mysqlxx/PoolWithFailover.h>
#include <mysqlxx/Pool.h>
#include <Core/MySQLClient.h>
#include <Databases/IDatabase.h>
#include <Databases/MySQL/MaterializeMySQLSettings.h>
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
namespace DB
{
class DatabaseMaterializeMySQL : public DatabaseMaterializeMySQLWrap
class DatabaseMaterializeMySQL : public IDatabase
{
public:
~DatabaseMaterializeMySQL() override;
DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_, std::unique_ptr<MaterializeModeSettings> settings_);
const IAST * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_, std::unique_ptr<MaterializeMySQLSettings> settings_);
String getEngineName() const override { return "MySQL"; }
void setException(const std::exception_ptr & exception);
protected:
ASTPtr engine_define;
DatabasePtr nested_database;
std::unique_ptr<MaterializeMySQLSettings> settings;
private:
const Context & global_context;
String metadata_path;
String mysql_database_name;
Poco::Logger * log;
MaterializeMySQLSyncThread materialize_thread;
mutable mysqlxx::Pool pool;
mutable MySQLClient client;
std::unique_ptr<MaterializeModeSettings> settings;
mutable std::mutex mutex;
std::exception_ptr exception;
void cleanOutdatedTables();
DatabasePtr getNestedDatabase() const;
void scheduleSynchronized();
public:
ASTPtr getCreateDatabaseQuery() const override;
BlockOutputStreamPtr getTableOutput(const String & table_name);
void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override;
std::optional<MaterializeMetadata> prepareSynchronized(std::unique_lock<std::mutex> & lock, const std::function<bool()> & is_cancelled);
void shutdown() override;
void dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function<bool()> & is_cancelled);
bool empty() const override;
std::mutex sync_mutex;
std::atomic<bool> sync_quit{false};
std::condition_variable sync_cond;
ThreadPool background_thread_pool{1};
String getDataPath() const override;
String getTableDataPath(const String & table_name) const override;
String getTableDataPath(const ASTCreateQuery & query) const override;
UUID tryGetTableUUID(const String & table_name) const override;
void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void dropTable(const Context & context, const String & name, bool no_delay) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & name) override;
void renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) override;
void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(const String & name) const override;
String getMetadataPath() const override;
String getObjectMetadataPath(const String & table_name) const override;
bool shouldBeEmptyOnDetach() const override;
void drop(const Context & context) override;
bool isTableExist(const String & name, const Context & context) const override;
StoragePtr tryGetTable(const String & name, const Context & context) const override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
};
}
#endif

View File

@ -1,188 +0,0 @@
#include <Databases/MySQL/DatabaseMaterializeMySQLWrap.h>
#include <Common/setThreadName.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/StorageMaterializeMySQL.h>
#include <Databases/MySQL/DatabaseMaterializeTablesIterator.h>
namespace DB
{
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
DatabaseMaterializeMySQLWrap::DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name)
: IDatabase(nested_database_->getDatabaseName()), nested_database(nested_database_), database_engine_define(database_engine_define_), log(&Logger::get(log_name))
{
}
void DatabaseMaterializeMySQLWrap::setException(const std::exception_ptr & exception_)
{
std::unique_lock<std::mutex> lock(mutex);
exception = exception_;
}
DatabasePtr DatabaseMaterializeMySQLWrap::getNestedDatabase() const
{
std::unique_lock<std::mutex> lock(mutex);
if (exception)
std::rethrow_exception(exception);
return nested_database;
}
ASTPtr DatabaseMaterializeMySQLWrap::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
void DatabaseMaterializeMySQLWrap::loadStoredObjects(Context & context, bool has_force_restore_data_flag)
{
try
{
LOG_DEBUG(log, "Loading MySQL nested database stored objects.");
getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag);
LOG_DEBUG(log, "Loaded MySQL nested database stored objects.");
}
catch (...)
{
tryLogCurrentException(log, "Cannot load MySQL nested database stored objects.");
throw;
}
}
void DatabaseMaterializeMySQLWrap::shutdown()
{
getNestedDatabase()->shutdown();
}
bool DatabaseMaterializeMySQLWrap::empty() const
{
return getNestedDatabase()->empty();
}
String DatabaseMaterializeMySQLWrap::getDataPath() const
{
return getNestedDatabase()->getDataPath();
}
String DatabaseMaterializeMySQLWrap::getMetadataPath() const
{
return getNestedDatabase()->getMetadataPath();
}
String DatabaseMaterializeMySQLWrap::getTableDataPath(const String & table_name) const
{
return getNestedDatabase()->getTableDataPath(table_name);
}
String DatabaseMaterializeMySQLWrap::getTableDataPath(const ASTCreateQuery & query) const
{
return getNestedDatabase()->getTableDataPath(query);
}
String DatabaseMaterializeMySQLWrap::getObjectMetadataPath(const String & table_name) const
{
return getNestedDatabase()->getObjectMetadataPath(table_name);
}
UUID DatabaseMaterializeMySQLWrap::tryGetTableUUID(const String & table_name) const
{
return getNestedDatabase()->tryGetTableUUID(table_name);
}
time_t DatabaseMaterializeMySQLWrap::getObjectMetadataModificationTime(const String & name) const
{
return getNestedDatabase()->getObjectMetadataModificationTime(name);
}
void DatabaseMaterializeMySQLWrap::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->createTable(context, name, table, query);
}
void DatabaseMaterializeMySQLWrap::dropTable(const Context & context, const String & name, bool no_delay)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support drop table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->dropTable(context, name, no_delay);
}
void DatabaseMaterializeMySQLWrap::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->attachTable(name, table, relative_table_path);
}
StoragePtr DatabaseMaterializeMySQLWrap::detachTable(const String & name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
return getNestedDatabase()->detachTable(name);
}
void DatabaseMaterializeMySQLWrap::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->renameTable(context, name, to_database, to_name, exchange);
}
void DatabaseMaterializeMySQLWrap::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support alter table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->alterTable(context, table_id, metadata);
}
bool DatabaseMaterializeMySQLWrap::shouldBeEmptyOnDetach() const
{
return getNestedDatabase()->shouldBeEmptyOnDetach();
}
void DatabaseMaterializeMySQLWrap::drop(const Context & context)
{
getNestedDatabase()->drop(context);
}
bool DatabaseMaterializeMySQLWrap::isTableExist(const String & name) const
{
return getNestedDatabase()->isTableExist(name);
}
StoragePtr DatabaseMaterializeMySQLWrap::tryGetTable(const String & name) const
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
return std::make_shared<StorageMaterializeMySQL>(getNestedDatabase()->tryGetTable(name));
return getNestedDatabase()->tryGetTable(name);
}
DatabaseTablesIteratorPtr DatabaseMaterializeMySQLWrap::getTablesIterator(const FilterByNameFunction & filter_by_table_name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
{
DatabaseTablesIteratorPtr iterator = getNestedDatabase()->getTablesIterator(filter_by_table_name);
return std::make_unique<DatabaseMaterializeTablesIterator>(std::move(iterator));
}
return getNestedDatabase()->getTablesIterator(filter_by_table_name);
}
}

View File

@ -1,70 +0,0 @@
#pragma once
#include <Databases/IDatabase.h>
namespace DB
{
class DatabaseMaterializeMySQLWrap : public IDatabase
{
public:
ASTPtr getCreateDatabaseQuery() const override;
void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override;
DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name);
protected:
DatabasePtr nested_database;
ASTPtr database_engine_define;
Poco::Logger * log;
mutable std::mutex mutex;
std::exception_ptr exception;
DatabasePtr getNestedDatabase() const;
void setException(const std::exception_ptr & exception);
public:
void shutdown() override;
bool empty() const override;
String getDataPath() const override;
String getTableDataPath(const String & table_name) const override;
String getTableDataPath(const ASTCreateQuery & query) const override;
UUID tryGetTableUUID(const String & table_name) const override;
void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void dropTable(const Context & context, const String & name, bool no_delay) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & name) override;
void renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) override;
void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(const String & name) const override;
String getMetadataPath() const override;
String getObjectMetadataPath(const String & table_name) const override;
bool shouldBeEmptyOnDetach() const override;
void drop(const Context & context) override;
bool isTableExist(const String & name) const override;
StoragePtr tryGetTable(const String & name) const override;
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
};
}

View File

@ -1,261 +0,0 @@
#include <Databases/MySQL/EventConsumer.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/copyData.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/executeQuery.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Common/assert_cast.h>
#include <Common/setThreadName.h>
namespace DB
{
using namespace MySQLReplication;
EventConsumer::~EventConsumer()
{
if (!quit && !background_exception)
{
{
quit = true;
std::lock_guard<std::mutex> lock(mutex);
}
cond.notify_one();
background_thread_pool.wait();
}
}
EventConsumer::EventConsumer(
const String & database_, const Context & context_, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_)
: metadata(metadata_), context(context_), settings(settings_), database(database_), prev_version(metadata.version)
{
background_thread_pool.scheduleOrThrowOnError([&]()
{
ThreadStatus thread_status;
setThreadName("MySQLDBSync");
std::unique_lock<std::mutex> lock(mutex);
const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); };
while (!quit_requested() && !background_exception)
{
if (!buffers.empty() && total_bytes_in_buffers)
flushBuffers();
cond.wait_for(lock, std::chrono::milliseconds(settings.max_flush_data_time), quit_requested);
}
});
}
void EventConsumer::onWriteData(const String & table_name, const std::vector<Field> & rows_data)
{
BufferPtr buffer = getTableBuffer(table_name);
size_t prev_bytes = buffer->data.bytes();
for (size_t column = 0; column < buffer->data.columns() - 2; ++column)
{
MutableColumnPtr col_to = IColumn::mutate(std::move(buffer->data.getByPosition(column).column));
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
fillSignColumnsAndMayFlush(buffer->data, 1, ++metadata.version, rows_data.size(), prev_bytes);
}
static inline bool differenceSortingKeys(const Tuple & row_old_data, const Tuple & row_new_data, const std::vector<size_t> sorting_columns_index)
{
for (const auto & sorting_column_index : sorting_columns_index)
if (row_old_data[sorting_column_index] != row_new_data[sorting_column_index])
return true;
return false;
}
void EventConsumer::onUpdateData(const String & table_name, const std::vector<Field> & rows_data)
{
if (rows_data.size() % 2 != 0)
throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR);
BufferPtr buffer = getTableBuffer(table_name);
size_t prev_bytes = buffer->data.bytes();
std::vector<bool> difference_sorting_keys_mark(rows_data.size() / 2);
for (size_t index = 0; index < rows_data.size(); index += 2)
difference_sorting_keys_mark.emplace_back(differenceSortingKeys(
DB::get<const Tuple &>(rows_data[index]), DB::get<const Tuple &>(rows_data[index + 1]), buffer->sorting_columns_index));
for (size_t column = 0; column < buffer->data.columns() - 2; ++column)
{
MutableColumnPtr col_to = IColumn::mutate(std::move(buffer->data.getByPosition(column).column));
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
else
{
/// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
}
}
}
MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(buffer->data.getByPosition(buffer->data.columns() - 2).column));
MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(buffer->data.getByPosition(buffer->data.columns() - 1).column));
ColumnInt8::Container & sign_column_data = assert_cast<ColumnInt8 &>(*sign_mutable_column).getData();
ColumnUInt64::Container & version_column_data = assert_cast<ColumnUInt64 &>(*version_mutable_column).getData();
UInt64 new_version = ++metadata.version;
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
{
sign_column_data.emplace_back(1);
version_column_data.emplace_back(new_version);
}
else
{
/// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently
sign_column_data.emplace_back(-1);
sign_column_data.emplace_back(1);
version_column_data.emplace_back(new_version);
version_column_data.emplace_back(new_version);
}
}
total_bytes_in_buffers += (buffer->data.bytes() - prev_bytes);
if (buffer->data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers)
flushBuffers();
}
void EventConsumer::onDeleteData(const String & table_name, const std::vector<Field> & rows_data)
{
BufferPtr buffer = getTableBuffer(table_name);
size_t prev_bytes = buffer->data.bytes();
for (size_t column = 0; column < buffer->data.columns() - 2; ++column)
{
MutableColumnPtr col_to = IColumn::mutate(std::move(buffer->data.getByPosition(column).column));
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
fillSignColumnsAndMayFlush(buffer->data, -1, ++metadata.version, rows_data.size(), prev_bytes);
}
EventConsumer::BufferPtr EventConsumer::getTableBuffer(const String & table_name)
{
if (buffers.find(table_name) == buffers.end())
{
StoragePtr storage = DatabaseCatalog::instance().getDatabase(database)->tryGetTable(table_name, context);
buffers[table_name] = std::make_shared<Buffer>();
buffers[table_name]->data = storage->getSampleBlockNonMaterialized();
if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(storage.get()))
{
Names required_for_sorting_key = table_merge_tree->getColumnsRequiredForSortingKey();
for (const auto & required_name_for_sorting_key : required_for_sorting_key)
buffers[table_name]->sorting_columns_index.emplace_back(
buffers[table_name]->data.getPositionByName(required_name_for_sorting_key));
}
}
return buffers[table_name];
}
void EventConsumer::onEvent(const BinlogEventPtr & receive_event, const MySQLReplication::Position & position)
{
std::unique_lock<std::mutex> lock(mutex);
if (background_exception)
background_thread_pool.wait();
last_position = position;
if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT)
{
WriteRowsEvent & write_rows_event = static_cast<WriteRowsEvent &>(*receive_event);
write_rows_event.dump();
onWriteData(write_rows_event.table, write_rows_event.rows);
}
else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT)
{
UpdateRowsEvent & update_rows_event = static_cast<UpdateRowsEvent &>(*receive_event);
update_rows_event.dump();
onUpdateData(update_rows_event.table, update_rows_event.rows);
}
else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT)
{
DeleteRowsEvent & delete_rows_event = static_cast<DeleteRowsEvent &>(*receive_event);
delete_rows_event.dump();
onDeleteData(delete_rows_event.table, delete_rows_event.rows);
}
else if (receive_event->type() == MYSQL_QUERY_EVENT)
{
/// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL.
// flush_function();
/// TODO: 直接使用Interpreter执行即可
}
}
void EventConsumer::fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes)
{
MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 2).column));
MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 1).column));
ColumnInt8::Container & sign_column_data = assert_cast<ColumnInt8 &>(*sign_mutable_column).getData();
ColumnUInt64::Container & version_column_data = assert_cast<ColumnUInt64 &>(*version_mutable_column).getData();
for (size_t index = 0; index < fill_size; ++index)
{
sign_column_data.emplace_back(sign_value);
version_column_data.emplace_back(version_value);
}
total_bytes_in_buffers += (data.bytes() - prev_bytes);
if (data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers)
flushBuffers();
}
void EventConsumer::flushBuffers()
{
/// TODO: 事务保证
try
{
for (auto & table_name_and_buffer : buffers)
{
const String & table_name = table_name_and_buffer.first;
BufferPtr & buffer = table_name_and_buffer.second;
Context query_context = context;
query_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context.setCurrentQueryId(""); // generate random query_id
String with_database_table_name = backQuoteIfNeed(database) + "." + backQuoteIfNeed(table_name);
BlockIO res = executeQuery("INSERT INTO " + with_database_table_name + " VALUES", query_context, true);
OneBlockInputStream input(buffer->data);
copyData(input, *res.out);
}
buffers.clear();
total_bytes_in_buffers = 0;
prev_version = metadata.version;
}
catch(...)
{
buffers.clear();
total_bytes_in_buffers = 0;
metadata.version = prev_version;
background_exception = true;
throw;
}
}
}

View File

@ -1,63 +0,0 @@
#pragma once
#include <unordered_map>
#include <Columns/IColumn.h>
#include <Core/Field.h>
#include <Core/MySQLReplication.h>
#include <Databases/IDatabase.h>
#include <Databases/MySQL/MaterializeModeSettings.h>
#include <boost/noncopyable.hpp>
#include <Common/ThreadPool.h>
#include "MaterializeMetadata.h"
namespace DB
{
class EventConsumer : private boost::noncopyable
{
public:
~EventConsumer();
void onEvent(const MySQLReplication::BinlogEventPtr & event, const MySQLReplication::Position & position);
EventConsumer(const String & database_, const Context & context, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_);
private:
MaterializeMetadata & metadata;
const Context & context;
const MaterializeModeSettings & settings;
String database;
size_t prev_version;
size_t total_bytes_in_buffers = 0;
MySQLReplication::Position last_position;
struct Buffer
{
Block data;
std::vector<size_t> sorting_columns_index;
};
using BufferPtr = std::shared_ptr<Buffer>;
std::unordered_map<String, BufferPtr> buffers;
void flushBuffers();
BufferPtr getTableBuffer(const String & table_name);
void onWriteData(const std::string & table_name, const std::vector<Field> & rows_data);
void onUpdateData(const std::string & table_name, const std::vector<Field> & rows_data);
void onDeleteData(const std::string & table_name, const std::vector<Field> & rows_data);
void fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes);
mutable std::mutex mutex;
std::condition_variable cond;
std::atomic_bool quit = false;
std::atomic_bool background_exception = false;
ThreadPool background_thread_pool{1};
};
}

View File

@ -1,4 +1,4 @@
#include <Databases/MySQL/MaterializeModeSettings.h>
#include <Databases/MySQL/MaterializeMySQLSettings.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
@ -13,9 +13,9 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_COLLECTION(MaterializeModeSettings, LIST_OF_MATERIALIZE_MODE_SETTINGS)
IMPLEMENT_SETTINGS_COLLECTION(MaterializeMySQLSettings, LIST_OF_MATERIALIZE_MODE_SETTINGS)
void MaterializeModeSettings::loadFromQuery(ASTStorage & storage_def)
void MaterializeMySQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{

View File

@ -11,11 +11,13 @@ class ASTStorage;
/** Settings for the MySQL Database engine(materialize mode).
* Could be loaded from a CREATE DATABASE query (SETTINGS clause).
*/
struct MaterializeModeSettings : public SettingsCollection<MaterializeModeSettings>
struct MaterializeMySQLSettings : public SettingsCollection<MaterializeMySQLSettings>
{
#define LIST_OF_MATERIALIZE_MODE_SETTINGS(M) \
M(SettingBool, locality_data, false, "", 0) \
M(SettingUInt64, max_rows_in_buffer, DEFAULT_BLOCK_SIZE, "", 0) \
M(SettingUInt64, max_bytes_in_buffer, DBMS_DEFAULT_BUFFER_SIZE, "", 0) \
M(SettingUInt64, max_rows_in_buffers, DEFAULT_BLOCK_SIZE, "", 0) \
M(SettingUInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "", 0) \
M(SettingUInt64, max_flush_data_time, 1000, "", 0) \
M(SettingUInt64, max_wait_time_when_mysql_unavailable, 1000, "", 0) \

View File

@ -0,0 +1,444 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_MYSQL
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
# include <cstdlib>
# include <Columns/ColumnTuple.h>
# include <DataStreams/AddingVersionsBlockOutputStream.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/copyData.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Databases/MySQL/queryConvert.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Interpreters/executeQuery.h>
# include <Parsers/parseQuery.h>
# include <Storages/StorageMergeTree.h>
# include <Common/CurrentMetrics.h>
# include <Common/quoteString.h>
# include <Common/setThreadName.h>
# include <common/sleep.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
extern const Metric BackgroundMySQLSyncSchedulePoolTask;
}
static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & comment)
{
try
{
Context context = context_;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true);
}
catch (...)
{
tryLogCurrentException("MaterializeMySQLSyncThread", "Query " + query_to_execute + " wasn't finished successfully");
throw;
}
// LOG_DEBUG(&Logger::get("MaterializeMySQLSyncThread"), "Executed query: " + query_to_execute);
}
static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name)
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name);
if (DatabaseMaterializeMySQL * database_materialize = typeid_cast<DatabaseMaterializeMySQL *>(database.get()))
return *database_materialize;
throw Exception("", ErrorCodes::LOGICAL_ERROR);
}
MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
{
try
{
if (!sync_quit)
{
{
sync_quit = true;
std::lock_guard<std::mutex> lock(sync_mutex);
}
sync_cond.notify_one();
// sync_task_handler->deactivate();
// flush_task_handler->deactivate();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
const Context & context, const String & database_name_, const String & mysql_database_name_,
mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_)
: global_context(context), database_name(database_name_), mysql_database_name(mysql_database_name_)
, pool(std::move(pool_)), client(std::move(client_)), settings(settings_)
{
/// TODO: 做简单的check, 失败即报错
startSynchronization();
}
/*MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
const Context & context, const String & database_name_, const String & metadata_path_
, const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_
, MySQLClient && client_ , std::unique_ptr<MaterializeMySQLSettings> settings_)
: DatabaseMaterializeMySQL(std::make_shared<DatabaseOrdinary>(database_name_, metadata_path_, context), database_engine_define_->clone(), "MaterializeMySQLSyncThread")
, global_context(context.getGlobalContext()), metadata_path(metadata_path_), mysql_database_name(mysql_database_name_)
, pool(std::move(pool_)), client(std::move(client_)), settings(std::move(settings_))
{
}*/
void MaterializeMySQLSyncThread::synchronization()
{
try
{
if (std::optional<MaterializeMetadata> metadata = prepareSynchronized())
{
Stopwatch watch;
Buffers buffers(database_name);
while (!isCancelled())
{
/// TODO: add gc task for `sign = -1`(use alter table delete, execute by interval. need final state)
UInt64 max_flush_time = settings->max_flush_data_time;
BinlogEventPtr binlog_event = client.readOneBinlogEvent(std::max(UInt64(1), max_flush_time - watch.elapsedMilliseconds()));
{
std::unique_lock<std::mutex> lock(sync_mutex);
if (binlog_event)
onEvent(buffers, binlog_event, *metadata);
if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds(
settings->max_rows_in_buffer, settings->max_bytes_in_buffer,
settings->max_rows_in_buffers, settings->max_bytes_in_buffers)
)
{
watch.restart();
flushBuffersData(buffers, *metadata);
}
}
}
}
}
catch (...)
{
/// TODO: set
getDatabase(database_name).setException(std::current_exception());
}
}
void MaterializeMySQLSyncThread::startSynchronization()
{
if (!background_thread_pool->joinable())
throw Exception("", ErrorCodes::LOGICAL_ERROR);
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this]() { synchronization(); });
}
static inline void cleanOutdatedTables(const String & database_name, const Context & context)
{
auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
const DatabasePtr & clean_database = DatabaseCatalog::instance().getDatabase(database_name);
for (auto iterator = clean_database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
String table = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name());
String comment = String("Clean ") + table + " for dump mysql.";
tryToExecuteQuery("DROP TABLE " + table, context, comment);
}
}
static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, const Context & context)
{
String with_database_table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
BlockIO res = tryToExecuteQuery("INSERT INTO " + with_database_table_name + " VALUES", context, "");
if (!res.out)
throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR);
return res.out;
}
static inline void dumpDataForTables(
mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info,
const String & database_name, const String & mysql_database_name,
const Context & context, const std::function<bool()> & is_cancelled)
{
auto iterator = master_info.need_dumping_tables.begin();
for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator)
{
const auto & table_name = iterator->first;
MySQLTableStruct table_struct = visitCreateQuery(iterator->second, context, database_name);
String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name);
tryToExecuteQuery(toCreateQuery(table_struct, context), context, comment);
BlockOutputStreamPtr out = std::make_shared<AddingVersionsBlockOutputStream>(master_info.version, getTableOutput(database_name, table_name, context));
MySQLBlockInputStream input(
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name),
out->getHeader(), DEFAULT_BLOCK_SIZE);
copyData(input, *out, is_cancelled);
}
}
std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchronized()
{
std::unique_lock<std::mutex> lock(sync_mutex);
while (!isCancelled())
{
try
{
LOG_DEBUG(log, "Checking database status.");
while (!isCancelled() && !DatabaseCatalog::instance().isDatabaseExist(database_name))
sync_cond.wait_for(lock, std::chrono::seconds(1));
LOG_DEBUG(log, "Database status is OK.");
mysqlxx::PoolWithFailover::Entry connection = pool.get();
MaterializeMetadata metadata(connection, getDatabase(database_name).getMetadataPath() + "/.metadata", mysql_database_name);
if (!metadata.need_dumping_tables.empty())
{
metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]()
{
cleanOutdatedTables(database_name, global_context);
dumpDataForTables(connection, metadata, database_name, mysql_database_name, global_context, [this] { return isCancelled(); });
});
}
client.connect();
client.startBinlogDump(std::rand(), mysql_database_name, metadata.binlog_file, metadata.binlog_position);
return metadata;
}
catch (mysqlxx::Exception & )
{
tryLogCurrentException(log);
/// Avoid busy loop when MySQL is not available.
sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable);
}
}
return {};
}
void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata)
{
metadata.transaction(client.getPosition(), [&]() { buffers.commit(metadata, global_context); });
}
static inline void fillSignAndVersionColumnsData(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size)
{
MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 2).column));
MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(data.getByPosition(data.columns() - 1).column));
ColumnInt8::Container & sign_column_data = assert_cast<ColumnInt8 &>(*sign_mutable_column).getData();
ColumnUInt64::Container & version_column_data = assert_cast<ColumnUInt64 &>(*version_mutable_column).getData();
for (size_t index = 0; index < fill_size; ++index)
{
sign_column_data.emplace_back(sign_value);
version_column_data.emplace_back(version_value);
}
}
template <Int8 sign>
static size_t onWriteOrDeleteData(const std::vector<Field> & rows_data, Block & buffer, size_t version)
{
size_t prev_bytes = buffer.bytes();
for (size_t column = 0; column < buffer.columns() - 2; ++column)
{
MutableColumnPtr col_to = IColumn::mutate(std::move(buffer.getByPosition(column).column));
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
fillSignAndVersionColumnsData(buffer, sign, version, rows_data.size());
return buffer.bytes() - prev_bytes;
}
static inline bool differenceSortingKeys(const Tuple & row_old_data, const Tuple & row_new_data, const std::vector<size_t> sorting_columns_index)
{
for (const auto & sorting_column_index : sorting_columns_index)
if (row_old_data[sorting_column_index] != row_new_data[sorting_column_index])
return true;
return false;
}
static inline size_t onUpdateData(const std::vector<Field> & rows_data, Block & buffer, size_t version, const std::vector<size_t> & sorting_columns_index)
{
if (rows_data.size() % 2 != 0)
throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR);
size_t prev_bytes = buffer.bytes();
std::vector<bool> difference_sorting_keys_mark(rows_data.size() / 2);
for (size_t index = 0; index < rows_data.size(); index += 2)
difference_sorting_keys_mark.emplace_back(differenceSortingKeys(
DB::get<const Tuple &>(rows_data[index]), DB::get<const Tuple &>(rows_data[index + 1]), sorting_columns_index));
for (size_t column = 0; column < buffer.columns() - 2; ++column)
{
MutableColumnPtr col_to = IColumn::mutate(std::move(buffer.getByPosition(column).column));
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
else
{
/// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
}
}
}
MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(buffer.getByPosition(buffer.columns() - 2).column));
MutableColumnPtr version_mutable_column = IColumn::mutate(std::move(buffer.getByPosition(buffer.columns() - 1).column));
ColumnInt8::Container & sign_column_data = assert_cast<ColumnInt8 &>(*sign_mutable_column).getData();
ColumnUInt64::Container & version_column_data = assert_cast<ColumnUInt64 &>(*version_mutable_column).getData();
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
{
sign_column_data.emplace_back(1);
version_column_data.emplace_back(version);
}
else
{
/// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently
sign_column_data.emplace_back(-1);
sign_column_data.emplace_back(1);
version_column_data.emplace_back(version);
version_column_data.emplace_back(version);
}
}
return buffer.bytes() - prev_bytes;
}
void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr & receive_event, MaterializeMetadata & metadata)
{
if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT)
{
WriteRowsEvent & write_rows_event = static_cast<WriteRowsEvent &>(*receive_event);
Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(write_rows_event.table, global_context);
size_t bytes = onWriteOrDeleteData<1>(write_rows_event.rows, buffer->first, ++metadata.version);
buffers.add(buffer->first.rows(), buffer->first.bytes(), write_rows_event.rows.size(), bytes);
}
else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT)
{
UpdateRowsEvent & update_rows_event = static_cast<UpdateRowsEvent &>(*receive_event);
Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(update_rows_event.table, global_context);
size_t bytes = onUpdateData(update_rows_event.rows, buffer->first, ++metadata.version, buffer->second);
buffers.add(buffer->first.rows(), buffer->first.bytes(), update_rows_event.rows.size(), bytes);
}
else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT)
{
DeleteRowsEvent & delete_rows_event = static_cast<DeleteRowsEvent &>(*receive_event);
Buffers::BufferAndSortingColumnsPtr buffer = buffers.getTableDataBuffer(delete_rows_event.table, global_context);
size_t bytes = onWriteOrDeleteData<-1>(delete_rows_event.rows, buffer->first, ++metadata.version);
buffers.add(buffer->first.rows(), buffer->first.bytes(), delete_rows_event.rows.size(), bytes);
}
else if (receive_event->type() == MYSQL_QUERY_EVENT)
{
flushBuffersData(buffers, metadata);
/// TODO: 执行DDL.
/// TODO: 直接使用Interpreter执行即可
}
}
void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes)
{
total_blocks_rows += written_rows;
total_blocks_bytes += written_bytes;
max_block_rows = std::max(block_rows, max_block_rows);
max_block_bytes = std::max(block_bytes, max_block_bytes);
}
bool MaterializeMySQLSyncThread::Buffers::checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes)
{
return max_block_rows >= check_block_bytes || max_block_bytes >= check_block_bytes || total_blocks_rows >= check_total_rows
|| total_blocks_bytes >= check_total_bytes;
}
void MaterializeMySQLSyncThread::Buffers::commit(MaterializeMetadata & metatdata, const Context & context)
{
try
{
for (auto & table_name_and_buffer : data)
{
OneBlockInputStream input(table_name_and_buffer.second->first);
BlockOutputStreamPtr out = getTableOutput(database, table_name_and_buffer.first, context);
copyData(input, *out);
}
data.clear();
max_block_rows = 0;
max_block_bytes = 0;
total_blocks_rows = 0;
total_blocks_bytes = 0;
}
catch(...)
{
data.clear();
throw;
}
}
MaterializeMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializeMySQLSyncThread::Buffers::getTableDataBuffer(
const String & table_name, const Context & context)
{
const auto & iterator = data.find(table_name);
if (iterator == data.end())
{
StoragePtr storage = getDatabase(database).tryGetTable(table_name, context);
BufferAndSortingColumnsPtr & buffer_and_soring_columns = data.try_emplace(
table_name, std::make_shared<BufferAndSortingColumns>(storage->getSampleBlockNonMaterialized(), std::vector<size_t>{})).first->second;
if (StorageMergeTree * table_merge_tree = storage->as<StorageMergeTree>())
{
Names required_for_sorting_key = table_merge_tree->getColumnsRequiredForSortingKey();
for (const auto & required_name_for_sorting_key : required_for_sorting_key)
buffer_and_soring_columns->second.emplace_back(
buffer_and_soring_columns->first.getPositionByName(required_name_for_sorting_key));
}
return buffer_and_soring_columns;
}
return iterator->second;
}
}
#endif

View File

@ -0,0 +1,91 @@
#pragma once
#include "config_core.h"
#if USE_MYSQL
# include <mutex>
# include <Core/BackgroundSchedulePool.h>
# include <Core/MySQLClient.h>
# include <DataStreams/BlockIO.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/IDatabase.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Databases/MySQL/MaterializeMySQLSettings.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Parsers/ASTCreateQuery.h>
# include <mysqlxx/Pool.h>
# include <mysqlxx/PoolWithFailover.h>
namespace DB
{
class MaterializeMySQLSyncThread
{
public:
~MaterializeMySQLSyncThread();
MaterializeMySQLSyncThread(
const Context & context, const String & database_name_, const String & mysql_database_name_
, mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_);
void startSynchronization();
private:
Poco::Logger * log;
const Context & global_context;
String database_name;
String mysql_database_name;
mutable mysqlxx::Pool pool;
mutable MySQLClient client;
MaterializeMySQLSettings * settings;
struct Buffers
{
String database;
/// thresholds
size_t max_block_rows = 0;
size_t max_block_bytes = 0;
size_t total_blocks_rows = 0;
size_t total_blocks_bytes = 0;
using BufferAndSortingColumns = std::pair<Block, std::vector<size_t>>;
using BufferAndSortingColumnsPtr = std::shared_ptr<BufferAndSortingColumns>;
std::unordered_map<String, BufferAndSortingColumnsPtr> data;
Buffers(const String & database_) : database(database_) {}
void commit(MaterializeMetadata & metatdata, const Context & context);
void add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes);
bool checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes);
BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, const Context & context);
};
void synchronization();
bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); }
std::optional<MaterializeMetadata> prepareSynchronized();
void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata);
void onEvent(Buffers & buffers, const MySQLReplication::BinlogEventPtr & event, MaterializeMetadata & metadata);
std::mutex sync_mutex;
std::atomic<bool> sync_quit{false};
std::condition_variable sync_cond;
std::unique_ptr<ThreadFromGlobalPool> background_thread_pool;
};
}
#endif