|
|
|
@ -321,13 +321,14 @@ type BinlogFilter struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error {
|
|
|
|
|
var inGtid, exGtid *gtid.Gtid
|
|
|
|
|
var subGtid, inGtid, exGtid *gtid.Gtid
|
|
|
|
|
var err error
|
|
|
|
|
if filter.IncludeGtid != "" {
|
|
|
|
|
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
subGtid = inGtid.Clone()
|
|
|
|
|
}
|
|
|
|
|
if filter.ExcludeGtid != "" {
|
|
|
|
|
exGtid, err = gtid.Parse(filter.ExcludeGtid)
|
|
|
|
@ -485,6 +486,12 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|
|
|
|
}
|
|
|
|
|
tx.Size = tx.EndPos - tx.StartPos
|
|
|
|
|
callFn(tx)
|
|
|
|
|
if subGtid != nil {
|
|
|
|
|
subGtid.Sub(tx.GTID)
|
|
|
|
|
if subGtid.EventCount() == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
tx = Transaction{}
|
|
|
|
|
}
|
|
|
|
|
currentGtid = ev.Data
|
|
|
|
|