master
兔子 1 year ago
parent 8caa467be7
commit 402bae1f0e

@ -8,6 +8,7 @@ import (
"github.com/starainrt/go-mysql/replication" "github.com/starainrt/go-mysql/replication"
"io" "io"
"os" "os"
"strings"
"time" "time"
) )
@ -25,6 +26,13 @@ type TxDetail struct {
Rows [][]interface{} Rows [][]interface{}
} }
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct { type Transaction struct {
GTID string GTID string
Timestamp int64 Timestamp int64
@ -33,6 +41,7 @@ type Transaction struct {
EndPos int EndPos int
Size int Size int
RowsCount int RowsCount int
Status uint8
sqlOrigin []string sqlOrigin []string
Txs []TxDetail Txs []TxDetail
} }
@ -199,6 +208,18 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default: default:
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{ tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos, StartPos: startPos,
@ -527,6 +548,18 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default: default:
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
tx.Txs = append(tx.Txs, TxDetail{ tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos, StartPos: startPos,
EndPos: int(h.LogPos), EndPos: int(h.LogPos),

Loading…
Cancel
Save