diff --git a/parse.go b/parse.go index fe7a19a..5627d76 100644 --- a/parse.go +++ b/parse.go @@ -29,10 +29,14 @@ type Transaction struct { EndPos int Size int RowsCount int - SQLOrigin string + sqlOrigin []string Txs []TxDetail } +func (t Transaction) GetSqlOrigin() []string { + return t.sqlOrigin +} + func ParseBinlogFile(path string, fx func(transaction Transaction)) error { return parseOneBinlog(path, fx) } @@ -85,6 +89,9 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { + if tx.EndPos != 0 { + f(tx) + } return nil } else if err != nil { return err @@ -145,15 +152,21 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } else { startPos = int(tbMapPos) - //fmt.Println(h.Timestamp, tbMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } switch sqlType { case "gtid": - if currentGtid == "" { - for _, v := range tx.Txs { + if currentGtid != "" { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } tx.RowsCount += v.RowCount + tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos if f != nil { @@ -172,7 +185,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { continue case "rowsquery": tx.EndPos = int(h.LogPos) - tx.SQLOrigin = sql + tx.sqlOrigin = append(tx.sqlOrigin, sql) default: tx.EndPos = int(h.LogPos) tx.Txs = append(tx.Txs, TxDetail{ diff --git a/parse_test.go b/parse_test.go index 745f89f..d481528 100644 --- a/parse_test.go +++ b/parse_test.go @@ -1,9 +1,12 @@ package binlog import ( + "fmt" "testing" ) func TestParse(t *testing.T) { - + ParseBinlogFile("./test/test-mysql-bin ", func(transaction Transaction) { + fmt.Println(transaction) + }) }