- 将臃肿的 parse.go 按职责拆分为多个模块:
parse_types.go、parse_io.go、parse_event_convert.go、parse_stream.go、parse_filter.go
- parse.go 保留为模块入口说明,提升可维护性与可读性
- 修复事务状态被覆盖问题(BEGIN/COMMIT/ROLLBACK 不再被重置为 PREPARE)
- 增加 include-tables 与 exclude-tables 互斥校验,同时配置时直接报配置错误
- 强化表匹配器模式校验,并补充非法模式测试
- 在明细过滤后重算事务统计(RowsCount/StartPos/EndPos/Size),避免统计失真
- 增加 TABLE_MAP 事件转换,补充列元信息透传(ColumnTypes/ColumnCollationIDs)
- 基于 unsigned 元数据规范化行值,避免无符号整型被渲染为负数
- 优化事件解析报错信息:增加有界 body 十六进制预览
- 补充单元测试:payload/tablemap 转换、unsigned 规范化、过滤逻辑、IO 预览
177 lines
3.8 KiB
Go
177 lines
3.8 KiB
Go
package binlog
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"os"
|
|
"time"
|
|
|
|
"b612.me/staros"
|
|
"github.com/starainrt/go-mysql/replication"
|
|
)
|
|
|
|
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
|
|
return parseOneBinlog(path, fx)
|
|
}
|
|
|
|
func parseOneBinlog(path string, fx func(Transaction) bool) error {
|
|
if !staros.Exists(path) {
|
|
return os.ErrNotExist
|
|
}
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
if err := validateBinlogHeader(f); err != nil {
|
|
return err
|
|
}
|
|
|
|
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
|
return parseBinlogDetail(br, fx)
|
|
}
|
|
|
|
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
|
|
idx := 0
|
|
for k, v := range tx.Txs {
|
|
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
|
|
v.Sql = tx.sqlOrigin[idx]
|
|
idx++
|
|
}
|
|
tx.RowsCount += v.RowCount
|
|
tx.Txs[k] = v
|
|
}
|
|
|
|
if onlyShowGtid {
|
|
tx.Size = 0
|
|
} else {
|
|
tx.Size = tx.EndPos - tx.StartPos
|
|
}
|
|
}
|
|
|
|
func fillTimeLazy(tx *Transaction) {
|
|
if tx.Timestamp != 0 && tx.Time.IsZero() {
|
|
tx.Time = time.Unix(tx.Timestamp, 0)
|
|
}
|
|
for i := range tx.Txs {
|
|
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
|
|
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
|
|
parser := replication.NewBinlogParser()
|
|
parser.SetParseTime(false)
|
|
parser.SetUseDecimal(false)
|
|
|
|
var (
|
|
tbMapPos uint32
|
|
tx Transaction
|
|
headBuf = make([]byte, replication.EventHeaderSize)
|
|
)
|
|
currentGtid := ""
|
|
|
|
for {
|
|
h, err := readEventHeader(r, parser, headBuf)
|
|
if err == io.EOF {
|
|
if currentGtid != "" {
|
|
finalizeTx(&tx, false)
|
|
fillTimeLazy(&tx)
|
|
if f != nil {
|
|
f(tx)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
body, err := readEventBody(r, h)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
e, err := parseEvent(parser, h, headBuf, body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if h.EventType == replication.TABLE_MAP_EVENT {
|
|
tbMapPos = h.LogPos - h.EventSize
|
|
}
|
|
|
|
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
|
for _, ev := range evs {
|
|
startPos := 0
|
|
if ev.Type == "query" || ev.Type == "gtid" {
|
|
startPos = int(h.LogPos - h.EventSize)
|
|
} else {
|
|
startPos = int(tbMapPos)
|
|
}
|
|
|
|
switch ev.Type {
|
|
case "gtid":
|
|
if currentGtid != "" {
|
|
finalizeTx(&tx, false)
|
|
fillTimeLazy(&tx)
|
|
if f != nil && !f(tx) {
|
|
return nil
|
|
}
|
|
}
|
|
currentGtid = ev.Data
|
|
tx = Transaction{
|
|
GTID: ev.Data,
|
|
StartPos: startPos,
|
|
Timestamp: int64(h.Timestamp),
|
|
Txs: make([]TxDetail, 0, 8),
|
|
sqlOrigin: make([]string, 0, 4),
|
|
}
|
|
case "":
|
|
tx.EndPos = int(h.LogPos)
|
|
case "tablemap":
|
|
tx.EndPos = int(h.LogPos)
|
|
tbMapPos = h.LogPos - h.EventSize
|
|
case "rowsquery":
|
|
tx.EndPos = int(h.LogPos)
|
|
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
|
default:
|
|
tx.EndPos = int(h.LogPos)
|
|
if ev.Type == "query" {
|
|
if equalFoldShort(ev.Data, "begin") {
|
|
if tx.TxStartTime == 0 {
|
|
tx.TxStartTime = int64(h.Timestamp)
|
|
}
|
|
tx.Status = STATUS_BEGIN
|
|
} else if equalFoldShort(ev.Data, "commit") {
|
|
tx.Status = STATUS_COMMIT
|
|
tx.TxEndTime = int64(h.Timestamp)
|
|
} else if equalFoldShort(ev.Data, "rollback") {
|
|
tx.Status = STATUS_ROLLBACK
|
|
tx.TxEndTime = int64(h.Timestamp)
|
|
}
|
|
}
|
|
if ev.DB != "" && ev.TB != "" {
|
|
tx.dmlEventCount++
|
|
}
|
|
tx.Txs = append(tx.Txs, TxDetail{
|
|
StartPos: startPos,
|
|
EndPos: int(h.LogPos),
|
|
Db: ev.DB,
|
|
Table: ev.TB,
|
|
Sql: ev.Data,
|
|
SqlType: ev.Type,
|
|
Rows: ev.Rows,
|
|
ColumnTypes: ev.ColumnTypes,
|
|
ColumnCollationIDs: ev.ColumnCollationIDs,
|
|
RowCount: int(ev.RowCnt),
|
|
Timestamp: int64(h.Timestamp),
|
|
CompressionType: ev.CompressionType,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|