ISSUES-4006 add settings for allows_query_when_mysql_lost & move dump event to logger

This commit is contained in:
zhang2014 2020-08-07 14:04:50 +08:00
parent bd21f8e6b9
commit d6bba8cbbb
6 changed files with 76 additions and 67 deletions

View File

@ -27,15 +27,15 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(&flags), 2);
}
void EventHeader::dump() const
void EventHeader::dump(std::ostream & out) const
{
std::cerr << "\n=== " << to_string(this->type) << " ===" << std::endl;
std::cerr << "Timestamp: " << this->timestamp << std::endl;
std::cerr << "Event Type: " << this->type << std::endl;
std::cerr << "Server ID: " << this->server_id << std::endl;
std::cerr << "Event Size: " << this->event_size << std::endl;
std::cerr << "Log Pos: " << this->log_pos << std::endl;
std::cerr << "Flags: " << this->flags << std::endl;
out << "\n=== " << to_string(this->type) << " ===" << std::endl;
out << "Timestamp: " << this->timestamp << std::endl;
out << "Event Type: " << this->type << std::endl;
out << "Server ID: " << this->server_id << std::endl;
out << "Event Size: " << this->event_size << std::endl;
out << "Log Pos: " << this->log_pos << std::endl;
out << "Flags: " << this->flags << std::endl;
}
/// https://dev.mysql.com/doc/internals/en/format-description-event.html
@ -53,13 +53,13 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(event_type_header_length.data()), len);
}
void FormatDescriptionEvent::dump() const
void FormatDescriptionEvent::dump(std::ostream & out) const
{
header.dump();
std::cerr << "Binlog Version: " << this->binlog_version << std::endl;
std::cerr << "Server Version: " << this->server_version << std::endl;
std::cerr << "Create Timestamp: " << this->create_timestamp << std::endl;
std::cerr << "Event Header Len: " << this->event_header_length << std::endl;
header.dump(out);
out << "Binlog Version: " << this->binlog_version << std::endl;
out << "Server Version: " << this->server_version << std::endl;
out << "Create Timestamp: " << this->create_timestamp << std::endl;
out << "Event Header Len: " << this->event_header_length << std::endl;
}
/// https://dev.mysql.com/doc/internals/en/rotate-event.html
@ -71,11 +71,11 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(next_binlog.data()), len);
}
void RotateEvent::dump() const
void RotateEvent::dump(std::ostream & out) const
{
header.dump();
std::cerr << "Position: " << this->position << std::endl;
std::cerr << "Next Binlog: " << this->next_binlog << std::endl;
header.dump(out);
out << "Position: " << this->position << std::endl;
out << "Next Binlog: " << this->next_binlog << std::endl;
}
/// https://dev.mysql.com/doc/internals/en/query-event.html
@ -114,25 +114,25 @@ namespace MySQLReplication
}
}
void QueryEvent::dump() const
void QueryEvent::dump(std::ostream & out) const
{
header.dump();
std::cerr << "Thread ID: " << this->thread_id << std::endl;
std::cerr << "Execution Time: " << this->exec_time << std::endl;
std::cerr << "Schema Len: " << this->schema_len << std::endl;
std::cerr << "Error Code: " << this->error_code << std::endl;
std::cerr << "Status Len: " << this->status_len << std::endl;
std::cerr << "Schema: " << this->schema << std::endl;
std::cerr << "Query: " << this->query << std::endl;
header.dump(out);
out << "Thread ID: " << this->thread_id << std::endl;
out << "Execution Time: " << this->exec_time << std::endl;
out << "Schema Len: " << this->schema_len << std::endl;
out << "Error Code: " << this->error_code << std::endl;
out << "Status Len: " << this->status_len << std::endl;
out << "Schema: " << this->schema << std::endl;
out << "Query: " << this->query << std::endl;
}
void XIDEvent::parseImpl(ReadBuffer & payload) { payload.readStrict(reinterpret_cast<char *>(&xid), 8); }
void XIDEvent::dump() const
void XIDEvent::dump(std::ostream & out) const
{
header.dump();
std::cerr << "XID: " << this->xid << std::endl;
header.dump(out);
out << "XID: " << this->xid << std::endl;
}
void TableMapEvent::parseImpl(ReadBuffer & payload)
@ -234,21 +234,21 @@ namespace MySQLReplication
}
}
void TableMapEvent::dump() const
void TableMapEvent::dump(std::ostream & out) const
{
header.dump();
std::cerr << "Table ID: " << this->table_id << std::endl;
std::cerr << "Flags: " << this->flags << std::endl;
std::cerr << "Schema Len: " << this->schema_len << std::endl;
std::cerr << "Schema: " << this->schema << std::endl;
std::cerr << "Table Len: " << this->table_len << std::endl;
std::cerr << "Table: " << this->table << std::endl;
std::cerr << "Column Count: " << this->column_count << std::endl;
header.dump(out);
out << "Table ID: " << this->table_id << std::endl;
out << "Flags: " << this->flags << std::endl;
out << "Schema Len: " << this->schema_len << std::endl;
out << "Schema: " << this->schema << std::endl;
out << "Table Len: " << this->table_len << std::endl;
out << "Table: " << this->table << std::endl;
out << "Column Count: " << this->column_count << std::endl;
for (auto i = 0U; i < column_count; i++)
{
std::cerr << "Column Type [" << i << "]: " << column_type[i] << ", Meta: " << column_meta[i] << std::endl;
out << "Column Type [" << i << "]: " << column_type[i] << ", Meta: " << column_meta[i] << std::endl;
}
std::cerr << "Null Bitmap: " << this->null_bitmap << std::endl;
out << "Null Bitmap: " << this->null_bitmap << std::endl;
}
void RowsEvent::parseImpl(ReadBuffer & payload)
@ -688,25 +688,25 @@ namespace MySQLReplication
rows.push_back(row);
}
void RowsEvent::dump() const
void RowsEvent::dump(std::ostream & out) const
{
FieldVisitorToString to_string;
header.dump();
std::cerr << "Schema: " << this->schema << std::endl;
std::cerr << "Table: " << this->table << std::endl;
header.dump(out);
out << "Schema: " << this->schema << std::endl;
out << "Table: " << this->table << std::endl;
for (auto i = 0U; i < rows.size(); i++)
{
std::cerr << "Row[" << i << "]: " << applyVisitor(to_string, rows[i]) << std::endl;
out << "Row[" << i << "]: " << applyVisitor(to_string, rows[i]) << std::endl;
}
}
void DryRunEvent::parseImpl(ReadBuffer & payload) { payload.ignore(header.event_size - EVENT_HEADER_LENGTH); }
void DryRunEvent::dump() const
void DryRunEvent::dump(std::ostream & out) const
{
header.dump();
std::cerr << "[DryRun Event]" << std::endl;
header.dump(out);
out << "[DryRun Event]" << std::endl;
}
void MySQLFlavor::readPayloadImpl(ReadBuffer & payload)

View File

@ -289,7 +289,7 @@ namespace MySQLReplication
UInt32 log_pos;
UInt16 flags;
void dump() const;
void dump(std::ostream & out) const;
void parse(ReadBuffer & payload);
};
@ -299,7 +299,7 @@ namespace MySQLReplication
EventHeader header;
virtual ~EventBase() = default;
virtual void dump() const = 0;
virtual void dump(std::ostream & out) const = 0;
virtual void parseHeader(ReadBuffer & payload) { header.parse(payload); }
virtual void parseEvent(ReadBuffer & payload) { parseImpl(payload); }
virtual MySQLEventType type() const { return MYSQL_UNHANDLED_EVENT; }
@ -317,7 +317,7 @@ namespace MySQLReplication
UInt8 event_header_length;
String event_type_header_length;
void dump() const override;
void dump(std::ostream & out) const override;
void parseImpl(ReadBuffer & payload) override;
private:
@ -330,7 +330,7 @@ namespace MySQLReplication
UInt64 position;
String next_binlog;
void dump() const override;
void dump(std::ostream & out) const override;
protected:
void parseImpl(ReadBuffer & payload) override;
@ -356,7 +356,7 @@ namespace MySQLReplication
String query;
QueryType typ = DDL;
void dump() const override;
void dump(std::ostream & out) const override;
MySQLEventType type() const override { return MYSQL_QUERY_EVENT; }
protected:
@ -368,7 +368,7 @@ namespace MySQLReplication
protected:
UInt64 xid;
void dump() const override;
void dump(std::ostream & out) const override;
void parseImpl(ReadBuffer & payload) override;
};
@ -386,7 +386,7 @@ namespace MySQLReplication
std::vector<UInt16> column_meta;
Bitmap null_bitmap;
void dump() const override;
void dump(std::ostream & out) const override;
protected:
void parseImpl(ReadBuffer & payload) override;
@ -408,7 +408,7 @@ namespace MySQLReplication
table = table_map->table;
}
void dump() const override;
void dump(std::ostream & out) const override;
protected:
UInt64 table_id;
@ -447,7 +447,7 @@ namespace MySQLReplication
class DryRunEvent : public EventBase
{
void dump() const override;
void dump(std::ostream & out) const override;
protected:
void parseImpl(ReadBuffer & payload) override;

View File

@ -205,7 +205,7 @@ int main(int argc, char ** argv)
{
case MYSQL_QUERY_EVENT: {
auto binlog_event = std::static_pointer_cast<QueryEvent>(event);
binlog_event->dump();
binlog_event->dump(std::cerr);
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;
@ -213,7 +213,7 @@ int main(int argc, char ** argv)
}
case MYSQL_WRITE_ROWS_EVENT: {
auto binlog_event = std::static_pointer_cast<WriteRowsEvent>(event);
binlog_event->dump();
binlog_event->dump(std::cerr);
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;
@ -221,7 +221,7 @@ int main(int argc, char ** argv)
}
case MYSQL_UPDATE_ROWS_EVENT: {
auto binlog_event = std::static_pointer_cast<UpdateRowsEvent>(event);
binlog_event->dump();
binlog_event->dump(std::cerr);
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;
@ -229,7 +229,7 @@ int main(int argc, char ** argv)
}
case MYSQL_DELETE_ROWS_EVENT: {
auto binlog_event = std::static_pointer_cast<DeleteRowsEvent>(event);
binlog_event->dump();
binlog_event->dump(std::cerr);
Position pos = slave.getPosition();
std::cerr << "Binlog Name: " << pos.binlog_name << ", Pos: " << pos.binlog_pos << std::endl;

View File

@ -37,7 +37,7 @@ void DatabaseMaterializeMySQL::rethrowExceptionIfNeed() const
{
std::unique_lock<std::mutex> lock(mutex);
if (exception)
if (!settings->allows_query_when_mysql_lost && exception)
{
try
{

View File

@ -14,7 +14,8 @@ class ASTStorage;
M(UInt64, max_rows_in_buffers, DEFAULT_BLOCK_SIZE, "Max rows that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \
M(UInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "Max bytes that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \
M(UInt64, max_flush_data_time, 1000, "Max milliseconds that data is allowed to cache in memory(for database and the cache data unable to query). when this time is exceeded, the data will be materialized", 0) \
M(UInt64, max_wait_time_when_mysql_unavailable, 1000, "", 0) \
M(UInt64, max_wait_time_when_mysql_unavailable, 1000, "Dump full data retry interval when MySQL is not available(milliseconds).", 0) \
M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \
DECLARE_SETTINGS_TRAITS(MaterializeMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)

View File

@ -171,10 +171,7 @@ void MaterializeMySQLSyncThread::synchronization(const String & mysql_version)
std::unique_lock<std::mutex> lock(sync_mutex);
if (binlog_event)
{
binlog_event->dump();
onEvent(buffers, binlog_event, *metadata);
}
if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds(
settings->max_rows_in_buffer, settings->max_bytes_in_buffer,
@ -593,6 +590,17 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
throw;
}
}
else if (receive_event->header.type != HEARTBEAT_EVENT)
{
const auto & dump_event_message = [&]()
{
std::stringstream ss;
receive_event->dump(ss);
return ss.str();
};
LOG_DEBUG(log, "Skip MySQL event: \n {}", dump_event_message());
}
}
bool MaterializeMySQLSyncThread::isMySQLSyncThread()