Compare commits

..

No commits in common. "9b9b211c0a1c7d44869be4ed76043b022b93bb13" and "f782522f6889f6ddc16350fbef1ecb9769d1bfd2" have entirely different histories.

113
parse.go
View File

@ -13,38 +13,29 @@ import (
) )
type TxDetail struct { type TxDetail struct {
StartPos int `json:"startPos"` StartPos int
EndPos int `json:"endPos"` EndPos int
RowCount int `json:"rowCount"` RowCount int
Timestamp int64 `json:"timestamp"` Timestamp int64
Time time.Time `json:"time"` Time time.Time
Sql string `json:"sql"` Sql string
Db string `json:"db"` Db string
Table string `json:"table"` Table string
SqlType string `json:"sqlType"` SqlType string
CompressionType string `json:"compressionType"` CompressionType string
Rows [][]interface{} Rows [][]interface{}
} }
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct { type Transaction struct {
GTID string `json:"gtid"` GTID string
Timestamp int64 `json:"timestamp"` Timestamp int64
Time time.Time `json:"time"` Time time.Time
StartPos int `json:"startPos"` StartPos int
EndPos int `json:"endPos"` EndPos int
Size int `json:"size"` Size int
RowsCount int `json:"rowsCount"` RowsCount int
Status uint8 `json:"status"` sqlOrigin []string
sqlOrigin []string `json:"sqlOrigin"` Txs []TxDetail
Txs []TxDetail `json:"txs"`
matchFilterSchema bool
} }
func (t Transaction) GetSqlOrigin() []string { func (t Transaction) GetSqlOrigin() []string {
@ -209,18 +200,6 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default: default:
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{ tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos, StartPos: startPos,
@ -368,39 +347,8 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
} }
excludeMap[v] = true excludeMap[v] = true
} }
} } else {
matchTbs := func(db, tb string) bool { excludeMap["*.*"] = true
if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 {
return true
}
if _, ok := includeMap["*.*"]; ok {
return true
}
if _, ok := excludeMap["*.*"]; ok {
return false
}
if _, ok := includeMap[db+"."+tb]; ok {
return true
}
if _, ok := excludeMap[db+"."+tb]; ok {
return false
}
if _, ok := includeMap[db+".*"]; ok {
return true
}
if _, ok := excludeMap[db+".*"]; ok {
return false
}
if _, ok := includeMap["*."+tb]; ok {
return true
}
if _, ok := excludeMap["*."+tb]; ok {
return false
}
if len(includeMap) != 0 {
return false
}
return true
} }
if filter.IncludeGtid != "" { if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid) inGtid, err = gtid.Parse(filter.IncludeGtid)
@ -446,9 +394,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
if filter.SmallThan != 0 && filter.SmallThan < tx.Size { if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true return true
} }
if len(tx.Txs) == 0 && tx.matchFilterSchema {
return true
}
return fn(tx) return fn(tx)
} }
for { for {
@ -606,22 +551,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default: default:
tx.EndPos = int(h.LogPos) tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
if !matchTbs(ev.DB, ev.TB) {
tx.matchFilterSchema = true
continue
}
tx.Txs = append(tx.Txs, TxDetail{ tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos, StartPos: startPos,
EndPos: int(h.LogPos), EndPos: int(h.LogPos),