refactor(parse): 拆分 parse.go 并修复事务/过滤一致性问题

- 将臃肿的 parse.go 按职责拆分为多个模块:
    parse_types.go、parse_io.go、parse_event_convert.go、parse_stream.go、parse_filter.go
  - parse.go 保留为模块入口说明,提升可维护性与可读性
  - 修复事务状态被覆盖问题(BEGIN/COMMIT/ROLLBACK 不再被重置为 PREPARE)
  - 增加 include-tables 与 exclude-tables 互斥校验,同时配置时直接报配置错误
  - 强化表匹配器模式校验,并补充非法模式测试
  - 在明细过滤后重算事务统计(RowsCount/StartPos/EndPos/Size),避免统计失真
  - 增加 TABLE_MAP 事件转换,补充列元信息透传(ColumnTypes/ColumnCollationIDs)
  - 基于 unsigned 元数据规范化行值,避免无符号整型被渲染为负数
  - 优化事件解析报错信息:增加有界 body 十六进制预览
  - 补充单元测试:payload/tablemap 转换、unsigned 规范化、过滤逻辑、IO 预览
This commit is contained in:
兔子 2026-03-19 17:04:35 +08:00
parent 52a32d4836
commit 8469c11373
Signed by: b612
GPG Key ID: 99DD2222B612B612
10 changed files with 1573 additions and 962 deletions

968
parse.go
View File

@ -1,964 +1,8 @@
package binlog package binlog
import ( // Parsing implementation is split across:
"bufio" // - parse_types.go
"bytes" // - parse_io.go
"errors" // - parse_event_convert.go
"fmt" // - parse_stream.go
"io" // - parse_filter.go
"os"
"strings"
"sync"
"time"
"b612.me/mysql/gtid"
"b612.me/staros"
"github.com/starainrt/go-mysql/replication"
)
var (
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
ErrEventTooSmall = errors.New("event size too small")
)
const (
CompressionNone uint64 = 255
CompressionZSTD uint64 = 0
)
const (
maxPooledRawDataCap = 4 << 20 // 4MB
defaultReadBufSize = 1 << 20 // 1MB
)
type TxDetail struct {
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
RowCount int `json:"rowCount"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
Sql string `json:"sql"`
Db string `json:"db"`
Table string `json:"table"`
SqlType string `json:"sqlType"`
CompressionType string `json:"compressionType"`
Rows [][]interface{} `json:"rows"`
}
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct {
GTID string `json:"gtid"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
Size int `json:"size"`
RowsCount int `json:"rowsCount"`
Status uint8 `json:"status"`
TxStartTime int64 `json:"txStartTime"`
TxEndTime int64 `json:"txEndTime"`
sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
dmlEventCount int
}
func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin
}
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
IncludeTables []string
ExcludeTables []string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
OnlyShowGtid bool
OnlyShowDML bool
PickTxAllIfMatch bool
ExcludeBlank bool
IncludeBlank bool
}
type BinlogEvent struct {
Type string
DB string
TB string
Data string
RowCnt uint32
Rows [][]interface{}
CompressionType string
}
type tableMatcher struct {
exactMatch map[string]bool
dbWildcard map[string]bool
tbWildcard map[string]bool
matchAll bool
}
func (m *tableMatcher) match(db, tb string) bool {
if m.matchAll {
return true
}
if m.dbWildcard[db] || m.tbWildcard[tb] {
return true
}
if len(m.exactMatch) > 0 {
// Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配
var buf [128]byte
key := buf[:0]
key = append(key, db...)
key = append(key, '.')
key = append(key, tb...)
if m.exactMatch[string(key)] {
return true
}
}
return false
}
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx)
}
func parseOneBinlog(path string, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if err := validateBinlogHeader(f); err != nil {
return err
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogDetail(br, fx)
}
func validateBinlogHeader(f *os.File) error {
const fileTypeBytes = int64(4)
b := make([]byte, fileTypeBytes)
if _, err := f.Read(b); err != nil {
return fmt.Errorf("read binlog header failed: %w", err)
}
if !bytes.Equal(b, replication.BinLogFileHeader) {
return ErrInvalidBinlogHeader
}
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
return fmt.Errorf("seek after header failed: %w", err)
}
return nil
}
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
if _, err := io.ReadFull(r, headBuf); err != nil {
return nil, err
}
h, err := parser.ParseHeader(headBuf)
if err != nil {
return nil, fmt.Errorf("parse header failed: %w", err)
}
if h.EventSize <= uint32(replication.EventHeaderSize) {
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
}
return h, nil
}
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
bodyLen := int(h.EventSize) - replication.EventHeaderSize
body := make([]byte, bodyLen)
if _, err := io.ReadFull(r, body); err != nil {
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
}
return body, nil
}
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
if bodyLen <= 0 {
return nil
}
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
return fmt.Errorf("skip event body failed: %w", err)
}
return nil
}
var rawDataPool = sync.Pool{
New: func() any {
b := make([]byte, 0, 64*1024)
return &b
},
}
func getRawDataBuf(n int) []byte {
p := rawDataPool.Get().(*[]byte)
if cap(*p) < n {
return make([]byte, n)
}
return (*p)[:n]
}
func putRawDataBuf(b []byte) {
if cap(b) > maxPooledRawDataCap {
return
}
b = b[:0]
rawDataPool.Put(&b)
}
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) {
rawLen := len(headBuf) + len(body)
rawData := getRawDataBuf(rawLen)
copy(rawData, headBuf)
copy(rawData[len(headBuf):], body)
e, err := parser.ParseEvent(h, body, rawData)
putRawDataBuf(rawData)
if err != nil {
return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Data %q, Err: %w",
h.LogPos, h, body, err)
}
return e, nil
}
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
if onlyShowGtid {
tx.Size = 0
} else {
tx.Size = tx.EndPos - tx.StartPos
}
}
func fillTimeLazy(tx *Transaction) {
if tx.Timestamp != 0 && tx.Time.IsZero() {
tx.Time = time.Unix(tx.Timestamp, 0)
}
for i := range tx.Txs {
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
}
}
}
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
var (
tbMapPos uint32
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil {
f(tx)
}
}
return nil
}
if err != nil {
return err
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "gtid":
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil && !f(tx) {
return nil
}
}
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
case "":
tx.EndPos = int(h.LogPos)
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
if equalFoldShort(ev.Data, "begin") {
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
tx.Status = STATUS_BEGIN
} else if equalFoldShort(ev.Data, "commit") {
tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
} else if equalFoldShort(ev.Data, "rollback") {
tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
tx.Status = status
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var buf [1]BinlogEvent
sig := &buf[0]
switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = wrEvent.Rows
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.ROWS_QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
if !ok {
return nil
}
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
case replication.QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok {
return nil
}
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
case replication.MARIADB_GTID_EVENT:
sig.Data = "begin"
sig.Type = "query"
case replication.XID_EVENT:
sig.Data = "commit"
sig.Type = "query"
case replication.GTID_EVENT:
ge, ok := ev.Event.(*replication.GTIDEvent)
if !ok {
return nil
}
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err != nil {
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
} else {
sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
if !ok {
return nil
}
res := make([]BinlogEvent, 0, len(ge.Events))
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
compressionType := getCompressionTypeName(ge.CompressionType)
for idx := range res {
res[idx].CompressionType = compressionType
}
return res
}
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
return buf[:]
}
func getCompressionTypeName(code uint64) string {
switch code {
case CompressionZSTD:
return "ZSTD"
case CompressionNone:
return ""
default:
return fmt.Sprintf("UNKNOWN(%d)", code)
}
}
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
if pos != 0 {
if err := seekToPosition(f, parser, pos); err != nil {
return err
}
} else {
if err := validateBinlogHeader(f); err != nil {
return err
}
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogWithFilter(br, parser, filter, fx)
}
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
if err := validateBinlogHeader(f); err != nil {
return err
}
headBuf := make([]byte, replication.EventHeaderSize)
for {
h, err := readEventHeader(f, parser, headBuf)
if err != nil {
return fmt.Errorf("seek to position failed: %w", err)
}
body, err := readEventBody(f, h)
if err != nil {
return err
}
_, err = parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
break
}
}
if _, err := f.Seek(pos, io.SeekStart); err != nil {
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
}
return nil
}
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var subGtid, inGtid, exGtid *gtid.Gtid
var err error
includeMatcher, excludeMatcher := prepareTableMatchers(filter)
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
return fmt.Errorf("parse include gtid failed: %w", err)
}
subGtid = inGtid.Clone()
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
return fmt.Errorf("parse exclude gtid failed: %w", err)
}
}
var (
tbMapPos uint32
skipCurrentTxn bool
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
callFn := func(tx Transaction) bool {
if fn == nil {
return true
}
fillTimeLazy(&tx)
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return true
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return true
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return true
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return true
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
return true
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true
}
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
return true
}
var txs []TxDetail
var matched bool
for _, t := range tx.Txs {
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
if t.Db == "" && t.Table == "" {
if includeMatcher != nil && !filter.IncludeBlank {
continue
}
if excludeMatcher != nil && filter.ExcludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return true
}
continue
}
}
if includeMatcher != nil {
if includeMatch {
matched = true
if filter.PickTxAllIfMatch {
return fn(tx)
}
txs = append(txs, t)
}
} else if excludeMatcher != nil {
if excludeMatch {
matched = true
if filter.PickTxAllIfMatch {
return true
}
} else {
txs = append(txs, t)
}
} else {
txs = append(txs, t)
}
}
if matched {
tx.Txs = txs
}
if !matched && includeMatcher != nil {
return true
}
if len(tx.Txs) == 0 && matched {
return true
}
return fn(tx)
}
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if !tx.Time.IsZero() || tx.Timestamp != 0 {
finalizeTx(&tx, filter.OnlyShowGtid)
callFn(tx)
}
return nil
}
if err != nil {
return err
}
// GTID-only fast path
if filter.OnlyShowGtid {
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT ||
h.EventType == replication.TABLE_MAP_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if filter.EndPos != 0 && startPos > filter.EndPos {
continue
}
if filter.StartPos != 0 && startPos < filter.StartPos {
continue
}
if currentGtid != "" {
tx.EndPos = startPos - 1
finalizeTx(&tx, true)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
tx = Transaction{}
currentGtid = ""
continue
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
tx = Transaction{}
continue
}
}
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
EndPos: startPos,
Timestamp: int64(h.Timestamp),
}
}
continue
}
// 先处理GTID事件决定当前事务是否命中
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if currentGtid != "" {
finalizeTx(&tx, false)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
skipCurrentTxn = false
if filter.EndPos != 0 && startPos > filter.EndPos {
skipCurrentTxn = true
}
if filter.StartPos != 0 && startPos < filter.StartPos {
skipCurrentTxn = true
}
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
skipCurrentTxn = true
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
skipCurrentTxn = true
}
}
if !skipCurrentTxn {
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
} else {
tx = Transaction{}
}
}
continue
}
// 未命中事务时TABLE_MAP_EVENT 仍需解析parser 缓存表元数据),
// 其余事件可安全跳过
if skipCurrentTxn {
if h.EventType == replication.TABLE_MAP_EVENT ||
h.EventType == replication.FORMAT_DESCRIPTION_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "":
tx.EndPos = int(h.LogPos)
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
if equalFoldShort(ev.Data, "begin") {
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
tx.Status = STATUS_BEGIN
} else if equalFoldShort(ev.Data, "commit") {
tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
} else if equalFoldShort(ev.Data, "rollback") {
tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
tx.Status = status
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) {
if len(filter.IncludeTables) > 0 {
includeMatcher = buildTableMatcher(filter.IncludeTables)
}
if len(filter.ExcludeTables) > 0 {
excludeMatcher = buildTableMatcher(filter.ExcludeTables)
}
return includeMatcher, excludeMatcher
}
func buildTableMatcher(patterns []string) *tableMatcher {
m := &tableMatcher{
exactMatch: make(map[string]bool),
dbWildcard: make(map[string]bool),
tbWildcard: make(map[string]bool),
}
for _, pattern := range patterns {
if pattern == "*.*" {
m.matchAll = true
continue
}
parts := strings.Split(pattern, ".")
if len(parts) != 2 {
continue
}
db, tb := parts[0], parts[1]
if db == "*" && tb == "*" {
m.matchAll = true
} else if db == "*" {
m.tbWildcard[tb] = true
} else if tb == "*" {
m.dbWildcard[db] = true
} else {
m.exactMatch[pattern] = true
}
}
return m
}
func equalFoldShort(s, lower string) bool {
if len(s) != len(lower) {
return false
}
for i := 0; i < len(s); i++ {
c := s[i]
if 'A' <= c && c <= 'Z' {
c += 'a' - 'A'
}
if c != lower[i] {
return false
}
}
return true
}

257
parse_event_convert.go Normal file
View File

@ -0,0 +1,257 @@
package binlog
import (
"fmt"
"b612.me/mysql/gtid"
"github.com/starainrt/go-mysql/mysql"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var buf [1]BinlogEvent
sig := &buf[0]
switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.TABLE_MAP_EVENT:
tableEvent, ok := ev.Event.(*replication.TableMapEvent)
if !ok {
return nil
}
sig.DB = string(tableEvent.Schema)
sig.TB = string(tableEvent.Table)
sig.Type = "tablemap"
case replication.ROWS_QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
if !ok {
return nil
}
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
case replication.QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok {
return nil
}
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
case replication.MARIADB_GTID_EVENT:
sig.Data = "begin"
sig.Type = "query"
case replication.XID_EVENT:
sig.Data = "commit"
sig.Type = "query"
case replication.GTID_EVENT:
ge, ok := ev.Event.(*replication.GTIDEvent)
if !ok {
return nil
}
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err != nil {
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
} else {
sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
if !ok {
return nil
}
res := make([]BinlogEvent, 0, len(ge.Events))
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
compressionType := getCompressionTypeName(ge.CompressionType)
for idx := range res {
res[idx].CompressionType = compressionType
}
return res
}
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
return buf[:]
}
func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} {
if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 {
if wrEvent == nil {
return nil
}
return wrEvent.Rows
}
unsignedMap := wrEvent.Table.UnsignedMap()
if len(unsignedMap) == 0 {
return wrEvent.Rows
}
columnTypes := wrEvent.Table.ColumnType
if len(columnTypes) == 0 {
return wrEvent.Rows
}
for rowIdx := range wrEvent.Rows {
row := wrEvent.Rows[rowIdx]
for colIdx := range row {
if !unsignedMap[colIdx] {
continue
}
if colIdx >= len(columnTypes) {
continue
}
row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx])
}
}
return wrEvent.Rows
}
func cloneColumnTypes(table *replication.TableMapEvent) []int {
if table == nil || len(table.ColumnType) == 0 {
return nil
}
ret := make([]int, len(table.ColumnType))
for i, t := range table.ColumnType {
ret[i] = int(t)
}
return ret
}
func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 {
if table == nil {
return nil
}
columnCount := len(table.ColumnType)
if columnCount == 0 && table.ColumnCount > 0 {
columnCount = int(table.ColumnCount)
}
if columnCount == 0 {
return nil
}
ret := make([]uint64, columnCount)
hasValue := false
for idx, collationID := range table.CollationMap() {
if idx < 0 || idx >= columnCount {
continue
}
ret[idx] = collationID
hasValue = hasValue || collationID != 0
}
for idx, collationID := range table.EnumSetCollationMap() {
if idx < 0 || idx >= columnCount {
continue
}
if ret[idx] == 0 {
ret[idx] = collationID
}
hasValue = hasValue || collationID != 0
}
if !hasValue {
return nil
}
return ret
}
func normalizeUnsignedValue(v interface{}, colType byte) interface{} {
signed, ok := signedToInt64(v)
if !ok {
return v
}
switch colType {
case mysql.MYSQL_TYPE_TINY:
return uint8(signed)
case mysql.MYSQL_TYPE_SHORT:
return uint16(signed)
case mysql.MYSQL_TYPE_INT24:
return uint32(uint32(int32(signed)) & 0x00FFFFFF)
case mysql.MYSQL_TYPE_LONG:
return uint32(signed)
case mysql.MYSQL_TYPE_LONGLONG:
return uint64(signed)
default:
return v
}
}
func signedToInt64(v interface{}) (int64, bool) {
switch x := v.(type) {
case int8:
return int64(x), true
case int16:
return int64(x), true
case int32:
return int64(x), true
case int64:
return x, true
case int:
return int64(x), true
default:
return 0, false
}
}
func getCompressionTypeName(code uint64) string {
switch code {
case CompressionZSTD:
return "ZSTD"
case CompressionNone:
return ""
default:
return fmt.Sprintf("UNKNOWN(%d)", code)
}
}

View File

@ -0,0 +1,73 @@
package binlog
import (
"testing"
"github.com/starainrt/go-mysql/mysql"
"github.com/starainrt/go-mysql/replication"
)
func TestParseBinlogEvent_TableMapEvent(t *testing.T) {
ev := &replication.BinlogEvent{
Header: &replication.EventHeader{EventType: replication.TABLE_MAP_EVENT},
Event: &replication.TableMapEvent{
Schema: []byte("db1"),
Table: []byte("tb1"),
},
}
events := ParseBinlogEvent(ev)
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
if events[0].Type != "tablemap" {
t.Fatalf("expected tablemap event type, got %q", events[0].Type)
}
if events[0].DB != "db1" || events[0].TB != "tb1" {
t.Fatalf("unexpected db/table: %s.%s", events[0].DB, events[0].TB)
}
}
func TestParseBinlogEvent_TransactionPayloadContainsTableMap(t *testing.T) {
table := &replication.TableMapEvent{
Schema: []byte("db2"),
Table: []byte("tb2"),
ColumnType: []byte{mysql.MYSQL_TYPE_LONG},
}
payload := &replication.TransactionPayloadEvent{
CompressionType: CompressionZSTD,
Events: []*replication.BinlogEvent{
{
Header: &replication.EventHeader{EventType: replication.TABLE_MAP_EVENT},
Event: table,
},
{
Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2},
Event: &replication.RowsEvent{
Table: table,
Rows: [][]interface{}{{int32(1)}},
},
},
},
}
ev := &replication.BinlogEvent{
Header: &replication.EventHeader{EventType: replication.TRANSACTION_PAYLOAD_EVENT},
Event: payload,
}
events := ParseBinlogEvent(ev)
if len(events) != 2 {
t.Fatalf("expected 2 events from payload, got %d", len(events))
}
if events[0].Type != "tablemap" {
t.Fatalf("expected first payload event to be tablemap, got %q", events[0].Type)
}
if events[1].Type != "insert" {
t.Fatalf("expected second payload event to be insert, got %q", events[1].Type)
}
if events[0].CompressionType != "ZSTD" || events[1].CompressionType != "ZSTD" {
t.Fatalf("expected payload events to carry compression type, got %q/%q", events[0].CompressionType, events[1].CompressionType)
}
}

View File

@ -0,0 +1,95 @@
package binlog
import (
"testing"
"github.com/starainrt/go-mysql/mysql"
"github.com/starainrt/go-mysql/replication"
)
func TestNormalizeRowsByUnsigned_AllIntegerKinds(t *testing.T) {
event := &replication.RowsEvent{
Table: &replication.TableMapEvent{
ColumnCount: 5,
ColumnType: []byte{mysql.MYSQL_TYPE_TINY, mysql.MYSQL_TYPE_SHORT, mysql.MYSQL_TYPE_INT24, mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_LONGLONG},
SignednessBitmap: []byte{0xF8},
},
Rows: [][]interface{}{{int8(-1), int16(-2), int32(-1), int32(-1), int64(-1)}},
}
got := normalizeRowsByUnsigned(event)
row := got[0]
if v, ok := row[0].(uint8); !ok || v != 255 {
t.Fatalf("tiny unsigned mismatch: %T %v", row[0], row[0])
}
if v, ok := row[1].(uint16); !ok || v != 65534 {
t.Fatalf("short unsigned mismatch: %T %v", row[1], row[1])
}
if v, ok := row[2].(uint32); !ok || v != 16777215 {
t.Fatalf("int24 unsigned mismatch: %T %v", row[2], row[2])
}
if v, ok := row[3].(uint32); !ok || v != 4294967295 {
t.Fatalf("long unsigned mismatch: %T %v", row[3], row[3])
}
if v, ok := row[4].(uint64); !ok || v != 18446744073709551615 {
t.Fatalf("longlong unsigned mismatch: %T %v", row[4], row[4])
}
}
func TestNormalizeRowsByUnsigned_NoSignednessMetadata(t *testing.T) {
event := &replication.RowsEvent{
Table: &replication.TableMapEvent{
ColumnCount: 1,
ColumnType: []byte{mysql.MYSQL_TYPE_LONGLONG},
},
Rows: [][]interface{}{{int64(-1)}},
}
got := normalizeRowsByUnsigned(event)
if v, ok := got[0][0].(int64); !ok || v != -1 {
t.Fatalf("value should remain signed when metadata missing: %T %v", got[0][0], got[0][0])
}
}
func TestParseBinlogEvent_IncludeColumnMetadata(t *testing.T) {
event := &replication.RowsEvent{
Table: &replication.TableMapEvent{
ColumnCount: 3,
ColumnType: []byte{mysql.MYSQL_TYPE_VAR_STRING, mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_VAR_STRING},
DefaultCharset: []uint64{45}, // utf8mb4_general_ci
},
Rows: [][]interface{}{{"name", int32(1), "desc"}},
}
ev := &replication.BinlogEvent{
Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2},
Event: event,
}
events := ParseBinlogEvent(ev)
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
got := events[0]
if len(got.ColumnTypes) != 3 {
t.Fatalf("unexpected column type length: %d", len(got.ColumnTypes))
}
if got.ColumnTypes[0] != int(mysql.MYSQL_TYPE_VAR_STRING) || got.ColumnTypes[1] != int(mysql.MYSQL_TYPE_LONG) {
t.Fatalf("unexpected column types: %v", got.ColumnTypes)
}
if len(got.ColumnCollationIDs) != 3 {
t.Fatalf("unexpected column collation length: %d", len(got.ColumnCollationIDs))
}
if got.ColumnCollationIDs[0] != 45 {
t.Fatalf("unexpected collation for column 0: %d", got.ColumnCollationIDs[0])
}
if got.ColumnCollationIDs[2] != 45 {
t.Fatalf("unexpected collation for column 2: %d", got.ColumnCollationIDs[2])
}
if got.ColumnCollationIDs[1] != 0 {
t.Fatalf("non-character column should keep zero collation: %d", got.ColumnCollationIDs[1])
}
}

572
parse_filter.go Normal file
View File

@ -0,0 +1,572 @@
package binlog
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"b612.me/mysql/gtid"
"b612.me/staros"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
}
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
if pos != 0 {
if err := seekToPosition(f, parser, pos); err != nil {
return err
}
} else {
if err := validateBinlogHeader(f); err != nil {
return err
}
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogWithFilter(br, parser, filter, fx)
}
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
}
var subGtid, inGtid, exGtid *gtid.Gtid
var err error
includeMatcher, excludeMatcher, err := prepareTableMatchers(filter)
if err != nil {
return err
}
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
return fmt.Errorf("parse include gtid failed: %w", err)
}
subGtid = inGtid.Clone()
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
return fmt.Errorf("parse exclude gtid failed: %w", err)
}
}
var (
tbMapPos uint32
skipCurrentTxn bool
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
callFn := func(tx Transaction) bool {
if fn == nil {
return true
}
fillTimeLazy(&tx)
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return true
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return true
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return true
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return true
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
return true
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true
}
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
return true
}
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, excludeMatcher, filter)
if pickAll {
return fn(tx)
}
if skipAll {
return true
}
if matched {
tx.Txs = txs
recomputeTxStatsFromVisibleDetails(&tx)
}
if !matched && includeMatcher != nil {
return true
}
if len(tx.Txs) == 0 && matched {
return true
}
return fn(tx)
}
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, filter.OnlyShowGtid)
callFn(tx)
}
return nil
}
if err != nil {
return err
}
// GTID-only fast path
if filter.OnlyShowGtid {
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT ||
h.EventType == replication.TABLE_MAP_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if filter.EndPos != 0 && startPos > filter.EndPos {
continue
}
if filter.StartPos != 0 && startPos < filter.StartPos {
continue
}
if currentGtid != "" {
tx.EndPos = startPos - 1
finalizeTx(&tx, true)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
tx = Transaction{}
currentGtid = ""
continue
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
tx = Transaction{}
continue
}
}
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
EndPos: startPos,
Timestamp: int64(h.Timestamp),
}
}
continue
}
// 先处理 GTID 事件(决定当前事务是否命中)
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if currentGtid != "" {
finalizeTx(&tx, false)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
skipCurrentTxn = false
if filter.EndPos != 0 && startPos > filter.EndPos {
skipCurrentTxn = true
}
if filter.StartPos != 0 && startPos < filter.StartPos {
skipCurrentTxn = true
}
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
skipCurrentTxn = true
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
skipCurrentTxn = true
}
}
if !skipCurrentTxn {
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
} else {
tx = Transaction{}
}
}
continue
}
// 未命中事务时TABLE_MAP_EVENT 仍需解析parser 缓存表元数据),
// 其余事件可安全跳过
if skipCurrentTxn {
if h.EventType == replication.TABLE_MAP_EVENT ||
h.EventType == replication.FORMAT_DESCRIPTION_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "":
tx.EndPos = int(h.LogPos)
case "tablemap":
tx.EndPos = int(h.LogPos)
tbMapPos = h.LogPos - h.EventSize
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
if ev.Type == "query" {
if equalFoldShort(ev.Data, "begin") {
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
tx.Status = STATUS_BEGIN
} else if equalFoldShort(ev.Data, "commit") {
tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
} else if equalFoldShort(ev.Data, "rollback") {
tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
ColumnTypes: ev.ColumnTypes,
ColumnCollationIDs: ev.ColumnCollationIDs,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}
func selectVisibleTxDetails(tx Transaction, includeMatcher, excludeMatcher *tableMatcher, filter BinlogFilter) ([]TxDetail, bool, bool, bool) {
txs := make([]TxDetail, 0, len(tx.Txs))
matched := false
for _, t := range tx.Txs {
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
if t.Db == "" && t.Table == "" {
if includeMatcher != nil {
if filter.IncludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, true, false
}
txs = append(txs, t)
}
continue
}
if excludeMatcher != nil {
if filter.ExcludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, false, true
}
continue
}
txs = append(txs, t)
continue
}
txs = append(txs, t)
continue
}
if includeMatcher != nil {
if includeMatch {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, true, false
}
txs = append(txs, t)
}
continue
}
if excludeMatcher != nil {
if excludeMatch {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, false, true
}
continue
}
txs = append(txs, t)
continue
}
txs = append(txs, t)
}
return txs, matched, false, false
}
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher, err error) {
if len(filter.IncludeTables) > 0 {
includeMatcher, err = buildTableMatcher(filter.IncludeTables)
if err != nil {
return nil, nil, fmt.Errorf("invalid include-tables: %w", err)
}
}
if len(filter.ExcludeTables) > 0 {
excludeMatcher, err = buildTableMatcher(filter.ExcludeTables)
if err != nil {
return nil, nil, fmt.Errorf("invalid exclude-tables: %w", err)
}
}
return includeMatcher, excludeMatcher, nil
}
func buildTableMatcher(patterns []string) (*tableMatcher, error) {
m := &tableMatcher{
exactMatch: make(map[string]bool),
dbWildcard: make(map[string]bool),
tbWildcard: make(map[string]bool),
}
for _, pattern := range patterns {
origin := pattern
pattern = strings.ToLower(strings.TrimSpace(pattern))
if pattern == "" {
continue
}
if pattern == "*.*" {
m.matchAll = true
continue
}
parts := strings.Split(pattern, ".")
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return nil, fmt.Errorf("invalid table pattern %q: expect db.table", strings.TrimSpace(origin))
}
db, tb := parts[0], parts[1]
if db != "*" && strings.Contains(db, "*") {
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full db segment", strings.TrimSpace(origin))
}
if tb != "*" && strings.Contains(tb, "*") {
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full table segment", strings.TrimSpace(origin))
}
if db == "*" && tb == "*" {
m.matchAll = true
} else if db == "*" {
m.tbWildcard[tb] = true
} else if tb == "*" {
m.dbWildcard[db] = true
} else {
m.exactMatch[db+"."+tb] = true
}
}
return m, nil
}
func hasConfiguredTablePatterns(patterns []string) bool {
for _, p := range patterns {
if strings.TrimSpace(p) != "" {
return true
}
}
return false
}
func recomputeTxStatsFromVisibleDetails(tx *Transaction) {
if tx == nil {
return
}
if len(tx.Txs) == 0 {
tx.RowsCount = 0
tx.Size = 0
return
}
firstSet := false
minStart := 0
maxEnd := 0
rows := 0
for _, d := range tx.Txs {
rows += d.RowCount
if !firstSet {
minStart = d.StartPos
maxEnd = d.EndPos
firstSet = true
continue
}
if d.StartPos < minStart {
minStart = d.StartPos
}
if d.EndPos > maxEnd {
maxEnd = d.EndPos
}
}
tx.RowsCount = rows
tx.StartPos = minStart
tx.EndPos = maxEnd
if maxEnd > minStart {
tx.Size = maxEnd - minStart
} else {
tx.Size = 0
}
}
func equalFoldShort(s, lower string) bool {
if len(s) != len(lower) {
return false
}
for i := 0; i < len(s); i++ {
c := s[i]
if 'A' <= c && c <= 'Z' {
c += 'a' - 'A'
}
if c != lower[i] {
return false
}
}
return true
}

View File

@ -0,0 +1,85 @@
package binlog
import (
"strings"
"testing"
)
func TestBuildTableMatcher_InvalidPattern(t *testing.T) {
cases := [][]string{
{"db"},
{"db."},
{".tb"},
{"db.tb.more"},
{"db.t*"},
{"d*.tb"},
}
for _, patterns := range cases {
if _, err := buildTableMatcher(patterns); err == nil {
t.Fatalf("expected invalid pattern error, got nil: %v", patterns)
}
}
}
func TestPrepareTableMatchers_ReturnErrorOnInvalidPattern(t *testing.T) {
_, _, err := prepareTableMatchers(BinlogFilter{IncludeTables: []string{"invalid"}})
if err == nil {
t.Fatal("expected include-tables error, got nil")
}
if !strings.Contains(err.Error(), "invalid include-tables") {
t.Fatalf("unexpected error: %v", err)
}
}
func TestSelectVisibleTxDetails_IncludeBlank(t *testing.T) {
includeMatcher, err := buildTableMatcher([]string{"db1.tb1"})
if err != nil {
t.Fatalf("build include matcher failed: %v", err)
}
tx := Transaction{Txs: []TxDetail{
{SqlType: "query", Sql: "BEGIN"},
{SqlType: "insert", Db: "db1", Table: "tb1", RowCount: 1},
}}
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, nil, BinlogFilter{IncludeBlank: true})
if !matched {
t.Fatal("expected matched=true")
}
if pickAll || skipAll {
t.Fatalf("unexpected pickAll/skipAll: %v/%v", pickAll, skipAll)
}
if len(txs) != 2 {
t.Fatalf("expected 2 details with IncludeBlank=true, got %d", len(txs))
}
if txs[0].SqlType != "query" || txs[1].Table != "tb1" {
t.Fatalf("unexpected details order/content: %#v", txs)
}
}
func TestSelectVisibleTxDetails_ExcludeBlank(t *testing.T) {
excludeMatcher, err := buildTableMatcher([]string{"db2.tb2"})
if err != nil {
t.Fatalf("build exclude matcher failed: %v", err)
}
tx := Transaction{Txs: []TxDetail{
{SqlType: "query", Sql: "BEGIN"},
{SqlType: "insert", Db: "db1", Table: "tb1", RowCount: 1},
}}
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, nil, excludeMatcher, BinlogFilter{ExcludeBlank: true})
if !matched {
t.Fatal("expected matched=true when excluding blank detail")
}
if pickAll || skipAll {
t.Fatalf("unexpected pickAll/skipAll: %v/%v", pickAll, skipAll)
}
if len(txs) != 1 {
t.Fatalf("expected 1 detail after ExcludeBlank=true, got %d", len(txs))
}
if txs[0].Db != "db1" || txs[0].Table != "tb1" {
t.Fatalf("unexpected remaining detail: %#v", txs[0])
}
}

150
parse_io.go Normal file
View File

@ -0,0 +1,150 @@
package binlog
import (
"bytes"
"encoding/hex"
"fmt"
"io"
"os"
"sync"
"github.com/starainrt/go-mysql/replication"
)
func validateBinlogHeader(f *os.File) error {
const fileTypeBytes = int64(4)
b := make([]byte, fileTypeBytes)
if _, err := f.Read(b); err != nil {
return fmt.Errorf("read binlog header failed: %w", err)
}
if !bytes.Equal(b, replication.BinLogFileHeader) {
return ErrInvalidBinlogHeader
}
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
return fmt.Errorf("seek after header failed: %w", err)
}
return nil
}
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
if _, err := io.ReadFull(r, headBuf); err != nil {
return nil, err
}
h, err := parser.ParseHeader(headBuf)
if err != nil {
return nil, fmt.Errorf("parse header failed: %w", err)
}
if h.EventSize <= uint32(replication.EventHeaderSize) {
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
}
return h, nil
}
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
bodyLen := int(h.EventSize) - replication.EventHeaderSize
body := make([]byte, bodyLen)
if _, err := io.ReadFull(r, body); err != nil {
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
}
return body, nil
}
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
if bodyLen <= 0 {
return nil
}
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
return fmt.Errorf("skip event body failed: %w", err)
}
return nil
}
var rawDataPool = sync.Pool{
New: func() any {
b := make([]byte, 0, 64*1024)
return &b
},
}
func getRawDataBuf(n int) []byte {
p := rawDataPool.Get().(*[]byte)
if cap(*p) < n {
return make([]byte, n)
}
return (*p)[:n]
}
func putRawDataBuf(b []byte) {
if cap(b) > maxPooledRawDataCap {
return
}
b = b[:0]
rawDataPool.Put(&b)
}
func formatBodyPreview(body []byte, maxBytes int) string {
if maxBytes <= 0 {
maxBytes = 256
}
if len(body) == 0 {
return "len=0"
}
previewLen := len(body)
truncated := false
if previewLen > maxBytes {
previewLen = maxBytes
truncated = true
}
hexBody := hex.EncodeToString(body[:previewLen])
if truncated {
return fmt.Sprintf("len=%d preview(hex,%dB)=%s...", len(body), previewLen, hexBody)
}
return fmt.Sprintf("len=%d preview(hex,%dB)=%s", len(body), previewLen, hexBody)
}
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) {
rawLen := len(headBuf) + len(body)
rawData := getRawDataBuf(rawLen)
copy(rawData, headBuf)
copy(rawData[len(headBuf):], body)
e, err := parser.ParseEvent(h, body, rawData)
putRawDataBuf(rawData)
if err != nil {
return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Body %s, Err: %w",
h.LogPos, h, formatBodyPreview(body, 256), err)
}
return e, nil
}
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
if err := validateBinlogHeader(f); err != nil {
return err
}
headBuf := make([]byte, replication.EventHeaderSize)
for {
h, err := readEventHeader(f, parser, headBuf)
if err != nil {
return fmt.Errorf("seek to position failed: %w", err)
}
body, err := readEventBody(f, h)
if err != nil {
return err
}
_, err = parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
break
}
}
if _, err := f.Seek(pos, io.SeekStart); err != nil {
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
}
return nil
}

33
parse_io_test.go Normal file
View File

@ -0,0 +1,33 @@
package binlog
import (
"strings"
"testing"
)
func TestFormatBodyPreview(t *testing.T) {
if got := formatBodyPreview(nil, 256); got != "len=0" {
t.Fatalf("unexpected empty preview: %q", got)
}
small := []byte{0x01, 0x02, 0xAB}
got := formatBodyPreview(small, 8)
if !strings.Contains(got, "len=3") || !strings.Contains(got, "0102ab") {
t.Fatalf("unexpected preview for small body: %q", got)
}
if strings.Contains(got, "...") {
t.Fatalf("small body should not be truncated: %q", got)
}
large := make([]byte, 300)
for i := range large {
large[i] = byte(i)
}
got = formatBodyPreview(large, 16)
if !strings.Contains(got, "len=300") || !strings.Contains(got, "preview(hex,16B)=") {
t.Fatalf("unexpected preview for large body: %q", got)
}
if !strings.HasSuffix(got, "...") {
t.Fatalf("large body should be truncated with ellipsis: %q", got)
}
}

176
parse_stream.go Normal file
View File

@ -0,0 +1,176 @@
package binlog
import (
"bufio"
"io"
"os"
"time"
"b612.me/staros"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx)
}
func parseOneBinlog(path string, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if err := validateBinlogHeader(f); err != nil {
return err
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogDetail(br, fx)
}
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
if onlyShowGtid {
tx.Size = 0
} else {
tx.Size = tx.EndPos - tx.StartPos
}
}
func fillTimeLazy(tx *Transaction) {
if tx.Timestamp != 0 && tx.Time.IsZero() {
tx.Time = time.Unix(tx.Timestamp, 0)
}
for i := range tx.Txs {
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
}
}
}
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
var (
tbMapPos uint32
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil {
f(tx)
}
}
return nil
}
if err != nil {
return err
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "gtid":
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil && !f(tx) {
return nil
}
}
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
case "":
tx.EndPos = int(h.LogPos)
case "tablemap":
tx.EndPos = int(h.LogPos)
tbMapPos = h.LogPos - h.EventSize
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
if ev.Type == "query" {
if equalFoldShort(ev.Data, "begin") {
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
tx.Status = STATUS_BEGIN
} else if equalFoldShort(ev.Data, "commit") {
tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
} else if equalFoldShort(ev.Data, "rollback") {
tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
ColumnTypes: ev.ColumnTypes,
ColumnCollationIDs: ev.ColumnCollationIDs,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}

126
parse_types.go Normal file
View File

@ -0,0 +1,126 @@
package binlog
import (
"errors"
"strings"
"time"
)
var (
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
ErrEventTooSmall = errors.New("event size too small")
)
const (
CompressionNone uint64 = 255
CompressionZSTD uint64 = 0
)
const (
maxPooledRawDataCap = 4 << 20 // 4MB
defaultReadBufSize = 1 << 20 // 1MB
)
type TxDetail struct {
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
RowCount int `json:"rowCount"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
Sql string `json:"sql"`
Db string `json:"db"`
Table string `json:"table"`
SqlType string `json:"sqlType"`
CompressionType string `json:"compressionType"`
Rows [][]interface{} `json:"rows"`
ColumnTypes []int `json:"columnTypes,omitempty"`
ColumnCollationIDs []uint64 `json:"columnCollationIds,omitempty"`
}
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct {
GTID string `json:"gtid"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
Size int `json:"size"`
RowsCount int `json:"rowsCount"`
Status uint8 `json:"status"`
TxStartTime int64 `json:"txStartTime"`
TxEndTime int64 `json:"txEndTime"`
sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
dmlEventCount int
}
func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin
}
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
IncludeTables []string
ExcludeTables []string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
OnlyShowGtid bool
OnlyShowDML bool
PickTxAllIfMatch bool
ExcludeBlank bool
IncludeBlank bool
}
type BinlogEvent struct {
Type string
DB string
TB string
Data string
RowCnt uint32
Rows [][]interface{}
ColumnTypes []int
ColumnCollationIDs []uint64
CompressionType string
}
type tableMatcher struct {
exactMatch map[string]bool
dbWildcard map[string]bool
tbWildcard map[string]bool
matchAll bool
}
func (m *tableMatcher) match(db, tb string) bool {
db = strings.ToLower(strings.TrimSpace(db))
tb = strings.ToLower(strings.TrimSpace(tb))
if m.matchAll {
return true
}
if m.dbWildcard[db] || m.tbWildcard[tb] {
return true
}
if len(m.exactMatch) > 0 {
// Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配
var buf [128]byte
key := buf[:0]
key = append(key, db...)
key = append(key, '.')
key = append(key, tb...)
if m.exactMatch[string(key)] {
return true
}
}
return false
}