package binlog import ( "b612.me/mysql/gtid" "b612.me/staros" "bufio" "bytes" "errors" "fmt" "github.com/starainrt/go-mysql/replication" "io" "os" "strings" "sync" "time" ) var ( ErrInvalidBinlogHeader = errors.New("invalid binlog file header") ErrEventTooSmall = errors.New("event size too small") ) const ( CompressionNone uint64 = 255 CompressionZSTD uint64 = 0 ) const ( maxPooledBodyCap = 4 << 20 // 4MB defaultReadBufSize = 1 << 20 // 1MB ) type TxDetail struct { StartPos int `json:"startPos"` EndPos int `json:"endPos"` RowCount int `json:"rowCount"` Timestamp int64 `json:"timestamp"` Time time.Time `json:"time"` Sql string `json:"sql"` Db string `json:"db"` Table string `json:"table"` SqlType string `json:"sqlType"` CompressionType string `json:"compressionType"` Rows [][]interface{} `json:"rows"` } const ( STATUS_PREPARE uint8 = iota STATUS_BEGIN STATUS_COMMIT STATUS_ROLLBACK ) type Transaction struct { GTID string `json:"gtid"` Timestamp int64 `json:"timestamp"` Time time.Time `json:"time"` StartPos int `json:"startPos"` EndPos int `json:"endPos"` Size int `json:"size"` RowsCount int `json:"rowsCount"` Status uint8 `json:"status"` TxStartTime int64 `json:"txStartTime"` TxEndTime int64 `json:"txEndTime"` sqlOrigin []string `json:"sqlOrigin"` Txs []TxDetail `json:"txs"` dmlEventCount int } func (t Transaction) GetSqlOrigin() []string { return t.sqlOrigin } type BinlogFilter struct { IncludeGtid string ExcludeGtid string IncludeTables []string ExcludeTables []string StartPos int EndPos int StartDate time.Time EndDate time.Time BigThan int SmallThan int OnlyShowGtid bool OnlyShowDML bool PickTxAllIfMatch bool ExcludeBlank bool IncludeBlank bool } type BinlogEvent struct { Type string DB string TB string Data string RowCnt uint32 Rows [][]interface{} CompressionType string } type tableMatcher struct { exactMatch map[string]bool dbWildcard map[string]bool tbWildcard map[string]bool matchAll bool } func (m *tableMatcher) match(db, tb string) bool { if m.matchAll { return true } if m.exactMatch[db+"."+tb] { return true } if m.dbWildcard[db] { return true } if m.tbWildcard[tb] { return true } return false } var bodyBufPool = sync.Pool{ New: func() any { b := make([]byte, 0, 64*1024) return &b }, } func getBodyBuf(n int) []byte { p := bodyBufPool.Get().(*[]byte) if cap(*p) < n { b := make([]byte, n) return b } return (*p)[:n] } func putBodyBuf(b []byte) { if cap(b) > maxPooledBodyCap { return } b = b[:0] bodyBufPool.Put(&b) } func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { return parseOneBinlog(path, fx) } func parseOneBinlog(path string, fx func(Transaction) bool) error { if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if err != nil { return err } defer f.Close() if err := validateBinlogHeader(f); err != nil { return err } br := bufio.NewReaderSize(f, defaultReadBufSize) return parseBinlogDetail(br, fx) } func validateBinlogHeader(f *os.File) error { const fileTypeBytes = int64(4) b := make([]byte, fileTypeBytes) if _, err := f.Read(b); err != nil { return fmt.Errorf("read binlog header failed: %w", err) } if !bytes.Equal(b, replication.BinLogFileHeader) { return ErrInvalidBinlogHeader } if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil { return fmt.Errorf("seek after header failed: %w", err) } return nil } func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) { if _, err := io.ReadFull(r, headBuf); err != nil { return nil, err } h, err := parser.ParseHeader(headBuf) if err != nil { return nil, fmt.Errorf("parse header failed: %w", err) } if h.EventSize <= uint32(replication.EventHeaderSize) { return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize) } return h, nil } func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) { bodyLen := int(h.EventSize) - replication.EventHeaderSize body := getBodyBuf(bodyLen) if _, err := io.ReadFull(r, body); err != nil { putBodyBuf(body) return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen) } return body, nil } func skipEventBody(r io.Reader, h *replication.EventHeader) error { bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize) if bodyLen <= 0 { return nil } if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil { return fmt.Errorf("skip event body failed: %w", err) } return nil } func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) { e, err := parser.ParseEvent(h, body, nil) if err != nil { return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err) } return e, nil } func finalizeTx(tx *Transaction, onlyShowGtid bool) { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } if onlyShowGtid { tx.Size = 0 } else { tx.Size = tx.EndPos - tx.StartPos } } func fillTimeLazy(tx *Transaction) { if tx.Timestamp != 0 && tx.Time.IsZero() { tx.Time = time.Unix(tx.Timestamp, 0) } for i := range tx.Txs { if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() { tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0) } } } func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { parser := replication.NewBinlogParser() parser.SetParseTime(false) parser.SetUseDecimal(false) var ( tbMapPos uint32 tx Transaction headBuf = make([]byte, replication.EventHeaderSize) ) currentGtid := "" for { h, err := readEventHeader(r, parser, headBuf) if err == io.EOF { if currentGtid != "" { finalizeTx(&tx, false) fillTimeLazy(&tx) if f != nil { f(tx) } } return nil } if err != nil { return err } body, err := readEventBody(r, h) if err != nil { return err } e, err := parseEvent(parser, h, body) putBodyBuf(body) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) for _, ev := range evs { startPos := 0 if ev.Type == "query" || ev.Type == "gtid" { startPos = int(h.LogPos - h.EventSize) } else { startPos = int(tbMapPos) } switch ev.Type { case "gtid": if currentGtid != "" { finalizeTx(&tx, false) fillTimeLazy(&tx) if f != nil && !f(tx) { return nil } } currentGtid = ev.Data tx = Transaction{ GTID: ev.Data, StartPos: startPos, Timestamp: int64(h.Timestamp), Txs: make([]TxDetail, 0, 8), sqlOrigin: make([]string, 0, 4), } case "": tx.EndPos = int(h.LogPos) case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: tx.EndPos = int(h.LogPos) status := STATUS_PREPARE if ev.Type == "query" { switch strings.ToLower(ev.Data) { case "begin": if tx.TxStartTime == 0 { tx.TxStartTime = int64(h.Timestamp) } status = STATUS_BEGIN case "commit": status = STATUS_COMMIT tx.TxEndTime = int64(h.Timestamp) case "rollback": status = STATUS_ROLLBACK tx.TxEndTime = int64(h.Timestamp) } tx.Status = status } if ev.DB != "" && ev.TB != "" { tx.dmlEventCount++ } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: ev.DB, Table: ev.TB, Sql: ev.Data, SqlType: ev.Type, Rows: ev.Rows, RowCount: int(ev.RowCnt), Timestamp: int64(h.Timestamp), CompressionType: ev.CompressionType, }) } } } } func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { var res []BinlogEvent var sig BinlogEvent switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: sig.Data = "anonymous-gtid-event:1" sig.Type = "gtid" case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { return res } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "insert" sig.RowCnt = uint32(len(wrEvent.Rows)) sig.Rows = wrEvent.Rows case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { return res } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "update" sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 sig.Rows = wrEvent.Rows case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { return res } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "delete" sig.RowCnt = uint32(len(wrEvent.Rows)) sig.Rows = wrEvent.Rows case replication.ROWS_QUERY_EVENT: queryEvent, ok := ev.Event.(*replication.RowsQueryEvent) if !ok { return res } sig.Data = string(queryEvent.Query) sig.Type = "rowsquery" case replication.QUERY_EVENT: queryEvent, ok := ev.Event.(*replication.QueryEvent) if !ok { return res } sig.DB = string(queryEvent.Schema) sig.Data = string(queryEvent.Query) sig.Type = "query" case replication.MARIADB_GTID_EVENT: sig.Data = "begin" sig.Type = "query" case replication.XID_EVENT: sig.Data = "commit" sig.Type = "query" case replication.GTID_EVENT: ge, ok := ev.Event.(*replication.GTIDEvent) if !ok { return res } gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err != nil { sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO) } else { sig.Data = gid.String() } sig.Type = "gtid" case replication.TRANSACTION_PAYLOAD_EVENT: ge, ok := ev.Event.(*replication.TransactionPayloadEvent) if !ok { return res } for _, val := range ge.Events { res = append(res, ParseBinlogEvent(val)...) } compressionType := getCompressionTypeName(ge.CompressionType) for idx := range res { res[idx].CompressionType = compressionType } return res } res = append(res, sig) return res } func getCompressionTypeName(code uint64) string { switch code { case CompressionZSTD: return "ZSTD" case CompressionNone: return "" default: return fmt.Sprintf("UNKNOWN(%d)", code) } } func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if err != nil { return err } defer f.Close() parser := replication.NewBinlogParser() parser.SetParseTime(false) parser.SetUseDecimal(false) if pos != 0 { if err := seekToPosition(f, parser, pos); err != nil { return err } } else { if err := validateBinlogHeader(f); err != nil { return err } } br := bufio.NewReaderSize(f, defaultReadBufSize) return parseBinlogWithFilter(br, parser, filter, fx) } func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error { if err := validateBinlogHeader(f); err != nil { return err } headBuf := make([]byte, replication.EventHeaderSize) for { h, err := readEventHeader(f, parser, headBuf) if err != nil { return fmt.Errorf("seek to position failed: %w", err) } body, err := readEventBody(f, h) if err != nil { return err } _, err = parseEvent(parser, h, body) putBodyBuf(body) if err != nil { return err } if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { break } } if _, err := f.Seek(pos, io.SeekStart); err != nil { return fmt.Errorf("seek to pos %d failed: %w", pos, err) } return nil } func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { var subGtid, inGtid, exGtid *gtid.Gtid var err error includeMatcher, excludeMatcher := prepareTableMatchers(filter) if filter.IncludeGtid != "" { inGtid, err = gtid.Parse(filter.IncludeGtid) if err != nil { return fmt.Errorf("parse include gtid failed: %w", err) } subGtid = inGtid.Clone() } if filter.ExcludeGtid != "" { exGtid, err = gtid.Parse(filter.ExcludeGtid) if err != nil { return fmt.Errorf("parse exclude gtid failed: %w", err) } } var ( tbMapPos uint32 skipCurrentTxn bool tx Transaction headBuf = make([]byte, replication.EventHeaderSize) ) currentGtid := "" callFn := func(tx Transaction) bool { if fn == nil { return true } fillTimeLazy(&tx) if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { return true } if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { return true } if filter.StartPos != 0 && filter.StartPos > tx.StartPos { return true } if filter.EndPos != 0 && filter.EndPos < tx.EndPos { return true } if filter.BigThan != 0 && filter.BigThan > tx.Size { return true } if filter.SmallThan != 0 && filter.SmallThan < tx.Size { return true } if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 { return true } var txs []TxDetail var matched bool for _, t := range tx.Txs { includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table) excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table) if t.Db == "" && t.Table == "" { if includeMatcher != nil && !filter.IncludeBlank { continue } if excludeMatcher != nil && filter.ExcludeBlank { matched = true if filter.PickTxAllIfMatch { return true } continue } } if includeMatcher != nil { if includeMatch { matched = true if filter.PickTxAllIfMatch { return fn(tx) } txs = append(txs, t) } } else if excludeMatcher != nil { if excludeMatch { matched = true if filter.PickTxAllIfMatch { return true } } else { txs = append(txs, t) } } else { txs = append(txs, t) } } if matched { tx.Txs = txs } if !matched && includeMatcher != nil { return true } if len(tx.Txs) == 0 && matched { return true } return fn(tx) } for { h, err := readEventHeader(r, parser, headBuf) if err == io.EOF { if !tx.Time.IsZero() || tx.Timestamp != 0 { finalizeTx(&tx, filter.OnlyShowGtid) callFn(tx) } return nil } if err != nil { return err } // GTID-only fast path if filter.OnlyShowGtid { if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT { if err := skipEventBody(r, h); err != nil { return err } continue } body, err := readEventBody(r, h) if err != nil { return err } e, err := parseEvent(parser, h, body) putBodyBuf(body) if err != nil { return err } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) for _, ev := range evs { if ev.Type != "gtid" { continue } startPos := int(h.LogPos - h.EventSize) if filter.EndPos != 0 && startPos > filter.EndPos { continue } if filter.StartPos != 0 && startPos < filter.StartPos { continue } if currentGtid != "" { tx.EndPos = startPos - 1 finalizeTx(&tx, true) if !callFn(tx) { return nil } if subGtid != nil { if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { return nil } } tx = Transaction{} } currentGtid = ev.Data if inGtid != nil { if c, _ := inGtid.Contain(ev.Data); !c { tx = Transaction{} currentGtid = "" continue } } if exGtid != nil { if c, _ := exGtid.Contain(ev.Data); c { currentGtid = "" tx = Transaction{} continue } } tx = Transaction{ GTID: ev.Data, StartPos: startPos, EndPos: startPos, Timestamp: int64(h.Timestamp), } } continue } // 先处理GTID事件(决定当前事务是否命中) if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT { body, err := readEventBody(r, h) if err != nil { return err } e, err := parseEvent(parser, h, body) putBodyBuf(body) if err != nil { return err } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) for _, ev := range evs { if ev.Type != "gtid" { continue } startPos := int(h.LogPos - h.EventSize) if currentGtid != "" { finalizeTx(&tx, false) if !callFn(tx) { return nil } if subGtid != nil { if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { return nil } } tx = Transaction{} } currentGtid = ev.Data skipCurrentTxn = false if filter.EndPos != 0 && startPos > filter.EndPos { skipCurrentTxn = true } if filter.StartPos != 0 && startPos < filter.StartPos { skipCurrentTxn = true } if inGtid != nil { if c, _ := inGtid.Contain(ev.Data); !c { skipCurrentTxn = true } } if exGtid != nil { if c, _ := exGtid.Contain(ev.Data); c { skipCurrentTxn = true } } if !skipCurrentTxn { tx = Transaction{ GTID: ev.Data, StartPos: startPos, Timestamp: int64(h.Timestamp), Txs: make([]TxDetail, 0, 8), sqlOrigin: make([]string, 0, 4), } } else { tx = Transaction{} } } continue } // 未命中事务:零解析到底 if skipCurrentTxn { if err := skipEventBody(r, h); err != nil { return err } continue } body, err := readEventBody(r, h) if err != nil { return err } e, err := parseEvent(parser, h, body) putBodyBuf(body) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) for _, ev := range evs { startPos := 0 if ev.Type == "query" || ev.Type == "gtid" { startPos = int(h.LogPos - h.EventSize) } else { startPos = int(tbMapPos) } switch ev.Type { case "": tx.EndPos = int(h.LogPos) case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: tx.EndPos = int(h.LogPos) status := STATUS_PREPARE if ev.Type == "query" { switch strings.ToLower(ev.Data) { case "begin": if tx.TxStartTime == 0 { tx.TxStartTime = int64(h.Timestamp) } status = STATUS_BEGIN case "commit": status = STATUS_COMMIT tx.TxEndTime = int64(h.Timestamp) case "rollback": status = STATUS_ROLLBACK tx.TxEndTime = int64(h.Timestamp) } tx.Status = status } if ev.DB != "" && ev.TB != "" { tx.dmlEventCount++ } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: ev.DB, Table: ev.TB, Sql: ev.Data, SqlType: ev.Type, Rows: ev.Rows, RowCount: int(ev.RowCnt), Timestamp: int64(h.Timestamp), CompressionType: ev.CompressionType, }) } } } } func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) { if len(filter.IncludeTables) > 0 { includeMatcher = buildTableMatcher(filter.IncludeTables) } if len(filter.ExcludeTables) > 0 { excludeMatcher = buildTableMatcher(filter.ExcludeTables) } return includeMatcher, excludeMatcher } func buildTableMatcher(patterns []string) *tableMatcher { m := &tableMatcher{ exactMatch: make(map[string]bool), dbWildcard: make(map[string]bool), tbWildcard: make(map[string]bool), } for _, pattern := range patterns { if pattern == "*.*" { m.matchAll = true continue } parts := strings.Split(pattern, ".") if len(parts) != 2 { continue } db, tb := parts[0], parts[1] if db == "*" && tb == "*" { m.matchAll = true } else if db == "*" { m.tbWildcard[tb] = true } else if tb == "*" { m.dbWildcard[db] = true } else { m.exactMatch[pattern] = true } } return m }