Compare commits
No commits in common. "9b9b211c0a1c7d44869be4ed76043b022b93bb13" and "f782522f6889f6ddc16350fbef1ecb9769d1bfd2" have entirely different histories.
9b9b211c0a
...
f782522f68
113
parse.go
113
parse.go
@ -13,38 +13,29 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TxDetail struct {
|
type TxDetail struct {
|
||||||
StartPos int `json:"startPos"`
|
StartPos int
|
||||||
EndPos int `json:"endPos"`
|
EndPos int
|
||||||
RowCount int `json:"rowCount"`
|
RowCount int
|
||||||
Timestamp int64 `json:"timestamp"`
|
Timestamp int64
|
||||||
Time time.Time `json:"time"`
|
Time time.Time
|
||||||
Sql string `json:"sql"`
|
Sql string
|
||||||
Db string `json:"db"`
|
Db string
|
||||||
Table string `json:"table"`
|
Table string
|
||||||
SqlType string `json:"sqlType"`
|
SqlType string
|
||||||
CompressionType string `json:"compressionType"`
|
CompressionType string
|
||||||
Rows [][]interface{}
|
Rows [][]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
STATUS_PREPARE uint8 = iota
|
|
||||||
STATUS_BEGIN
|
|
||||||
STATUS_COMMIT
|
|
||||||
STATUS_ROLLBACK
|
|
||||||
)
|
|
||||||
|
|
||||||
type Transaction struct {
|
type Transaction struct {
|
||||||
GTID string `json:"gtid"`
|
GTID string
|
||||||
Timestamp int64 `json:"timestamp"`
|
Timestamp int64
|
||||||
Time time.Time `json:"time"`
|
Time time.Time
|
||||||
StartPos int `json:"startPos"`
|
StartPos int
|
||||||
EndPos int `json:"endPos"`
|
EndPos int
|
||||||
Size int `json:"size"`
|
Size int
|
||||||
RowsCount int `json:"rowsCount"`
|
RowsCount int
|
||||||
Status uint8 `json:"status"`
|
sqlOrigin []string
|
||||||
sqlOrigin []string `json:"sqlOrigin"`
|
Txs []TxDetail
|
||||||
Txs []TxDetail `json:"txs"`
|
|
||||||
matchFilterSchema bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t Transaction) GetSqlOrigin() []string {
|
func (t Transaction) GetSqlOrigin() []string {
|
||||||
@ -209,18 +200,6 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
|
|||||||
tx.EndPos = int(h.LogPos)
|
tx.EndPos = int(h.LogPos)
|
||||||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||||||
default:
|
default:
|
||||||
status := STATUS_PREPARE
|
|
||||||
if ev.Type == "query" {
|
|
||||||
switch strings.ToLower(ev.Data) {
|
|
||||||
case "begin":
|
|
||||||
status = STATUS_BEGIN
|
|
||||||
case "commit":
|
|
||||||
status = STATUS_COMMIT
|
|
||||||
case "rollback":
|
|
||||||
status = STATUS_ROLLBACK
|
|
||||||
}
|
|
||||||
tx.Status = status
|
|
||||||
}
|
|
||||||
tx.EndPos = int(h.LogPos)
|
tx.EndPos = int(h.LogPos)
|
||||||
tx.Txs = append(tx.Txs, TxDetail{
|
tx.Txs = append(tx.Txs, TxDetail{
|
||||||
StartPos: startPos,
|
StartPos: startPos,
|
||||||
@ -368,39 +347,8 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|||||||
}
|
}
|
||||||
excludeMap[v] = true
|
excludeMap[v] = true
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
matchTbs := func(db, tb string) bool {
|
excludeMap["*.*"] = true
|
||||||
if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if _, ok := includeMap["*.*"]; ok {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if _, ok := excludeMap["*.*"]; ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if _, ok := includeMap[db+"."+tb]; ok {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if _, ok := excludeMap[db+"."+tb]; ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if _, ok := includeMap[db+".*"]; ok {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if _, ok := excludeMap[db+".*"]; ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if _, ok := includeMap["*."+tb]; ok {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if _, ok := excludeMap["*."+tb]; ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if len(includeMap) != 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
if filter.IncludeGtid != "" {
|
if filter.IncludeGtid != "" {
|
||||||
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
||||||
@ -446,9 +394,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|||||||
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if len(tx.Txs) == 0 && tx.matchFilterSchema {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return fn(tx)
|
return fn(tx)
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
@ -606,22 +551,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|||||||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||||||
default:
|
default:
|
||||||
tx.EndPos = int(h.LogPos)
|
tx.EndPos = int(h.LogPos)
|
||||||
status := STATUS_PREPARE
|
|
||||||
if ev.Type == "query" {
|
|
||||||
switch strings.ToLower(ev.Data) {
|
|
||||||
case "begin":
|
|
||||||
status = STATUS_BEGIN
|
|
||||||
case "commit":
|
|
||||||
status = STATUS_COMMIT
|
|
||||||
case "rollback":
|
|
||||||
status = STATUS_ROLLBACK
|
|
||||||
}
|
|
||||||
tx.Status = status
|
|
||||||
}
|
|
||||||
if !matchTbs(ev.DB, ev.TB) {
|
|
||||||
tx.matchFilterSchema = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
tx.Txs = append(tx.Txs, TxDetail{
|
tx.Txs = append(tx.Txs, TxDetail{
|
||||||
StartPos: startPos,
|
StartPos: startPos,
|
||||||
EndPos: int(h.LogPos),
|
EndPos: int(h.LogPos),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user