2023-04-25 18:41:34 +08:00
|
|
|
|
package binlog
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"b612.me/mysql/gtid"
|
|
|
|
|
|
"b612.me/staros"
|
2026-03-08 20:07:59 +08:00
|
|
|
|
"bufio"
|
2023-04-25 18:41:34 +08:00
|
|
|
|
"bytes"
|
2026-03-08 20:07:59 +08:00
|
|
|
|
"errors"
|
2023-04-25 18:41:34 +08:00
|
|
|
|
"fmt"
|
|
|
|
|
|
"github.com/starainrt/go-mysql/replication"
|
|
|
|
|
|
"io"
|
|
|
|
|
|
"os"
|
2023-07-03 13:47:39 +08:00
|
|
|
|
"strings"
|
2026-03-08 20:07:59 +08:00
|
|
|
|
"sync"
|
2023-04-25 19:30:24 +08:00
|
|
|
|
"time"
|
2023-04-25 18:41:34 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
var (
|
|
|
|
|
|
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
|
|
|
|
|
|
ErrEventTooSmall = errors.New("event size too small")
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
CompressionNone uint64 = 255
|
|
|
|
|
|
CompressionZSTD uint64 = 0
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
maxPooledBodyCap = 4 << 20 // 4MB
|
|
|
|
|
|
defaultReadBufSize = 1 << 20 // 1MB
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
type TxDetail struct {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
StartPos int `json:"startPos"`
|
|
|
|
|
|
EndPos int `json:"endPos"`
|
|
|
|
|
|
RowCount int `json:"rowCount"`
|
|
|
|
|
|
Timestamp int64 `json:"timestamp"`
|
|
|
|
|
|
Time time.Time `json:"time"`
|
|
|
|
|
|
Sql string `json:"sql"`
|
|
|
|
|
|
Db string `json:"db"`
|
|
|
|
|
|
Table string `json:"table"`
|
|
|
|
|
|
SqlType string `json:"sqlType"`
|
|
|
|
|
|
CompressionType string `json:"compressionType"`
|
|
|
|
|
|
Rows [][]interface{} `json:"rows"`
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2023-07-03 13:47:39 +08:00
|
|
|
|
const (
|
|
|
|
|
|
STATUS_PREPARE uint8 = iota
|
|
|
|
|
|
STATUS_BEGIN
|
|
|
|
|
|
STATUS_COMMIT
|
|
|
|
|
|
STATUS_ROLLBACK
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
type Transaction struct {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
GTID string `json:"gtid"`
|
|
|
|
|
|
Timestamp int64 `json:"timestamp"`
|
|
|
|
|
|
Time time.Time `json:"time"`
|
|
|
|
|
|
StartPos int `json:"startPos"`
|
|
|
|
|
|
EndPos int `json:"endPos"`
|
|
|
|
|
|
Size int `json:"size"`
|
|
|
|
|
|
RowsCount int `json:"rowsCount"`
|
|
|
|
|
|
Status uint8 `json:"status"`
|
|
|
|
|
|
TxStartTime int64 `json:"txStartTime"`
|
|
|
|
|
|
TxEndTime int64 `json:"txEndTime"`
|
|
|
|
|
|
sqlOrigin []string `json:"sqlOrigin"`
|
|
|
|
|
|
Txs []TxDetail `json:"txs"`
|
|
|
|
|
|
dmlEventCount int
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2023-04-29 11:29:29 +08:00
|
|
|
|
func (t Transaction) GetSqlOrigin() []string {
|
|
|
|
|
|
return t.sqlOrigin
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
type BinlogFilter struct {
|
|
|
|
|
|
IncludeGtid string
|
|
|
|
|
|
ExcludeGtid string
|
|
|
|
|
|
IncludeTables []string
|
|
|
|
|
|
ExcludeTables []string
|
|
|
|
|
|
StartPos int
|
|
|
|
|
|
EndPos int
|
|
|
|
|
|
StartDate time.Time
|
|
|
|
|
|
EndDate time.Time
|
|
|
|
|
|
BigThan int
|
|
|
|
|
|
SmallThan int
|
|
|
|
|
|
OnlyShowGtid bool
|
|
|
|
|
|
OnlyShowDML bool
|
|
|
|
|
|
PickTxAllIfMatch bool
|
|
|
|
|
|
ExcludeBlank bool
|
|
|
|
|
|
IncludeBlank bool
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type BinlogEvent struct {
|
|
|
|
|
|
Type string
|
|
|
|
|
|
DB string
|
|
|
|
|
|
TB string
|
|
|
|
|
|
Data string
|
|
|
|
|
|
RowCnt uint32
|
|
|
|
|
|
Rows [][]interface{}
|
|
|
|
|
|
CompressionType string
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type tableMatcher struct {
|
|
|
|
|
|
exactMatch map[string]bool
|
|
|
|
|
|
dbWildcard map[string]bool
|
|
|
|
|
|
tbWildcard map[string]bool
|
|
|
|
|
|
matchAll bool
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (m *tableMatcher) match(db, tb string) bool {
|
|
|
|
|
|
if m.matchAll {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
if m.exactMatch[db+"."+tb] {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
if m.dbWildcard[db] {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
if m.tbWildcard[tb] {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
return false
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var bodyBufPool = sync.Pool{
|
|
|
|
|
|
New: func() any {
|
|
|
|
|
|
b := make([]byte, 0, 64*1024)
|
|
|
|
|
|
return &b
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func getBodyBuf(n int) []byte {
|
|
|
|
|
|
p := bodyBufPool.Get().(*[]byte)
|
|
|
|
|
|
if cap(*p) < n {
|
|
|
|
|
|
b := make([]byte, n)
|
|
|
|
|
|
return b
|
|
|
|
|
|
}
|
|
|
|
|
|
return (*p)[:n]
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func putBodyBuf(b []byte) {
|
|
|
|
|
|
if cap(b) > maxPooledBodyCap {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
b = b[:0]
|
|
|
|
|
|
bodyBufPool.Put(&b)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-06-30 17:55:45 +08:00
|
|
|
|
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
|
2023-04-25 18:41:34 +08:00
|
|
|
|
return parseOneBinlog(path, fx)
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-30 17:55:45 +08:00
|
|
|
|
func parseOneBinlog(path string, fx func(Transaction) bool) error {
|
2023-04-25 18:41:34 +08:00
|
|
|
|
if !staros.Exists(path) {
|
|
|
|
|
|
return os.ErrNotExist
|
|
|
|
|
|
}
|
|
|
|
|
|
f, err := os.Open(path)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
defer f.Close()
|
2023-04-25 18:41:34 +08:00
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if err := validateBinlogHeader(f); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
|
|
|
|
|
return parseBinlogDetail(br, fx)
|
|
|
|
|
|
}
|
2023-04-25 18:41:34 +08:00
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
func validateBinlogHeader(f *os.File) error {
|
|
|
|
|
|
const fileTypeBytes = int64(4)
|
2023-04-25 18:41:34 +08:00
|
|
|
|
b := make([]byte, fileTypeBytes)
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
if _, err := f.Read(b); err != nil {
|
|
|
|
|
|
return fmt.Errorf("read binlog header failed: %w", err)
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if !bytes.Equal(b, replication.BinLogFileHeader) {
|
|
|
|
|
|
return ErrInvalidBinlogHeader
|
|
|
|
|
|
}
|
|
|
|
|
|
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
|
|
|
|
|
|
return fmt.Errorf("seek after header failed: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
|
|
|
|
|
|
if _, err := io.ReadFull(r, headBuf); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
h, err := parser.ParseHeader(headBuf)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("parse header failed: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if h.EventSize <= uint32(replication.EventHeaderSize) {
|
|
|
|
|
|
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
|
|
|
|
|
|
}
|
|
|
|
|
|
return h, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
|
|
|
|
|
|
bodyLen := int(h.EventSize) - replication.EventHeaderSize
|
|
|
|
|
|
body := getBodyBuf(bodyLen)
|
|
|
|
|
|
if _, err := io.ReadFull(r, body); err != nil {
|
|
|
|
|
|
putBodyBuf(body)
|
|
|
|
|
|
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
|
|
|
|
|
|
}
|
|
|
|
|
|
return body, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
|
|
|
|
|
|
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
|
|
|
|
|
|
if bodyLen <= 0 {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
|
|
|
|
|
|
return fmt.Errorf("skip event body failed: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) {
|
|
|
|
|
|
e, err := parser.ParseEvent(h, body, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return e, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
|
|
|
|
|
|
idx := 0
|
|
|
|
|
|
for k, v := range tx.Txs {
|
|
|
|
|
|
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
|
|
|
|
|
|
v.Sql = tx.sqlOrigin[idx]
|
|
|
|
|
|
idx++
|
|
|
|
|
|
}
|
|
|
|
|
|
tx.RowsCount += v.RowCount
|
|
|
|
|
|
tx.Txs[k] = v
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if onlyShowGtid {
|
|
|
|
|
|
tx.Size = 0
|
|
|
|
|
|
} else {
|
|
|
|
|
|
tx.Size = tx.EndPos - tx.StartPos
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func fillTimeLazy(tx *Transaction) {
|
|
|
|
|
|
if tx.Timestamp != 0 && tx.Time.IsZero() {
|
|
|
|
|
|
tx.Time = time.Unix(tx.Timestamp, 0)
|
|
|
|
|
|
}
|
|
|
|
|
|
for i := range tx.Txs {
|
|
|
|
|
|
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
|
|
|
|
|
|
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
|
|
|
|
|
|
}
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-06-30 17:55:45 +08:00
|
|
|
|
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
parser := replication.NewBinlogParser()
|
|
|
|
|
|
parser.SetParseTime(false)
|
|
|
|
|
|
parser.SetUseDecimal(false)
|
|
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
var (
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tbMapPos uint32
|
|
|
|
|
|
tx Transaction
|
|
|
|
|
|
headBuf = make([]byte, replication.EventHeaderSize)
|
2023-04-25 18:41:34 +08:00
|
|
|
|
)
|
2023-04-25 20:27:48 +08:00
|
|
|
|
currentGtid := ""
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
for {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
h, err := readEventHeader(r, parser, headBuf)
|
|
|
|
|
|
if err == io.EOF {
|
|
|
|
|
|
if currentGtid != "" {
|
|
|
|
|
|
finalizeTx(&tx, false)
|
|
|
|
|
|
fillTimeLazy(&tx)
|
|
|
|
|
|
if f != nil {
|
|
|
|
|
|
f(tx)
|
2023-04-29 12:26:54 +08:00
|
|
|
|
}
|
2023-04-29 11:29:29 +08:00
|
|
|
|
}
|
2023-04-25 18:41:34 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
body, err := readEventBody(r, h)
|
|
|
|
|
|
if err != nil {
|
2023-04-25 18:41:34 +08:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
e, err := parseEvent(parser, h, body)
|
|
|
|
|
|
putBodyBuf(body)
|
2023-04-25 18:41:34 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
if h.EventType == replication.TABLE_MAP_EVENT {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tbMapPos = h.LogPos - h.EventSize
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
2023-06-29 13:16:42 +08:00
|
|
|
|
for _, ev := range evs {
|
|
|
|
|
|
startPos := 0
|
|
|
|
|
|
if ev.Type == "query" || ev.Type == "gtid" {
|
|
|
|
|
|
startPos = int(h.LogPos - h.EventSize)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
startPos = int(tbMapPos)
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
switch ev.Type {
|
|
|
|
|
|
case "gtid":
|
|
|
|
|
|
if currentGtid != "" {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
finalizeTx(&tx, false)
|
|
|
|
|
|
fillTimeLazy(&tx)
|
|
|
|
|
|
if f != nil && !f(tx) {
|
|
|
|
|
|
return nil
|
2023-04-29 11:29:29 +08:00
|
|
|
|
}
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
currentGtid = ev.Data
|
|
|
|
|
|
tx = Transaction{
|
|
|
|
|
|
GTID: ev.Data,
|
|
|
|
|
|
StartPos: startPos,
|
|
|
|
|
|
Timestamp: int64(h.Timestamp),
|
2026-03-08 20:07:59 +08:00
|
|
|
|
Txs: make([]TxDetail, 0, 8),
|
|
|
|
|
|
sqlOrigin: make([]string, 0, 4),
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
case "":
|
|
|
|
|
|
tx.EndPos = int(h.LogPos)
|
|
|
|
|
|
case "rowsquery":
|
|
|
|
|
|
tx.EndPos = int(h.LogPos)
|
|
|
|
|
|
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
|
|
|
|
|
default:
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tx.EndPos = int(h.LogPos)
|
2023-07-03 13:47:39 +08:00
|
|
|
|
status := STATUS_PREPARE
|
|
|
|
|
|
if ev.Type == "query" {
|
|
|
|
|
|
switch strings.ToLower(ev.Data) {
|
|
|
|
|
|
case "begin":
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if tx.TxStartTime == 0 {
|
|
|
|
|
|
tx.TxStartTime = int64(h.Timestamp)
|
|
|
|
|
|
}
|
2023-07-03 13:47:39 +08:00
|
|
|
|
status = STATUS_BEGIN
|
|
|
|
|
|
case "commit":
|
|
|
|
|
|
status = STATUS_COMMIT
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tx.TxEndTime = int64(h.Timestamp)
|
2023-07-03 13:47:39 +08:00
|
|
|
|
case "rollback":
|
|
|
|
|
|
status = STATUS_ROLLBACK
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tx.TxEndTime = int64(h.Timestamp)
|
2023-07-03 13:47:39 +08:00
|
|
|
|
}
|
|
|
|
|
|
tx.Status = status
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if ev.DB != "" && ev.TB != "" {
|
|
|
|
|
|
tx.dmlEventCount++
|
|
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
tx.Txs = append(tx.Txs, TxDetail{
|
|
|
|
|
|
StartPos: startPos,
|
|
|
|
|
|
EndPos: int(h.LogPos),
|
|
|
|
|
|
Db: ev.DB,
|
|
|
|
|
|
Table: ev.TB,
|
|
|
|
|
|
Sql: ev.Data,
|
|
|
|
|
|
SqlType: ev.Type,
|
|
|
|
|
|
Rows: ev.Rows,
|
|
|
|
|
|
RowCount: int(ev.RowCnt),
|
|
|
|
|
|
Timestamp: int64(h.Timestamp),
|
|
|
|
|
|
CompressionType: ev.CompressionType,
|
|
|
|
|
|
})
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
}
|
2023-04-25 18:41:34 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
|
|
|
|
|
|
var res []BinlogEvent
|
|
|
|
|
|
var sig BinlogEvent
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
switch ev.Header.EventType {
|
2023-04-29 17:16:24 +08:00
|
|
|
|
case replication.ANONYMOUS_GTID_EVENT:
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.Data = "anonymous-gtid-event:1"
|
|
|
|
|
|
sig.Type = "gtid"
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
|
|
|
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return res
|
|
|
|
|
|
}
|
2023-06-30 12:44:34 +08:00
|
|
|
|
sig.DB = string(wrEvent.Table.Schema)
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.TB = string(wrEvent.Table.Table)
|
|
|
|
|
|
sig.Type = "insert"
|
|
|
|
|
|
sig.RowCnt = uint32(len(wrEvent.Rows))
|
|
|
|
|
|
sig.Rows = wrEvent.Rows
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
|
|
|
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return res
|
|
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.DB = string(wrEvent.Table.Schema)
|
2023-06-30 12:44:34 +08:00
|
|
|
|
sig.TB = string(wrEvent.Table.Table)
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.Type = "update"
|
|
|
|
|
|
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
|
|
|
|
|
|
sig.Rows = wrEvent.Rows
|
2023-04-25 18:41:34 +08:00
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
|
|
|
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return res
|
|
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.DB = string(wrEvent.Table.Schema)
|
|
|
|
|
|
sig.TB = string(wrEvent.Table.Table)
|
|
|
|
|
|
sig.Type = "delete"
|
|
|
|
|
|
sig.RowCnt = uint32(len(wrEvent.Rows))
|
|
|
|
|
|
sig.Rows = wrEvent.Rows
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
case replication.ROWS_QUERY_EVENT:
|
2026-03-08 20:07:59 +08:00
|
|
|
|
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return res
|
|
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.Data = string(queryEvent.Query)
|
|
|
|
|
|
sig.Type = "rowsquery"
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
case replication.QUERY_EVENT:
|
2026-03-08 20:07:59 +08:00
|
|
|
|
queryEvent, ok := ev.Event.(*replication.QueryEvent)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return res
|
|
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.DB = string(queryEvent.Schema)
|
|
|
|
|
|
sig.Data = string(queryEvent.Query)
|
|
|
|
|
|
sig.Type = "query"
|
2023-04-25 18:41:34 +08:00
|
|
|
|
|
|
|
|
|
|
case replication.MARIADB_GTID_EVENT:
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.Data = "begin"
|
|
|
|
|
|
sig.Type = "query"
|
2023-04-25 18:41:34 +08:00
|
|
|
|
|
|
|
|
|
|
case replication.XID_EVENT:
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.Data = "commit"
|
|
|
|
|
|
sig.Type = "query"
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-04-25 18:41:34 +08:00
|
|
|
|
case replication.GTID_EVENT:
|
2026-03-08 20:07:59 +08:00
|
|
|
|
ge, ok := ev.Event.(*replication.GTIDEvent)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return res
|
|
|
|
|
|
}
|
2023-04-25 19:30:24 +08:00
|
|
|
|
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
|
|
|
|
|
|
} else {
|
2023-06-29 13:16:42 +08:00
|
|
|
|
sig.Data = gid.String()
|
|
|
|
|
|
}
|
|
|
|
|
|
sig.Type = "gtid"
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
case replication.TRANSACTION_PAYLOAD_EVENT:
|
2026-03-08 20:07:59 +08:00
|
|
|
|
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return res
|
|
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
for _, val := range ge.Events {
|
|
|
|
|
|
res = append(res, ParseBinlogEvent(val)...)
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
compressionType := getCompressionTypeName(ge.CompressionType)
|
2023-06-29 13:16:42 +08:00
|
|
|
|
for idx := range res {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
res[idx].CompressionType = compressionType
|
2023-06-29 13:16:42 +08:00
|
|
|
|
}
|
|
|
|
|
|
return res
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
res = append(res, sig)
|
|
|
|
|
|
return res
|
2023-04-25 18:41:34 +08:00
|
|
|
|
}
|
2023-05-24 15:25:31 +08:00
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
func getCompressionTypeName(code uint64) string {
|
|
|
|
|
|
switch code {
|
|
|
|
|
|
case CompressionZSTD:
|
|
|
|
|
|
return "ZSTD"
|
|
|
|
|
|
case CompressionNone:
|
|
|
|
|
|
return ""
|
|
|
|
|
|
default:
|
|
|
|
|
|
return fmt.Sprintf("UNKNOWN(%d)", code)
|
|
|
|
|
|
}
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
|
|
|
|
|
|
if !staros.Exists(path) {
|
|
|
|
|
|
return os.ErrNotExist
|
2023-07-03 13:37:39 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
f, err := os.Open(path)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
2023-07-03 14:03:45 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
|
|
|
|
parser := replication.NewBinlogParser()
|
|
|
|
|
|
parser.SetParseTime(false)
|
|
|
|
|
|
parser.SetUseDecimal(false)
|
|
|
|
|
|
|
|
|
|
|
|
if pos != 0 {
|
|
|
|
|
|
if err := seekToPosition(f, parser, pos); err != nil {
|
|
|
|
|
|
return err
|
2023-07-03 14:03:45 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
if err := validateBinlogHeader(f); err != nil {
|
|
|
|
|
|
return err
|
2023-07-03 14:03:45 +08:00
|
|
|
|
}
|
2024-04-07 15:50:21 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
|
|
|
|
|
return parseBinlogWithFilter(br, parser, filter, fx)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
|
|
|
|
|
|
if err := validateBinlogHeader(f); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
headBuf := make([]byte, replication.EventHeaderSize)
|
|
|
|
|
|
for {
|
|
|
|
|
|
h, err := readEventHeader(f, parser, headBuf)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("seek to position failed: %w", err)
|
2024-04-07 15:50:21 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
body, err := readEventBody(f, h)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
2024-04-07 15:50:21 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
_, err = parseEvent(parser, h, body)
|
|
|
|
|
|
putBodyBuf(body)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
2024-04-07 15:50:21 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
|
|
|
|
|
|
break
|
2024-04-07 15:50:21 +08:00
|
|
|
|
}
|
2023-07-03 13:37:39 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
if _, err := f.Seek(pos, io.SeekStart); err != nil {
|
|
|
|
|
|
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
|
|
|
|
|
|
var subGtid, inGtid, exGtid *gtid.Gtid
|
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
|
|
includeMatcher, excludeMatcher := prepareTableMatchers(filter)
|
|
|
|
|
|
|
2023-05-24 15:25:31 +08:00
|
|
|
|
if filter.IncludeGtid != "" {
|
|
|
|
|
|
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
|
|
|
|
|
if err != nil {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
return fmt.Errorf("parse include gtid failed: %w", err)
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2023-06-30 17:23:07 +08:00
|
|
|
|
subGtid = inGtid.Clone()
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
if filter.ExcludeGtid != "" {
|
|
|
|
|
|
exGtid, err = gtid.Parse(filter.ExcludeGtid)
|
|
|
|
|
|
if err != nil {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
return fmt.Errorf("parse exclude gtid failed: %w", err)
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-05-24 15:25:31 +08:00
|
|
|
|
var (
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tbMapPos uint32
|
|
|
|
|
|
skipCurrentTxn bool
|
|
|
|
|
|
tx Transaction
|
|
|
|
|
|
headBuf = make([]byte, replication.EventHeaderSize)
|
2023-05-24 15:25:31 +08:00
|
|
|
|
)
|
|
|
|
|
|
currentGtid := ""
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-30 17:55:45 +08:00
|
|
|
|
callFn := func(tx Transaction) bool {
|
2023-05-24 15:25:31 +08:00
|
|
|
|
if fn == nil {
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
fillTimeLazy(&tx)
|
|
|
|
|
|
|
2023-05-24 15:25:31 +08:00
|
|
|
|
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
if filter.BigThan != 0 && filter.BigThan > tx.Size {
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
|
2023-07-03 14:03:45 +08:00
|
|
|
|
return true
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2024-04-07 15:50:21 +08:00
|
|
|
|
var txs []TxDetail
|
|
|
|
|
|
var matched bool
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2024-04-07 15:50:21 +08:00
|
|
|
|
for _, t := range tx.Txs {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
|
|
|
|
|
|
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
|
|
|
|
|
|
|
|
|
|
|
|
if t.Db == "" && t.Table == "" {
|
|
|
|
|
|
if includeMatcher != nil && !filter.IncludeBlank {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
if excludeMatcher != nil && filter.ExcludeBlank {
|
|
|
|
|
|
matched = true
|
|
|
|
|
|
if filter.PickTxAllIfMatch {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if includeMatcher != nil {
|
|
|
|
|
|
if includeMatch {
|
2024-04-07 15:50:21 +08:00
|
|
|
|
matched = true
|
|
|
|
|
|
if filter.PickTxAllIfMatch {
|
|
|
|
|
|
return fn(tx)
|
|
|
|
|
|
}
|
|
|
|
|
|
txs = append(txs, t)
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
} else if excludeMatcher != nil {
|
|
|
|
|
|
if excludeMatch {
|
2024-04-07 15:50:21 +08:00
|
|
|
|
matched = true
|
|
|
|
|
|
if filter.PickTxAllIfMatch {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
txs = append(txs, t)
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
txs = append(txs, t)
|
2024-04-07 15:50:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2024-04-07 16:17:17 +08:00
|
|
|
|
if matched {
|
2024-04-07 16:13:32 +08:00
|
|
|
|
tx.Txs = txs
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if !matched && includeMatcher != nil {
|
2024-04-07 16:28:20 +08:00
|
|
|
|
return true
|
|
|
|
|
|
}
|
2024-04-07 16:13:32 +08:00
|
|
|
|
if len(tx.Txs) == 0 && matched {
|
2024-04-07 15:50:21 +08:00
|
|
|
|
return true
|
|
|
|
|
|
}
|
2023-06-30 17:55:45 +08:00
|
|
|
|
return fn(tx)
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-05-24 15:25:31 +08:00
|
|
|
|
for {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
h, err := readEventHeader(r, parser, headBuf)
|
|
|
|
|
|
if err == io.EOF {
|
|
|
|
|
|
if !tx.Time.IsZero() || tx.Timestamp != 0 {
|
|
|
|
|
|
finalizeTx(&tx, filter.OnlyShowGtid)
|
|
|
|
|
|
callFn(tx)
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
// GTID-only fast path
|
|
|
|
|
|
if filter.OnlyShowGtid {
|
|
|
|
|
|
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
|
|
|
|
|
|
if err := skipEventBody(r, h); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2023-06-30 13:37:41 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
body, err := readEventBody(r, h)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
2023-06-30 13:37:41 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
e, err := parseEvent(parser, h, body)
|
|
|
|
|
|
putBodyBuf(body)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
|
|
|
|
|
for _, ev := range evs {
|
|
|
|
|
|
if ev.Type != "gtid" {
|
|
|
|
|
|
continue
|
2023-06-29 13:16:42 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
startPos := int(h.LogPos - h.EventSize)
|
|
|
|
|
|
|
|
|
|
|
|
if filter.EndPos != 0 && startPos > filter.EndPos {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
if filter.StartPos != 0 && startPos < filter.StartPos {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
if currentGtid != "" {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tx.EndPos = startPos - 1
|
|
|
|
|
|
finalizeTx(&tx, true)
|
2023-06-30 17:55:45 +08:00
|
|
|
|
if !callFn(tx) {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2023-06-30 17:23:07 +08:00
|
|
|
|
if subGtid != nil {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
2023-06-30 17:23:07 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2023-06-30 16:32:14 +08:00
|
|
|
|
tx = Transaction{}
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
currentGtid = ev.Data
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
if inGtid != nil {
|
|
|
|
|
|
if c, _ := inGtid.Contain(ev.Data); !c {
|
2023-07-03 17:36:00 +08:00
|
|
|
|
tx = Transaction{}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
currentGtid = ""
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
if exGtid != nil {
|
|
|
|
|
|
if c, _ := exGtid.Contain(ev.Data); c {
|
|
|
|
|
|
currentGtid = ""
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tx = Transaction{}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
tx = Transaction{
|
|
|
|
|
|
GTID: ev.Data,
|
|
|
|
|
|
StartPos: startPos,
|
2026-03-08 20:07:59 +08:00
|
|
|
|
EndPos: startPos,
|
2023-06-29 13:16:42 +08:00
|
|
|
|
Timestamp: int64(h.Timestamp),
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
}
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 先处理GTID事件(决定当前事务是否命中)
|
|
|
|
|
|
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
|
|
|
|
|
|
body, err := readEventBody(r, h)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
e, err := parseEvent(parser, h, body)
|
|
|
|
|
|
putBodyBuf(body)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
|
|
|
|
|
for _, ev := range evs {
|
|
|
|
|
|
if ev.Type != "gtid" {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
startPos := int(h.LogPos - h.EventSize)
|
|
|
|
|
|
|
|
|
|
|
|
if currentGtid != "" {
|
|
|
|
|
|
finalizeTx(&tx, false)
|
|
|
|
|
|
if !callFn(tx) {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if subGtid != nil {
|
|
|
|
|
|
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
tx = Transaction{}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
currentGtid = ev.Data
|
|
|
|
|
|
skipCurrentTxn = false
|
|
|
|
|
|
|
|
|
|
|
|
if filter.EndPos != 0 && startPos > filter.EndPos {
|
|
|
|
|
|
skipCurrentTxn = true
|
|
|
|
|
|
}
|
|
|
|
|
|
if filter.StartPos != 0 && startPos < filter.StartPos {
|
|
|
|
|
|
skipCurrentTxn = true
|
|
|
|
|
|
}
|
|
|
|
|
|
if inGtid != nil {
|
|
|
|
|
|
if c, _ := inGtid.Contain(ev.Data); !c {
|
|
|
|
|
|
skipCurrentTxn = true
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if exGtid != nil {
|
|
|
|
|
|
if c, _ := exGtid.Contain(ev.Data); c {
|
|
|
|
|
|
skipCurrentTxn = true
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if !skipCurrentTxn {
|
|
|
|
|
|
tx = Transaction{
|
|
|
|
|
|
GTID: ev.Data,
|
|
|
|
|
|
StartPos: startPos,
|
|
|
|
|
|
Timestamp: int64(h.Timestamp),
|
|
|
|
|
|
Txs: make([]TxDetail, 0, 8),
|
|
|
|
|
|
sqlOrigin: make([]string, 0, 4),
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
tx = Transaction{}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 未命中事务:零解析到底
|
|
|
|
|
|
if skipCurrentTxn {
|
|
|
|
|
|
if err := skipEventBody(r, h); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
body, err := readEventBody(r, h)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
e, err := parseEvent(parser, h, body)
|
|
|
|
|
|
putBodyBuf(body)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if h.EventType == replication.TABLE_MAP_EVENT {
|
|
|
|
|
|
tbMapPos = h.LogPos - h.EventSize
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
|
|
|
|
|
for _, ev := range evs {
|
|
|
|
|
|
startPos := 0
|
|
|
|
|
|
if ev.Type == "query" || ev.Type == "gtid" {
|
|
|
|
|
|
startPos = int(h.LogPos - h.EventSize)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
startPos = int(tbMapPos)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
switch ev.Type {
|
2023-06-29 13:16:42 +08:00
|
|
|
|
case "":
|
|
|
|
|
|
tx.EndPos = int(h.LogPos)
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
case "rowsquery":
|
|
|
|
|
|
tx.EndPos = int(h.LogPos)
|
|
|
|
|
|
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
2023-06-29 13:16:42 +08:00
|
|
|
|
default:
|
|
|
|
|
|
tx.EndPos = int(h.LogPos)
|
2023-07-03 13:47:39 +08:00
|
|
|
|
status := STATUS_PREPARE
|
|
|
|
|
|
if ev.Type == "query" {
|
|
|
|
|
|
switch strings.ToLower(ev.Data) {
|
|
|
|
|
|
case "begin":
|
2023-07-03 15:45:54 +08:00
|
|
|
|
if tx.TxStartTime == 0 {
|
|
|
|
|
|
tx.TxStartTime = int64(h.Timestamp)
|
|
|
|
|
|
}
|
2023-07-03 13:47:39 +08:00
|
|
|
|
status = STATUS_BEGIN
|
|
|
|
|
|
case "commit":
|
|
|
|
|
|
status = STATUS_COMMIT
|
2023-07-03 15:45:54 +08:00
|
|
|
|
tx.TxEndTime = int64(h.Timestamp)
|
2023-07-03 13:47:39 +08:00
|
|
|
|
case "rollback":
|
|
|
|
|
|
status = STATUS_ROLLBACK
|
2023-07-03 15:45:54 +08:00
|
|
|
|
tx.TxEndTime = int64(h.Timestamp)
|
2023-07-03 13:47:39 +08:00
|
|
|
|
}
|
|
|
|
|
|
tx.Status = status
|
|
|
|
|
|
}
|
2023-07-03 14:46:11 +08:00
|
|
|
|
if ev.DB != "" && ev.TB != "" {
|
2026-03-08 20:07:59 +08:00
|
|
|
|
tx.dmlEventCount++
|
2023-07-03 14:46:11 +08:00
|
|
|
|
}
|
2023-06-29 13:16:42 +08:00
|
|
|
|
tx.Txs = append(tx.Txs, TxDetail{
|
|
|
|
|
|
StartPos: startPos,
|
|
|
|
|
|
EndPos: int(h.LogPos),
|
|
|
|
|
|
Db: ev.DB,
|
|
|
|
|
|
Table: ev.TB,
|
|
|
|
|
|
Sql: ev.Data,
|
|
|
|
|
|
SqlType: ev.Type,
|
|
|
|
|
|
Rows: ev.Rows,
|
|
|
|
|
|
RowCount: int(ev.RowCnt),
|
|
|
|
|
|
Timestamp: int64(h.Timestamp),
|
|
|
|
|
|
CompressionType: ev.CompressionType,
|
|
|
|
|
|
})
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-08 20:07:59 +08:00
|
|
|
|
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) {
|
|
|
|
|
|
if len(filter.IncludeTables) > 0 {
|
|
|
|
|
|
includeMatcher = buildTableMatcher(filter.IncludeTables)
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
if len(filter.ExcludeTables) > 0 {
|
|
|
|
|
|
excludeMatcher = buildTableMatcher(filter.ExcludeTables)
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
return includeMatcher, excludeMatcher
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func buildTableMatcher(patterns []string) *tableMatcher {
|
|
|
|
|
|
m := &tableMatcher{
|
|
|
|
|
|
exactMatch: make(map[string]bool),
|
|
|
|
|
|
dbWildcard: make(map[string]bool),
|
|
|
|
|
|
tbWildcard: make(map[string]bool),
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
|
|
|
|
|
|
for _, pattern := range patterns {
|
|
|
|
|
|
if pattern == "*.*" {
|
|
|
|
|
|
m.matchAll = true
|
|
|
|
|
|
continue
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
parts := strings.Split(pattern, ".")
|
|
|
|
|
|
if len(parts) != 2 {
|
|
|
|
|
|
continue
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
db, tb := parts[0], parts[1]
|
|
|
|
|
|
if db == "*" && tb == "*" {
|
|
|
|
|
|
m.matchAll = true
|
|
|
|
|
|
} else if db == "*" {
|
|
|
|
|
|
m.tbWildcard[tb] = true
|
|
|
|
|
|
} else if tb == "*" {
|
|
|
|
|
|
m.dbWildcard[db] = true
|
|
|
|
|
|
} else {
|
|
|
|
|
|
m.exactMatch[pattern] = true
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-03-08 20:07:59 +08:00
|
|
|
|
return m
|
2023-05-24 15:25:31 +08:00
|
|
|
|
}
|