diff --git a/src/Core/MySQLReplication.cpp b/src/Core/MySQLReplication.cpp index 934a262b36c..db0c70d07a1 100644 --- a/src/Core/MySQLReplication.cpp +++ b/src/Core/MySQLReplication.cpp @@ -27,15 +27,15 @@ namespace MySQLReplication payload.readStrict(reinterpret_cast(&flags), 2); } - void EventHeader::dump() const + void EventHeader::dump(std::ostream & out) const { - std::cerr << "\n=== " << to_string(this->type) << " ===" << std::endl; - std::cerr << "Timestamp: " << this->timestamp << std::endl; - std::cerr << "Event Type: " << this->type << std::endl; - std::cerr << "Server ID: " << this->server_id << std::endl; - std::cerr << "Event Size: " << this->event_size << std::endl; - std::cerr << "Log Pos: " << this->log_pos << std::endl; - std::cerr << "Flags: " << this->flags << std::endl; + out << "\n=== " << to_string(this->type) << " ===" << std::endl; + out << "Timestamp: " << this->timestamp << std::endl; + out << "Event Type: " << this->type << std::endl; + out << "Server ID: " << this->server_id << std::endl; + out << "Event Size: " << this->event_size << std::endl; + out << "Log Pos: " << this->log_pos << std::endl; + out << "Flags: " << this->flags << std::endl; } /// https://dev.mysql.com/doc/internals/en/format-description-event.html @@ -53,13 +53,13 @@ namespace MySQLReplication payload.readStrict(reinterpret_cast(event_type_header_length.data()), len); } - void FormatDescriptionEvent::dump() const + void FormatDescriptionEvent::dump(std::ostream & out) const { - header.dump(); - std::cerr << "Binlog Version: " << this->binlog_version << std::endl; - std::cerr << "Server Version: " << this->server_version << std::endl; - std::cerr << "Create Timestamp: " << this->create_timestamp << std::endl; - std::cerr << "Event Header Len: " << this->event_header_length << std::endl; + header.dump(out); + out << "Binlog Version: " << this->binlog_version << std::endl; + out << "Server Version: " << this->server_version << std::endl; + out << "Create Timestamp: " << this->create_timestamp << std::endl; + out << "Event Header Len: " << this->event_header_length << std::endl; } /// https://dev.mysql.com/doc/internals/en/rotate-event.html @@ -71,11 +71,11 @@ namespace MySQLReplication payload.readStrict(reinterpret_cast(next_binlog.data()), len); } - void RotateEvent::dump() const + void RotateEvent::dump(std::ostream & out) const { - header.dump(); - std::cerr << "Position: " << this->position << std::endl; - std::cerr << "Next Binlog: " << this->next_binlog << std::endl; + header.dump(out); + out << "Position: " << this->position << std::endl; + out << "Next Binlog: " << this->next_binlog << std::endl; } /// https://dev.mysql.com/doc/internals/en/query-event.html @@ -114,25 +114,25 @@ namespace MySQLReplication } } - void QueryEvent::dump() const + void QueryEvent::dump(std::ostream & out) const { - header.dump(); - std::cerr << "Thread ID: " << this->thread_id << std::endl; - std::cerr << "Execution Time: " << this->exec_time << std::endl; - std::cerr << "Schema Len: " << this->schema_len << std::endl; - std::cerr << "Error Code: " << this->error_code << std::endl; - std::cerr << "Status Len: " << this->status_len << std::endl; - std::cerr << "Schema: " << this->schema << std::endl; - std::cerr << "Query: " << this->query << std::endl; + header.dump(out); + out << "Thread ID: " << this->thread_id << std::endl; + out << "Execution Time: " << this->exec_time << std::endl; + out << "Schema Len: " << this->schema_len << std::endl; + out << "Error Code: " << this->error_code << std::endl; + out << "Status Len: " << this->status_len << std::endl; + out << "Schema: " << this->schema << std::endl; + out << "Query: " << this->query << std::endl; } void XIDEvent::parseImpl(ReadBuffer & payload) { payload.readStrict(reinterpret_cast(&xid), 8); } - void XIDEvent::dump() const + void XIDEvent::dump(std::ostream & out) const { - header.dump(); - std::cerr << "XID: " << this->xid << std::endl; + header.dump(out); + out << "XID: " << this->xid << std::endl; } void TableMapEvent::parseImpl(ReadBuffer & payload) @@ -234,21 +234,21 @@ namespace MySQLReplication } } - void TableMapEvent::dump() const + void TableMapEvent::dump(std::ostream & out) const { - header.dump(); - std::cerr << "Table ID: " << this->table_id << std::endl; - std::cerr << "Flags: " << this->flags << std::endl; - std::cerr << "Schema Len: " << this->schema_len << std::endl; - std::cerr << "Schema: " << this->schema << std::endl; - std::cerr << "Table Len: " << this->table_len << std::endl; - std::cerr << "Table: " << this->table << std::endl; - std::cerr << "Column Count: " << this->column_count << std::endl; + header.dump(out); + out << "Table ID: " << this->table_id << std::endl; + out << "Flags: " << this->flags << std::endl; + out << "Schema Len: " << this->schema_len << std::endl; + out << "Schema: " << this->schema << std::endl; + out << "Table Len: " << this->table_len << std::endl; + out << "Table: " << this->table << std::endl; + out << "Column Count: " << this->column_count << std::endl; for (auto i = 0U; i < column_count; i++) { - std::cerr << "Column Type [" << i << "]: " << column_type[i] << ", Meta: " << column_meta[i] << std::endl; + out << "Column Type [" << i << "]: " << column_type[i] << ", Meta: " << column_meta[i] << std::endl; } - std::cerr << "Null Bitmap: " << this->null_bitmap << std::endl; + out << "Null Bitmap: " << this->null_bitmap << std::endl; } void RowsEvent::parseImpl(ReadBuffer & payload) @@ -688,25 +688,25 @@ namespace MySQLReplication rows.push_back(row); } - void RowsEvent::dump() const + void RowsEvent::dump(std::ostream & out) const { FieldVisitorToString to_string; - header.dump(); - std::cerr << "Schema: " << this->schema << std::endl; - std::cerr << "Table: " << this->table << std::endl; + header.dump(out); + out << "Schema: " << this->schema << std::endl; + out << "Table: " << this->table << std::endl; for (auto i = 0U; i < rows.size(); i++) { - std::cerr << "Row[" << i << "]: " << applyVisitor(to_string, rows[i]) << std::endl; + out << "Row[" << i << "]: " << applyVisitor(to_string, rows[i]) << std::endl; } } void DryRunEvent::parseImpl(ReadBuffer & payload) { payload.ignore(header.event_size - EVENT_HEADER_LENGTH); } - void DryRunEvent::dump() const + void DryRunEvent::dump(std::ostream & out) const { - header.dump(); - std::cerr << "[DryRun Event]" << std::endl; + header.dump(out); + out << "[DryRun Event]" << std::endl; } void MySQLFlavor::readPayloadImpl(ReadBuffer & payload) diff --git a/src/Core/MySQLReplication.h b/src/Core/MySQLReplication.h index f4ac02c2ffd..29af875a430 100644 --- a/src/Core/MySQLReplication.h +++ b/src/Core/MySQLReplication.h @@ -289,7 +289,7 @@ namespace MySQLReplication UInt32 log_pos; UInt16 flags; - void dump() const; + void dump(std::ostream & out) const; void parse(ReadBuffer & payload); }; @@ -299,7 +299,7 @@ namespace MySQLReplication EventHeader header; virtual ~EventBase() = default; - virtual void dump() const = 0; + virtual void dump(std::ostream & out) const = 0; virtual void parseHeader(ReadBuffer & payload) { header.parse(payload); } virtual void parseEvent(ReadBuffer & payload) { parseImpl(payload); } virtual MySQLEventType type() const { return MYSQL_UNHANDLED_EVENT; } @@ -317,7 +317,7 @@ namespace MySQLReplication UInt8 event_header_length; String event_type_header_length; - void dump() const override; + void dump(std::ostream & out) const override; void parseImpl(ReadBuffer & payload) override; private: @@ -330,7 +330,7 @@ namespace MySQLReplication UInt64 position; String next_binlog; - void dump() const override; + void dump(std::ostream & out) const override; protected: void parseImpl(ReadBuffer & payload) override; @@ -356,7 +356,7 @@ namespace MySQLReplication String query; QueryType typ = DDL; - void dump() const override; + void dump(std::ostream & out) const override; MySQLEventType type() const override { return MYSQL_QUERY_EVENT; } protected: @@ -368,7 +368,7 @@ namespace MySQLReplication protected: UInt64 xid; - void dump() const override; + void dump(std::ostream & out) const override; void parseImpl(ReadBuffer & payload) override; }; @@ -386,7 +386,7 @@ namespace MySQLReplication std::vector column_meta; Bitmap null_bitmap; - void dump() const override; + void dump(std::ostream & out) const override; protected: void parseImpl(ReadBuffer & payload) override; @@ -408,7 +408,7 @@ namespace MySQLReplication table = table_map->table; } - void dump() const override; + void dump(std::ostream & out) const override; protected: UInt64 table_id; @@ -447,7 +447,7 @@ namespace MySQLReplication class DryRunEvent : public EventBase { - void dump() const override; + void dump(std::ostream & out) const override; protected: void parseImpl(ReadBuffer & payload) override; diff --git a/src/Core/tests/mysql_protocol.cpp b/src/Core/tests/mysql_protocol.cpp index c86cf0be25a..e5e875b1975 100644 --- a/src/Core/tests/mysql_protocol.cpp +++ b/src/Core/tests/mysql_protocol.cpp @@ -205,7 +205,7 @@ int main(int argc, char ** argv) { case MYSQL_QUERY_EVENT: { auto binlog_event = std::static_pointer_cast(event); - binlog_event->dump(); + binlog_event->dump(std::cerr); Position pos = slave.getPosition(); std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; @@ -213,7 +213,7 @@ int main(int argc, char ** argv) } case MYSQL_WRITE_ROWS_EVENT: { auto binlog_event = std::static_pointer_cast(event); - binlog_event->dump(); + binlog_event->dump(std::cerr); Position pos = slave.getPosition(); std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; @@ -221,7 +221,7 @@ int main(int argc, char ** argv) } case MYSQL_UPDATE_ROWS_EVENT: { auto binlog_event = std::static_pointer_cast(event); - binlog_event->dump(); + binlog_event->dump(std::cerr); Position pos = slave.getPosition(); std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; @@ -229,7 +229,7 @@ int main(int argc, char ** argv) } case MYSQL_DELETE_ROWS_EVENT: { auto binlog_event = std::static_pointer_cast(event); - binlog_event->dump(); + binlog_event->dump(std::cerr); Position pos = slave.getPosition(); std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl; diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 1d3419a218c..ce21d312b1f 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -37,7 +37,7 @@ void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const { std::unique_lock lock(mutex); - if (exception) + if (!settings->allows_query_when_mysql_lost && exception) { try { diff --git a/src/Databases/MySQL/MaterializeMySQLSettings.h b/src/Databases/MySQL/MaterializeMySQLSettings.h index 539471becfb..07de219c72f 100644 --- a/src/Databases/MySQL/MaterializeMySQLSettings.h +++ b/src/Databases/MySQL/MaterializeMySQLSettings.h @@ -14,7 +14,8 @@ class ASTStorage; M(UInt64, max_rows_in_buffers, DEFAULT_BLOCK_SIZE, "Max rows that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \ M(UInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "Max bytes that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \ M(UInt64, max_flush_data_time, 1000, "Max milliseconds that data is allowed to cache in memory(for database and the cache data unable to query). when this time is exceeded, the data will be materialized", 0) \ - M(UInt64, max_wait_time_when_mysql_unavailable, 1000, "", 0) \ + M(UInt64, max_wait_time_when_mysql_unavailable, 1000, "Dump full data retry interval when MySQL is not available(milliseconds).", 0) \ + M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \ DECLARE_SETTINGS_TRAITS(MaterializeMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS) diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 43d1295e5c4..ea95f204a61 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -171,10 +171,7 @@ void MaterializeMySQLSyncThread::synchronization(const String & mysql_version) std::unique_lock lock(sync_mutex); if (binlog_event) - { - binlog_event->dump(); onEvent(buffers, binlog_event, *metadata); - } if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds( settings->max_rows_in_buffer, settings->max_bytes_in_buffer, @@ -593,6 +590,17 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr throw; } } + else if (receive_event->header.type != HEARTBEAT_EVENT) + { + const auto & dump_event_message = [&]() + { + std::stringstream ss; + receive_event->dump(ss); + return ss.str(); + }; + + LOG_DEBUG(log, "Skip MySQL event: \n {}", dump_event_message()); + } } bool MaterializeMySQLSyncThread::isMySQLSyncThread()