mysqlbinlog/parse.go

924 lines
21 KiB
Go
Raw Normal View History

2023-04-25 18:41:34 +08:00
package binlog
import (
"b612.me/mysql/gtid"
"b612.me/staros"
2026-03-08 20:07:59 +08:00
"bufio"
2023-04-25 18:41:34 +08:00
"bytes"
2026-03-08 20:07:59 +08:00
"errors"
2023-04-25 18:41:34 +08:00
"fmt"
"github.com/starainrt/go-mysql/replication"
"io"
"os"
2023-07-03 13:47:39 +08:00
"strings"
2026-03-08 20:07:59 +08:00
"sync"
2023-04-25 19:30:24 +08:00
"time"
2023-04-25 18:41:34 +08:00
)
2026-03-08 20:07:59 +08:00
var (
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
ErrEventTooSmall = errors.New("event size too small")
)
const (
CompressionNone uint64 = 255
CompressionZSTD uint64 = 0
)
const (
maxPooledBodyCap = 4 << 20 // 4MB
defaultReadBufSize = 1 << 20 // 1MB
)
2023-04-25 18:41:34 +08:00
type TxDetail struct {
2026-03-08 20:07:59 +08:00
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
RowCount int `json:"rowCount"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
Sql string `json:"sql"`
Db string `json:"db"`
Table string `json:"table"`
SqlType string `json:"sqlType"`
CompressionType string `json:"compressionType"`
Rows [][]interface{} `json:"rows"`
2023-04-25 18:41:34 +08:00
}
2023-07-03 13:47:39 +08:00
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
2023-04-25 18:41:34 +08:00
type Transaction struct {
2026-03-08 20:07:59 +08:00
GTID string `json:"gtid"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
Size int `json:"size"`
RowsCount int `json:"rowsCount"`
Status uint8 `json:"status"`
TxStartTime int64 `json:"txStartTime"`
TxEndTime int64 `json:"txEndTime"`
sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
dmlEventCount int
2023-04-25 18:41:34 +08:00
}
2023-04-29 11:29:29 +08:00
func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin
}
2026-03-08 20:07:59 +08:00
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
IncludeTables []string
ExcludeTables []string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
OnlyShowGtid bool
OnlyShowDML bool
PickTxAllIfMatch bool
ExcludeBlank bool
IncludeBlank bool
}
type BinlogEvent struct {
Type string
DB string
TB string
Data string
RowCnt uint32
Rows [][]interface{}
CompressionType string
}
type tableMatcher struct {
exactMatch map[string]bool
dbWildcard map[string]bool
tbWildcard map[string]bool
matchAll bool
}
func (m *tableMatcher) match(db, tb string) bool {
if m.matchAll {
return true
}
if m.exactMatch[db+"."+tb] {
return true
}
if m.dbWildcard[db] {
return true
}
if m.tbWildcard[tb] {
return true
}
return false
}
var bodyBufPool = sync.Pool{
New: func() any {
b := make([]byte, 0, 64*1024)
return &b
},
}
func getBodyBuf(n int) []byte {
p := bodyBufPool.Get().(*[]byte)
if cap(*p) < n {
b := make([]byte, n)
return b
}
return (*p)[:n]
}
func putBodyBuf(b []byte) {
if cap(b) > maxPooledBodyCap {
return
}
b = b[:0]
bodyBufPool.Put(&b)
}
2023-06-30 17:55:45 +08:00
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
2023-04-25 18:41:34 +08:00
return parseOneBinlog(path, fx)
}
2026-03-08 20:07:59 +08:00
2023-06-30 17:55:45 +08:00
func parseOneBinlog(path string, fx func(Transaction) bool) error {
2023-04-25 18:41:34 +08:00
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
2026-03-08 20:07:59 +08:00
defer f.Close()
2023-04-25 18:41:34 +08:00
2026-03-08 20:07:59 +08:00
if err := validateBinlogHeader(f); err != nil {
return err
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogDetail(br, fx)
}
2023-04-25 18:41:34 +08:00
2026-03-08 20:07:59 +08:00
func validateBinlogHeader(f *os.File) error {
const fileTypeBytes = int64(4)
2023-04-25 18:41:34 +08:00
b := make([]byte, fileTypeBytes)
2026-03-08 20:07:59 +08:00
if _, err := f.Read(b); err != nil {
return fmt.Errorf("read binlog header failed: %w", err)
2023-04-25 18:41:34 +08:00
}
2026-03-08 20:07:59 +08:00
if !bytes.Equal(b, replication.BinLogFileHeader) {
return ErrInvalidBinlogHeader
}
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
return fmt.Errorf("seek after header failed: %w", err)
}
return nil
}
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
if _, err := io.ReadFull(r, headBuf); err != nil {
return nil, err
}
h, err := parser.ParseHeader(headBuf)
if err != nil {
return nil, fmt.Errorf("parse header failed: %w", err)
}
if h.EventSize <= uint32(replication.EventHeaderSize) {
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
}
return h, nil
}
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
bodyLen := int(h.EventSize) - replication.EventHeaderSize
body := getBodyBuf(bodyLen)
if _, err := io.ReadFull(r, body); err != nil {
putBodyBuf(body)
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
}
return body, nil
}
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
if bodyLen <= 0 {
return nil
}
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
return fmt.Errorf("skip event body failed: %w", err)
}
return nil
}
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) {
e, err := parser.ParseEvent(h, body, nil)
if err != nil {
return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err)
}
return e, nil
}
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
if onlyShowGtid {
tx.Size = 0
} else {
tx.Size = tx.EndPos - tx.StartPos
}
}
func fillTimeLazy(tx *Transaction) {
if tx.Timestamp != 0 && tx.Time.IsZero() {
tx.Time = time.Unix(tx.Timestamp, 0)
}
for i := range tx.Txs {
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
}
2023-04-25 18:41:34 +08:00
}
}
2023-06-30 17:55:45 +08:00
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
2026-03-08 20:07:59 +08:00
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
2023-04-25 18:41:34 +08:00
var (
2026-03-08 20:07:59 +08:00
tbMapPos uint32
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
2023-04-25 18:41:34 +08:00
)
2023-04-25 20:27:48 +08:00
currentGtid := ""
2026-03-08 20:07:59 +08:00
2023-04-25 18:41:34 +08:00
for {
2026-03-08 20:07:59 +08:00
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil {
f(tx)
2023-04-29 12:26:54 +08:00
}
2023-04-29 11:29:29 +08:00
}
2023-04-25 18:41:34 +08:00
return nil
}
if err != nil {
return err
}
2026-03-08 20:07:59 +08:00
body, err := readEventBody(r, h)
if err != nil {
2023-04-25 18:41:34 +08:00
return err
}
2026-03-08 20:07:59 +08:00
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
2023-04-25 18:41:34 +08:00
if err != nil {
return err
}
2026-03-08 20:07:59 +08:00
2023-04-25 18:41:34 +08:00
if h.EventType == replication.TABLE_MAP_EVENT {
2026-03-08 20:07:59 +08:00
tbMapPos = h.LogPos - h.EventSize
2023-04-25 18:41:34 +08:00
}
2026-03-08 20:07:59 +08:00
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
2023-06-29 13:16:42 +08:00
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
switch ev.Type {
case "gtid":
if currentGtid != "" {
2026-03-08 20:07:59 +08:00
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil && !f(tx) {
return nil
2023-04-29 11:29:29 +08:00
}
2023-04-25 18:41:34 +08:00
}
2023-06-29 13:16:42 +08:00
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
2026-03-08 20:07:59 +08:00
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
2023-04-25 18:41:34 +08:00
}
2023-06-29 13:16:42 +08:00
case "":
tx.EndPos = int(h.LogPos)
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
2026-03-08 20:07:59 +08:00
tx.EndPos = int(h.LogPos)
2023-07-03 13:47:39 +08:00
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
2026-03-08 20:07:59 +08:00
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
2023-07-03 13:47:39 +08:00
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
2026-03-08 20:07:59 +08:00
tx.TxEndTime = int64(h.Timestamp)
2023-07-03 13:47:39 +08:00
case "rollback":
status = STATUS_ROLLBACK
2026-03-08 20:07:59 +08:00
tx.TxEndTime = int64(h.Timestamp)
2023-07-03 13:47:39 +08:00
}
tx.Status = status
}
2026-03-08 20:07:59 +08:00
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
2023-06-29 13:16:42 +08:00
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
2023-04-25 18:41:34 +08:00
}
}
}
2023-06-29 13:16:42 +08:00
}
2023-04-25 18:41:34 +08:00
2023-06-29 13:16:42 +08:00
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var res []BinlogEvent
var sig BinlogEvent
2026-03-08 20:07:59 +08:00
2023-04-25 18:41:34 +08:00
switch ev.Header.EventType {
2023-04-29 17:16:24 +08:00
case replication.ANONYMOUS_GTID_EVENT:
2023-06-29 13:16:42 +08:00
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
2026-03-08 20:07:59 +08:00
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return res
}
2023-06-30 12:44:34 +08:00
sig.DB = string(wrEvent.Table.Schema)
2023-06-29 13:16:42 +08:00
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
2026-03-08 20:07:59 +08:00
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return res
}
2023-06-29 13:16:42 +08:00
sig.DB = string(wrEvent.Table.Schema)
2023-06-30 12:44:34 +08:00
sig.TB = string(wrEvent.Table.Table)
2023-06-29 13:16:42 +08:00
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = wrEvent.Rows
2023-04-25 18:41:34 +08:00
2026-03-08 20:07:59 +08:00
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return res
}
2023-06-29 13:16:42 +08:00
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
2026-03-08 20:07:59 +08:00
2023-04-25 18:41:34 +08:00
case replication.ROWS_QUERY_EVENT:
2026-03-08 20:07:59 +08:00
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
if !ok {
return res
}
2023-06-29 13:16:42 +08:00
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
2026-03-08 20:07:59 +08:00
2023-04-25 18:41:34 +08:00
case replication.QUERY_EVENT:
2026-03-08 20:07:59 +08:00
queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok {
return res
}
2023-06-29 13:16:42 +08:00
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
2023-04-25 18:41:34 +08:00
case replication.MARIADB_GTID_EVENT:
2023-06-29 13:16:42 +08:00
sig.Data = "begin"
sig.Type = "query"
2023-04-25 18:41:34 +08:00
case replication.XID_EVENT:
2023-06-29 13:16:42 +08:00
sig.Data = "commit"
sig.Type = "query"
2026-03-08 20:07:59 +08:00
2023-04-25 18:41:34 +08:00
case replication.GTID_EVENT:
2026-03-08 20:07:59 +08:00
ge, ok := ev.Event.(*replication.GTIDEvent)
if !ok {
return res
}
2023-04-25 19:30:24 +08:00
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
2026-03-08 20:07:59 +08:00
if err != nil {
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
} else {
2023-06-29 13:16:42 +08:00
sig.Data = gid.String()
}
sig.Type = "gtid"
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
case replication.TRANSACTION_PAYLOAD_EVENT:
2026-03-08 20:07:59 +08:00
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
if !ok {
return res
}
2023-06-29 13:16:42 +08:00
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
2023-04-25 18:41:34 +08:00
}
2026-03-08 20:07:59 +08:00
compressionType := getCompressionTypeName(ge.CompressionType)
2023-06-29 13:16:42 +08:00
for idx := range res {
2026-03-08 20:07:59 +08:00
res[idx].CompressionType = compressionType
2023-06-29 13:16:42 +08:00
}
return res
2023-04-25 18:41:34 +08:00
}
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
res = append(res, sig)
return res
2023-04-25 18:41:34 +08:00
}
2023-05-24 15:25:31 +08:00
2026-03-08 20:07:59 +08:00
func getCompressionTypeName(code uint64) string {
switch code {
case CompressionZSTD:
return "ZSTD"
case CompressionNone:
return ""
default:
return fmt.Sprintf("UNKNOWN(%d)", code)
}
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
2023-07-03 13:37:39 +08:00
}
2026-03-08 20:07:59 +08:00
f, err := os.Open(path)
if err != nil {
return err
2023-07-03 14:03:45 +08:00
}
2026-03-08 20:07:59 +08:00
defer f.Close()
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
if pos != 0 {
if err := seekToPosition(f, parser, pos); err != nil {
return err
2023-07-03 14:03:45 +08:00
}
2026-03-08 20:07:59 +08:00
} else {
if err := validateBinlogHeader(f); err != nil {
return err
2023-07-03 14:03:45 +08:00
}
2024-04-07 15:50:21 +08:00
}
2026-03-08 20:07:59 +08:00
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogWithFilter(br, parser, filter, fx)
}
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
if err := validateBinlogHeader(f); err != nil {
return err
}
headBuf := make([]byte, replication.EventHeaderSize)
for {
h, err := readEventHeader(f, parser, headBuf)
if err != nil {
return fmt.Errorf("seek to position failed: %w", err)
2024-04-07 15:50:21 +08:00
}
2026-03-08 20:07:59 +08:00
body, err := readEventBody(f, h)
if err != nil {
return err
2024-04-07 15:50:21 +08:00
}
2026-03-08 20:07:59 +08:00
_, err = parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
2024-04-07 15:50:21 +08:00
}
2026-03-08 20:07:59 +08:00
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
break
2024-04-07 15:50:21 +08:00
}
2023-07-03 13:37:39 +08:00
}
2026-03-08 20:07:59 +08:00
if _, err := f.Seek(pos, io.SeekStart); err != nil {
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
}
return nil
}
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var subGtid, inGtid, exGtid *gtid.Gtid
var err error
includeMatcher, excludeMatcher := prepareTableMatchers(filter)
2023-05-24 15:25:31 +08:00
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
2026-03-08 20:07:59 +08:00
return fmt.Errorf("parse include gtid failed: %w", err)
2023-05-24 15:25:31 +08:00
}
2023-06-30 17:23:07 +08:00
subGtid = inGtid.Clone()
2023-05-24 15:25:31 +08:00
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
2026-03-08 20:07:59 +08:00
return fmt.Errorf("parse exclude gtid failed: %w", err)
2023-05-24 15:25:31 +08:00
}
}
2026-03-08 20:07:59 +08:00
2023-05-24 15:25:31 +08:00
var (
2026-03-08 20:07:59 +08:00
tbMapPos uint32
skipCurrentTxn bool
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
2023-05-24 15:25:31 +08:00
)
currentGtid := ""
2026-03-08 20:07:59 +08:00
2023-06-30 17:55:45 +08:00
callFn := func(tx Transaction) bool {
2023-05-24 15:25:31 +08:00
if fn == nil {
2023-06-30 17:55:45 +08:00
return true
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
fillTimeLazy(&tx)
2023-05-24 15:25:31 +08:00
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
2023-06-30 17:55:45 +08:00
return true
2023-05-24 15:25:31 +08:00
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
2023-06-30 17:55:45 +08:00
return true
2023-05-24 15:25:31 +08:00
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
2023-06-30 17:55:45 +08:00
return true
2023-05-24 15:25:31 +08:00
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
2023-06-30 17:55:45 +08:00
return true
2023-05-24 15:25:31 +08:00
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
2023-06-30 17:55:45 +08:00
return true
2023-05-24 15:25:31 +08:00
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
2023-06-30 17:55:45 +08:00
return true
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
2023-07-03 14:03:45 +08:00
return true
}
2026-03-08 20:07:59 +08:00
2024-04-07 15:50:21 +08:00
var txs []TxDetail
var matched bool
2026-03-08 20:07:59 +08:00
2024-04-07 15:50:21 +08:00
for _, t := range tx.Txs {
2026-03-08 20:07:59 +08:00
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
if t.Db == "" && t.Table == "" {
if includeMatcher != nil && !filter.IncludeBlank {
continue
}
if excludeMatcher != nil && filter.ExcludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return true
}
continue
}
}
if includeMatcher != nil {
if includeMatch {
2024-04-07 15:50:21 +08:00
matched = true
if filter.PickTxAllIfMatch {
return fn(tx)
}
txs = append(txs, t)
}
2026-03-08 20:07:59 +08:00
} else if excludeMatcher != nil {
if excludeMatch {
2024-04-07 15:50:21 +08:00
matched = true
if filter.PickTxAllIfMatch {
return true
}
} else {
txs = append(txs, t)
}
2026-03-08 20:07:59 +08:00
} else {
txs = append(txs, t)
2024-04-07 15:50:21 +08:00
}
}
2026-03-08 20:07:59 +08:00
2024-04-07 16:17:17 +08:00
if matched {
2024-04-07 16:13:32 +08:00
tx.Txs = txs
}
2026-03-08 20:07:59 +08:00
if !matched && includeMatcher != nil {
2024-04-07 16:28:20 +08:00
return true
}
2024-04-07 16:13:32 +08:00
if len(tx.Txs) == 0 && matched {
2024-04-07 15:50:21 +08:00
return true
}
2023-06-30 17:55:45 +08:00
return fn(tx)
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
2023-05-24 15:25:31 +08:00
for {
2026-03-08 20:07:59 +08:00
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if !tx.Time.IsZero() || tx.Timestamp != 0 {
finalizeTx(&tx, filter.OnlyShowGtid)
callFn(tx)
2023-05-24 15:25:31 +08:00
}
return nil
}
if err != nil {
return err
}
2026-03-08 20:07:59 +08:00
// GTID-only fast path
if filter.OnlyShowGtid {
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
if err := skipEventBody(r, h); err != nil {
return err
}
2023-06-30 13:37:41 +08:00
continue
}
2026-03-08 20:07:59 +08:00
body, err := readEventBody(r, h)
if err != nil {
return err
2023-06-30 13:37:41 +08:00
}
2026-03-08 20:07:59 +08:00
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
2023-06-29 13:16:42 +08:00
}
2026-03-08 20:07:59 +08:00
startPos := int(h.LogPos - h.EventSize)
if filter.EndPos != 0 && startPos > filter.EndPos {
continue
}
if filter.StartPos != 0 && startPos < filter.StartPos {
continue
}
2023-06-29 13:16:42 +08:00
if currentGtid != "" {
2026-03-08 20:07:59 +08:00
tx.EndPos = startPos - 1
finalizeTx(&tx, true)
2023-06-30 17:55:45 +08:00
if !callFn(tx) {
return nil
}
2023-06-30 17:23:07 +08:00
if subGtid != nil {
2026-03-08 20:07:59 +08:00
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
2023-06-30 17:23:07 +08:00
return nil
}
}
2023-06-30 16:32:14 +08:00
tx = Transaction{}
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
currentGtid = ev.Data
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
2023-07-03 17:36:00 +08:00
tx = Transaction{}
2023-06-29 13:16:42 +08:00
currentGtid = ""
continue
}
2023-05-24 15:25:31 +08:00
}
2023-06-29 13:16:42 +08:00
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
2026-03-08 20:07:59 +08:00
tx = Transaction{}
2023-06-29 13:16:42 +08:00
continue
}
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
2026-03-08 20:07:59 +08:00
EndPos: startPos,
2023-06-29 13:16:42 +08:00
Timestamp: int64(h.Timestamp),
}
2026-03-08 20:07:59 +08:00
}
continue
}
// 先处理GTID事件决定当前事务是否命中
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if currentGtid != "" {
finalizeTx(&tx, false)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
skipCurrentTxn = false
if filter.EndPos != 0 && startPos > filter.EndPos {
skipCurrentTxn = true
}
if filter.StartPos != 0 && startPos < filter.StartPos {
skipCurrentTxn = true
}
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
skipCurrentTxn = true
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
skipCurrentTxn = true
}
}
if !skipCurrentTxn {
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
} else {
tx = Transaction{}
}
}
continue
}
// 未命中事务:零解析到底
if skipCurrentTxn {
if err := skipEventBody(r, h); err != nil {
return err
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
2023-06-29 13:16:42 +08:00
case "":
tx.EndPos = int(h.LogPos)
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
2026-03-08 20:07:59 +08:00
2023-06-29 13:16:42 +08:00
default:
tx.EndPos = int(h.LogPos)
2023-07-03 13:47:39 +08:00
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
2023-07-03 15:45:54 +08:00
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
2023-07-03 13:47:39 +08:00
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
2023-07-03 15:45:54 +08:00
tx.TxEndTime = int64(h.Timestamp)
2023-07-03 13:47:39 +08:00
case "rollback":
status = STATUS_ROLLBACK
2023-07-03 15:45:54 +08:00
tx.TxEndTime = int64(h.Timestamp)
2023-07-03 13:47:39 +08:00
}
tx.Status = status
}
2023-07-03 14:46:11 +08:00
if ev.DB != "" && ev.TB != "" {
2026-03-08 20:07:59 +08:00
tx.dmlEventCount++
2023-07-03 14:46:11 +08:00
}
2023-06-29 13:16:42 +08:00
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
2023-05-24 15:25:31 +08:00
}
}
}
}
2026-03-08 20:07:59 +08:00
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) {
if len(filter.IncludeTables) > 0 {
includeMatcher = buildTableMatcher(filter.IncludeTables)
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
if len(filter.ExcludeTables) > 0 {
excludeMatcher = buildTableMatcher(filter.ExcludeTables)
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
return includeMatcher, excludeMatcher
}
func buildTableMatcher(patterns []string) *tableMatcher {
m := &tableMatcher{
exactMatch: make(map[string]bool),
dbWildcard: make(map[string]bool),
tbWildcard: make(map[string]bool),
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
for _, pattern := range patterns {
if pattern == "*.*" {
m.matchAll = true
continue
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
parts := strings.Split(pattern, ".")
if len(parts) != 2 {
continue
2023-05-24 15:25:31 +08:00
}
2026-03-08 20:07:59 +08:00
db, tb := parts[0], parts[1]
if db == "*" && tb == "*" {
m.matchAll = true
} else if db == "*" {
m.tbWildcard[tb] = true
} else if tb == "*" {
m.dbWildcard[db] = true
} else {
m.exactMatch[pattern] = true
2023-05-24 15:25:31 +08:00
}
}
2026-03-08 20:07:59 +08:00
return m
2023-05-24 15:25:31 +08:00
}