add rows event

master
兔子 1 year ago
parent eec96dd25e
commit 0f74dc3dc8

@ -21,6 +21,7 @@ type TxDetail struct {
Db string Db string
Table string Table string
SqlType string SqlType string
Rows [][]interface{}
} }
type Transaction struct { type Transaction struct {
@ -85,6 +86,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
sqlType string = "" sqlType string = ""
rowCnt uint32 = 0 rowCnt uint32 = 0
tbMapPos uint32 = 0 tbMapPos uint32 = 0
rows [][]interface{}
) )
var tx Transaction var tx Transaction
currentGtid := "" currentGtid := ""
@ -155,7 +157,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
db, tb, sqlType, sql, rowCnt = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent)
startPos := 0 startPos := 0
if sqlType == "query" || sqlType == "gtid" { if sqlType == "query" || sqlType == "gtid" {
startPos = int(h.LogPos - h.EventSize) startPos = int(h.LogPos - h.EventSize)
@ -207,6 +209,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
Table: tb, Table: tb,
Sql: sql, Sql: sql,
SqlType: sqlType, SqlType: sqlType,
Rows: rows,
RowCount: int(rowCnt), RowCount: int(rowCnt),
Timestamp: int64(h.Timestamp), Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0), Time: time.Unix(int64(h.Timestamp), 0),
@ -216,17 +219,21 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
} }
func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32) { func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32, [][]interface{}) {
var ( var (
db string = "" db string = ""
tb string = "" tb string = ""
sql string = "" sql string = ""
sqlType string = "" sqlType string = ""
rowCnt uint32 = 0 rowCnt uint32 = 0
rows [][]interface{}
) )
switch ev.Header.EventType { switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
//ge := ev.Event.(*replication.GTIDEvent)
sql = "anonymous-gtid-event:1"
sqlType = "gtid"
case replication.WRITE_ROWS_EVENTv1, case replication.WRITE_ROWS_EVENTv1,
replication.WRITE_ROWS_EVENTv2: replication.WRITE_ROWS_EVENTv2:
@ -235,7 +242,7 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string,
tb = string(wrEvent.Table.Table) tb = string(wrEvent.Table.Table)
sqlType = "insert" sqlType = "insert"
rowCnt = uint32(len(wrEvent.Rows)) rowCnt = uint32(len(wrEvent.Rows))
rows = wrEvent.Rows
case replication.UPDATE_ROWS_EVENTv1, case replication.UPDATE_ROWS_EVENTv1,
replication.UPDATE_ROWS_EVENTv2: replication.UPDATE_ROWS_EVENTv2:
wrEvent := ev.Event.(*replication.RowsEvent) wrEvent := ev.Event.(*replication.RowsEvent)
@ -243,7 +250,7 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string,
tb = string(wrEvent.Table.Table) tb = string(wrEvent.Table.Table)
sqlType = "update" sqlType = "update"
rowCnt = uint32(len(wrEvent.Rows)) / 2 rowCnt = uint32(len(wrEvent.Rows)) / 2
rows = wrEvent.Rows
case replication.DELETE_ROWS_EVENTv1, case replication.DELETE_ROWS_EVENTv1,
replication.DELETE_ROWS_EVENTv2: replication.DELETE_ROWS_EVENTv2:
@ -255,6 +262,7 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string,
tb = string(wrEvent.Table.Table) tb = string(wrEvent.Table.Table)
sqlType = "delete" sqlType = "delete"
rowCnt = uint32(len(wrEvent.Rows)) rowCnt = uint32(len(wrEvent.Rows))
rows = wrEvent.Rows
case replication.ROWS_QUERY_EVENT: case replication.ROWS_QUERY_EVENT:
queryEvent := ev.Event.(*replication.RowsQueryEvent) queryEvent := ev.Event.(*replication.RowsQueryEvent)
sql = string(queryEvent.Query) sql = string(queryEvent.Query)
@ -283,6 +291,6 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string,
} }
sqlType = "gtid" sqlType = "gtid"
} }
return db, tb, sqlType, sql, rowCnt return db, tb, sqlType, sql, rowCnt, rows
} }

@ -6,7 +6,7 @@ import (
) )
func TestParse(t *testing.T) { func TestParse(t *testing.T) {
ParseBinlogFile("./test/test-mysql-bin ", func(transaction Transaction) { ParseBinlogFile("./test/test-mysql-bin", func(transaction Transaction) {
fmt.Println(transaction) fmt.Println(transaction)
}) })
} }

Loading…
Cancel
Save