mysqlbinlog/parse_filter.go

573 lines
13 KiB
Go
Raw Permalink Normal View History

package binlog
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"b612.me/mysql/gtid"
"b612.me/staros"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
}
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
parser := replication.NewBinlogParser()
parser.SetParseTime(false)
parser.SetUseDecimal(false)
if pos != 0 {
if err := seekToPosition(f, parser, pos); err != nil {
return err
}
} else {
if err := validateBinlogHeader(f); err != nil {
return err
}
}
br := bufio.NewReaderSize(f, defaultReadBufSize)
return parseBinlogWithFilter(br, parser, filter, fx)
}
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
}
var subGtid, inGtid, exGtid *gtid.Gtid
var err error
includeMatcher, excludeMatcher, err := prepareTableMatchers(filter)
if err != nil {
return err
}
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
return fmt.Errorf("parse include gtid failed: %w", err)
}
subGtid = inGtid.Clone()
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
return fmt.Errorf("parse exclude gtid failed: %w", err)
}
}
var (
tbMapPos uint32
skipCurrentTxn bool
tx Transaction
headBuf = make([]byte, replication.EventHeaderSize)
)
currentGtid := ""
callFn := func(tx Transaction) bool {
if fn == nil {
return true
}
fillTimeLazy(&tx)
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return true
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return true
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return true
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return true
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
return true
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true
}
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
return true
}
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, excludeMatcher, filter)
if pickAll {
return fn(tx)
}
if skipAll {
return true
}
if matched {
tx.Txs = txs
recomputeTxStatsFromVisibleDetails(&tx)
}
if !matched && includeMatcher != nil {
return true
}
if len(tx.Txs) == 0 && matched {
return true
}
return fn(tx)
}
for {
h, err := readEventHeader(r, parser, headBuf)
if err == io.EOF {
if currentGtid != "" {
finalizeTx(&tx, filter.OnlyShowGtid)
callFn(tx)
}
return nil
}
if err != nil {
return err
}
// GTID-only fast path
if filter.OnlyShowGtid {
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT ||
h.EventType == replication.TABLE_MAP_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if filter.EndPos != 0 && startPos > filter.EndPos {
continue
}
if filter.StartPos != 0 && startPos < filter.StartPos {
continue
}
if currentGtid != "" {
tx.EndPos = startPos - 1
finalizeTx(&tx, true)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
tx = Transaction{}
currentGtid = ""
continue
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
tx = Transaction{}
continue
}
}
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
EndPos: startPos,
Timestamp: int64(h.Timestamp),
}
}
continue
}
// 先处理 GTID 事件(决定当前事务是否命中)
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
if ev.Type != "gtid" {
continue
}
startPos := int(h.LogPos - h.EventSize)
if currentGtid != "" {
finalizeTx(&tx, false)
if !callFn(tx) {
return nil
}
if subGtid != nil {
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
skipCurrentTxn = false
if filter.EndPos != 0 && startPos > filter.EndPos {
skipCurrentTxn = true
}
if filter.StartPos != 0 && startPos < filter.StartPos {
skipCurrentTxn = true
}
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
skipCurrentTxn = true
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
skipCurrentTxn = true
}
}
if !skipCurrentTxn {
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Txs: make([]TxDetail, 0, 8),
sqlOrigin: make([]string, 0, 4),
}
} else {
tx = Transaction{}
}
}
continue
}
// 未命中事务时TABLE_MAP_EVENT 仍需解析parser 缓存表元数据),
// 其余事件可安全跳过
if skipCurrentTxn {
if h.EventType == replication.TABLE_MAP_EVENT ||
h.EventType == replication.FORMAT_DESCRIPTION_EVENT {
body, err := readEventBody(r, h)
if err != nil {
return err
}
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
return err
}
} else {
if err := skipEventBody(r, h); err != nil {
return err
}
}
continue
}
body, err := readEventBody(r, h)
if err != nil {
return err
}
e, err := parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize
}
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
} else {
startPos = int(tbMapPos)
}
switch ev.Type {
case "":
tx.EndPos = int(h.LogPos)
case "tablemap":
tx.EndPos = int(h.LogPos)
tbMapPos = h.LogPos - h.EventSize
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
if ev.Type == "query" {
if equalFoldShort(ev.Data, "begin") {
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
tx.Status = STATUS_BEGIN
} else if equalFoldShort(ev.Data, "commit") {
tx.Status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
} else if equalFoldShort(ev.Data, "rollback") {
tx.Status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
}
if ev.DB != "" && ev.TB != "" {
tx.dmlEventCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
ColumnTypes: ev.ColumnTypes,
ColumnCollationIDs: ev.ColumnCollationIDs,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
CompressionType: ev.CompressionType,
})
}
}
}
}
func selectVisibleTxDetails(tx Transaction, includeMatcher, excludeMatcher *tableMatcher, filter BinlogFilter) ([]TxDetail, bool, bool, bool) {
txs := make([]TxDetail, 0, len(tx.Txs))
matched := false
for _, t := range tx.Txs {
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
if t.Db == "" && t.Table == "" {
if includeMatcher != nil {
if filter.IncludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, true, false
}
txs = append(txs, t)
}
continue
}
if excludeMatcher != nil {
if filter.ExcludeBlank {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, false, true
}
continue
}
txs = append(txs, t)
continue
}
txs = append(txs, t)
continue
}
if includeMatcher != nil {
if includeMatch {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, true, false
}
txs = append(txs, t)
}
continue
}
if excludeMatcher != nil {
if excludeMatch {
matched = true
if filter.PickTxAllIfMatch {
return nil, true, false, true
}
continue
}
txs = append(txs, t)
continue
}
txs = append(txs, t)
}
return txs, matched, false, false
}
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher, err error) {
if len(filter.IncludeTables) > 0 {
includeMatcher, err = buildTableMatcher(filter.IncludeTables)
if err != nil {
return nil, nil, fmt.Errorf("invalid include-tables: %w", err)
}
}
if len(filter.ExcludeTables) > 0 {
excludeMatcher, err = buildTableMatcher(filter.ExcludeTables)
if err != nil {
return nil, nil, fmt.Errorf("invalid exclude-tables: %w", err)
}
}
return includeMatcher, excludeMatcher, nil
}
func buildTableMatcher(patterns []string) (*tableMatcher, error) {
m := &tableMatcher{
exactMatch: make(map[string]bool),
dbWildcard: make(map[string]bool),
tbWildcard: make(map[string]bool),
}
for _, pattern := range patterns {
origin := pattern
pattern = strings.ToLower(strings.TrimSpace(pattern))
if pattern == "" {
continue
}
if pattern == "*.*" {
m.matchAll = true
continue
}
parts := strings.Split(pattern, ".")
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return nil, fmt.Errorf("invalid table pattern %q: expect db.table", strings.TrimSpace(origin))
}
db, tb := parts[0], parts[1]
if db != "*" && strings.Contains(db, "*") {
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full db segment", strings.TrimSpace(origin))
}
if tb != "*" && strings.Contains(tb, "*") {
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full table segment", strings.TrimSpace(origin))
}
if db == "*" && tb == "*" {
m.matchAll = true
} else if db == "*" {
m.tbWildcard[tb] = true
} else if tb == "*" {
m.dbWildcard[db] = true
} else {
m.exactMatch[db+"."+tb] = true
}
}
return m, nil
}
func hasConfiguredTablePatterns(patterns []string) bool {
for _, p := range patterns {
if strings.TrimSpace(p) != "" {
return true
}
}
return false
}
func recomputeTxStatsFromVisibleDetails(tx *Transaction) {
if tx == nil {
return
}
if len(tx.Txs) == 0 {
tx.RowsCount = 0
tx.Size = 0
return
}
firstSet := false
minStart := 0
maxEnd := 0
rows := 0
for _, d := range tx.Txs {
rows += d.RowCount
if !firstSet {
minStart = d.StartPos
maxEnd = d.EndPos
firstSet = true
continue
}
if d.StartPos < minStart {
minStart = d.StartPos
}
if d.EndPos > maxEnd {
maxEnd = d.EndPos
}
}
tx.RowsCount = rows
tx.StartPos = minStart
tx.EndPos = maxEnd
if maxEnd > minStart {
tx.Size = maxEnd - minStart
} else {
tx.Size = 0
}
}
func equalFoldShort(s, lower string) bool {
if len(s) != len(lower) {
return false
}
for i := 0; i < len(s); i++ {
c := s[i]
if 'A' <= c && c <= 'Z' {
c += 'a' - 'A'
}
if c != lower[i] {
return false
}
}
return true
}