add more info

master
兔子 2 years ago
parent cc1434ceef
commit b981b331e8

@ -29,10 +29,14 @@ type Transaction struct {
EndPos int EndPos int
Size int Size int
RowsCount int RowsCount int
SQLOrigin string sqlOrigin []string
Txs []TxDetail Txs []TxDetail
} }
func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin
}
func ParseBinlogFile(path string, fx func(transaction Transaction)) error { func ParseBinlogFile(path string, fx func(transaction Transaction)) error {
return parseOneBinlog(path, fx) return parseOneBinlog(path, fx)
} }
@ -85,6 +89,9 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
for { for {
headBuf := make([]byte, replication.EventHeaderSize) headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(r, headBuf); err == io.EOF { if _, err = io.ReadFull(r, headBuf); err == io.EOF {
if tx.EndPos != 0 {
f(tx)
}
return nil return nil
} else if err != nil { } else if err != nil {
return err return err
@ -145,15 +152,21 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else { } else {
startPos = int(tbMapPos) startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, tbMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} }
switch sqlType { switch sqlType {
case "gtid": case "gtid":
if currentGtid == "" { if currentGtid != "" {
for _, v := range tx.Txs { idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount tx.RowsCount += v.RowCount
tx.Txs[k] = v
} }
tx.Size = tx.EndPos - tx.StartPos tx.Size = tx.EndPos - tx.StartPos
if f != nil { if f != nil {
@ -172,7 +185,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
continue continue
case "rowsquery": case "rowsquery":
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.SQLOrigin = sql tx.sqlOrigin = append(tx.sqlOrigin, sql)
default: default:
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{ tx.Txs = append(tx.Txs, TxDetail{

@ -1,9 +1,12 @@
package binlog package binlog
import ( import (
"fmt"
"testing" "testing"
) )
func TestParse(t *testing.T) { func TestParse(t *testing.T) {
ParseBinlogFile("./test/test-mysql-bin ", func(transaction Transaction) {
fmt.Println(transaction)
})
} }

Loading…
Cancel
Save