bug fix:因sync.Pool不当使用导致解析失败的问题

This commit is contained in:
兔子 2026-03-12 22:30:15 +08:00
parent 095cb3dbdf
commit 52a32d4836
Signed by: b612
GPG Key ID: 99DD2222B612B612

207
parse.go
View File

@ -1,18 +1,19 @@
package binlog package binlog
import ( import (
"b612.me/mysql/gtid"
"b612.me/staros"
"bufio" "bufio"
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"github.com/starainrt/go-mysql/replication"
"io" "io"
"os" "os"
"strings" "strings"
"sync" "sync"
"time" "time"
"b612.me/mysql/gtid"
"b612.me/staros"
"github.com/starainrt/go-mysql/replication"
) )
var ( var (
@ -26,8 +27,8 @@ const (
) )
const ( const (
maxPooledBodyCap = 4 << 20 // 4MB maxPooledRawDataCap = 4 << 20 // 4MB
defaultReadBufSize = 1 << 20 // 1MB defaultReadBufSize = 1 << 20 // 1MB
) )
type TxDetail struct { type TxDetail struct {
@ -110,42 +111,23 @@ func (m *tableMatcher) match(db, tb string) bool {
if m.matchAll { if m.matchAll {
return true return true
} }
if m.exactMatch[db+"."+tb] { if m.dbWildcard[db] || m.tbWildcard[tb] {
return true return true
} }
if m.dbWildcard[db] { if len(m.exactMatch) > 0 {
return true // Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配
} var buf [128]byte
if m.tbWildcard[tb] { key := buf[:0]
return true key = append(key, db...)
key = append(key, '.')
key = append(key, tb...)
if m.exactMatch[string(key)] {
return true
}
} }
return false return false
} }
var bodyBufPool = sync.Pool{
New: func() any {
b := make([]byte, 0, 64*1024)
return &b
},
}
func getBodyBuf(n int) []byte {
p := bodyBufPool.Get().(*[]byte)
if cap(*p) < n {
b := make([]byte, n)
return b
}
return (*p)[:n]
}
func putBodyBuf(b []byte) {
if cap(b) > maxPooledBodyCap {
return
}
b = b[:0]
bodyBufPool.Put(&b)
}
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx) return parseOneBinlog(path, fx)
} }
@ -200,9 +182,8 @@ func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []by
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) { func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
bodyLen := int(h.EventSize) - replication.EventHeaderSize bodyLen := int(h.EventSize) - replication.EventHeaderSize
body := getBodyBuf(bodyLen) body := make([]byte, bodyLen)
if _, err := io.ReadFull(r, body); err != nil { if _, err := io.ReadFull(r, body); err != nil {
putBodyBuf(body)
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen) return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
} }
return body, nil return body, nil
@ -219,10 +200,40 @@ func skipEventBody(r io.Reader, h *replication.EventHeader) error {
return nil return nil
} }
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) { var rawDataPool = sync.Pool{
e, err := parser.ParseEvent(h, body, nil) New: func() any {
b := make([]byte, 0, 64*1024)
return &b
},
}
func getRawDataBuf(n int) []byte {
p := rawDataPool.Get().(*[]byte)
if cap(*p) < n {
return make([]byte, n)
}
return (*p)[:n]
}
func putRawDataBuf(b []byte) {
if cap(b) > maxPooledRawDataCap {
return
}
b = b[:0]
rawDataPool.Put(&b)
}
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) {
rawLen := len(headBuf) + len(body)
rawData := getRawDataBuf(rawLen)
copy(rawData, headBuf)
copy(rawData[len(headBuf):], body)
e, err := parser.ParseEvent(h, body, rawData)
putRawDataBuf(rawData)
if err != nil { if err != nil {
return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err) return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Data %q, Err: %w",
h.LogPos, h, body, err)
} }
return e, nil return e, nil
} }
@ -289,8 +300,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
return err return err
} }
e, err := parseEvent(parser, h, body) e, err := parseEvent(parser, h, headBuf, body)
putBodyBuf(body)
if err != nil { if err != nil {
return err return err
} }
@ -334,17 +344,16 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE status := STATUS_PREPARE
if ev.Type == "query" { if ev.Type == "query" {
switch strings.ToLower(ev.Data) { if equalFoldShort(ev.Data, "begin") {
case "begin":
if tx.TxStartTime == 0 { if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp) tx.TxStartTime = int64(h.Timestamp)
} }
status = STATUS_BEGIN tx.Status = STATUS_BEGIN
case "commit": } else if equalFoldShort(ev.Data, "commit") {
status = STATUS_COMMIT tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp) tx.TxEndTime = int64(h.Timestamp)
case "rollback": } else if equalFoldShort(ev.Data, "rollback") {
status = STATUS_ROLLBACK tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp) tx.TxEndTime = int64(h.Timestamp)
} }
tx.Status = status tx.Status = status
@ -370,8 +379,8 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
} }
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var res []BinlogEvent var buf [1]BinlogEvent
var sig BinlogEvent sig := &buf[0]
switch ev.Header.EventType { switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT: case replication.ANONYMOUS_GTID_EVENT:
@ -381,7 +390,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent) wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok { if !ok {
return res return nil
} }
sig.DB = string(wrEvent.Table.Schema) sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table) sig.TB = string(wrEvent.Table.Table)
@ -392,7 +401,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent) wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok { if !ok {
return res return nil
} }
sig.DB = string(wrEvent.Table.Schema) sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table) sig.TB = string(wrEvent.Table.Table)
@ -403,7 +412,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent) wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok { if !ok {
return res return nil
} }
sig.DB = string(wrEvent.Table.Schema) sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table) sig.TB = string(wrEvent.Table.Table)
@ -414,7 +423,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
case replication.ROWS_QUERY_EVENT: case replication.ROWS_QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent) queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
if !ok { if !ok {
return res return nil
} }
sig.Data = string(queryEvent.Query) sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery" sig.Type = "rowsquery"
@ -422,7 +431,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
case replication.QUERY_EVENT: case replication.QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.QueryEvent) queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok { if !ok {
return res return nil
} }
sig.DB = string(queryEvent.Schema) sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query) sig.Data = string(queryEvent.Query)
@ -439,7 +448,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
case replication.GTID_EVENT: case replication.GTID_EVENT:
ge, ok := ev.Event.(*replication.GTIDEvent) ge, ok := ev.Event.(*replication.GTIDEvent)
if !ok { if !ok {
return res return nil
} }
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err != nil { if err != nil {
@ -452,8 +461,9 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
case replication.TRANSACTION_PAYLOAD_EVENT: case replication.TRANSACTION_PAYLOAD_EVENT:
ge, ok := ev.Event.(*replication.TransactionPayloadEvent) ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
if !ok { if !ok {
return res return nil
} }
res := make([]BinlogEvent, 0, len(ge.Events))
for _, val := range ge.Events { for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...) res = append(res, ParseBinlogEvent(val)...)
} }
@ -464,8 +474,8 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
return res return res
} }
res = append(res, sig) // 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
return res return buf[:]
} }
func getCompressionTypeName(code uint64) string { func getCompressionTypeName(code uint64) string {
@ -523,8 +533,7 @@ func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) err
if err != nil { if err != nil {
return err return err
} }
_, err = parseEvent(parser, h, body) _, err = parseEvent(parser, h, headBuf, body)
putBodyBuf(body)
if err != nil { if err != nil {
return err return err
} }
@ -538,7 +547,6 @@ func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) err
} }
return nil return nil
} }
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var subGtid, inGtid, exGtid *gtid.Gtid var subGtid, inGtid, exGtid *gtid.Gtid
var err error var err error
@ -666,18 +674,27 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter
// GTID-only fast path // GTID-only fast path
if filter.OnlyShowGtid { if filter.OnlyShowGtid {
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT { if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
if err := skipEventBody(r, h); err != nil { if h.EventType == replication.FORMAT_DESCRIPTION_EVENT ||
return err h.EventType == replication.TABLE_MAP_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
} }
continue continue
} }
body, err := readEventBody(r, h) body, err := readEventBody(r, h)
if err != nil { if err != nil {
return err return err
} }
e, err := parseEvent(parser, h, body) e, err := parseEvent(parser, h, headBuf, body)
putBodyBuf(body)
if err != nil { if err != nil {
return err return err
} }
@ -743,8 +760,7 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter
if err != nil { if err != nil {
return err return err
} }
e, err := parseEvent(parser, h, body) e, err := parseEvent(parser, h, headBuf, body)
putBodyBuf(body)
if err != nil { if err != nil {
return err return err
} }
@ -804,10 +820,22 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter
continue continue
} }
// 未命中事务:零解析到底 // 未命中事务时TABLE_MAP_EVENT 仍需解析parser 缓存表元数据),
// 其余事件可安全跳过
if skipCurrentTxn { if skipCurrentTxn {
if err := skipEventBody(r, h); err != nil { if h.EventType == replication.TABLE_MAP_EVENT ||
return err h.EventType == replication.FORMAT_DESCRIPTION_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
} }
continue continue
} }
@ -816,8 +844,7 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter
if err != nil { if err != nil {
return err return err
} }
e, err := parseEvent(parser, h, body) e, err := parseEvent(parser, h, headBuf, body)
putBodyBuf(body)
if err != nil { if err != nil {
return err return err
} }
@ -847,17 +874,16 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE status := STATUS_PREPARE
if ev.Type == "query" { if ev.Type == "query" {
switch strings.ToLower(ev.Data) { if equalFoldShort(ev.Data, "begin") {
case "begin":
if tx.TxStartTime == 0 { if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp) tx.TxStartTime = int64(h.Timestamp)
} }
status = STATUS_BEGIN tx.Status = STATUS_BEGIN
case "commit": } else if equalFoldShort(ev.Data, "commit") {
status = STATUS_COMMIT tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp) tx.TxEndTime = int64(h.Timestamp)
case "rollback": } else if equalFoldShort(ev.Data, "rollback") {
status = STATUS_ROLLBACK tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp) tx.TxEndTime = int64(h.Timestamp)
} }
tx.Status = status tx.Status = status
@ -881,7 +907,6 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter
} }
} }
} }
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) { func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) {
if len(filter.IncludeTables) > 0 { if len(filter.IncludeTables) > 0 {
includeMatcher = buildTableMatcher(filter.IncludeTables) includeMatcher = buildTableMatcher(filter.IncludeTables)
@ -921,3 +946,19 @@ func buildTableMatcher(patterns []string) *tableMatcher {
} }
return m return m
} }
func equalFoldShort(s, lower string) bool {
if len(s) != len(lower) {
return false
}
for i := 0; i < len(s); i++ {
c := s[i]
if 'A' <= c && c <= 'Z' {
c += 'a' - 'A'
}
if c != lower[i] {
return false
}
}
return true
}