Compare commits

..

3 Commits

Author SHA1 Message Date
9b9b211c0a add more feature 2023-07-03 14:03:45 +08:00
61d231f6b6 Merge branch 'master' of git.b612.me:b612/mysqlbinlog 2023-07-03 13:47:47 +08:00
402bae1f0e update 2023-07-03 13:47:39 +08:00

113
parse.go
View File

@ -13,29 +13,38 @@ import (
) )
type TxDetail struct { type TxDetail struct {
StartPos int StartPos int `json:"startPos"`
EndPos int EndPos int `json:"endPos"`
RowCount int RowCount int `json:"rowCount"`
Timestamp int64 Timestamp int64 `json:"timestamp"`
Time time.Time Time time.Time `json:"time"`
Sql string Sql string `json:"sql"`
Db string Db string `json:"db"`
Table string Table string `json:"table"`
SqlType string SqlType string `json:"sqlType"`
CompressionType string CompressionType string `json:"compressionType"`
Rows [][]interface{} Rows [][]interface{}
} }
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct { type Transaction struct {
GTID string GTID string `json:"gtid"`
Timestamp int64 Timestamp int64 `json:"timestamp"`
Time time.Time Time time.Time `json:"time"`
StartPos int StartPos int `json:"startPos"`
EndPos int EndPos int `json:"endPos"`
Size int Size int `json:"size"`
RowsCount int RowsCount int `json:"rowsCount"`
sqlOrigin []string Status uint8 `json:"status"`
Txs []TxDetail sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
matchFilterSchema bool
} }
func (t Transaction) GetSqlOrigin() []string { func (t Transaction) GetSqlOrigin() []string {
@ -200,6 +209,18 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default: default:
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{ tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos, StartPos: startPos,
@ -347,8 +368,39 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
} }
excludeMap[v] = true excludeMap[v] = true
} }
} else { }
excludeMap["*.*"] = true matchTbs := func(db, tb string) bool {
if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 {
return true
}
if _, ok := includeMap["*.*"]; ok {
return true
}
if _, ok := excludeMap["*.*"]; ok {
return false
}
if _, ok := includeMap[db+"."+tb]; ok {
return true
}
if _, ok := excludeMap[db+"."+tb]; ok {
return false
}
if _, ok := includeMap[db+".*"]; ok {
return true
}
if _, ok := excludeMap[db+".*"]; ok {
return false
}
if _, ok := includeMap["*."+tb]; ok {
return true
}
if _, ok := excludeMap["*."+tb]; ok {
return false
}
if len(includeMap) != 0 {
return false
}
return true
} }
if filter.IncludeGtid != "" { if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid) inGtid, err = gtid.Parse(filter.IncludeGtid)
@ -394,6 +446,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
if filter.SmallThan != 0 && filter.SmallThan < tx.Size { if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true return true
} }
if len(tx.Txs) == 0 && tx.matchFilterSchema {
return true
}
return fn(tx) return fn(tx)
} }
for { for {
@ -551,6 +606,22 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default: default:
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
if !matchTbs(ev.DB, ev.TB) {
tx.matchFilterSchema = true
continue
}
tx.Txs = append(tx.Txs, TxDetail{ tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos, StartPos: startPos,
EndPos: int(h.LogPos), EndPos: int(h.LogPos),