From 0f74dc3dc897f787ee8835c15f4c606f7e10f1c9 Mon Sep 17 00:00:00 2001 From: starainrt Date: Sat, 29 Apr 2023 17:16:24 +0800 Subject: [PATCH] add rows event --- parse.go | 20 ++++++++++++++------ parse_test.go | 2 +- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/parse.go b/parse.go index 81c582f..6e0ea59 100644 --- a/parse.go +++ b/parse.go @@ -21,6 +21,7 @@ type TxDetail struct { Db string Table string SqlType string + Rows [][]interface{} } type Transaction struct { @@ -85,6 +86,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { sqlType string = "" rowCnt uint32 = 0 tbMapPos uint32 = 0 + rows [][]interface{} ) var tx Transaction currentGtid := "" @@ -155,7 +157,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data - db, tb, sqlType, sql, rowCnt = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) + db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) startPos := 0 if sqlType == "query" || sqlType == "gtid" { startPos = int(h.LogPos - h.EventSize) @@ -207,6 +209,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { Table: tb, Sql: sql, SqlType: sqlType, + Rows: rows, RowCount: int(rowCnt), Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), @@ -216,17 +219,21 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { } -func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32) { +func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32, [][]interface{}) { var ( db string = "" tb string = "" sql string = "" sqlType string = "" rowCnt uint32 = 0 + rows [][]interface{} ) switch ev.Header.EventType { - + case replication.ANONYMOUS_GTID_EVENT: + //ge := ev.Event.(*replication.GTIDEvent) + sql = "anonymous-gtid-event:1" + sqlType = "gtid" case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: @@ -235,7 +242,7 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, tb = string(wrEvent.Table.Table) sqlType = "insert" rowCnt = uint32(len(wrEvent.Rows)) - + rows = wrEvent.Rows case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) @@ -243,7 +250,7 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, tb = string(wrEvent.Table.Table) sqlType = "update" rowCnt = uint32(len(wrEvent.Rows)) / 2 - + rows = wrEvent.Rows case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: @@ -255,6 +262,7 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, tb = string(wrEvent.Table.Table) sqlType = "delete" rowCnt = uint32(len(wrEvent.Rows)) + rows = wrEvent.Rows case replication.ROWS_QUERY_EVENT: queryEvent := ev.Event.(*replication.RowsQueryEvent) sql = string(queryEvent.Query) @@ -283,6 +291,6 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, } sqlType = "gtid" } - return db, tb, sqlType, sql, rowCnt + return db, tb, sqlType, sql, rowCnt, rows } diff --git a/parse_test.go b/parse_test.go index d481528..9c80134 100644 --- a/parse_test.go +++ b/parse_test.go @@ -6,7 +6,7 @@ import ( ) func TestParse(t *testing.T) { - ParseBinlogFile("./test/test-mysql-bin ", func(transaction Transaction) { + ParseBinlogFile("./test/test-mysql-bin", func(transaction Transaction) { fmt.Println(transaction) }) }