master
兔子 2 years ago
parent 8caa467be7
commit f782522f68

@ -8,6 +8,7 @@ import (
"github.com/starainrt/go-mysql/replication" "github.com/starainrt/go-mysql/replication"
"io" "io"
"os" "os"
"strings"
"time" "time"
) )
@ -313,6 +314,8 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
type BinlogFilter struct { type BinlogFilter struct {
IncludeGtid string IncludeGtid string
ExcludeGtid string ExcludeGtid string
IncludeTables []string
ExcludeTables []string
StartPos int StartPos int
EndPos int EndPos int
StartDate time.Time StartDate time.Time
@ -325,6 +328,28 @@ type BinlogFilter struct {
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var subGtid, inGtid, exGtid *gtid.Gtid var subGtid, inGtid, exGtid *gtid.Gtid
var err error var err error
var includeMap = make(map[string]bool)
var excludeMap = make(map[string]bool)
if len(filter.IncludeTables) != 0 {
for _, v := range filter.IncludeTables {
if len(strings.Split(v, ".")) != 2 {
return fmt.Errorf("IncludeTable Name Is Invalid:%s", v)
}
includeMap[v] = true
}
} else {
includeMap["*.*"] = true
}
if len(filter.ExcludeTables) != 0 {
for _, v := range filter.ExcludeTables {
if len(strings.Split(v, ".")) != 2 {
return fmt.Errorf("ExcludeTable Name Is Invalid:%s", v)
}
excludeMap[v] = true
}
} else {
excludeMap["*.*"] = true
}
if filter.IncludeGtid != "" { if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid) inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil { if err != nil {
@ -338,7 +363,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
return err return err
} }
} }
// process: 0, continue: 1, break: 2, EOF: 3 // process: 0, continue: 1, break: 2, EOF: 3
var ( var (
n int64 n int64

Loading…
Cancel
Save