Merge pull request #34487 from zzsmdfj/issue/32977_MaterializedMySQL_add_table_list_settings

MaterializedMySQL add materialized_mysql_tables_list settings
This commit is contained in:
tavplubix 2022-03-03 12:44:51 +01:00 committed by GitHub
commit b7d4c78f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 208 additions and 20 deletions

View File

@ -36,6 +36,7 @@ ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'passwo
- `max_flush_data_time` — Maximum number of milliseconds that data is allowed to cache in memory (for database and the cache data unable to query). When this time is exceeded, the data will be materialized. Default: `1000`.
- `max_wait_time_when_mysql_unavailable` — Retry interval when MySQL is not available (milliseconds). Negative value disables retry. Default: `1000`.
- `allows_query_when_mysql_lost` — Allows to query a materialized table when MySQL is lost. Default: `0` (`false`).
- `materialized_mysql_tables_list` — a comma-separated list of mysql database tables, which will be replicated by MaterializedMySQL database engine. Default value: empty list — means whole tables will be replicated.
```sql
CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user', '***')

View File

@ -142,7 +142,7 @@ void MySQLClient::setBinlogChecksum(const String & binlog_checksum)
replication.setChecksumSignatureLength(Poco::toUpper(binlog_checksum) == "NONE" ? 0 : 4);
}
void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid_str, const String & binlog_checksum)
void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, std::unordered_set<String> replicate_tables, String gtid_str, const String & binlog_checksum)
{
/// Maybe CRC32 or NONE. mysqlbinlog.cc use NONE, see its below comments:
/// Make a notice to the server that this client is checksum-aware.
@ -165,6 +165,7 @@ void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, Stri
/// Set Filter rule to replication.
replication.setReplicateDatabase(replicate_db);
replication.setReplicateTables(replicate_tables);
BinlogDumpGTID binlog_dump(slave_id, gtid_sets.toPayload());
packet_endpoint->sendPacket<BinlogDumpGTID>(binlog_dump, true);

View File

@ -33,7 +33,7 @@ public:
/// Start replication stream by GTID.
/// replicate_db: replication database schema, events from other databases will be ignored.
/// gtid: executed gtid sets format like 'hhhhhhhh-hhhh-hhhh-hhhh-hhhhhhhhhhhh:x-y'.
void startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid, const String & binlog_checksum);
void startBinlogDumpGTID(UInt32 slave_id, String replicate_db, std::unordered_set<String> replicate_tables, String gtid, const String & binlog_checksum);
BinlogEventPtr readOneBinlogEvent(UInt64 milliseconds = 0);
Position getPosition() const { return replication.getPosition(); }

View File

@ -142,8 +142,7 @@ namespace MySQLReplication
out << "XID: " << this->xid << '\n';
}
/// https://dev.mysql.com/doc/internals/en/table-map-event.html
void TableMapEvent::parseImpl(ReadBuffer & payload)
void TableMapEventHeader::parse(ReadBuffer & payload)
{
payload.readStrict(reinterpret_cast<char *>(&table_id), 6);
payload.readStrict(reinterpret_cast<char *>(&flags), 2);
@ -157,7 +156,11 @@ namespace MySQLReplication
table.resize(table_len);
payload.readStrict(reinterpret_cast<char *>(table.data()), table_len);
payload.ignore(1);
}
/// https://dev.mysql.com/doc/internals/en/table-map-event.html
void TableMapEvent::parseImpl(ReadBuffer & payload)
{
column_count = readLengthEncodedNumber(payload);
for (auto i = 0U; i < column_count; ++i)
{
@ -165,7 +168,6 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(&v), 1);
column_type.emplace_back(v);
}
String meta;
readLengthEncodedString(meta, payload);
parseMeta(meta);
@ -957,10 +959,20 @@ namespace MySQLReplication
}
case TABLE_MAP_EVENT:
{
event = std::make_shared<TableMapEvent>(std::move(event_header));
event->parseEvent(event_payload);
auto table_map = std::static_pointer_cast<TableMapEvent>(event);
table_maps[table_map->table_id] = table_map;
TableMapEventHeader map_event_header;
map_event_header.parse(event_payload);
if (doReplicate(map_event_header.schema, map_event_header.table))
{
event = std::make_shared<TableMapEvent>(std::move(event_header), map_event_header);
event->parseEvent(event_payload);
auto table_map = std::static_pointer_cast<TableMapEvent>(event);
table_maps[table_map->table_id] = table_map;
}
else
{
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseEvent(event_payload);
}
break;
}
case WRITE_ROWS_EVENT_V1:
@ -1030,8 +1042,21 @@ namespace MySQLReplication
// Special "dummy event"
return false;
}
auto table_map = table_maps.at(table_id);
return table_map->schema == replicate_do_db;
if (table_maps.contains(table_id))
{
auto table_map = table_maps.at(table_id);
return (table_map->schema == replicate_do_db) && (replicate_tables.empty() || replicate_tables.contains(table_map->table));
}
return false;
}
bool MySQLFlavor::doReplicate(const String & db, const String & table_name)
{
if (replicate_do_db.empty())
return false;
if (replicate_do_db != db)
return false;
return replicate_tables.empty() || table_name.empty() || replicate_tables.contains(table_name);
}
}

View File

@ -409,6 +409,20 @@ namespace MySQLReplication
void parseImpl(ReadBuffer & payload) override;
};
class TableMapEventHeader
{
public:
UInt64 table_id;
UInt16 flags;
UInt8 schema_len;
String schema;
UInt8 table_len;
String table;
TableMapEventHeader(): table_id(0), flags(0), schema_len(0), table_len(0) {}
void parse(ReadBuffer & payload);
};
class TableMapEvent : public EventBase
{
public:
@ -423,7 +437,15 @@ namespace MySQLReplication
std::vector<UInt16> column_meta;
Bitmap null_bitmap;
TableMapEvent(EventHeader && header_) : EventBase(std::move(header_)), table_id(0), flags(0), schema_len(0), table_len(0), column_count(0) {}
TableMapEvent(EventHeader && header_, const TableMapEventHeader & map_event_header) : EventBase(std::move(header_)), column_count(0)
{
table_id = map_event_header.table_id;
flags = map_event_header.flags;
schema_len = map_event_header.schema_len;
schema = map_event_header.schema;
table_len = map_event_header.table_len;
table = map_event_header.table;
}
void dump(WriteBuffer & out) const override;
protected:
@ -563,6 +585,7 @@ namespace MySQLReplication
Position getPosition() const override { return position; }
BinlogEventPtr readOneEvent() override { return event; }
void setReplicateDatabase(String db) override { replicate_do_db = std::move(db); }
void setReplicateTables(std::unordered_set<String> tables) { replicate_tables = std::move(tables); }
void setGTIDSets(GTIDSets sets) override { position.gtid_sets = std::move(sets); }
void setChecksumSignatureLength(size_t checksum_signature_length_) override { checksum_signature_length = checksum_signature_length_; }
@ -570,10 +593,13 @@ namespace MySQLReplication
Position position;
BinlogEventPtr event;
String replicate_do_db;
// only for filter data(Row Event), not include DDL Event
std::unordered_set<String> replicate_tables;
std::map<UInt64, std::shared_ptr<TableMapEvent> > table_maps;
size_t checksum_signature_length = 4;
bool doReplicate(UInt64 table_id);
bool doReplicate(const String & db, const String & table_name);
};
}

View File

@ -30,11 +30,15 @@ namespace ErrorCodes
static std::unordered_map<String, String> fetchTablesCreateQuery(
const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name,
const std::vector<String> & fetch_tables, const Settings & global_settings)
const std::vector<String> & fetch_tables, std::unordered_set<String> & materialized_tables_list,
const Settings & global_settings)
{
std::unordered_map<String, String> tables_create_query;
for (const auto & fetch_table_name : fetch_tables)
{
if (!materialized_tables_list.empty() && !materialized_tables_list.contains(fetch_table_name))
continue;
Block show_create_table_header{
{std::make_shared<DataTypeString>(), "Table"},
{std::make_shared<DataTypeString>(), "Create Table"},
@ -276,7 +280,8 @@ MaterializeMetadata::MaterializeMetadata(const String & path_, const Settings &
void MaterializeMetadata::startReplication(
mysqlxx::PoolWithFailover::Entry & connection, const String & database,
bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables)
bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables,
std::unordered_set<String> & materialized_tables_list)
{
checkSyncUserPriv(connection, settings);
@ -297,7 +302,7 @@ void MaterializeMetadata::startReplication(
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
opened_transaction = true;
need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database, settings), settings);
need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database, settings), materialized_tables_list, settings);
connection->query("UNLOCK TABLES;").execute();
}
catch (...)

View File

@ -48,7 +48,8 @@ struct MaterializeMetadata
mysqlxx::PoolWithFailover::Entry & connection,
const String & database,
bool & opened_transaction,
std::unordered_map<String, String> & need_dumping_tables);
std::unordered_map<String, String> & need_dumping_tables,
std::unordered_set<String> & materialized_tables_list);
MaterializeMetadata(const String & path_, const Settings & settings_);
};

View File

@ -16,6 +16,7 @@ class ASTStorage;
M(UInt64, max_flush_data_time, 1000, "Max milliseconds that data is allowed to cache in memory(for database and the cache data unable to query). when this time is exceeded, the data will be materialized", 0) \
M(Int64, max_wait_time_when_mysql_unavailable, 1000, "Retry interval when MySQL is not available (milliseconds). Negative value disable retry.", 0) \
M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \
M(String, materialized_mysql_tables_list, "", "a comma-separated list of mysql database tables, which will be replicated by MaterializedMySQL database engine. Default value: empty list — means whole tables will be replicated.", 0) \
DECLARE_SETTINGS_TRAITS(MaterializedMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)

View File

@ -25,6 +25,10 @@
#include <Common/setThreadName.h>
#include <base/sleep.h>
#include <base/bit_cast.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Parsers/CommonParsers.h>
#include <Parsers/ASTIdentifier.h>
namespace DB
{
@ -148,6 +152,61 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const S
}
}
static std::tuple<String, String> tryExtractTableNameFromDDL(const String & ddl)
{
String table_name;
String database_name;
if (ddl.empty()) return std::make_tuple(database_name, table_name);
bool parse_failed = false;
Tokens tokens(ddl.data(), ddl.data() + ddl.size());
IParser::Pos pos(tokens, 0);
Expected expected;
ASTPtr res;
ASTPtr table;
if (ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos, expected) || ParserKeyword("CREATE TABLE").ignore(pos, expected))
{
ParserKeyword("IF NOT EXISTS").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("ALTER TABLE").ignore(pos, expected))
{
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("DROP TABLE").ignore(pos, expected) || ParserKeyword("DROP TEMPORARY TABLE").ignore(pos, expected))
{
ParserKeyword("IF EXISTS").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("TRUNCATE").ignore(pos, expected))
{
ParserKeyword("TABLE").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("RENAME TABLE").ignore(pos, expected))
{
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else
{
parse_failed = true;
}
if (!parse_failed)
{
if (auto table_id = table->as<ASTTableIdentifier>()->getTableId())
{
database_name = table_id.database_name;
table_name = table_id.table_name;
}
}
return std::make_tuple(database_name, table_name);
}
MaterializedMySQLSyncThread::MaterializedMySQLSyncThread(
ContextPtr context_,
const String & database_name_,
@ -164,6 +223,17 @@ MaterializedMySQLSyncThread::MaterializedMySQLSyncThread(
, settings(settings_)
{
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") ";
if (!settings->materialized_mysql_tables_list.value.empty())
{
Names tables_list;
boost::split(tables_list, settings->materialized_mysql_tables_list.value, [](char c){ return c == ','; });
for (String & table_name: tables_list)
{
boost::trim(table_name);
materialized_tables_list.insert(table_name);
}
}
}
void MaterializedMySQLSyncThread::synchronization()
@ -434,7 +504,7 @@ bool MaterializedMySQLSyncThread::prepareSynchronized(MaterializeMetadata & meta
checkMySQLVariables(connection, getContext()->getSettingsRef());
std::unordered_map<String, String> need_dumping_tables;
metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables);
metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables, materialized_tables_list);
if (!need_dumping_tables.empty())
{
@ -464,7 +534,7 @@ bool MaterializedMySQLSyncThread::prepareSynchronized(MaterializeMetadata & meta
connection->query("COMMIT").execute();
client.connect();
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set, metadata.binlog_checksum);
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, materialized_tables_list, metadata.executed_gtid_set, metadata.binlog_checksum);
setSynchronizationThreadException(nullptr);
return true;
@ -792,9 +862,24 @@ void MaterializedMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_even
auto query_context = createQueryContext(getContext());
CurrentThread::QueryScope query_scope(query_context);
String query = query_event.query;
if (!materialized_tables_list.empty())
{
auto [ddl_database_name, ddl_table_name] = tryExtractTableNameFromDDL(query_event.query);
if (!ddl_table_name.empty())
{
ddl_database_name = ddl_database_name.empty() ? query_event.schema: ddl_database_name;
if (ddl_database_name != mysql_database_name || !materialized_tables_list.contains(ddl_table_name))
{
LOG_DEBUG(log, "Skip MySQL DDL: \n {}", query_event.query);
return;
}
}
}
String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data";
String event_database = query_event.schema == mysql_database_name ? database_name : "";
tryToExecuteQuery(query_prefix + query_event.query, query_context, event_database, comment);
tryToExecuteQuery(query_prefix + query, query_context, event_database, comment);
}
catch (Exception & exception)
{

View File

@ -63,6 +63,7 @@ private:
mutable MySQLClient client;
MaterializedMySQLSettings * settings;
String query_prefix;
NameSet materialized_tables_list;
// USE MySQL ERROR CODE:
// https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html

View File

@ -1183,3 +1183,39 @@ def materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, m
"\t2021\t3020399000000\t3020399000000\t00000000010100000000000000000000000000000000000000\t10\t1\t11\tvarbinary\tRED\n" +
"2\t2\t22\t9223372036854775807\t-2\t2\t22\t18446744073709551615\t-2.2\t2.2\t-2.22\t2.222\t2.2222\t2021-10-07\ttext\tvarchar\tBLOB\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786" +
"\t2021\t-3020399000000\t-46798000001\t000000000101000000D55C6E30D4095E40DCF0BBE996493E40\t11\t3\t22\tvarbinary\tGREEN\n")
def materialized_database_settings_materialized_mysql_tables_list(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database")
mysql_node.query("CREATE TABLE test_database.a (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))")
mysql_node.query("INSERT INTO test_database.a VALUES(1, 'foo')")
mysql_node.query("INSERT INTO test_database.a VALUES(2, 'bar')")
# table b(include json type, not in materialized_mysql_tables_list) can be skip
mysql_node.query("CREATE TABLE test_database.b (id INT(11) NOT NULL PRIMARY KEY, value JSON)")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializedMySQL('{}:3306', 'test_database', 'root', 'clickhouse') SETTINGS materialized_mysql_tables_list = ' a,c,d'".format(service_name))
check_query(clickhouse_node, "SELECT name from system.tables where database = 'test_database' FORMAT TSV", "a\n")
check_query(clickhouse_node, "SELECT COUNT() FROM test_database.a FORMAT TSV", "2\n")
# mysql data(binlog) can be skip
mysql_node.query("INSERT INTO test_database.b VALUES(1, '{\"name\":\"testjson\"}')")
mysql_node.query("INSERT INTO test_database.b VALUES(2, '{\"name\":\"testjson\"}')")
# irrelevant database can be skip
mysql_node.query("DROP DATABASE IF EXISTS other_database")
mysql_node.query("CREATE DATABASE other_database")
mysql_node.query("CREATE TABLE other_database.d (id INT(11) NOT NULL PRIMARY KEY, value json)")
mysql_node.query("INSERT INTO other_database.d VALUES(1, '{\"name\":\"testjson\"}')")
mysql_node.query("CREATE TABLE test_database.c (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))")
mysql_node.query("INSERT INTO test_database.c VALUES(1, 'foo')")
mysql_node.query("INSERT INTO test_database.c VALUES(2, 'bar')")
check_query(clickhouse_node, "SELECT name from system.tables where database = 'test_database' FORMAT TSV", "a\nc\n")
check_query(clickhouse_node, "SELECT COUNT() FROM test_database.c FORMAT TSV", "2\n")
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")

View File

@ -257,3 +257,7 @@ def test_table_overrides(started_cluster, started_mysql_8_0, started_mysql_5_7,
def test_materialized_database_support_all_kinds_of_mysql_datatype(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_5_7, "mysql57")
def test_materialized_database_settings_materialized_mysql_tables_list(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_database_settings_materialized_mysql_tables_list(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_database_settings_materialized_mysql_tables_list(clickhouse_node, started_mysql_5_7, "mysql57")

View File

@ -61,7 +61,9 @@ static DB::MySQLReplication::BinlogEventPtr parseSingleEventBody(
}
case DB::MySQLReplication::TABLE_MAP_EVENT:
{
event = std::make_shared<DB::MySQLReplication::TableMapEvent>(std::move(header));
DB::MySQLReplication::TableMapEventHeader map_event_header;
map_event_header.parse(*event_payload);
event = std::make_shared<DB::MySQLReplication::TableMapEvent>(std::move(header), map_event_header);
event->parseEvent(*event_payload);
last_table_map_event = std::static_pointer_cast<DB::MySQLReplication::TableMapEvent>(event);
break;