diff --git a/contrib/grpc-cmake/CMakeLists.txt b/contrib/grpc-cmake/CMakeLists.txt index a70c155da53..20feb96e065 100644 --- a/contrib/grpc-cmake/CMakeLists.txt +++ b/contrib/grpc-cmake/CMakeLists.txt @@ -37,7 +37,6 @@ else() set(CARES_STATIC ON CACHE BOOL "" FORCE) endif() set(_gRPC_CARES_LIBRARIES c-ares) -add_subdirectory(${CARES_ROOT_DIR} ${CARES_BINARY_DIR}) # upb.cmake set(UPB_ROOT_DIR ${GRPC_SOURCE_DIR}/third_party/upb) diff --git a/src/Core/MySQL/MySQLGtid.cpp b/src/Core/MySQL/MySQLGtid.cpp new file mode 100644 index 00000000000..7ab01ece980 --- /dev/null +++ b/src/Core/MySQL/MySQLGtid.cpp @@ -0,0 +1,174 @@ +#include "MySQLGtid.h" + +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int UNKNOWN_EXCEPTION; +} + +void GTIDSet::tryMerge(size_t i) +{ + if ((i + 1) >= intervals.size()) + return; + + if (intervals[i].end != intervals[i + 1].start) + return; + intervals[i].end = intervals[i + 1].end; + intervals.erase(intervals.begin() + i + 1, intervals.begin() + i + 1 + 1); +} + +void GTIDSets::parse(const String gtid_format) +{ + std::vector gtid_sets; + boost::split(gtid_sets, gtid_format, boost::is_any_of(",")); + + for (size_t i = 0; i < gtid_sets.size(); i++) + { + std::vector server_ids; + boost::split(server_ids, gtid_sets[i], [](char c) { return c == ':'; }); + + GTIDSet set; + parseUUID(reinterpret_cast(server_ids[0].data()), set.uuid); + + for (size_t k = 1; k < server_ids.size(); k++) + { + std::vector inters; + boost::split(inters, server_ids[k], [](char c) { return c == '-'; }); + + GTIDSet::Interval val; + switch (inters.size()) + { + case 1: { + val.start = std::stol(inters[0]); + val.end = val.start + 1; + break; + } + case 2: { + val.start = std::stol(inters[0]); + val.end = std::stol(inters[1]) + 1; + break; + } + default: + throw Exception("GTIDParse: Invalid GTID interval: " + server_ids[k], ErrorCodes::UNKNOWN_EXCEPTION); + } + set.intervals.emplace_back(val); + } + sets.emplace_back(set); + } +} + +void GTIDSets::update(const GTID & other) +{ + for (GTIDSet & set : sets) + { + if (std::equal(std::begin(set.uuid), std::end(set.uuid), std::begin(other.uuid))) + { + for (auto i = 0U; i < set.intervals.size(); i++) + { + auto current = set.intervals[i]; + + /// Already Contained. + if (other.seq_no >= current.start && other.seq_no < current.end) + { + throw Exception( + "GTIDSets updates other: " + std::to_string(other.seq_no) + " invalid successor to " + std::to_string(current.end), + ErrorCodes::UNKNOWN_EXCEPTION); + } + + /// Sequence, extend the interval. + if (other.seq_no == current.end) + { + set.intervals[i].end = other.seq_no + 1; + set.tryMerge(i); + return; + } + } + + /// Add new interval. + GTIDSet::Interval new_interval{other.seq_no, other.seq_no + 1}; + for (auto it = set.intervals.begin(); it != set.intervals.end(); ++it) + { + if (other.seq_no < (*it).start) + { + set.intervals.insert(it, new_interval); + return; + } + } + set.intervals.emplace_back(new_interval); + return; + } + } + + GTIDSet set; + memcpy(set.uuid, other.uuid, 16); + GTIDSet::Interval interval{other.seq_no, other.seq_no + 1}; + set.intervals.emplace_back(interval); + sets.emplace_back(set); +} + +String GTIDSets::toString() const +{ + WriteBufferFromOwnString buffer; + + for (size_t i = 0; i < sets.size(); i++) + { + GTIDSet set = sets[i]; + + String dst36; + dst36.resize(36); + formatUUID(set.uuid, reinterpret_cast(dst36.data())); + writeString(dst36, buffer); + + for (size_t k = 0; k < set.intervals.size(); k++) + { + buffer.write(':'); + auto start = set.intervals[k].start; + auto end = set.intervals[k].end; + + if (end == (start + 1)) + { + writeString(std::to_string(start), buffer); + } + else + { + writeString(std::to_string(start), buffer); + buffer.write('-'); + writeString(std::to_string(end - 1), buffer); + } + } + + if (i < (sets.size() - 1)) + { + buffer.write(','); + } + } + return buffer.str(); +} + +String GTIDSets::toPayload() const +{ + WriteBufferFromOwnString buffer; + + UInt64 sets_size = sets.size(); + buffer.write(reinterpret_cast(&sets_size), 8); + for (size_t i = 0; i < sets.size(); i++) + { + GTIDSet set = sets[i]; + + buffer.write(reinterpret_cast(set.uuid), sizeof(set.uuid)); + + UInt64 intervals_size = set.intervals.size(); + buffer.write(reinterpret_cast(&intervals_size), 8); + for (size_t k = 0; k < set.intervals.size(); k++) + { + buffer.write(reinterpret_cast(&set.intervals[k].start), 8); + buffer.write(reinterpret_cast(&set.intervals[k].end), 8); + } + } + return buffer.str(); +} + +} diff --git a/src/Core/MySQL/MySQLGtid.h b/src/Core/MySQL/MySQLGtid.h new file mode 100644 index 00000000000..49f8582d063 --- /dev/null +++ b/src/Core/MySQL/MySQLGtid.h @@ -0,0 +1,42 @@ +#pragma once +#include +#include + + +namespace DB +{ +class GTID +{ +public: + UInt8 uuid[16]; + Int64 seq_no; +}; + +class GTIDSet +{ +public: + struct Interval + { + Int64 start; + Int64 end; + }; + + UInt8 uuid[16]; + std::vector intervals; + + void tryMerge(size_t i); +}; + +class GTIDSets +{ +public: + std::vector sets; + + void parse(const String gtid_format_); + void update(const GTID & gtid); + + String toString() const; + String toPayload() const; +}; + +} diff --git a/src/Core/MySQL/MySQLReplication.cpp b/src/Core/MySQL/MySQLReplication.cpp index 8d67c91c749..37a6cc0daad 100644 --- a/src/Core/MySQL/MySQLReplication.cpp +++ b/src/Core/MySQL/MySQLReplication.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include diff --git a/src/Core/ya.make b/src/Core/ya.make index 74a0f7e273a..7b7515a4ca8 100644 --- a/src/Core/ya.make +++ b/src/Core/ya.make @@ -27,6 +27,7 @@ SRCS( MySQL/IMySQLReadPacket.cpp MySQL/IMySQLWritePacket.cpp MySQL/MySQLClient.cpp + MySQL/MySQLGtid.cpp MySQL/MySQLReplication.cpp MySQL/PacketEndpoint.cpp MySQL/PacketsConnection.cpp