From 9b9b211c0a1c7d44869be4ed76043b022b93bb13 Mon Sep 17 00:00:00 2001 From: starainrt Date: Mon, 3 Jul 2023 14:03:45 +0800 Subject: [PATCH] add more feature --- parse.go | 83 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 22 deletions(-) diff --git a/parse.go b/parse.go index 0a31b0e..a256ffe 100644 --- a/parse.go +++ b/parse.go @@ -13,16 +13,16 @@ import ( ) type TxDetail struct { - StartPos int - EndPos int - RowCount int - Timestamp int64 - Time time.Time - Sql string - Db string - Table string - SqlType string - CompressionType string + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + RowCount int `json:"rowCount"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + Sql string `json:"sql"` + Db string `json:"db"` + Table string `json:"table"` + SqlType string `json:"sqlType"` + CompressionType string `json:"compressionType"` Rows [][]interface{} } @@ -34,16 +34,17 @@ const ( ) type Transaction struct { - GTID string - Timestamp int64 - Time time.Time - StartPos int - EndPos int - Size int - RowsCount int - Status uint8 - sqlOrigin []string - Txs []TxDetail + GTID string `json:"gtid"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + Size int `json:"size"` + RowsCount int `json:"rowsCount"` + Status uint8 `json:"status"` + sqlOrigin []string `json:"sqlOrigin"` + Txs []TxDetail `json:"txs"` + matchFilterSchema bool } func (t Transaction) GetSqlOrigin() []string { @@ -367,8 +368,39 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter } excludeMap[v] = true } - } else { - excludeMap["*.*"] = true + } + matchTbs := func(db, tb string) bool { + if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 { + return true + } + if _, ok := includeMap["*.*"]; ok { + return true + } + if _, ok := excludeMap["*.*"]; ok { + return false + } + if _, ok := includeMap[db+"."+tb]; ok { + return true + } + if _, ok := excludeMap[db+"."+tb]; ok { + return false + } + if _, ok := includeMap[db+".*"]; ok { + return true + } + if _, ok := excludeMap[db+".*"]; ok { + return false + } + if _, ok := includeMap["*."+tb]; ok { + return true + } + if _, ok := excludeMap["*."+tb]; ok { + return false + } + if len(includeMap) != 0 { + return false + } + return true } if filter.IncludeGtid != "" { inGtid, err = gtid.Parse(filter.IncludeGtid) @@ -414,6 +446,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter if filter.SmallThan != 0 && filter.SmallThan < tx.Size { return true } + if len(tx.Txs) == 0 && tx.matchFilterSchema { + return true + } return fn(tx) } for { @@ -583,6 +618,10 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter } tx.Status = status } + if !matchTbs(ev.DB, ev.TB) { + tx.matchFilterSchema = true + continue + } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos),