mysqlbinlog/parse.go
2026-03-08 20:07:59 +08:00

924 lines
21 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package binlog
import (
"b612.me/mysql/gtid"
"b612.me/staros"
"bufio"
"bytes"
"errors"
"fmt"
"github.com/starainrt/go-mysql/replication"
"io"
"os"
"strings"
"sync"
"time"
)
var (
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
ErrEventTooSmall = errors.New("event size too small")
)
const (
CompressionNone uint64 = 255
CompressionZSTD uint64 = 0
)
const (
maxPooledBodyCap = 4 << 20 // 4MB
defaultReadBufSize = 1 << 20 // 1MB
)
type TxDetail struct {
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
RowCount int `json:"rowCount"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
Sql string `json:"sql"`
Db string `json:"db"`
Table string `json:"table"`
SqlType string `json:"sqlType"`
CompressionType string `json:"compressionType"`
Rows [][]interface{} `json:"rows"`
}
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct {
GTID string `json:"gtid"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
Size int `json:"size"`
RowsCount int `json:"rowsCount"`
Status uint8 `json:"status"`
TxStartTime int64 `json:"txStartTime"`
TxEndTime int64 `json:"txEndTime"`
sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
dmlEventCount int
}
func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin
}
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
IncludeTables []string
ExcludeTables []string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
OnlyShowGtid bool
OnlyShowDML bool
PickTxAllIfMatch bool
ExcludeBlank bool
IncludeBlank bool
}
type BinlogEvent struct {
Type string
DB string
TB string
Data string
RowCnt uint32
Rows [][]interface{}
CompressionType string
}
type tableMatcher struct {
exactMatch map[string]bool
dbWildcard map[string]bool
tbWildcard map[string]bool
matchAll bool
}
func (m *tableMatcher) match(db, tb string) bool {
if m.matchAll {
return true
}
if m.exactMatch[db+"."+tb] {
return true
}
if m.dbWildcard[db] {
return true
}
if m.tbWildcard[tb] {
return true
}
return false
}
var bodyBufPool = sync.Pool{
New: func() any {
b := make([]byte, 0, 64*1024)
return &b
},
}
func getBodyBuf(n int) []byte {
p := bodyBufPool.Get().(*[]byte)
if cap(*p) < n {
b := make([]byte, n)
return b
}
return (*p)[:n]
}
func putBodyBuf(b []byte) {
if cap(b) > maxPooledBodyCap {
return
}
b = b[:0]
bodyBufPool.Put(&b)
}
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx)
}
func parseOneBinlog(path string, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if err := validateBinlogHeader(f); err != nil {
return err
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogDetail(br, fx)
}
func validateBinlogHeader(f *os.File) error {
const fileTypeBytes = int64(4)
b := make([]byte, fileTypeBytes)
if _, err := f.Read(b); err != nil {
return fmt.Errorf("read binlog header failed: %w", err)
}
if !bytes.Equal(b, replication.BinLogFileHeader) {
return ErrInvalidBinlogHeader
}
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
return fmt.Errorf("seek after header failed: %w", err)
}
return nil
}
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
if _, err := io.ReadFull(r, headBuf); err != nil {
return nil, err
}
h, err := parser.ParseHeader(headBuf)
if err != nil {
return nil, fmt.Errorf("parse header failed: %w", err)
}
if h.EventSize <= uint32(replication.EventHeaderSize) {
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
}
return h, nil
}
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
bodyLen := int(h.EventSize) - replication.EventHeaderSize
body := getBodyBuf(bodyLen)
if _, err := io.ReadFull(r, body); err != nil {
putBodyBuf(body)
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
}
return body, nil
}
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
if bodyLen <= 0 {
return nil
}
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
return fmt.Errorf("skip event body failed: %w", err)
}
return nil
}
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) {
e, err := parser.ParseEvent(h, body, nil)
if err != nil {
return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err)
}
return e, nil
}
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
if onlyShowGtid {
tx.Size = 0
} else {
tx.Size = tx.EndPos - tx.StartPos
}
}
func fillTimeLazy(tx *Transaction) {
if tx.Timestamp != 0 && tx.Time.IsZero() {
tx.Time = time.Unix(tx.Timestamp, 0)
}
for i := range tx.Txs {
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
}
}
}
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
var (
tbMapPos uint32
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil {
f(tx)
}
}
return nil
}
if err != nil {
return err
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "gtid":
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil && !f(tx) {
return nil
}
}
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
case "":
tx.EndPos = int(h.LogPos)
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
case "rollback":
status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
tx.Status = status
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var res []BinlogEvent
var sig BinlogEvent
switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return res
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return res
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = wrEvent.Rows
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return res
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.ROWS_QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
if !ok {
return res
}
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
case replication.QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok {
return res
}
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
case replication.MARIADB_GTID_EVENT:
sig.Data = "begin"
sig.Type = "query"
case replication.XID_EVENT:
sig.Data = "commit"
sig.Type = "query"
case replication.GTID_EVENT:
ge, ok := ev.Event.(*replication.GTIDEvent)
if !ok {
return res
}
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err != nil {
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
} else {
sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
if !ok {
return res
}
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
compressionType := getCompressionTypeName(ge.CompressionType)
for idx := range res {
res[idx].CompressionType = compressionType
}
return res
}
res = append(res, sig)
return res
}
func getCompressionTypeName(code uint64) string {
switch code {
case CompressionZSTD:
return "ZSTD"
case CompressionNone:
return ""
default:
return fmt.Sprintf("UNKNOWN(%d)", code)
}
}
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
if pos != 0 {
if err := seekToPosition(f, parser, pos); err != nil {
return err
}
} else {
if err := validateBinlogHeader(f); err != nil {
return err
}
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogWithFilter(br, parser, filter, fx)
}
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
if err := validateBinlogHeader(f); err != nil {
return err
}
headBuf := make([]byte, replication.EventHeaderSize)
for {
h, err := readEventHeader(f, parser, headBuf)
if err != nil {
return fmt.Errorf("seek to position failed: %w", err)
}
body, err := readEventBody(f, h)
if err != nil {
return err
}
_, err = parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
break
}
}
if _, err := f.Seek(pos, io.SeekStart); err != nil {
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
}
return nil
}
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var subGtid, inGtid, exGtid *gtid.Gtid
var err error
includeMatcher, excludeMatcher := prepareTableMatchers(filter)
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
return fmt.Errorf("parse include gtid failed: %w", err)
}
subGtid = inGtid.Clone()
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
return fmt.Errorf("parse exclude gtid failed: %w", err)
}
}
var (
tbMapPos uint32
skipCurrentTxn bool
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
callFn := func(tx Transaction) bool {
if fn == nil {
return true
}
fillTimeLazy(&tx)
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return true
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return true
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return true
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return true
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
return true
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true
}
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
return true
}
var txs []TxDetail
var matched bool
for _, t := range tx.Txs {
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
if t.Db == "" && t.Table == "" {
if includeMatcher != nil && !filter.IncludeBlank {
continue
}
if excludeMatcher != nil && filter.ExcludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return true
}
continue
}
}
if includeMatcher != nil {
if includeMatch {
matched = true
if filter.PickTxAllIfMatch {
return fn(tx)
}
txs = append(txs, t)
}
} else if excludeMatcher != nil {
if excludeMatch {
matched = true
if filter.PickTxAllIfMatch {
return true
}
} else {
txs = append(txs, t)
}
} else {
txs = append(txs, t)
}
}
if matched {
tx.Txs = txs
}
if !matched && includeMatcher != nil {
return true
}
if len(tx.Txs) == 0 && matched {
return true
}
return fn(tx)
}
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if !tx.Time.IsZero() || tx.Timestamp != 0 {
finalizeTx(&tx, filter.OnlyShowGtid)
callFn(tx)
}
return nil
}
if err != nil {
return err
}
// GTID-only fast path
if filter.OnlyShowGtid {
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
if err := skipEventBody(r, h); err != nil {
return err
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if filter.EndPos != 0 && startPos > filter.EndPos {
continue
}
if filter.StartPos != 0 && startPos < filter.StartPos {
continue
}
if currentGtid != "" {
tx.EndPos = startPos - 1
finalizeTx(&tx, true)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
tx = Transaction{}
currentGtid = ""
continue
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
tx = Transaction{}
continue
}
}
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
EndPos: startPos,
Timestamp: int64(h.Timestamp),
}
}
continue
}
// 先处理GTID事件决定当前事务是否命中
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if currentGtid != "" {
finalizeTx(&tx, false)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
skipCurrentTxn = false
if filter.EndPos != 0 && startPos > filter.EndPos {
skipCurrentTxn = true
}
if filter.StartPos != 0 && startPos < filter.StartPos {
skipCurrentTxn = true
}
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
skipCurrentTxn = true
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
skipCurrentTxn = true
}
}
if !skipCurrentTxn {
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
} else {
tx = Transaction{}
}
}
continue
}
// 未命中事务:零解析到底
if skipCurrentTxn {
if err := skipEventBody(r, h); err != nil {
return err
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, body)
putBodyBuf(body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "":
tx.EndPos = int(h.LogPos)
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
case "rollback":
status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
tx.Status = status
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) {
if len(filter.IncludeTables) > 0 {
includeMatcher = buildTableMatcher(filter.IncludeTables)
}
if len(filter.ExcludeTables) > 0 {
excludeMatcher = buildTableMatcher(filter.ExcludeTables)
}
return includeMatcher, excludeMatcher
}
func buildTableMatcher(patterns []string) *tableMatcher {
m := &tableMatcher{
exactMatch: make(map[string]bool),
dbWildcard: make(map[string]bool),
tbWildcard: make(map[string]bool),
}
for _, pattern := range patterns {
if pattern == "*.*" {
m.matchAll = true
continue
}
parts := strings.Split(pattern, ".")
if len(parts) != 2 {
continue
}
db, tb := parts[0], parts[1]
if db == "*" && tb == "*" {
m.matchAll = true
} else if db == "*" {
m.tbWildcard[tb] = true
} else if tb == "*" {
m.dbWildcard[db] = true
} else {
m.exactMatch[pattern] = true
}
}
return m
}