ISSUES-4006 add MySQL replication GTID module #4006

This commit is contained in:
BohuTANG 2020-08-17 08:57:56 +08:00
parent 8810390dc0
commit b4d18b1a52
5 changed files with 218 additions and 1 deletions

View File

@ -37,7 +37,6 @@ else()
set(CARES_STATIC ON CACHE BOOL "" FORCE)
endif()
set(_gRPC_CARES_LIBRARIES c-ares)
add_subdirectory(${CARES_ROOT_DIR} ${CARES_BINARY_DIR})
# upb.cmake
set(UPB_ROOT_DIR ${GRPC_SOURCE_DIR}/third_party/upb)

View File

@ -0,0 +1,174 @@
#include "MySQLGtid.h"
#include <boost/algorithm/string.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
}
void GTIDSet::tryMerge(size_t i)
{
if ((i + 1) >= intervals.size())
return;
if (intervals[i].end != intervals[i + 1].start)
return;
intervals[i].end = intervals[i + 1].end;
intervals.erase(intervals.begin() + i + 1, intervals.begin() + i + 1 + 1);
}
void GTIDSets::parse(const String gtid_format)
{
std::vector<String> gtid_sets;
boost::split(gtid_sets, gtid_format, boost::is_any_of(","));
for (size_t i = 0; i < gtid_sets.size(); i++)
{
std::vector<String> server_ids;
boost::split(server_ids, gtid_sets[i], [](char c) { return c == ':'; });
GTIDSet set;
parseUUID(reinterpret_cast<const UInt8 *>(server_ids[0].data()), set.uuid);
for (size_t k = 1; k < server_ids.size(); k++)
{
std::vector<String> inters;
boost::split(inters, server_ids[k], [](char c) { return c == '-'; });
GTIDSet::Interval val;
switch (inters.size())
{
case 1: {
val.start = std::stol(inters[0]);
val.end = val.start + 1;
break;
}
case 2: {
val.start = std::stol(inters[0]);
val.end = std::stol(inters[1]) + 1;
break;
}
default:
throw Exception("GTIDParse: Invalid GTID interval: " + server_ids[k], ErrorCodes::UNKNOWN_EXCEPTION);
}
set.intervals.emplace_back(val);
}
sets.emplace_back(set);
}
}
void GTIDSets::update(const GTID & other)
{
for (GTIDSet & set : sets)
{
if (std::equal(std::begin(set.uuid), std::end(set.uuid), std::begin(other.uuid)))
{
for (auto i = 0U; i < set.intervals.size(); i++)
{
auto current = set.intervals[i];
/// Already Contained.
if (other.seq_no >= current.start && other.seq_no < current.end)
{
throw Exception(
"GTIDSets updates other: " + std::to_string(other.seq_no) + " invalid successor to " + std::to_string(current.end),
ErrorCodes::UNKNOWN_EXCEPTION);
}
/// Sequence, extend the interval.
if (other.seq_no == current.end)
{
set.intervals[i].end = other.seq_no + 1;
set.tryMerge(i);
return;
}
}
/// Add new interval.
GTIDSet::Interval new_interval{other.seq_no, other.seq_no + 1};
for (auto it = set.intervals.begin(); it != set.intervals.end(); ++it)
{
if (other.seq_no < (*it).start)
{
set.intervals.insert(it, new_interval);
return;
}
}
set.intervals.emplace_back(new_interval);
return;
}
}
GTIDSet set;
memcpy(set.uuid, other.uuid, 16);
GTIDSet::Interval interval{other.seq_no, other.seq_no + 1};
set.intervals.emplace_back(interval);
sets.emplace_back(set);
}
String GTIDSets::toString() const
{
WriteBufferFromOwnString buffer;
for (size_t i = 0; i < sets.size(); i++)
{
GTIDSet set = sets[i];
String dst36;
dst36.resize(36);
formatUUID(set.uuid, reinterpret_cast<UInt8 *>(dst36.data()));
writeString(dst36, buffer);
for (size_t k = 0; k < set.intervals.size(); k++)
{
buffer.write(':');
auto start = set.intervals[k].start;
auto end = set.intervals[k].end;
if (end == (start + 1))
{
writeString(std::to_string(start), buffer);
}
else
{
writeString(std::to_string(start), buffer);
buffer.write('-');
writeString(std::to_string(end - 1), buffer);
}
}
if (i < (sets.size() - 1))
{
buffer.write(',');
}
}
return buffer.str();
}
String GTIDSets::toPayload() const
{
WriteBufferFromOwnString buffer;
UInt64 sets_size = sets.size();
buffer.write(reinterpret_cast<const char *>(&sets_size), 8);
for (size_t i = 0; i < sets.size(); i++)
{
GTIDSet set = sets[i];
buffer.write(reinterpret_cast<const char *>(set.uuid), sizeof(set.uuid));
UInt64 intervals_size = set.intervals.size();
buffer.write(reinterpret_cast<const char *>(&intervals_size), 8);
for (size_t k = 0; k < set.intervals.size(); k++)
{
buffer.write(reinterpret_cast<const char *>(&set.intervals[k].start), 8);
buffer.write(reinterpret_cast<const char *>(&set.intervals[k].end), 8);
}
}
return buffer.str();
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
class GTID
{
public:
UInt8 uuid[16];
Int64 seq_no;
};
class GTIDSet
{
public:
struct Interval
{
Int64 start;
Int64 end;
};
UInt8 uuid[16];
std::vector<Interval> intervals;
void tryMerge(size_t i);
};
class GTIDSets
{
public:
std::vector<GTIDSet> sets;
void parse(const String gtid_format_);
void update(const GTID & gtid);
String toString() const;
String toPayload() const;
};
}

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypeString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <common/DateLUT.h>
#include <Common/FieldVisitors.h>
#include <Core/MySQL/PacketsGeneric.h>

View File

@ -27,6 +27,7 @@ SRCS(
MySQL/IMySQLReadPacket.cpp
MySQL/IMySQLWritePacket.cpp
MySQL/MySQLClient.cpp
MySQL/MySQLGtid.cpp
MySQL/MySQLReplication.cpp
MySQL/PacketEndpoint.cpp
MySQL/PacketsConnection.cpp