Merge branch 'master' of git.b612.me:b612/mysqlbinlog
This commit is contained in:
commit
61d231f6b6
25
parse.go
25
parse.go
@ -334,6 +334,8 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
|
|||||||
type BinlogFilter struct {
|
type BinlogFilter struct {
|
||||||
IncludeGtid string
|
IncludeGtid string
|
||||||
ExcludeGtid string
|
ExcludeGtid string
|
||||||
|
IncludeTables []string
|
||||||
|
ExcludeTables []string
|
||||||
StartPos int
|
StartPos int
|
||||||
EndPos int
|
EndPos int
|
||||||
StartDate time.Time
|
StartDate time.Time
|
||||||
@ -346,6 +348,28 @@ type BinlogFilter struct {
|
|||||||
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
|
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
|
||||||
var subGtid, inGtid, exGtid *gtid.Gtid
|
var subGtid, inGtid, exGtid *gtid.Gtid
|
||||||
var err error
|
var err error
|
||||||
|
var includeMap = make(map[string]bool)
|
||||||
|
var excludeMap = make(map[string]bool)
|
||||||
|
if len(filter.IncludeTables) != 0 {
|
||||||
|
for _, v := range filter.IncludeTables {
|
||||||
|
if len(strings.Split(v, ".")) != 2 {
|
||||||
|
return fmt.Errorf("IncludeTable Name Is Invalid:%s", v)
|
||||||
|
}
|
||||||
|
includeMap[v] = true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
includeMap["*.*"] = true
|
||||||
|
}
|
||||||
|
if len(filter.ExcludeTables) != 0 {
|
||||||
|
for _, v := range filter.ExcludeTables {
|
||||||
|
if len(strings.Split(v, ".")) != 2 {
|
||||||
|
return fmt.Errorf("ExcludeTable Name Is Invalid:%s", v)
|
||||||
|
}
|
||||||
|
excludeMap[v] = true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
excludeMap["*.*"] = true
|
||||||
|
}
|
||||||
if filter.IncludeGtid != "" {
|
if filter.IncludeGtid != "" {
|
||||||
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -359,7 +383,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// process: 0, continue: 1, break: 2, EOF: 3
|
// process: 0, continue: 1, break: 2, EOF: 3
|
||||||
var (
|
var (
|
||||||
n int64
|
n int64
|
||||||
|
Loading…
x
Reference in New Issue
Block a user