From 402bae1f0e1e2ca31649799a55a86a99fe5d1fbc Mon Sep 17 00:00:00 2001 From: starainrt Date: Mon, 3 Jul 2023 13:47:39 +0800 Subject: [PATCH] update --- parse.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/parse.go b/parse.go index d4e0e68..bca0cc3 100644 --- a/parse.go +++ b/parse.go @@ -8,6 +8,7 @@ import ( "github.com/starainrt/go-mysql/replication" "io" "os" + "strings" "time" ) @@ -25,6 +26,13 @@ type TxDetail struct { Rows [][]interface{} } +const ( + STATUS_PREPARE uint8 = iota + STATUS_BEGIN + STATUS_COMMIT + STATUS_ROLLBACK +) + type Transaction struct { GTID string Timestamp int64 @@ -33,6 +41,7 @@ type Transaction struct { EndPos int Size int RowsCount int + Status uint8 sqlOrigin []string Txs []TxDetail } @@ -199,6 +208,18 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: + status := STATUS_PREPARE + if ev.Type == "query" { + switch strings.ToLower(ev.Data) { + case "begin": + status = STATUS_BEGIN + case "commit": + status = STATUS_COMMIT + case "rollback": + status = STATUS_ROLLBACK + } + tx.Status = status + } tx.EndPos = int(h.LogPos) tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, @@ -527,6 +548,18 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: tx.EndPos = int(h.LogPos) + status := STATUS_PREPARE + if ev.Type == "query" { + switch strings.ToLower(ev.Data) { + case "begin": + status = STATUS_BEGIN + case "commit": + status = STATUS_COMMIT + case "rollback": + status = STATUS_ROLLBACK + } + tx.Status = status + } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos),