ISSUES-4006 suport synchronous MySQL incremental data[part 2]

This commit is contained in:
zhang2014 2020-06-12 12:21:43 +08:00
parent 0c52d425ba
commit 0336a4ad58
21 changed files with 996 additions and 484 deletions

View File

@ -460,6 +460,7 @@ namespace MySQLReplication
String binlog_name; String binlog_name;
Position() : binlog_pos(0), binlog_name("") { } Position() : binlog_pos(0), binlog_name("") { }
Position(UInt64 binlog_pos_, const String & binlog_name_) : binlog_pos(binlog_pos_), binlog_name(binlog_name_) { }
void updateLogPos(UInt64 pos) { binlog_pos = pos; } void updateLogPos(UInt64 pos) { binlog_pos = pos; }
void updateLogName(String binlog) { binlog_name = std::move(binlog); } void updateLogName(String binlog) { binlog_name = std::move(binlog); }
}; };

View File

@ -23,22 +23,21 @@ void AddingVersionsBlockOutputStream::flush()
void AddingVersionsBlockOutputStream::write(const Block & block) void AddingVersionsBlockOutputStream::write(const Block & block)
{ {
/// create_version and delete_version are always in the last place
Block res; Block res;
size_t rows = block.rows(); size_t rows = block.rows();
for (size_t index = 0; index < block.columns(); ++index) for (size_t index = 0; index < block.columns(); ++index)
res.insert(block.getByPosition(index)); res.insert(block.getByPosition(index));
DataTypePtr data_type = std::make_shared<DataTypeUInt64>(); DataTypePtr sign_type = std::make_shared<DataTypeInt8>();
DataTypePtr version_type = std::make_shared<DataTypeUInt64>();
auto create_version = ColumnUInt64::create(rows); ColumnPtr sign_column = sign_type->createColumnConst(rows, Field(Int8(1)))->convertToFullColumnIfConst();
for (size_t index = 0; index < rows; ++index) ColumnPtr version_column = version_type->createColumnConst(rows, Field(UInt64(++version)))->convertToFullColumnIfConst();
create_version->getData()[index] = ++version;
Block header = output->getHeader(); Block header = output->getHeader();
res.insert(ColumnWithTypeAndName(create_version->getPtr(), data_type, header.getByPosition(header.columns() - 2).name)); res.insert({sign_column, sign_type, header.getByPosition(header.columns() - 2).name});
res.insert(ColumnWithTypeAndName(data_type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst(), data_type, header.getByPosition(header.columns() - 1).name)); res.insert({version_column, version_type, header.getByPosition(header.columns() - 1).name});
output->write(res); output->write(res);
} }
Block AddingVersionsBlockOutputStream::getHeader() const Block AddingVersionsBlockOutputStream::getHeader() const

View File

@ -43,7 +43,8 @@ public:
using ColumnType = ColumnVector<FieldType>; using ColumnType = ColumnVector<FieldType>;
using Value = std::pair<std::string, FieldType>; using Value = std::pair<std::string, FieldType>;
using Values = std::vector<Value>; using Values = std::vector<Value>;
using NameToValueMap = HashMap<StringRef, FieldType, StringRefHash>; using NameToValueMap = HashMap<Strin
gRef, FieldType, StringRefHash>;
using ValueToNameMap = std::unordered_map<FieldType, StringRef>; using ValueToNameMap = std::unordered_map<FieldType, StringRef>;
static constexpr bool is_parametric = true; static constexpr bool is_parametric = true;

View File

@ -18,12 +18,13 @@
#endif #endif
#if USE_MYSQL #if USE_MYSQL
# include <Core/MySQLClient.h>
# include <Databases/MySQL/DatabaseConnectionMySQL.h> # include <Databases/MySQL/DatabaseConnectionMySQL.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h> # include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Databases/MySQL/MaterializeModeSettings.h>
# include <Interpreters/evaluateConstantExpression.h> # include <Interpreters/evaluateConstantExpression.h>
# include <Common/parseAddress.h> # include <Common/parseAddress.h>
# include <mysqlxx/Pool.h> # include <mysqlxx/Pool.h>
# include <Core/MySQLClient.h>
#endif #endif
namespace DB namespace DB
@ -118,12 +119,18 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306); const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port); auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
if (materializeMySQLDatabase(engine_define->settings)) auto materialize_mode_settings = std::make_unique<MaterializeModeSettings>();
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (materialize_mode_settings->locality_data)
{ {
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password); MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
return std::make_shared<DatabaseMaterializeMySQL>( return std::make_shared<DatabaseMaterializeMySQL>(
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client)); context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
} }
return std::make_shared<DatabaseConnectionMySQL>(context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool)); return std::make_shared<DatabaseConnectionMySQL>(context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool));

View File

@ -1,135 +0,0 @@
#include <Databases/MySQL/DataBuffers.h>
#include <Storages/IStorage.h>
#include <common/sleep.h>
namespace DB
{
DataBuffers::DataBuffers(size_t & version_, IDatabase * database_, const std::function<void(const std::unordered_map<String, Block> &)> & flush_function_)
: version(version_), database(database_), flush_function(flush_function_)
{
/// TODO: 定时刷新
}
void DataBuffers::flush()
{
flush_function(buffers);
buffers.clear();
}
void DataBuffers::writeData(const std::string & table_name, const std::vector<Field> & rows_data)
{
Block & to = getTableBuffer(table_name, rows_data.size());
for (size_t column = 0; column < to.columns() - 2; ++column)
{
/// normally columns
MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
Field new_version(UInt64(++version));
MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate();
MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate();
delete_version_column->insertManyDefaults(rows_data.size());
for (size_t index = 0; index < rows_data.size(); ++index)
create_version_column->insert(new_version);
}
void DataBuffers::updateData(const String & table_name, const std::vector<Field> & rows_data)
{
if (rows_data.size() % 2 != 0)
throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR);
Block & to = getTableBuffer(table_name, rows_data.size());
for (size_t column = 0; column < to.columns() - 2; ++column)
{
/// normally columns
MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
Field new_version(UInt64(++version));
MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate();
MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
{
if (index % 2 == 0)
{
create_version_column->insertDefault();
delete_version_column->insert(new_version);
}
else
{
delete_version_column->insertDefault();
create_version_column->insert(new_version);
}
}
}
void DataBuffers::deleteData(const String & table_name, const std::vector<Field> & rows_data)
{
Block & to = getTableBuffer(table_name, rows_data.size());
for (size_t column = 0; column < to.columns() - 2; ++column)
{
/// normally columns
MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
Field new_version(UInt64(++version));
MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate();
MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate();
create_version_column->insertManyDefaults(rows_data.size());
for (size_t index = 0; index < rows_data.size(); ++index)
delete_version_column->insert(new_version);
}
Block & DataBuffers::getTableBuffer(const String & table_name, size_t write_size)
{
if (buffers.find(table_name) == buffers.end())
{
StoragePtr write_storage = database->tryGetTable(table_name);
buffers[table_name] = write_storage->getSampleBlockNonMaterialized();
}
/// TODO: settings
if (buffers[table_name].rows() + write_size > 8192)
flush();
return buffers[table_name];
}
[[noreturn]] void DataBuffers::scheduleFlush()
{
while (1)
{
try
{
flush();
sleepForSeconds(1);
}
catch (...)
{
// ++error_count;
// sleep_time = std::min(
// std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
// max_sleep_time);
tryLogCurrentException("");
}
}
}
}

View File

@ -1,42 +0,0 @@
#pragma once
#include <unordered_map>
#include <Columns/IColumn.h>
#include <Core/Field.h>
#include <Databases/IDatabase.h>
#include <boost/noncopyable.hpp>
#include <Common/ThreadPool.h>
namespace DB
{
class DataBuffers : private boost::noncopyable
{
public:
DataBuffers(size_t & version_, IDatabase * database_, const std::function<void(const std::unordered_map<String, Block> &)> & flush_function_);
void flush();
void writeData(const std::string & table_name, const std::vector<Field> & rows_data);
void updateData(const std::string & table_name, const std::vector<Field> & rows_data);
void deleteData(const std::string & table_name, const std::vector<Field> & rows_data);
private:
size_t & version;
IDatabase * database;
std::function<void(const std::unordered_map<String, Block> &)> flush_function;
mutable std::mutex mutex;
std::unordered_map<String, Block> buffers;
[[noreturn]] void scheduleFlush();
Block & getTableBuffer(const String & table_name, size_t write_size);
ThreadFromGlobalPool thread{&DataBuffers::scheduleFlush, this};
};
}

View File

@ -10,17 +10,14 @@
# include <Columns/ColumnTuple.h> # include <Columns/ColumnTuple.h>
# include <DataStreams/AddingVersionsBlockOutputStream.h> # include <DataStreams/AddingVersionsBlockOutputStream.h>
# include <DataStreams/copyData.h> # include <DataStreams/copyData.h>
# include <DataTypes/DataTypeString.h>
# include <Databases/DatabaseFactory.h>
# include <Databases/MySQL/DataBuffers.h>
# include <Databases/MySQL/MasterStatusInfo.h>
# include <Databases/MySQL/queryConvert.h> # include <Databases/MySQL/queryConvert.h>
# include <Databases/MySQL/EventConsumer.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLBlockInputStream.h> # include <Formats/MySQLBlockInputStream.h>
# include <IO/ReadBufferFromString.h> # include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h> # include <Interpreters/Context.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h> # include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Interpreters/executeQuery.h> # include <Interpreters/executeQuery.h>
# include <Parsers/MySQL/ASTCreateQuery.h>
# include <Parsers/parseQuery.h> # include <Parsers/parseQuery.h>
# include <Common/quoteString.h> # include <Common/quoteString.h>
# include <Common/setThreadName.h> # include <Common/setThreadName.h>
@ -34,189 +31,165 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY; extern const int INCORRECT_QUERY;
} }
DatabaseMaterializeMySQL::DatabaseMaterializeMySQL( static inline BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & comment)
const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_)
: IDatabase(database_name_), global_context(context.getGlobalContext()), metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_)
, nested_database(std::make_shared<DatabaseOrdinary>(database_name_, metadata_path, global_context))
, pool(std::move(pool_)), client(std::move(client_)), log(&Logger::get("DatabaseMaterializeMySQL"))
{ {
/// TODO: 做简单的check, 失败即报错
}
BlockIO DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute, const String & comment)
{
String query_prefix = "/*" + comment + " for " + backQuoteIfNeed(database_name) + " Database */ ";
try try
{ {
Context context = global_context; Context context = context_;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id context.setCurrentQueryId(""); // generate random query_id
return executeQuery(query_prefix + query_to_execute, context, true); return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true);
} }
catch (...) catch (...)
{ {
tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully"); tryLogCurrentException("DatabaseMaterializeMySQL", "Query " + query_to_execute + " wasn't finished successfully");
throw; throw;
} }
LOG_DEBUG(log, "Executed query: " << query_to_execute); LOG_DEBUG(&Logger::get("DatabaseMaterializeMySQL"), "Executed query: " << query_to_execute);
} }
String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name) DatabaseMaterializeMySQL::DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_
, const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_
, MySQLClient && client_ , std::unique_ptr<MaterializeModeSettings> settings_)
: DatabaseMaterializeMySQLWrap(std::make_shared<DatabaseOrdinary>(database_name_, metadata_path_, context), database_engine_define_->clone(), "DatabaseMaterializeMySQL")
, global_context(context.getGlobalContext()), metadata_path(metadata_path_), mysql_database_name(mysql_database_name_)
, pool(std::move(pool_)), client(std::move(client_)), settings(std::move(settings_))
{ {
Block show_create_table_header{ /// TODO: 做简单的check, 失败即报错
{std::make_shared<DataTypeString>(), "Table"}, scheduleSynchronized();
{std::make_shared<DataTypeString>(), "Create Table"},
};
MySQLBlockInputStream show_create_table(
connection, "SHOW CREATE TABLE " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name),
show_create_table_header, DEFAULT_BLOCK_SIZE);
Block create_query_block = show_create_table.read();
if (!create_query_block || create_query_block.rows() != 1)
throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR);
return create_query_block.getByName("Create Table").column->getDataAt(0).toString();
} }
BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name, bool fill_version) BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name)
{ {
Context context = global_context; String with_database_table_name = backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name);
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; BlockIO res = tryToExecuteQuery("INSERT INTO " + with_database_table_name + " VALUES", global_context, "");
context.setCurrentQueryId(""); // generate random query_id
StoragePtr write_storage = nested_database->tryGetTable(table_name); if (!res.out)
auto table_lock = write_storage->lockStructureForShare(true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout); throw Exception("LOGICAL ERROR:", ErrorCodes::LOGICAL_ERROR);
BlockOutputStreamPtr output = write_storage->write(ASTPtr{}, global_context); return res.out;
output->addTableLock(table_lock);
return fill_version ? std::make_shared<AddingVersionsBlockOutputStream>(version, output) : output;
} }
void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector<String> & tables_name, const std::function<bool()> & is_cancelled) void DatabaseMaterializeMySQL::cleanOutdatedTables()
{ {
std::unordered_map<String, String> tables_create_query; auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
for (size_t index = 0; index < tables_name.size() && !is_cancelled(); ++index) const DatabasePtr & clean_database = DatabaseCatalog::instance().getDatabase(database_name);
tables_create_query[tables_name[index]] = getCreateQuery(connection, tables_name[index]);
auto iterator = tables_create_query.begin(); for (auto iterator = clean_database->getTablesIterator(); iterator->isValid(); iterator->next())
for (; iterator != tables_create_query.end() && !is_cancelled(); ++iterator) {
String table = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name());
String comment = String("Clean ") + table + " for dump mysql.";
tryToExecuteQuery("DROP TABLE " + table, global_context, comment);
}
}
void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function<bool()> & is_cancelled)
{
auto iterator = master_info.need_dumping_tables.begin();
for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator)
{ {
const auto & table_name = iterator->first; const auto & table_name = iterator->first;
MySQLTableStruct table_struct = visitCreateQuery(iterator->second, global_context, database_name); MySQLTableStruct table_struct = visitCreateQuery(iterator->second, global_context, database_name);
String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name); String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name);
tryToExecuteQuery(toCreateQuery(table_struct, global_context), comment); tryToExecuteQuery(toCreateQuery(table_struct, global_context), global_context, comment);
BlockOutputStreamPtr out = getTableOutput(table_name, true); BlockOutputStreamPtr out = std::make_shared<AddingVersionsBlockOutputStream>(master_info.version, getTableOutput(table_name));
MySQLBlockInputStream input(connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE); MySQLBlockInputStream input(connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE);
copyData(input, *out, is_cancelled); copyData(input, *out, is_cancelled);
} }
} }
MasterStatusInfo DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock<std::mutex> & lock) std::optional<MaterializeMetadata> DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock<std::mutex> & lock, const std::function<bool()> & is_cancelled)
{ {
while(!sync_quit.load(std::memory_order_seq_cst)) while (!is_cancelled())
{ {
try try
{ {
LOG_DEBUG(log, "Checking " + database_name + " database status."); LOG_DEBUG(log, "Checking " + database_name + " database status.");
while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name)) while (!is_cancelled && !DatabaseCatalog::instance().isDatabaseExist(database_name))
sync_cond.wait_for(lock, std::chrono::seconds(1)); sync_cond.wait_for(lock, std::chrono::seconds(1));
LOG_DEBUG(log, database_name + " database status is OK."); LOG_DEBUG(log, database_name + " database status is OK.");
mysqlxx::PoolWithFailover::Entry connection = pool.get(); mysqlxx::PoolWithFailover::Entry connection = pool.get();
MasterStatusInfo master_info(connection, getMetadataPath() + "/.master_status", mysql_database_name); MaterializeMetadata metadata(connection, getMetadataPath() + "/.metadata", mysql_database_name);
if (!master_info.need_dumping_tables.empty()) if (!metadata.need_dumping_tables.empty())
{ {
/// TODO: 删除所有表结构, 这可能需要考虑到仍然有查询在使用这个表. metadata.transaction(Position(metadata.binlog_position, metadata.binlog_file), [&]()
dumpDataForTables(connection, master_info.need_dumping_tables, [&]() { return sync_quit.load(std::memory_order_seq_cst); }); {
master_info.finishDump(); cleanOutdatedTables();
dumpDataForTables(connection, metadata, is_cancelled);
});
} }
client.connect(); client.connect();
client.startBinlogDump(std::rand(), mysql_database_name, master_info.binlog_file, master_info.binlog_position); client.startBinlogDump(std::rand(), mysql_database_name, metadata.binlog_file, metadata.binlog_position);
return master_info; return metadata;
}
catch (mysqlxx::Exception & )
{
tryLogCurrentException(log);
/// Avoid busy loop when MySQL is not available.
sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable);
}
}
return {};
}
void DatabaseMaterializeMySQL::scheduleSynchronized()
{
background_thread_pool.scheduleOrThrowOnError([&]()
{
ThreadStatus thread_status;
setThreadName("MySQLDBSync");
std::unique_lock<std::mutex> lock(sync_mutex);
const auto quit_requested = [this] { return sync_quit.load(std::memory_order_relaxed); };
try
{
std::optional<MaterializeMetadata> metadata = prepareSynchronized(lock, quit_requested);
if (!quit_requested() && metadata)
{
EventConsumer consumer(getDatabaseName(), global_context, *metadata, *settings);
while (!quit_requested())
{
const auto & event = client.readOneBinlogEvent();
consumer.onEvent(event, client.getPosition());
}
}
} }
catch(...) catch(...)
{ {
tryLogCurrentException(log, "Prepare MySQL Synchronized exception and retry"); setException(std::current_exception());
sleepForSeconds(1);
} }
} });
throw Exception("", ErrorCodes::LOGICAL_ERROR);
} }
DatabaseMaterializeMySQL::~DatabaseMaterializeMySQL()
void DatabaseMaterializeMySQL::synchronized()
{ {
setThreadName("MySQLDBSync");
try try
{ {
std::unique_lock<std::mutex> lock{sync_mutex}; if (!sync_quit)
MasterStatusInfo master_status = prepareSynchronized(lock);
DataBuffers buffers(version, this, [&](const std::unordered_map<String, Block> & tables_data)
{ {
master_status.transaction(client.getPosition(), [&]() /// At least once, There is only one possible reference: https://github.com/ClickHouse/ClickHouse/pull/8467
{ {
for (const auto & [table_name, data] : tables_data) sync_quit = true;
{ std::lock_guard<std::mutex> lock(sync_mutex);
if (!sync_quit.load(std::memory_order_seq_cst)) }
{
LOG_DEBUG(log, "Prepare to flush data.");
BlockOutputStreamPtr output = getTableOutput(table_name, false);
output->writePrefix();
output->write(data);
output->writeSuffix();
output->flush();
LOG_DEBUG(log, "Finish data flush.");
}
}
});
});
while (!sync_quit.load(std::memory_order_seq_cst)) sync_cond.notify_one();
{ background_thread_pool.wait();
const auto & event = client.readOneBinlogEvent();
if (event->type() == MYSQL_WRITE_ROWS_EVENT)
{
WriteRowsEvent & write_rows_event = static_cast<WriteRowsEvent &>(*event);
write_rows_event.dump();
buffers.writeData(write_rows_event.table, write_rows_event.rows);
}
else if (event->type() == MYSQL_UPDATE_ROWS_EVENT)
{
UpdateRowsEvent & update_rows_event = static_cast<UpdateRowsEvent &>(*event);
update_rows_event.dump();
buffers.updateData(update_rows_event.table, update_rows_event.rows);
}
else if (event->type() == MYSQL_DELETE_ROWS_EVENT)
{
DeleteRowsEvent & delete_rows_event = static_cast<DeleteRowsEvent &>(*event);
delete_rows_event.dump();
buffers.deleteData(delete_rows_event.table, delete_rows_event.rows);
}
else if (event->type() == MYSQL_QUERY_EVENT)
{
/// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL.
buffers.flush();
}
} }
} }
catch(...) catch (...)
{ {
tryLogCurrentException(log); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "config_core.h" #include "config_core.h"
#if USE_MYSQL #if USE_MYSQL
# include <mutex> # include <mutex>
@ -10,7 +11,9 @@
# include <DataTypes/DataTypesNumber.h> # include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseOrdinary.h> # include <Databases/DatabaseOrdinary.h>
# include <Databases/IDatabase.h> # include <Databases/IDatabase.h>
# include <Databases/MySQL/MasterStatusInfo.h> # include <Databases/MySQL/DatabaseMaterializeMySQLWrap.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Databases/MySQL/MaterializeModeSettings.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h> # include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Parsers/ASTCreateQuery.h> # include <Parsers/ASTCreateQuery.h>
# include <mysqlxx/Pool.h> # include <mysqlxx/Pool.h>
@ -19,149 +22,41 @@
namespace DB namespace DB
{ {
class DatabaseMaterializeMySQL : public IDatabase class DatabaseMaterializeMySQL : public DatabaseMaterializeMySQLWrap
{ {
public: public:
~DatabaseMaterializeMySQL() override;
DatabaseMaterializeMySQL( DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_, const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_); MySQLClient && client_, std::unique_ptr<MaterializeModeSettings> settings_);
String getEngineName() const override { return "MySQL"; } String getEngineName() const override { return "MySQL"; }
void shutdown() override { nested_database->shutdown(); }
bool empty() const override { return nested_database->empty(); }
String getDataPath() const override { return nested_database->getDataPath(); }
String getTableDataPath(const String & string) const override { return nested_database->getTableDataPath(string); }
String getTableDataPath(const ASTCreateQuery & query) const override { return nested_database->getTableDataPath(query); }
UUID tryGetTableUUID(const String & string) const override { return nested_database->tryGetTableUUID(string); }
bool isDictionaryExist(const String & string) const override { return nested_database->isDictionaryExist(string); }
DatabaseDictionariesIteratorPtr getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name) override
{
return nested_database->getDictionariesIterator(filter_by_dictionary_name);
}
void createTable(const Context & context, const String & string, const StoragePtr & ptr, const ASTPtr & astPtr) override
{
nested_database->createTable(context, string, ptr, astPtr);
}
void createDictionary(const Context & context, const String & string, const ASTPtr & ptr) override
{
nested_database->createDictionary(context, string, ptr);
}
void dropTable(const Context & context, const String & string, bool no_delay) override
{
nested_database->dropTable(context, string, no_delay);
}
void removeDictionary(const Context & context, const String & string) override { nested_database->removeDictionary(context, string); }
void attachTable(const String & string, const StoragePtr & ptr, const String & relative_table_path) override
{
nested_database->attachTable(string, ptr, relative_table_path);
}
void attachDictionary(const String & string, const DictionaryAttachInfo & info) override { nested_database->attachDictionary(string, info); }
StoragePtr detachTable(const String & string) override { return nested_database->detachTable(string); }
void detachDictionary(const String & string) override { nested_database->detachDictionary(string); }
void renameTable(const Context & context, const String & string, IDatabase & database, const String & string1, bool b) override
{
nested_database->renameTable(context, string, database, string1, b);
}
void alterTable(const Context & context, const StorageID & id, const StorageInMemoryMetadata & metadata) override
{
nested_database->alterTable(context, id, metadata);
}
time_t getObjectMetadataModificationTime(const String & string) const override
{
return nested_database->getObjectMetadataModificationTime(string);
}
Poco::AutoPtr<Poco::Util::AbstractConfiguration> getDictionaryConfiguration(const String & string) const override
{
return nested_database->getDictionaryConfiguration(string);
}
String getMetadataPath() const override { return nested_database->getMetadataPath(); }
String getObjectMetadataPath(const String & string) const override { return nested_database->getObjectMetadataPath(string); }
bool shouldBeEmptyOnDetach() const override { return nested_database->shouldBeEmptyOnDetach(); }
void drop(const Context & context) override { nested_database->drop(context); }
bool isTableExist(const String & name) const override { return nested_database->isTableExist(name); }
StoragePtr tryGetTable(const String & name) const override { return nested_database->tryGetTable(name); }
void loadStoredObjects(Context & context, bool b) override
{
try
{
LOG_DEBUG(log, "Loading MySQL nested database stored objects.");
nested_database->loadStoredObjects(context, b);
LOG_DEBUG(log, "Loaded MySQL nested database stored objects.");
}
catch (...)
{
tryLogCurrentException(log, "Cannot load MySQL nested database stored objects.");
throw;
}
}
ASTPtr getCreateDatabaseQuery() const override
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override { return nested_database->getTablesIterator(filter_by_table_name); }
private: private:
const Context & global_context; const Context & global_context;
String metadata_path; String metadata_path;
ASTPtr database_engine_define;
String mysql_database_name; String mysql_database_name;
DatabasePtr nested_database;
size_t version{0};
mutable mysqlxx::Pool pool; mutable mysqlxx::Pool pool;
mutable MySQLClient client; mutable MySQLClient client;
std::unique_ptr<MaterializeModeSettings> settings;
Poco::Logger * log; void cleanOutdatedTables();
void synchronized(); void scheduleSynchronized();
MasterStatusInfo prepareSynchronized(std::unique_lock<std::mutex> & lock); BlockOutputStreamPtr getTableOutput(const String & table_name);
BlockOutputStreamPtr getTableOutput(const String & table_name, bool fill_version); std::optional<MaterializeMetadata> prepareSynchronized(std::unique_lock<std::mutex> & lock, const std::function<bool()> & is_cancelled);
BlockIO tryToExecuteQuery(const String & query_to_execute, const String & comment); void dumpDataForTables(mysqlxx::Pool::Entry & connection, MaterializeMetadata & master_info, const std::function<bool()> & is_cancelled);
String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name); std::mutex sync_mutex;
void dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector<String> & tables_name, const std::function<bool()> & is_cancelled);
mutable std::mutex sync_mutex;
std::atomic<bool> sync_quit{false}; std::atomic<bool> sync_quit{false};
std::condition_variable sync_cond; std::condition_variable sync_cond;
ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronized, this}; ThreadPool background_thread_pool{1};
}; };
} }

View File

@ -0,0 +1,188 @@
#include <Databases/MySQL/DatabaseMaterializeMySQLWrap.h>
#include <Common/setThreadName.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/StorageMaterializeMySQL.h>
#include <Databases/MySQL/DatabaseMaterializeTablesIterator.h>
namespace DB
{
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
DatabaseMaterializeMySQLWrap::DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name)
: IDatabase(nested_database_->getDatabaseName()), nested_database(nested_database_), database_engine_define(database_engine_define_), log(&Logger::get(log_name))
{
}
void DatabaseMaterializeMySQLWrap::setException(const std::exception_ptr & exception_)
{
std::unique_lock<std::mutex> lock(mutex);
exception = exception_;
}
DatabasePtr DatabaseMaterializeMySQLWrap::getNestedDatabase() const
{
std::unique_lock<std::mutex> lock(mutex);
if (exception)
std::rethrow_exception(exception);
return nested_database;
}
ASTPtr DatabaseMaterializeMySQLWrap::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
void DatabaseMaterializeMySQLWrap::loadStoredObjects(Context & context, bool has_force_restore_data_flag)
{
try
{
LOG_DEBUG(log, "Loading MySQL nested database stored objects.");
getNestedDatabase()->loadStoredObjects(context, has_force_restore_data_flag);
LOG_DEBUG(log, "Loaded MySQL nested database stored objects.");
}
catch (...)
{
tryLogCurrentException(log, "Cannot load MySQL nested database stored objects.");
throw;
}
}
void DatabaseMaterializeMySQLWrap::shutdown()
{
getNestedDatabase()->shutdown();
}
bool DatabaseMaterializeMySQLWrap::empty() const
{
return getNestedDatabase()->empty();
}
String DatabaseMaterializeMySQLWrap::getDataPath() const
{
return getNestedDatabase()->getDataPath();
}
String DatabaseMaterializeMySQLWrap::getMetadataPath() const
{
return getNestedDatabase()->getMetadataPath();
}
String DatabaseMaterializeMySQLWrap::getTableDataPath(const String & table_name) const
{
return getNestedDatabase()->getTableDataPath(table_name);
}
String DatabaseMaterializeMySQLWrap::getTableDataPath(const ASTCreateQuery & query) const
{
return getNestedDatabase()->getTableDataPath(query);
}
String DatabaseMaterializeMySQLWrap::getObjectMetadataPath(const String & table_name) const
{
return getNestedDatabase()->getObjectMetadataPath(table_name);
}
UUID DatabaseMaterializeMySQLWrap::tryGetTableUUID(const String & table_name) const
{
return getNestedDatabase()->tryGetTableUUID(table_name);
}
time_t DatabaseMaterializeMySQLWrap::getObjectMetadataModificationTime(const String & name) const
{
return getNestedDatabase()->getObjectMetadataModificationTime(name);
}
void DatabaseMaterializeMySQLWrap::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->createTable(context, name, table, query);
}
void DatabaseMaterializeMySQLWrap::dropTable(const Context & context, const String & name, bool no_delay)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support drop table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->dropTable(context, name, no_delay);
}
void DatabaseMaterializeMySQLWrap::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->attachTable(name, table, relative_table_path);
}
StoragePtr DatabaseMaterializeMySQLWrap::detachTable(const String & name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
return getNestedDatabase()->detachTable(name);
}
void DatabaseMaterializeMySQLWrap::renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->renameTable(context, name, to_database, to_name, exchange);
}
void DatabaseMaterializeMySQLWrap::alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
throw Exception("MySQL database in locality_data mode does not support alter table.", ErrorCodes::NOT_IMPLEMENTED);
getNestedDatabase()->alterTable(context, table_id, metadata);
}
bool DatabaseMaterializeMySQLWrap::shouldBeEmptyOnDetach() const
{
return getNestedDatabase()->shouldBeEmptyOnDetach();
}
void DatabaseMaterializeMySQLWrap::drop(const Context & context)
{
getNestedDatabase()->drop(context);
}
bool DatabaseMaterializeMySQLWrap::isTableExist(const String & name) const
{
return getNestedDatabase()->isTableExist(name);
}
StoragePtr DatabaseMaterializeMySQLWrap::tryGetTable(const String & name) const
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
return std::make_shared<StorageMaterializeMySQL>(getNestedDatabase()->tryGetTable(name));
return getNestedDatabase()->tryGetTable(name);
}
DatabaseTablesIteratorPtr DatabaseMaterializeMySQLWrap::getTablesIterator(const FilterByNameFunction & filter_by_table_name)
{
if (getThreadName() != MYSQL_BACKGROUND_THREAD_NAME)
{
DatabaseTablesIteratorPtr iterator = getNestedDatabase()->getTablesIterator(filter_by_table_name);
return std::make_unique<DatabaseMaterializeTablesIterator>(std::move(iterator));
}
return getNestedDatabase()->getTablesIterator(filter_by_table_name);
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <Databases/IDatabase.h>
namespace DB
{
class DatabaseMaterializeMySQLWrap : public IDatabase
{
public:
ASTPtr getCreateDatabaseQuery() const override;
void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override;
DatabaseMaterializeMySQLWrap(const DatabasePtr & nested_database_, const ASTPtr & database_engine_define_, const String & log_name);
protected:
DatabasePtr nested_database;
ASTPtr database_engine_define;
Poco::Logger * log;
mutable std::mutex mutex;
std::exception_ptr exception;
DatabasePtr getNestedDatabase() const;
void setException(const std::exception_ptr & exception);
public:
void shutdown() override;
bool empty() const override;
String getDataPath() const override;
String getTableDataPath(const String & table_name) const override;
String getTableDataPath(const ASTCreateQuery & query) const override;
UUID tryGetTableUUID(const String & table_name) const override;
void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void dropTable(const Context & context, const String & name, bool no_delay) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & name) override;
void renameTable(const Context & context, const String & name, IDatabase & to_database, const String & to_name, bool exchange) override;
void alterTable(const Context & context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(const String & name) const override;
String getMetadataPath() const override;
String getObjectMetadataPath(const String & table_name) const override;
bool shouldBeEmptyOnDetach() const override;
void drop(const Context & context) override;
bool isTableExist(const String & name) const override;
StoragePtr tryGetTable(const String & name) const override;
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
};
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Databases/IDatabase.h>
namespace DB
{
class DatabaseMaterializeTablesIterator final : public IDatabaseTablesIterator
{
public:
virtual void next() { nested_iterator->next(); }
virtual bool isValid() const { return nested_iterator->isValid(); }
virtual const String & name() const { return nested_iterator->name(); }
virtual const StoragePtr & table() const
{
StoragePtr storage = std::make_shared<StorageMaterializeMySQL>(nested_iterator->table());
return tables.emplace_back(storage);
}
virtual UUID uuid() const { return nested_iterator->uuid(); }
DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_) : nested_iterator(std::move(nested_iterator_))
{}
private:
mutable std::vector<StoragePtr> tables;
DatabaseTablesIteratorPtr nested_iterator;
};
}

View File

@ -0,0 +1,261 @@
#include <Databases/MySQL/EventConsumer.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/copyData.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/executeQuery.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Common/assert_cast.h>
#include <Common/setThreadName.h>
namespace DB
{
using namespace MySQLReplication;
EventConsumer::~EventConsumer()
{
if (!quit && !background_exception)
{
{
quit = true;
std::lock_guard<std::mutex> lock(mutex);
}
cond.notify_one();
background_thread_pool.wait();
}
}
EventConsumer::EventConsumer(
const String & database_, const Context & context_, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_)
: metadata(metadata_), context(context_), settings(settings_), database(database_), prev_version(metadata.version)
{
background_thread_pool.scheduleOrThrowOnError([&]()
{
ThreadStatus thread_status;
setThreadName("MySQLDBSync");
std::unique_lock<std::mutex> lock(mutex);
const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); };
while (!quit_requested() && !background_exception)
{
if (!buffers.empty() && total_bytes_in_buffers)
flushBuffers();
cond.wait_for(lock, std::chrono::milliseconds(settings.max_flush_data_time), quit_requested);
}
});
}
void EventConsumer::onWriteData(const String & table_name, const std::vector<Field> & rows_data)
{
BufferPtr buffer = getTableBuffer(table_name);
size_t prev_bytes = buffer->data.bytes();
for (size_t column = 0; column < buffer->data.columns() - 2; ++column)
{
MutableColumnPtr col_to = (*std::move(buffer->data.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
fillSignColumnsAndMayFlush(buffer->data, 1, ++metadata.version, rows_data.size(), prev_bytes);
}
static inline bool differenceSortingKeys(const Tuple & row_old_data, const Tuple & row_new_data, const std::vector<size_t> sorting_columns_index)
{
for (const auto & sorting_column_index : sorting_columns_index)
if (row_old_data[sorting_column_index] != row_new_data[sorting_column_index])
return true;
return false;
}
void EventConsumer::onUpdateData(const String & table_name, const std::vector<Field> & rows_data)
{
if (rows_data.size() % 2 != 0)
throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR);
BufferPtr buffer = getTableBuffer(table_name);
size_t prev_bytes = buffer->data.bytes();
std::vector<bool> difference_sorting_keys_mark(rows_data.size() / 2);
for (size_t index = 0; index < rows_data.size(); index += 2)
difference_sorting_keys_mark.emplace_back(differenceSortingKeys(
DB::get<const Tuple &>(rows_data[index]), DB::get<const Tuple &>(rows_data[index + 1]), buffer->sorting_columns_index));
for (size_t column = 0; column < buffer->data.columns() - 2; ++column)
{
MutableColumnPtr col_to = (*std::move(buffer->data.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
else
{
/// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
}
}
}
MutableColumnPtr sign_mutable_column = (*std::move(buffer->data.getByPosition(buffer->data.columns() - 2)).column).mutate();
MutableColumnPtr version_mutable_column = (*std::move(buffer->data.getByPosition(buffer->data.columns() - 1)).column).mutate();
ColumnInt8::Container & sign_column_data = assert_cast<ColumnInt8 &>(*sign_mutable_column).getData();
ColumnUInt64::Container & version_column_data = assert_cast<ColumnUInt64 &>(*version_mutable_column).getData();
UInt64 new_version = ++metadata.version;
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
{
sign_column_data.emplace_back(1);
version_column_data.emplace_back(new_version);
}
else
{
/// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently
sign_column_data.emplace_back(-1);
sign_column_data.emplace_back(1);
version_column_data.emplace_back(new_version);
version_column_data.emplace_back(new_version);
}
}
total_bytes_in_buffers += (buffer->data.bytes() - prev_bytes);
if (buffer->data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers)
flushBuffers();
}
void EventConsumer::onDeleteData(const String & table_name, const std::vector<Field> & rows_data)
{
BufferPtr buffer = getTableBuffer(table_name);
size_t prev_bytes = buffer->data.bytes();
for (size_t column = 0; column < buffer->data.columns() - 2; ++column)
{
MutableColumnPtr col_to = (*std::move(buffer->data.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
fillSignColumnsAndMayFlush(buffer->data, -1, ++metadata.version, rows_data.size(), prev_bytes);
}
EventConsumer::BufferPtr EventConsumer::getTableBuffer(const String & table_name)
{
if (buffers.find(table_name) == buffers.end())
{
StoragePtr storage = DatabaseCatalog::instance().getDatabase(database)->tryGetTable(table_name);
buffers[table_name] = std::make_shared<Buffer>();
buffers[table_name]->data = storage->getSampleBlockNonMaterialized();
if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(storage.get()))
{
Names required_for_sorting_key = table_merge_tree->getColumnsRequiredForSortingKey();
for (const auto & required_name_for_sorting_key : required_for_sorting_key)
buffers[table_name]->sorting_columns_index.emplace_back(
buffers[table_name]->data.getPositionByName(required_name_for_sorting_key));
}
}
return buffers[table_name];
}
void EventConsumer::onEvent(const BinlogEventPtr & receive_event, const MySQLReplication::Position & position)
{
std::unique_lock<std::mutex> lock(mutex);
if (background_exception)
background_thread_pool.wait();
last_position = position;
if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT)
{
WriteRowsEvent & write_rows_event = static_cast<WriteRowsEvent &>(*receive_event);
write_rows_event.dump();
onWriteData(write_rows_event.table, write_rows_event.rows);
}
else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT)
{
UpdateRowsEvent & update_rows_event = static_cast<UpdateRowsEvent &>(*receive_event);
update_rows_event.dump();
onUpdateData(update_rows_event.table, update_rows_event.rows);
}
else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT)
{
DeleteRowsEvent & delete_rows_event = static_cast<DeleteRowsEvent &>(*receive_event);
delete_rows_event.dump();
onDeleteData(delete_rows_event.table, delete_rows_event.rows);
}
else if (receive_event->type() == MYSQL_QUERY_EVENT)
{
/// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL.
// flush_function();
/// TODO: 直接使用Interpreter执行即可
}
}
void EventConsumer::fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes)
{
MutableColumnPtr sign_mutable_column = (*std::move(data.getByPosition(data.columns() - 2)).column).mutate();
MutableColumnPtr version_mutable_column = (*std::move(data.getByPosition(data.columns() - 1)).column).mutate();
ColumnInt8::Container & sign_column_data = assert_cast<ColumnInt8 &>(*sign_mutable_column).getData();
ColumnUInt64::Container & version_column_data = assert_cast<ColumnUInt64 &>(*version_mutable_column).getData();
for (size_t index = 0; index < fill_size; ++index)
{
sign_column_data.emplace_back(sign_value);
version_column_data.emplace_back(version_value);
}
total_bytes_in_buffers += (data.bytes() - prev_bytes);
if (data.rows() >= settings.max_rows_in_buffer || total_bytes_in_buffers >= settings.max_bytes_in_buffers)
flushBuffers();
}
void EventConsumer::flushBuffers()
{
/// TODO: 事务保证
try
{
for (auto & table_name_and_buffer : buffers)
{
const String & table_name = table_name_and_buffer.first;
BufferPtr & buffer = table_name_and_buffer.second;
Context query_context = context;
query_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context.setCurrentQueryId(""); // generate random query_id
String with_database_table_name = backQuoteIfNeed(database) + "." + backQuoteIfNeed(table_name);
BlockIO res = executeQuery("INSERT INTO " + with_database_table_name + " VALUES", query_context, true);
OneBlockInputStream input(buffer->data);
copyData(input, *res.out);
}
buffers.clear();
total_bytes_in_buffers = 0;
prev_version = metadata.version;
}
catch(...)
{
buffers.clear();
total_bytes_in_buffers = 0;
metadata.version = prev_version;
background_exception = true;
throw;
}
}
}

View File

@ -0,0 +1,63 @@
#pragma once
#include <unordered_map>
#include <Columns/IColumn.h>
#include <Core/Field.h>
#include <Core/MySQLReplication.h>
#include <Databases/IDatabase.h>
#include <Databases/MySQL/MaterializeModeSettings.h>
#include <boost/noncopyable.hpp>
#include <Common/ThreadPool.h>
#include "MaterializeMetadata.h"
namespace DB
{
class EventConsumer : private boost::noncopyable
{
public:
~EventConsumer();
void onEvent(const MySQLReplication::BinlogEventPtr & event, const MySQLReplication::Position & position);
EventConsumer(const String & database_, const Context & context, MaterializeMetadata & metadata_, MaterializeModeSettings & settings_);
private:
MaterializeMetadata & metadata;
const Context & context;
const MaterializeModeSettings & settings;
String database;
size_t prev_version;
size_t total_bytes_in_buffers = 0;
MySQLReplication::Position last_position;
struct Buffer
{
Block data;
std::vector<size_t> sorting_columns_index;
};
using BufferPtr = std::shared_ptr<Buffer>;
std::unordered_map<String, BufferPtr> buffers;
void flushBuffers();
BufferPtr getTableBuffer(const String & table_name);
void onWriteData(const std::string & table_name, const std::vector<Field> & rows_data);
void onUpdateData(const std::string & table_name, const std::vector<Field> & rows_data);
void onDeleteData(const std::string & table_name, const std::vector<Field> & rows_data);
void fillSignColumnsAndMayFlush(Block & data, Int8 sign_value, UInt64 version_value, size_t fill_size, size_t prev_bytes);
mutable std::mutex mutex;
std::condition_variable cond;
std::atomic_bool quit = false;
std::atomic_bool background_exception = false;
ThreadPool background_thread_pool{1};
};
}

View File

@ -1,17 +1,44 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Databases/MySQL/MasterStatusInfo.h> #include <Databases/MySQL/MaterializeMetadata.h>
#include <Formats/MySQLBlockInputStream.h> #include <Formats/MySQLBlockInputStream.h>
#include <Common/quoteString.h>
#include <Poco/File.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <Poco/File.h>
#include <Common/quoteString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB namespace DB
{ {
static std::unordered_map<String, String> fetchTablesCreateQuery(
const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name, const std::vector<String> & fetch_tables)
{
std::unordered_map<String, String> tables_create_query;
for (size_t index = 0; index < fetch_tables.size(); ++index)
{
Block show_create_table_header{
{std::make_shared<DataTypeString>(), "Table"},
{std::make_shared<DataTypeString>(), "Create Table"},
};
MySQLBlockInputStream show_create_table(
connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_tables[index]),
show_create_table_header, DEFAULT_BLOCK_SIZE);
Block create_query_block = show_create_table.read();
if (!create_query_block || create_query_block.rows() != 1)
throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR);
tables_create_query[fetch_tables[index]] = create_query_block.getByName("Create Table").column->getDataAt(0).toString();
}
return tables_create_query;
}
static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database) static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database)
{ {
Block header{{std::make_shared<DataTypeString>(), "table_name"}}; Block header{{std::make_shared<DataTypeString>(), "table_name"}};
@ -29,7 +56,7 @@ static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entr
return tables_in_db; return tables_in_db;
} }
void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection) void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection)
{ {
Block header{ Block header{
{std::make_shared<DataTypeString>(), "File"}, {std::make_shared<DataTypeString>(), "File"},
@ -45,6 +72,7 @@ void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & conn
if (!master_status || master_status.rows() != 1) if (!master_status || master_status.rows() != 1)
throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR); throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR);
version = 0;
binlog_file = (*master_status.getByPosition(0).column)[0].safeGet<String>(); binlog_file = (*master_status.getByPosition(0).column)[0].safeGet<String>();
binlog_position = (*master_status.getByPosition(1).column)[0].safeGet<UInt64>(); binlog_position = (*master_status.getByPosition(1).column)[0].safeGet<UInt64>();
binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet<String>(); binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet<String>();
@ -52,7 +80,7 @@ void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & conn
executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet<String>(); executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet<String>();
} }
bool MasterStatusInfo::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection)
{ {
Block header{ Block header{
{std::make_shared<DataTypeString>(), "Log_name"}, {std::make_shared<DataTypeString>(), "Log_name"},
@ -73,53 +101,65 @@ bool MasterStatusInfo::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry &
} }
return false; return false;
} }
void MasterStatusInfo::finishDump()
{
WriteBufferFromFile out(persistent_path);
out << "Version:\t1\n"
<< "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db
<< "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set;
out.next(); void commitMetadata(const std::function<void()> & function, const String & persistent_tmp_path, const String & persistent_path)
out.sync(); {
try
{
function();
Poco::File(persistent_tmp_path).renameTo(persistent_path);
}
catch (...)
{
Poco::File(persistent_tmp_path).remove();
throw;
}
} }
void MasterStatusInfo::transaction(const MySQLReplication::Position & position, const std::function<void()> & fun) void MaterializeMetadata::transaction(const MySQLReplication::Position & position, const std::function<void()> & fun)
{ {
binlog_file = position.binlog_name; binlog_file = position.binlog_name;
binlog_position = position.binlog_pos; binlog_position = position.binlog_pos;
String persistent_tmp_path = persistent_path + ".tmp";
{ {
Poco::File temp_file(persistent_path + ".temp"); WriteBufferFromFile out(persistent_tmp_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT | O_EXCL);
if (temp_file.exists())
temp_file.remove(); /// TSV format metadata file.
writeString("Version:\t1\n", out);
writeString("Binlog File:\t" + binlog_file + "\n", out);
writeString("Executed GTID:\t" + executed_gtid_set + "\n", out);
writeString("Binlog Position:\t" + toString(binlog_position) + "\n", out);
writeString("Data Version:\t" + toString(version) + "\n", out);
out.next();
out.sync();
out.close();
} }
WriteBufferFromFile out(persistent_path + ".temp"); commitMetadata(fun, persistent_tmp_path, persistent_path);
out << "Version:\t1\n"
<< "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db
<< "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set;
out.next();
out.sync();
fun();
Poco::File(persistent_path + ".temp").renameTo(persistent_path);
} }
MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database) MaterializeMetadata::MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database)
: persistent_path(path_) : persistent_path(path_)
{ {
if (Poco::File(persistent_path).exists()) if (Poco::File(persistent_path).exists())
{ {
ReadBufferFromFile in(persistent_path); ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE);
in >> "Version:\t1\n" >> "Binlog File:\t" >> binlog_file >> "\nBinlog Position:\t" >> binlog_position >> "\nBinlog Do DB:\t" assertString("Version:\t1\n", in);
>> binlog_do_db >> "\nBinlog Ignore DB:\t" >> binlog_ignore_db >> "\nExecuted GTID SET:\t" >> executed_gtid_set; assertString("Binlog File:\t", in);
readString(binlog_file, in);
assertString("Executed GTID:\t", in);
readString(executed_gtid_set, in);
assertString("Binlog Position:\t", in);
readIntText(binlog_position, in);
assertString("Data Version:\t", in);
readIntText(version, in);
if (checkBinlogFileExists(connection)) if (checkBinlogFileExists(connection))
{
std::cout << "Load From File \n";
return; return;
}
} }
bool locked_tables = false; bool locked_tables = false;
@ -134,9 +174,8 @@ MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute(); connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
need_dumping_tables = fetchTablesInDB(connection, database); need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database));
connection->query("UNLOCK TABLES;").execute(); connection->query("UNLOCK TABLES;").execute();
/// TODO: 拉取建表语句, 解析并构建出表结构(列列表, 主键, 唯一索引, 分区键)
} }
catch (...) catch (...)
{ {

View File

@ -8,7 +8,7 @@
namespace DB namespace DB
{ {
struct MasterStatusInfo struct MaterializeMetadata
{ {
const String persistent_path; const String persistent_path;
@ -18,9 +18,8 @@ struct MasterStatusInfo
String binlog_ignore_db; String binlog_ignore_db;
String executed_gtid_set; String executed_gtid_set;
std::vector<String> need_dumping_tables; size_t version = 0;
std::unordered_map<String, String> need_dumping_tables;
void finishDump();
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection); void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);
@ -28,9 +27,7 @@ struct MasterStatusInfo
void transaction(const MySQLReplication::Position & position, const std::function<void()> & fun); void transaction(const MySQLReplication::Position & position, const std::function<void()> & fun);
MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database); MaterializeMetadata(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database);
}; };
} }

View File

@ -0,0 +1,42 @@
#include <Databases/MySQL/MaterializeModeSettings.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
#include <Core/SettingsCollectionImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_COLLECTION(MaterializeModeSettings, LIST_OF_MATERIALIZE_MODE_SETTINGS)
void MaterializeModeSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
throw Exception(e.message() + " for storage " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS);
else
e.rethrow();
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Core/Defines.h>
#include <Core/SettingsCollection.h>
namespace DB
{
class ASTStorage;
/** Settings for the MySQL Database engine(materialize mode).
* Could be loaded from a CREATE DATABASE query (SETTINGS clause).
*/
struct MaterializeModeSettings : public SettingsCollection<MaterializeModeSettings>
{
#define LIST_OF_MATERIALIZE_MODE_SETTINGS(M) \
M(SettingBool, locality_data, false, "", 0) \
M(SettingUInt64, max_rows_in_buffer, DEFAULT_BLOCK_SIZE, "", 0) \
M(SettingUInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "", 0) \
M(SettingUInt64, max_flush_data_time, 1000, "", 0) \
M(SettingUInt64, max_wait_time_when_mysql_unavailable, 1000, "", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_MATERIALIZE_MODE_SETTINGS)
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -137,13 +137,16 @@ String getUniqueColumnName(NamesAndTypesList columns_name_and_type, const String
String toCreateQuery(const MySQLTableStruct & table_struct, const Context & context) String toCreateQuery(const MySQLTableStruct & table_struct, const Context & context)
{ {
/// TODO: settings /// TODO: settings
String create_version = getUniqueColumnName(table_struct.columns_name_and_type, "_create_version"); if (table_struct.primary_keys.empty())
String delete_version = getUniqueColumnName(table_struct.columns_name_and_type, "_delete_version"); throw Exception("", ErrorCodes::NOT_IMPLEMENTED);
WriteBufferFromOwnString out; WriteBufferFromOwnString out;
String sign = getUniqueColumnName(table_struct.columns_name_and_type, "__sign");
String version = getUniqueColumnName(table_struct.columns_name_and_type, "__version");
out << "CREATE TABLE " << (table_struct.if_not_exists ? "IF NOT EXISTS" : "") out << "CREATE TABLE " << (table_struct.if_not_exists ? "IF NOT EXISTS" : "")
<< backQuoteIfNeed(table_struct.database_name) + "." << backQuoteIfNeed(table_struct.table_name) << "(" << backQuoteIfNeed(table_struct.database_name) + "." << backQuoteIfNeed(table_struct.table_name)
<< queryToString(InterpreterCreateQuery::formatColumns(table_struct.columns_name_and_type)) << "(" << queryToString(InterpreterCreateQuery::formatColumns(table_struct.columns_name_and_type))
<< ", " << create_version << " UInt64, " << delete_version << " UInt64" << ") ENGINE = MergeTree()" << ", " << sign << " Int8, " << version << " UInt64" << ") ENGINE = ReplacingMergeTree(" + version + ")"
<< " PARTITION BY " << queryToString(getFormattedPartitionByExpression(table_struct, context, 1000, 50000)) << " PARTITION BY " << queryToString(getFormattedPartitionByExpression(table_struct, context, 1000, 50000))
<< " ORDER BY " << queryToString(getFormattedOrderByExpression(table_struct)); << " ORDER BY " << queryToString(getFormattedOrderByExpression(table_struct));
return out.str(); return out.str();

View File

@ -28,7 +28,11 @@ bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & p
if (!s_eq.ignore(pos, expected)) if (!s_eq.ignore(pos, expected))
return false; return false;
if (!value_p.parse(pos, value, expected)) if (ParserKeyword("TRUE").ignore(pos, expected))
value = std::make_shared<ASTLiteral>(Field(UInt64(1)));
else if (ParserKeyword("FALSE").ignore(pos, expected))
value = std::make_shared<ASTLiteral>(Field(UInt64(0)));
else if (!value_p.parse(pos, value, expected))
return false; return false;
tryGetIdentifierNameInto(name, change.name); tryGetIdentifierNameInto(name, change.name);

View File

@ -0,0 +1,56 @@
#include <Storages/StorageMaterializeMySQL.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Pipe.h>
namespace DB
{
StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_)
: IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_)
{
ColumnsDescription columns_desc;
const ColumnsDescription & nested_columns_desc = nested_storage->getColumns();
size_t index = 0;
auto iterator = nested_columns_desc.begin();
for (; index < nested_columns_desc.size() - 2; ++index, ++iterator)
columns_desc.add(*iterator);
setColumns(columns_desc);
}
Pipes StorageMaterializeMySQL::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned int num_streams)
{
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>())
{
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
if (!tables_in_select_query.children.empty())
{
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
if (tables_element.table_expression)
tables_element.table_expression->as<ASTTableExpression &>().final = true;
}
}
Names require_columns_name = column_names;
Block header = nested_storage->getSampleBlockNonMaterialized();
ColumnWithTypeAndName & sign_column = header.getByPosition(header.columns() - 2);
if (require_columns_name.end() == std::find(require_columns_name.begin(), require_columns_name.end(), sign_column.name))
require_columns_name.emplace_back(sign_column.name);
return nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams);
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Storages/IStorage.h>
namespace DB
{
class StorageMaterializeMySQL final : public ext::shared_ptr_helper<StorageMaterializeMySQL>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageMaterializeMySQL>;
public:
String getName() const override { return "MySQL"; }
bool supportsFinal() const override { return nested_storage->supportsFinal(); }
bool supportsSampling() const override { return nested_storage->supportsSampling(); }
StorageMaterializeMySQL(const StoragePtr & nested_storage_);
Pipes read(
const Names & column_names, const SelectQueryInfo & query_info,
const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;
private:
StoragePtr nested_storage;
};
}