mysqlbinlog/parse_filter.go
starainrt 0c9d9c6eae
feat(binlog): 增加 logical clock 元数据解析与统计支撑
- 暴露事务级 last_committed、sequence_number、transaction_length 和 commit timestamp
- 在 GTID event 转换时透传 logical clock 元数据
- 新增 ParseOptions、ParseProgress,支持上下文取消和解析进度回调
- 保留 TransactionPayloadEvent 展开后的 tablemap 与压缩类型信息
- 增加 TransactionSummary 辅助结构,便于上层统计事务结果、耗时、表分布和逻辑时钟
- 清理测试对外部大 binlog 样本的依赖,保证独立库测试可运行
- 修正跨平台测试与 GTID 输出格式断言,提升发版稳定性
2026-05-10 14:02:53 +08:00

693 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package binlog
import (
"bufio"
"context"
"fmt"
"io"
"os"
"strings"
"b612.me/mysql/gtid"
"b612.me/staros"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
return ParseBinlogWithOptions(path, ParseOptions{
StartPos: pos,
Filter: filter,
}, fx)
}
func ParseBinlogWithOptions(path string, opts ParseOptions, fx func(Transaction) bool) error {
filter := opts.Filter
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
}
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
if opts.StartPos != 0 {
if err := seekToPosition(f, parser, opts.StartPos); err != nil {
return err
}
} else {
if err := validateBinlogHeader(f); err != nil {
return err
}
}
fileSize := int64(0)
if info, err := f.Stat(); err == nil {
fileSize = info.Size()
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogWithFilter(br, parser, path, fileSize, opts, fx)
}
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, path string, fileSize int64, opts ParseOptions, fn func(Transaction) bool) error {
filter := opts.Filter
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
}
ctx := opts.Context
if ctx == nil {
ctx = context.Background()
}
var subGtid, inGtid, exGtid *gtid.Gtid
var err error
includeMatcher, excludeMatcher, err := prepareTableMatchers(filter)
if err != nil {
return err
}
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
return fmt.Errorf("parse include gtid failed: %w", err)
}
subGtid = inGtid.Clone()
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
return fmt.Errorf("parse exclude gtid failed: %w", err)
}
}
var (
tbMapPos uint32
skipCurrentTxn bool
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
callFn := func(tx Transaction) bool {
if fn == nil {
return true
}
fillTimeLazy(&tx)
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return true
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return true
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return true
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return true
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
return true
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true
}
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
return true
}
if includeMatcher == nil && excludeMatcher == nil {
return fn(tx)
}
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, excludeMatcher, filter)
if pickAll {
return fn(tx)
}
if skipAll {
return true
}
if matched {
tx.Txs = txs
recomputeTxStatsFromVisibleDetails(&tx)
}
if !matched && includeMatcher != nil {
return true
}
if len(tx.Txs) == 0 && matched {
return true
}
return fn(tx)
}
observeProgress := func(h *replication.EventHeader, evs []BinlogEvent) bool {
if opts.OnProgress == nil || h == nil {
return true
}
eventPos := int64(h.LogPos - h.EventSize)
nextPos := int64(h.LogPos)
if len(evs) == 0 {
ev := BinlogEvent{
EventType: byte(h.EventType),
ServerID: h.ServerID,
Timestamp: h.Timestamp,
LogPos: h.LogPos,
EventSize: h.EventSize,
}
return opts.OnProgress(ParseProgress{
Path: path,
Event: ev,
EventPos: eventPos,
NextPos: nextPos,
FileSize: fileSize,
})
}
for _, ev := range evs {
if !opts.OnProgress(ParseProgress{
Path: path,
Event: ev,
EventPos: eventPos,
NextPos: nextPos,
FileSize: fileSize,
}) {
return false
}
}
return true
}
for {
if err := ctx.Err(); err != nil {
return err
}
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, filter.OnlyShowGtid)
callFn(tx)
}
return nil
}
if err != nil {
return err
}
if filter.OnlyShowGtid {
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT ||
h.EventType == replication.TABLE_MAP_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
if !observeProgress(h, evs) {
return nil
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
if !observeProgress(h, nil) {
return nil
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
if !observeProgress(h, evs) {
return nil
}
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if filter.EndPos != 0 && startPos > filter.EndPos {
continue
}
if filter.StartPos != 0 && startPos < filter.StartPos {
continue
}
if currentGtid != "" {
tx.EndPos = startPos - 1
finalizeTx(&tx, true)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
tx = Transaction{}
currentGtid = ""
continue
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
tx = Transaction{}
continue
}
}
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
EndPos: startPos,
Timestamp: int64(h.Timestamp),
LastCommitted: ev.LastCommitted,
SequenceNumber: ev.SequenceNumber,
TransactionLength: ev.TransactionLength,
ImmediateCommitTimestamp: ev.ImmediateCommitTimestamp,
OriginalCommitTimestamp: ev.OriginalCommitTimestamp,
}
}
continue
}
// 先处理 GTID 事件(决定当前事务是否命中)
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
if !observeProgress(h, evs) {
return nil
}
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if currentGtid != "" {
finalizeTx(&tx, false)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
skipCurrentTxn = false
if filter.EndPos != 0 && startPos > filter.EndPos {
skipCurrentTxn = true
}
if filter.StartPos != 0 && startPos < filter.StartPos {
skipCurrentTxn = true
}
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
skipCurrentTxn = true
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
skipCurrentTxn = true
}
}
if !skipCurrentTxn {
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
LastCommitted: ev.LastCommitted,
SequenceNumber: ev.SequenceNumber,
TransactionLength: ev.TransactionLength,
ImmediateCommitTimestamp: ev.ImmediateCommitTimestamp,
OriginalCommitTimestamp: ev.OriginalCommitTimestamp,
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
} else {
tx = Transaction{}
currentGtid = ""
}
}
continue
}
// 未命中事务时TABLE_MAP_EVENT 仍需解析parser 缓存表元数据),
// 其余事件可安全跳过
if skipCurrentTxn {
if h.EventType == replication.TABLE_MAP_EVENT ||
h.EventType == replication.FORMAT_DESCRIPTION_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
if !observeProgress(h, evs) {
return nil
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
if !observeProgress(h, nil) {
return nil
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
if !observeProgress(h, evs) {
return nil
}
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "gtid":
if currentGtid != "" {
finalizeTx(&tx, false)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
}
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
LastCommitted: ev.LastCommitted,
SequenceNumber: ev.SequenceNumber,
TransactionLength: ev.TransactionLength,
ImmediateCommitTimestamp: ev.ImmediateCommitTimestamp,
OriginalCommitTimestamp: ev.OriginalCommitTimestamp,
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
case "":
tx.EndPos = int(h.LogPos)
case "tablemap":
tx.EndPos = int(h.LogPos)
tbMapPos = h.LogPos - h.EventSize
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
if ev.Type == "query" {
if equalFoldShort(ev.Data, "begin") {
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
tx.Status = STATUS_BEGIN
} else if equalFoldShort(ev.Data, "commit") {
tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
} else if equalFoldShort(ev.Data, "rollback") {
tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
ColumnTypes: ev.ColumnTypes,
ColumnCollationIDs: ev.ColumnCollationIDs,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}
func selectVisibleTxDetails(tx Transaction, includeMatcher, excludeMatcher *tableMatcher, filter BinlogFilter) ([]TxDetail, bool, bool, bool) {
txs := make([]TxDetail, 0, len(tx.Txs))
matched := false
for _, t := range tx.Txs {
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
if t.Db == "" && t.Table == "" {
if includeMatcher != nil {
if filter.IncludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, true, false
}
txs = append(txs, t)
}
continue
}
if excludeMatcher != nil {
if filter.ExcludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, false, true
}
continue
}
txs = append(txs, t)
continue
}
txs = append(txs, t)
continue
}
if includeMatcher != nil {
if includeMatch {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, true, false
}
txs = append(txs, t)
}
continue
}
if excludeMatcher != nil {
if excludeMatch {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, false, true
}
continue
}
txs = append(txs, t)
continue
}
txs = append(txs, t)
}
return txs, matched, false, false
}
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher, err error) {
if len(filter.IncludeTables) > 0 {
includeMatcher, err = buildTableMatcher(filter.IncludeTables)
if err != nil {
return nil, nil, fmt.Errorf("invalid include-tables: %w", err)
}
}
if len(filter.ExcludeTables) > 0 {
excludeMatcher, err = buildTableMatcher(filter.ExcludeTables)
if err != nil {
return nil, nil, fmt.Errorf("invalid exclude-tables: %w", err)
}
}
return includeMatcher, excludeMatcher, nil
}
func buildTableMatcher(patterns []string) (*tableMatcher, error) {
m := &tableMatcher{
exactMatch: make(map[string]bool),
dbWildcard: make(map[string]bool),
tbWildcard: make(map[string]bool),
}
for _, pattern := range patterns {
origin := pattern
pattern = strings.ToLower(strings.TrimSpace(pattern))
if pattern == "" {
continue
}
if pattern == "*.*" {
m.matchAll = true
continue
}
parts := strings.Split(pattern, ".")
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return nil, fmt.Errorf("invalid table pattern %q: expect db.table", strings.TrimSpace(origin))
}
db, tb := parts[0], parts[1]
if db != "*" && strings.Contains(db, "*") {
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full db segment", strings.TrimSpace(origin))
}
if tb != "*" && strings.Contains(tb, "*") {
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full table segment", strings.TrimSpace(origin))
}
if db == "*" && tb == "*" {
m.matchAll = true
} else if db == "*" {
m.tbWildcard[tb] = true
} else if tb == "*" {
m.dbWildcard[db] = true
} else {
m.exactMatch[db+"."+tb] = true
}
}
return m, nil
}
func hasConfiguredTablePatterns(patterns []string) bool {
for _, p := range patterns {
if strings.TrimSpace(p) != "" {
return true
}
}
return false
}
func recomputeTxStatsFromVisibleDetails(tx *Transaction) {
if tx == nil {
return
}
if len(tx.Txs) == 0 {
tx.RowsCount = 0
tx.Size = 0
return
}
firstSet := false
minStart := 0
maxEnd := 0
rows := 0
for _, d := range tx.Txs {
rows += d.RowCount
if !firstSet {
minStart = d.StartPos
maxEnd = d.EndPos
firstSet = true
continue
}
if d.StartPos < minStart {
minStart = d.StartPos
}
if d.EndPos > maxEnd {
maxEnd = d.EndPos
}
}
tx.RowsCount = rows
tx.StartPos = minStart
tx.EndPos = maxEnd
if maxEnd > minStart {
tx.Size = maxEnd - minStart
} else {
tx.Size = 0
}
}
func equalFoldShort(s, lower string) bool {
if len(s) != len(lower) {
return false
}
for i := 0; i < len(s); i++ {
c := s[i]
if 'A' <= c && c <= 'Z' {
c += 'a' - 'A'
}
if c != lower[i] {
return false
}
}
return true
}