You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
239 lines
5.8 KiB
Go
239 lines
5.8 KiB
Go
1 year ago
|
package mysql
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"fmt"
|
||
|
"sort"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
|
||
|
"github.com/pingcap/errors"
|
||
|
"github.com/siddontang/go-log/log"
|
||
|
)
|
||
|
|
||
|
// MariadbGTID represent mariadb gtid, [domain ID]-[server-id]-[sequence]
|
||
|
type MariadbGTID struct {
|
||
|
DomainID uint32
|
||
|
ServerID uint32
|
||
|
SequenceNumber uint64
|
||
|
}
|
||
|
|
||
|
// ParseMariadbGTID parses mariadb gtid, [domain ID]-[server-id]-[sequence]
|
||
|
func ParseMariadbGTID(str string) (*MariadbGTID, error) {
|
||
|
if len(str) == 0 {
|
||
|
return &MariadbGTID{0, 0, 0}, nil
|
||
|
}
|
||
|
|
||
|
seps := strings.Split(str, "-")
|
||
|
|
||
|
gtid := new(MariadbGTID)
|
||
|
|
||
|
if len(seps) != 3 {
|
||
|
return gtid, errors.Errorf("invalid Mariadb GTID %v, must domain-server-sequence", str)
|
||
|
}
|
||
|
|
||
|
domainID, err := strconv.ParseUint(seps[0], 10, 32)
|
||
|
if err != nil {
|
||
|
return gtid, errors.Errorf("invalid MariaDB GTID Domain ID (%v): %v", seps[0], err)
|
||
|
}
|
||
|
|
||
|
serverID, err := strconv.ParseUint(seps[1], 10, 32)
|
||
|
if err != nil {
|
||
|
return gtid, errors.Errorf("invalid MariaDB GTID Server ID (%v): %v", seps[1], err)
|
||
|
}
|
||
|
|
||
|
sequenceID, err := strconv.ParseUint(seps[2], 10, 64)
|
||
|
if err != nil {
|
||
|
return gtid, errors.Errorf("invalid MariaDB GTID Sequence number (%v): %v", seps[2], err)
|
||
|
}
|
||
|
|
||
|
return &MariadbGTID{
|
||
|
DomainID: uint32(domainID),
|
||
|
ServerID: uint32(serverID),
|
||
|
SequenceNumber: sequenceID}, nil
|
||
|
}
|
||
|
|
||
|
func (gtid *MariadbGTID) String() string {
|
||
|
if gtid.DomainID == 0 && gtid.ServerID == 0 && gtid.SequenceNumber == 0 {
|
||
|
return ""
|
||
|
}
|
||
|
|
||
|
return fmt.Sprintf("%d-%d-%d", gtid.DomainID, gtid.ServerID, gtid.SequenceNumber)
|
||
|
}
|
||
|
|
||
|
// Contain return whether one mariadb gtid covers another mariadb gtid
|
||
|
func (gtid *MariadbGTID) Contain(other *MariadbGTID) bool {
|
||
|
return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
|
||
|
}
|
||
|
|
||
|
// Clone clones a mariadb gtid
|
||
|
func (gtid *MariadbGTID) Clone() *MariadbGTID {
|
||
|
o := new(MariadbGTID)
|
||
|
*o = *gtid
|
||
|
return o
|
||
|
}
|
||
|
|
||
|
func (gtid *MariadbGTID) forward(newer *MariadbGTID) error {
|
||
|
if newer.DomainID != gtid.DomainID {
|
||
|
return errors.Errorf("%s is not same with doamin of %s", newer, gtid)
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
Here's a simplified example of binlog events.
|
||
|
Although I think one domain should have only one update at same time, we can't limit the user's usage.
|
||
|
we just output a warn log and let it go on
|
||
|
| mysqld-bin.000001 | 1453 | Gtid | 112 | 1495 | BEGIN GTID 0-112-6 |
|
||
|
| mysqld-bin.000001 | 1624 | Xid | 112 | 1655 | COMMIT xid=74 |
|
||
|
| mysqld-bin.000001 | 1655 | Gtid | 112 | 1697 | BEGIN GTID 0-112-7 |
|
||
|
| mysqld-bin.000001 | 1826 | Xid | 112 | 1857 | COMMIT xid=75 |
|
||
|
| mysqld-bin.000001 | 1857 | Gtid | 111 | 1899 | BEGIN GTID 0-111-5 |
|
||
|
| mysqld-bin.000001 | 1981 | Xid | 111 | 2012 | COMMIT xid=77 |
|
||
|
| mysqld-bin.000001 | 2012 | Gtid | 112 | 2054 | BEGIN GTID 0-112-8 |
|
||
|
| mysqld-bin.000001 | 2184 | Xid | 112 | 2215 | COMMIT xid=116 |
|
||
|
| mysqld-bin.000001 | 2215 | Gtid | 111 | 2257 | BEGIN GTID 0-111-6 |
|
||
|
*/
|
||
|
if newer.SequenceNumber <= gtid.SequenceNumber {
|
||
|
log.Warnf("out of order binlog appears with gtid %s vs current position gtid %s", newer, gtid)
|
||
|
}
|
||
|
|
||
|
gtid.ServerID = newer.ServerID
|
||
|
gtid.SequenceNumber = newer.SequenceNumber
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// MariadbGTIDSet is a set of mariadb gtid
|
||
|
type MariadbGTIDSet struct {
|
||
|
Sets map[uint32]*MariadbGTID
|
||
|
}
|
||
|
|
||
|
// ParseMariadbGTIDSet parses str into mariadb gtid sets
|
||
|
func ParseMariadbGTIDSet(str string) (GTIDSet, error) {
|
||
|
s := new(MariadbGTIDSet)
|
||
|
s.Sets = make(map[uint32]*MariadbGTID)
|
||
|
if str == "" {
|
||
|
return s, nil
|
||
|
}
|
||
|
err := s.Update(str)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return s, nil
|
||
|
}
|
||
|
|
||
|
// AddSet adds mariadb gtid into mariadb gtid set
|
||
|
func (s *MariadbGTIDSet) AddSet(gtid *MariadbGTID) error {
|
||
|
if gtid == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
o, ok := s.Sets[gtid.DomainID]
|
||
|
if ok {
|
||
|
err := o.forward(gtid)
|
||
|
if err != nil {
|
||
|
return errors.Trace(err)
|
||
|
}
|
||
|
} else {
|
||
|
s.Sets[gtid.DomainID] = gtid
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Update updates mariadb gtid set
|
||
|
func (s *MariadbGTIDSet) Update(GTIDStr string) error {
|
||
|
sp := strings.Split(GTIDStr, ",")
|
||
|
//todo, handle redundant same uuid
|
||
|
for i := 0; i < len(sp); i++ {
|
||
|
gtid, err := ParseMariadbGTID(sp[i])
|
||
|
if err != nil {
|
||
|
return errors.Trace(err)
|
||
|
}
|
||
|
err = s.AddSet(gtid)
|
||
|
if err != nil {
|
||
|
return errors.Trace(err)
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *MariadbGTIDSet) String() string {
|
||
|
sets := make([]string, 0, len(s.Sets))
|
||
|
for _, set := range s.Sets {
|
||
|
sets = append(sets, set.String())
|
||
|
}
|
||
|
sort.Strings(sets)
|
||
|
|
||
|
return strings.Join(sets, ",")
|
||
|
}
|
||
|
|
||
|
// Encode encodes mariadb gtid set
|
||
|
func (s *MariadbGTIDSet) Encode() []byte {
|
||
|
var buf bytes.Buffer
|
||
|
sep := ""
|
||
|
for _, gtid := range s.Sets {
|
||
|
buf.WriteString(sep)
|
||
|
buf.WriteString(gtid.String())
|
||
|
sep = ","
|
||
|
}
|
||
|
|
||
|
return buf.Bytes()
|
||
|
}
|
||
|
|
||
|
// Clone clones a mariadb gtid set
|
||
|
func (s *MariadbGTIDSet) Clone() GTIDSet {
|
||
|
clone := &MariadbGTIDSet{
|
||
|
Sets: make(map[uint32]*MariadbGTID),
|
||
|
}
|
||
|
for domainID, gtid := range s.Sets {
|
||
|
clone.Sets[domainID] = gtid.Clone()
|
||
|
}
|
||
|
|
||
|
return clone
|
||
|
}
|
||
|
|
||
|
// Equal returns true if two mariadb gtid set is same, otherwise return false
|
||
|
func (s *MariadbGTIDSet) Equal(o GTIDSet) bool {
|
||
|
other, ok := o.(*MariadbGTIDSet)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if len(other.Sets) != len(s.Sets) {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
for domainID, gtid := range other.Sets {
|
||
|
o, ok := s.Sets[domainID]
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if *gtid != *o {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// Contain return whether one mariadb gtid set covers another mariadb gtid set
|
||
|
func (s *MariadbGTIDSet) Contain(o GTIDSet) bool {
|
||
|
other, ok := o.(*MariadbGTIDSet)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
for doaminID, gtid := range other.Sets {
|
||
|
o, ok := s.Sets[doaminID]
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if !o.Contain(gtid) {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return true
|
||
|
}
|