mysqlbinlog/parse_stream.go

177 lines
3.8 KiB
Go
Raw Permalink Normal View History

package binlog
import (
"bufio"
"io"
"os"
"time"
"b612.me/staros"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx)
}
func parseOneBinlog(path string, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if err := validateBinlogHeader(f); err != nil {
return err
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogDetail(br, fx)
}
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
if onlyShowGtid {
tx.Size = 0
} else {
tx.Size = tx.EndPos - tx.StartPos
}
}
func fillTimeLazy(tx *Transaction) {
if tx.Timestamp != 0 && tx.Time.IsZero() {
tx.Time = time.Unix(tx.Timestamp, 0)
}
for i := range tx.Txs {
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
}
}
}
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
var (
tbMapPos uint32
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil {
f(tx)
}
}
return nil
}
if err != nil {
return err
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "gtid":
if currentGtid != "" {
finalizeTx(&tx, false)
fillTimeLazy(&tx)
if f != nil && !f(tx) {
return nil
}
}
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
case "":
tx.EndPos = int(h.LogPos)
case "tablemap":
tx.EndPos = int(h.LogPos)
tbMapPos = h.LogPos - h.EventSize
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
if ev.Type == "query" {
if equalFoldShort(ev.Data, "begin") {
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
tx.Status = STATUS_BEGIN
} else if equalFoldShort(ev.Data, "commit") {
tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
} else if equalFoldShort(ev.Data, "rollback") {
tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
ColumnTypes: ev.ColumnTypes,
ColumnCollationIDs: ev.ColumnCollationIDs,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}