From 52a32d4836857dac042bf496fe4696f1a7378769 Mon Sep 17 00:00:00 2001 From: starainrt Date: Thu, 12 Mar 2026 22:30:15 +0800 Subject: [PATCH] =?UTF-8?q?bug=20fix:=E5=9B=A0sync.Pool=E4=B8=8D=E5=BD=93?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=AF=BC=E8=87=B4=E8=A7=A3=E6=9E=90=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- parse.go | 207 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 124 insertions(+), 83 deletions(-) diff --git a/parse.go b/parse.go index ad3642f..4e070eb 100644 --- a/parse.go +++ b/parse.go @@ -1,18 +1,19 @@ package binlog import ( - "b612.me/mysql/gtid" - "b612.me/staros" "bufio" "bytes" "errors" "fmt" - "github.com/starainrt/go-mysql/replication" "io" "os" "strings" "sync" "time" + + "b612.me/mysql/gtid" + "b612.me/staros" + "github.com/starainrt/go-mysql/replication" ) var ( @@ -26,8 +27,8 @@ const ( ) const ( - maxPooledBodyCap = 4 << 20 // 4MB - defaultReadBufSize = 1 << 20 // 1MB + maxPooledRawDataCap = 4 << 20 // 4MB + defaultReadBufSize = 1 << 20 // 1MB ) type TxDetail struct { @@ -110,42 +111,23 @@ func (m *tableMatcher) match(db, tb string) bool { if m.matchAll { return true } - if m.exactMatch[db+"."+tb] { + if m.dbWildcard[db] || m.tbWildcard[tb] { return true } - if m.dbWildcard[db] { - return true - } - if m.tbWildcard[tb] { - return true + if len(m.exactMatch) > 0 { + // Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配 + var buf [128]byte + key := buf[:0] + key = append(key, db...) + key = append(key, '.') + key = append(key, tb...) + if m.exactMatch[string(key)] { + return true + } } return false } -var bodyBufPool = sync.Pool{ - New: func() any { - b := make([]byte, 0, 64*1024) - return &b - }, -} - -func getBodyBuf(n int) []byte { - p := bodyBufPool.Get().(*[]byte) - if cap(*p) < n { - b := make([]byte, n) - return b - } - return (*p)[:n] -} - -func putBodyBuf(b []byte) { - if cap(b) > maxPooledBodyCap { - return - } - b = b[:0] - bodyBufPool.Put(&b) -} - func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { return parseOneBinlog(path, fx) } @@ -200,9 +182,8 @@ func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []by func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) { bodyLen := int(h.EventSize) - replication.EventHeaderSize - body := getBodyBuf(bodyLen) + body := make([]byte, bodyLen) if _, err := io.ReadFull(r, body); err != nil { - putBodyBuf(body) return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen) } return body, nil @@ -219,10 +200,40 @@ func skipEventBody(r io.Reader, h *replication.EventHeader) error { return nil } -func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) { - e, err := parser.ParseEvent(h, body, nil) +var rawDataPool = sync.Pool{ + New: func() any { + b := make([]byte, 0, 64*1024) + return &b + }, +} + +func getRawDataBuf(n int) []byte { + p := rawDataPool.Get().(*[]byte) + if cap(*p) < n { + return make([]byte, n) + } + return (*p)[:n] +} + +func putRawDataBuf(b []byte) { + if cap(b) > maxPooledRawDataCap { + return + } + b = b[:0] + rawDataPool.Put(&b) +} + +func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) { + rawLen := len(headBuf) + len(body) + rawData := getRawDataBuf(rawLen) + copy(rawData, headBuf) + copy(rawData[len(headBuf):], body) + + e, err := parser.ParseEvent(h, body, rawData) + putRawDataBuf(rawData) if err != nil { - return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err) + return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Data %q, Err: %w", + h.LogPos, h, body, err) } return e, nil } @@ -289,8 +300,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { return err } - e, err := parseEvent(parser, h, body) - putBodyBuf(body) + e, err := parseEvent(parser, h, headBuf, body) if err != nil { return err } @@ -334,17 +344,16 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { tx.EndPos = int(h.LogPos) status := STATUS_PREPARE if ev.Type == "query" { - switch strings.ToLower(ev.Data) { - case "begin": + if equalFoldShort(ev.Data, "begin") { if tx.TxStartTime == 0 { tx.TxStartTime = int64(h.Timestamp) } - status = STATUS_BEGIN - case "commit": - status = STATUS_COMMIT + tx.Status = STATUS_BEGIN + } else if equalFoldShort(ev.Data, "commit") { + tx.Status = STATUS_COMMIT tx.TxEndTime = int64(h.Timestamp) - case "rollback": - status = STATUS_ROLLBACK + } else if equalFoldShort(ev.Data, "rollback") { + tx.Status = STATUS_ROLLBACK tx.TxEndTime = int64(h.Timestamp) } tx.Status = status @@ -370,8 +379,8 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { } func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { - var res []BinlogEvent - var sig BinlogEvent + var buf [1]BinlogEvent + sig := &buf[0] switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: @@ -381,7 +390,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { - return res + return nil } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) @@ -392,7 +401,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { - return res + return nil } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) @@ -403,7 +412,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { - return res + return nil } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) @@ -414,7 +423,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.ROWS_QUERY_EVENT: queryEvent, ok := ev.Event.(*replication.RowsQueryEvent) if !ok { - return res + return nil } sig.Data = string(queryEvent.Query) sig.Type = "rowsquery" @@ -422,7 +431,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.QUERY_EVENT: queryEvent, ok := ev.Event.(*replication.QueryEvent) if !ok { - return res + return nil } sig.DB = string(queryEvent.Schema) sig.Data = string(queryEvent.Query) @@ -439,7 +448,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.GTID_EVENT: ge, ok := ev.Event.(*replication.GTIDEvent) if !ok { - return res + return nil } gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err != nil { @@ -452,8 +461,9 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.TRANSACTION_PAYLOAD_EVENT: ge, ok := ev.Event.(*replication.TransactionPayloadEvent) if !ok { - return res + return nil } + res := make([]BinlogEvent, 0, len(ge.Events)) for _, val := range ge.Events { res = append(res, ParseBinlogEvent(val)...) } @@ -464,8 +474,8 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { return res } - res = append(res, sig) - return res + // 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。 + return buf[:] } func getCompressionTypeName(code uint64) string { @@ -523,8 +533,7 @@ func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) err if err != nil { return err } - _, err = parseEvent(parser, h, body) - putBodyBuf(body) + _, err = parseEvent(parser, h, headBuf, body) if err != nil { return err } @@ -538,7 +547,6 @@ func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) err } return nil } - func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { var subGtid, inGtid, exGtid *gtid.Gtid var err error @@ -666,18 +674,27 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter // GTID-only fast path if filter.OnlyShowGtid { if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT { - if err := skipEventBody(r, h); err != nil { - return err + if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || + h.EventType == replication.TABLE_MAP_EVENT { + body, err := readEventBody(r, h) + if err != nil { + return err + } + if _, err = parseEvent(parser, h, headBuf, body); err != nil { + return err + } + } else { + if err := skipEventBody(r, h); err != nil { + return err + } } continue } - body, err := readEventBody(r, h) if err != nil { return err } - e, err := parseEvent(parser, h, body) - putBodyBuf(body) + e, err := parseEvent(parser, h, headBuf, body) if err != nil { return err } @@ -743,8 +760,7 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter if err != nil { return err } - e, err := parseEvent(parser, h, body) - putBodyBuf(body) + e, err := parseEvent(parser, h, headBuf, body) if err != nil { return err } @@ -804,10 +820,22 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter continue } - // 未命中事务:零解析到底 + // 未命中事务时:TABLE_MAP_EVENT 仍需解析(parser 缓存表元数据), + // 其余事件可安全跳过 if skipCurrentTxn { - if err := skipEventBody(r, h); err != nil { - return err + if h.EventType == replication.TABLE_MAP_EVENT || + h.EventType == replication.FORMAT_DESCRIPTION_EVENT { + body, err := readEventBody(r, h) + if err != nil { + return err + } + if _, err = parseEvent(parser, h, headBuf, body); err != nil { + return err + } + } else { + if err := skipEventBody(r, h); err != nil { + return err + } } continue } @@ -816,8 +844,7 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter if err != nil { return err } - e, err := parseEvent(parser, h, body) - putBodyBuf(body) + e, err := parseEvent(parser, h, headBuf, body) if err != nil { return err } @@ -847,17 +874,16 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter tx.EndPos = int(h.LogPos) status := STATUS_PREPARE if ev.Type == "query" { - switch strings.ToLower(ev.Data) { - case "begin": + if equalFoldShort(ev.Data, "begin") { if tx.TxStartTime == 0 { tx.TxStartTime = int64(h.Timestamp) } - status = STATUS_BEGIN - case "commit": - status = STATUS_COMMIT + tx.Status = STATUS_BEGIN + } else if equalFoldShort(ev.Data, "commit") { + tx.Status = STATUS_COMMIT tx.TxEndTime = int64(h.Timestamp) - case "rollback": - status = STATUS_ROLLBACK + } else if equalFoldShort(ev.Data, "rollback") { + tx.Status = STATUS_ROLLBACK tx.TxEndTime = int64(h.Timestamp) } tx.Status = status @@ -881,7 +907,6 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter } } } - func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) { if len(filter.IncludeTables) > 0 { includeMatcher = buildTableMatcher(filter.IncludeTables) @@ -921,3 +946,19 @@ func buildTableMatcher(patterns []string) *tableMatcher { } return m } + +func equalFoldShort(s, lower string) bool { + if len(s) != len(lower) { + return false + } + for i := 0; i < len(s); i++ { + c := s[i] + if 'A' <= c && c <= 'Z' { + c += 'a' - 'A' + } + if c != lower[i] { + return false + } + } + return true +}