package binlog import ( "bufio" "io" "os" "time" "b612.me/staros" "github.com/starainrt/go-mysql/replication" ) func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { return parseOneBinlog(path, fx) } func parseOneBinlog(path string, fx func(Transaction) bool) error { if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if err != nil { return err } defer f.Close() if err := validateBinlogHeader(f); err != nil { return err } br := bufio.NewReaderSize(f, defaultReadBufSize) return parseBinlogDetail(br, fx) } func finalizeTx(tx *Transaction, onlyShowGtid bool) { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } if onlyShowGtid { tx.Size = 0 } else { tx.Size = tx.EndPos - tx.StartPos } } func fillTimeLazy(tx *Transaction) { if tx.Timestamp != 0 && tx.Time.IsZero() { tx.Time = time.Unix(tx.Timestamp, 0) } for i := range tx.Txs { if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() { tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0) } } } func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { parser := replication.NewBinlogParser() parser.SetParseTime(false) parser.SetUseDecimal(false) var ( tbMapPos uint32 tx Transaction headBuf = make([]byte, replication.EventHeaderSize) ) currentGtid := "" for { h, err := readEventHeader(r, parser, headBuf) if err == io.EOF { if currentGtid != "" { finalizeTx(&tx, false) fillTimeLazy(&tx) if f != nil { f(tx) } } return nil } if err != nil { return err } body, err := readEventBody(r, h) if err != nil { return err } e, err := parseEvent(parser, h, headBuf, body) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) for _, ev := range evs { startPos := 0 if ev.Type == "query" || ev.Type == "gtid" { startPos = int(h.LogPos - h.EventSize) } else { startPos = int(tbMapPos) } switch ev.Type { case "gtid": if currentGtid != "" { finalizeTx(&tx, false) fillTimeLazy(&tx) if f != nil && !f(tx) { return nil } } currentGtid = ev.Data tx = Transaction{ GTID: ev.Data, StartPos: startPos, Timestamp: int64(h.Timestamp), Txs: make([]TxDetail, 0, 8), sqlOrigin: make([]string, 0, 4), } case "": tx.EndPos = int(h.LogPos) case "tablemap": tx.EndPos = int(h.LogPos) tbMapPos = h.LogPos - h.EventSize case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: tx.EndPos = int(h.LogPos) if ev.Type == "query" { if equalFoldShort(ev.Data, "begin") { if tx.TxStartTime == 0 { tx.TxStartTime = int64(h.Timestamp) } tx.Status = STATUS_BEGIN } else if equalFoldShort(ev.Data, "commit") { tx.Status = STATUS_COMMIT tx.TxEndTime = int64(h.Timestamp) } else if equalFoldShort(ev.Data, "rollback") { tx.Status = STATUS_ROLLBACK tx.TxEndTime = int64(h.Timestamp) } } if ev.DB != "" && ev.TB != "" { tx.dmlEventCount++ } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: ev.DB, Table: ev.TB, Sql: ev.Data, SqlType: ev.Type, Rows: ev.Rows, ColumnTypes: ev.ColumnTypes, ColumnCollationIDs: ev.ColumnCollationIDs, RowCount: int(ev.RowCnt), Timestamp: int64(h.Timestamp), CompressionType: ev.CompressionType, }) } } } }