924 lines
21 KiB
Go
924 lines
21 KiB
Go
package binlog
|
||
|
||
import (
|
||
"b612.me/mysql/gtid"
|
||
"b612.me/staros"
|
||
"bufio"
|
||
"bytes"
|
||
"errors"
|
||
"fmt"
|
||
"github.com/starainrt/go-mysql/replication"
|
||
"io"
|
||
"os"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
var (
|
||
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
|
||
ErrEventTooSmall = errors.New("event size too small")
|
||
)
|
||
|
||
const (
|
||
CompressionNone uint64 = 255
|
||
CompressionZSTD uint64 = 0
|
||
)
|
||
|
||
const (
|
||
maxPooledBodyCap = 4 << 20 // 4MB
|
||
defaultReadBufSize = 1 << 20 // 1MB
|
||
)
|
||
|
||
type TxDetail struct {
|
||
StartPos int `json:"startPos"`
|
||
EndPos int `json:"endPos"`
|
||
RowCount int `json:"rowCount"`
|
||
Timestamp int64 `json:"timestamp"`
|
||
Time time.Time `json:"time"`
|
||
Sql string `json:"sql"`
|
||
Db string `json:"db"`
|
||
Table string `json:"table"`
|
||
SqlType string `json:"sqlType"`
|
||
CompressionType string `json:"compressionType"`
|
||
Rows [][]interface{} `json:"rows"`
|
||
}
|
||
|
||
const (
|
||
STATUS_PREPARE uint8 = iota
|
||
STATUS_BEGIN
|
||
STATUS_COMMIT
|
||
STATUS_ROLLBACK
|
||
)
|
||
|
||
type Transaction struct {
|
||
GTID string `json:"gtid"`
|
||
Timestamp int64 `json:"timestamp"`
|
||
Time time.Time `json:"time"`
|
||
StartPos int `json:"startPos"`
|
||
EndPos int `json:"endPos"`
|
||
Size int `json:"size"`
|
||
RowsCount int `json:"rowsCount"`
|
||
Status uint8 `json:"status"`
|
||
TxStartTime int64 `json:"txStartTime"`
|
||
TxEndTime int64 `json:"txEndTime"`
|
||
sqlOrigin []string `json:"sqlOrigin"`
|
||
Txs []TxDetail `json:"txs"`
|
||
dmlEventCount int
|
||
}
|
||
|
||
func (t Transaction) GetSqlOrigin() []string {
|
||
return t.sqlOrigin
|
||
}
|
||
|
||
type BinlogFilter struct {
|
||
IncludeGtid string
|
||
ExcludeGtid string
|
||
IncludeTables []string
|
||
ExcludeTables []string
|
||
StartPos int
|
||
EndPos int
|
||
StartDate time.Time
|
||
EndDate time.Time
|
||
BigThan int
|
||
SmallThan int
|
||
OnlyShowGtid bool
|
||
OnlyShowDML bool
|
||
PickTxAllIfMatch bool
|
||
ExcludeBlank bool
|
||
IncludeBlank bool
|
||
}
|
||
|
||
type BinlogEvent struct {
|
||
Type string
|
||
DB string
|
||
TB string
|
||
Data string
|
||
RowCnt uint32
|
||
Rows [][]interface{}
|
||
CompressionType string
|
||
}
|
||
|
||
type tableMatcher struct {
|
||
exactMatch map[string]bool
|
||
dbWildcard map[string]bool
|
||
tbWildcard map[string]bool
|
||
matchAll bool
|
||
}
|
||
|
||
func (m *tableMatcher) match(db, tb string) bool {
|
||
if m.matchAll {
|
||
return true
|
||
}
|
||
if m.exactMatch[db+"."+tb] {
|
||
return true
|
||
}
|
||
if m.dbWildcard[db] {
|
||
return true
|
||
}
|
||
if m.tbWildcard[tb] {
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
var bodyBufPool = sync.Pool{
|
||
New: func() any {
|
||
b := make([]byte, 0, 64*1024)
|
||
return &b
|
||
},
|
||
}
|
||
|
||
func getBodyBuf(n int) []byte {
|
||
p := bodyBufPool.Get().(*[]byte)
|
||
if cap(*p) < n {
|
||
b := make([]byte, n)
|
||
return b
|
||
}
|
||
return (*p)[:n]
|
||
}
|
||
|
||
func putBodyBuf(b []byte) {
|
||
if cap(b) > maxPooledBodyCap {
|
||
return
|
||
}
|
||
b = b[:0]
|
||
bodyBufPool.Put(&b)
|
||
}
|
||
|
||
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
|
||
return parseOneBinlog(path, fx)
|
||
}
|
||
|
||
func parseOneBinlog(path string, fx func(Transaction) bool) error {
|
||
if !staros.Exists(path) {
|
||
return os.ErrNotExist
|
||
}
|
||
f, err := os.Open(path)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer f.Close()
|
||
|
||
if err := validateBinlogHeader(f); err != nil {
|
||
return err
|
||
}
|
||
|
||
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
||
return parseBinlogDetail(br, fx)
|
||
}
|
||
|
||
func validateBinlogHeader(f *os.File) error {
|
||
const fileTypeBytes = int64(4)
|
||
b := make([]byte, fileTypeBytes)
|
||
|
||
if _, err := f.Read(b); err != nil {
|
||
return fmt.Errorf("read binlog header failed: %w", err)
|
||
}
|
||
if !bytes.Equal(b, replication.BinLogFileHeader) {
|
||
return ErrInvalidBinlogHeader
|
||
}
|
||
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
|
||
return fmt.Errorf("seek after header failed: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
|
||
if _, err := io.ReadFull(r, headBuf); err != nil {
|
||
return nil, err
|
||
}
|
||
h, err := parser.ParseHeader(headBuf)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("parse header failed: %w", err)
|
||
}
|
||
if h.EventSize <= uint32(replication.EventHeaderSize) {
|
||
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
|
||
}
|
||
return h, nil
|
||
}
|
||
|
||
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
|
||
bodyLen := int(h.EventSize) - replication.EventHeaderSize
|
||
body := getBodyBuf(bodyLen)
|
||
if _, err := io.ReadFull(r, body); err != nil {
|
||
putBodyBuf(body)
|
||
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
|
||
}
|
||
return body, nil
|
||
}
|
||
|
||
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
|
||
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
|
||
if bodyLen <= 0 {
|
||
return nil
|
||
}
|
||
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
|
||
return fmt.Errorf("skip event body failed: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) {
|
||
e, err := parser.ParseEvent(h, body, nil)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err)
|
||
}
|
||
return e, nil
|
||
}
|
||
|
||
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
|
||
idx := 0
|
||
for k, v := range tx.Txs {
|
||
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
|
||
v.Sql = tx.sqlOrigin[idx]
|
||
idx++
|
||
}
|
||
tx.RowsCount += v.RowCount
|
||
tx.Txs[k] = v
|
||
}
|
||
|
||
if onlyShowGtid {
|
||
tx.Size = 0
|
||
} else {
|
||
tx.Size = tx.EndPos - tx.StartPos
|
||
}
|
||
}
|
||
|
||
func fillTimeLazy(tx *Transaction) {
|
||
if tx.Timestamp != 0 && tx.Time.IsZero() {
|
||
tx.Time = time.Unix(tx.Timestamp, 0)
|
||
}
|
||
for i := range tx.Txs {
|
||
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
|
||
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
|
||
}
|
||
}
|
||
}
|
||
|
||
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
|
||
parser := replication.NewBinlogParser()
|
||
parser.SetParseTime(false)
|
||
parser.SetUseDecimal(false)
|
||
|
||
var (
|
||
tbMapPos uint32
|
||
tx Transaction
|
||
headBuf = make([]byte, replication.EventHeaderSize)
|
||
)
|
||
currentGtid := ""
|
||
|
||
for {
|
||
h, err := readEventHeader(r, parser, headBuf)
|
||
if err == io.EOF {
|
||
if currentGtid != "" {
|
||
finalizeTx(&tx, false)
|
||
fillTimeLazy(&tx)
|
||
if f != nil {
|
||
f(tx)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
body, err := readEventBody(r, h)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
e, err := parseEvent(parser, h, body)
|
||
putBodyBuf(body)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if h.EventType == replication.TABLE_MAP_EVENT {
|
||
tbMapPos = h.LogPos - h.EventSize
|
||
}
|
||
|
||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||
for _, ev := range evs {
|
||
startPos := 0
|
||
if ev.Type == "query" || ev.Type == "gtid" {
|
||
startPos = int(h.LogPos - h.EventSize)
|
||
} else {
|
||
startPos = int(tbMapPos)
|
||
}
|
||
|
||
switch ev.Type {
|
||
case "gtid":
|
||
if currentGtid != "" {
|
||
finalizeTx(&tx, false)
|
||
fillTimeLazy(&tx)
|
||
if f != nil && !f(tx) {
|
||
return nil
|
||
}
|
||
}
|
||
currentGtid = ev.Data
|
||
tx = Transaction{
|
||
GTID: ev.Data,
|
||
StartPos: startPos,
|
||
Timestamp: int64(h.Timestamp),
|
||
Txs: make([]TxDetail, 0, 8),
|
||
sqlOrigin: make([]string, 0, 4),
|
||
}
|
||
case "":
|
||
tx.EndPos = int(h.LogPos)
|
||
case "rowsquery":
|
||
tx.EndPos = int(h.LogPos)
|
||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||
default:
|
||
tx.EndPos = int(h.LogPos)
|
||
status := STATUS_PREPARE
|
||
if ev.Type == "query" {
|
||
switch strings.ToLower(ev.Data) {
|
||
case "begin":
|
||
if tx.TxStartTime == 0 {
|
||
tx.TxStartTime = int64(h.Timestamp)
|
||
}
|
||
status = STATUS_BEGIN
|
||
case "commit":
|
||
status = STATUS_COMMIT
|
||
tx.TxEndTime = int64(h.Timestamp)
|
||
case "rollback":
|
||
status = STATUS_ROLLBACK
|
||
tx.TxEndTime = int64(h.Timestamp)
|
||
}
|
||
tx.Status = status
|
||
}
|
||
if ev.DB != "" && ev.TB != "" {
|
||
tx.dmlEventCount++
|
||
}
|
||
tx.Txs = append(tx.Txs, TxDetail{
|
||
StartPos: startPos,
|
||
EndPos: int(h.LogPos),
|
||
Db: ev.DB,
|
||
Table: ev.TB,
|
||
Sql: ev.Data,
|
||
SqlType: ev.Type,
|
||
Rows: ev.Rows,
|
||
RowCount: int(ev.RowCnt),
|
||
Timestamp: int64(h.Timestamp),
|
||
CompressionType: ev.CompressionType,
|
||
})
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
|
||
var res []BinlogEvent
|
||
var sig BinlogEvent
|
||
|
||
switch ev.Header.EventType {
|
||
case replication.ANONYMOUS_GTID_EVENT:
|
||
sig.Data = "anonymous-gtid-event:1"
|
||
sig.Type = "gtid"
|
||
|
||
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||
if !ok {
|
||
return res
|
||
}
|
||
sig.DB = string(wrEvent.Table.Schema)
|
||
sig.TB = string(wrEvent.Table.Table)
|
||
sig.Type = "insert"
|
||
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||
sig.Rows = wrEvent.Rows
|
||
|
||
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||
if !ok {
|
||
return res
|
||
}
|
||
sig.DB = string(wrEvent.Table.Schema)
|
||
sig.TB = string(wrEvent.Table.Table)
|
||
sig.Type = "update"
|
||
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
|
||
sig.Rows = wrEvent.Rows
|
||
|
||
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||
if !ok {
|
||
return res
|
||
}
|
||
sig.DB = string(wrEvent.Table.Schema)
|
||
sig.TB = string(wrEvent.Table.Table)
|
||
sig.Type = "delete"
|
||
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||
sig.Rows = wrEvent.Rows
|
||
|
||
case replication.ROWS_QUERY_EVENT:
|
||
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
|
||
if !ok {
|
||
return res
|
||
}
|
||
sig.Data = string(queryEvent.Query)
|
||
sig.Type = "rowsquery"
|
||
|
||
case replication.QUERY_EVENT:
|
||
queryEvent, ok := ev.Event.(*replication.QueryEvent)
|
||
if !ok {
|
||
return res
|
||
}
|
||
sig.DB = string(queryEvent.Schema)
|
||
sig.Data = string(queryEvent.Query)
|
||
sig.Type = "query"
|
||
|
||
case replication.MARIADB_GTID_EVENT:
|
||
sig.Data = "begin"
|
||
sig.Type = "query"
|
||
|
||
case replication.XID_EVENT:
|
||
sig.Data = "commit"
|
||
sig.Type = "query"
|
||
|
||
case replication.GTID_EVENT:
|
||
ge, ok := ev.Event.(*replication.GTIDEvent)
|
||
if !ok {
|
||
return res
|
||
}
|
||
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
|
||
if err != nil {
|
||
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
|
||
} else {
|
||
sig.Data = gid.String()
|
||
}
|
||
sig.Type = "gtid"
|
||
|
||
case replication.TRANSACTION_PAYLOAD_EVENT:
|
||
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
|
||
if !ok {
|
||
return res
|
||
}
|
||
for _, val := range ge.Events {
|
||
res = append(res, ParseBinlogEvent(val)...)
|
||
}
|
||
compressionType := getCompressionTypeName(ge.CompressionType)
|
||
for idx := range res {
|
||
res[idx].CompressionType = compressionType
|
||
}
|
||
return res
|
||
}
|
||
|
||
res = append(res, sig)
|
||
return res
|
||
}
|
||
|
||
func getCompressionTypeName(code uint64) string {
|
||
switch code {
|
||
case CompressionZSTD:
|
||
return "ZSTD"
|
||
case CompressionNone:
|
||
return ""
|
||
default:
|
||
return fmt.Sprintf("UNKNOWN(%d)", code)
|
||
}
|
||
}
|
||
|
||
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
|
||
if !staros.Exists(path) {
|
||
return os.ErrNotExist
|
||
}
|
||
|
||
f, err := os.Open(path)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer f.Close()
|
||
|
||
parser := replication.NewBinlogParser()
|
||
parser.SetParseTime(false)
|
||
parser.SetUseDecimal(false)
|
||
|
||
if pos != 0 {
|
||
if err := seekToPosition(f, parser, pos); err != nil {
|
||
return err
|
||
}
|
||
} else {
|
||
if err := validateBinlogHeader(f); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
||
return parseBinlogWithFilter(br, parser, filter, fx)
|
||
}
|
||
|
||
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
|
||
if err := validateBinlogHeader(f); err != nil {
|
||
return err
|
||
}
|
||
|
||
headBuf := make([]byte, replication.EventHeaderSize)
|
||
for {
|
||
h, err := readEventHeader(f, parser, headBuf)
|
||
if err != nil {
|
||
return fmt.Errorf("seek to position failed: %w", err)
|
||
}
|
||
body, err := readEventBody(f, h)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
_, err = parseEvent(parser, h, body)
|
||
putBodyBuf(body)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
|
||
break
|
||
}
|
||
}
|
||
|
||
if _, err := f.Seek(pos, io.SeekStart); err != nil {
|
||
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
|
||
var subGtid, inGtid, exGtid *gtid.Gtid
|
||
var err error
|
||
|
||
includeMatcher, excludeMatcher := prepareTableMatchers(filter)
|
||
|
||
if filter.IncludeGtid != "" {
|
||
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
||
if err != nil {
|
||
return fmt.Errorf("parse include gtid failed: %w", err)
|
||
}
|
||
subGtid = inGtid.Clone()
|
||
}
|
||
if filter.ExcludeGtid != "" {
|
||
exGtid, err = gtid.Parse(filter.ExcludeGtid)
|
||
if err != nil {
|
||
return fmt.Errorf("parse exclude gtid failed: %w", err)
|
||
}
|
||
}
|
||
|
||
var (
|
||
tbMapPos uint32
|
||
skipCurrentTxn bool
|
||
tx Transaction
|
||
headBuf = make([]byte, replication.EventHeaderSize)
|
||
)
|
||
currentGtid := ""
|
||
|
||
callFn := func(tx Transaction) bool {
|
||
if fn == nil {
|
||
return true
|
||
}
|
||
|
||
fillTimeLazy(&tx)
|
||
|
||
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
|
||
return true
|
||
}
|
||
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
|
||
return true
|
||
}
|
||
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
|
||
return true
|
||
}
|
||
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
|
||
return true
|
||
}
|
||
if filter.BigThan != 0 && filter.BigThan > tx.Size {
|
||
return true
|
||
}
|
||
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
||
return true
|
||
}
|
||
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
|
||
return true
|
||
}
|
||
|
||
var txs []TxDetail
|
||
var matched bool
|
||
|
||
for _, t := range tx.Txs {
|
||
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
|
||
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
|
||
|
||
if t.Db == "" && t.Table == "" {
|
||
if includeMatcher != nil && !filter.IncludeBlank {
|
||
continue
|
||
}
|
||
if excludeMatcher != nil && filter.ExcludeBlank {
|
||
matched = true
|
||
if filter.PickTxAllIfMatch {
|
||
return true
|
||
}
|
||
continue
|
||
}
|
||
}
|
||
|
||
if includeMatcher != nil {
|
||
if includeMatch {
|
||
matched = true
|
||
if filter.PickTxAllIfMatch {
|
||
return fn(tx)
|
||
}
|
||
txs = append(txs, t)
|
||
}
|
||
} else if excludeMatcher != nil {
|
||
if excludeMatch {
|
||
matched = true
|
||
if filter.PickTxAllIfMatch {
|
||
return true
|
||
}
|
||
} else {
|
||
txs = append(txs, t)
|
||
}
|
||
} else {
|
||
txs = append(txs, t)
|
||
}
|
||
}
|
||
|
||
if matched {
|
||
tx.Txs = txs
|
||
}
|
||
if !matched && includeMatcher != nil {
|
||
return true
|
||
}
|
||
if len(tx.Txs) == 0 && matched {
|
||
return true
|
||
}
|
||
return fn(tx)
|
||
}
|
||
|
||
for {
|
||
h, err := readEventHeader(r, parser, headBuf)
|
||
if err == io.EOF {
|
||
if !tx.Time.IsZero() || tx.Timestamp != 0 {
|
||
finalizeTx(&tx, filter.OnlyShowGtid)
|
||
callFn(tx)
|
||
}
|
||
return nil
|
||
}
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// GTID-only fast path
|
||
if filter.OnlyShowGtid {
|
||
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
|
||
if err := skipEventBody(r, h); err != nil {
|
||
return err
|
||
}
|
||
continue
|
||
}
|
||
|
||
body, err := readEventBody(r, h)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
e, err := parseEvent(parser, h, body)
|
||
putBodyBuf(body)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||
for _, ev := range evs {
|
||
if ev.Type != "gtid" {
|
||
continue
|
||
}
|
||
startPos := int(h.LogPos - h.EventSize)
|
||
|
||
if filter.EndPos != 0 && startPos > filter.EndPos {
|
||
continue
|
||
}
|
||
if filter.StartPos != 0 && startPos < filter.StartPos {
|
||
continue
|
||
}
|
||
|
||
if currentGtid != "" {
|
||
tx.EndPos = startPos - 1
|
||
finalizeTx(&tx, true)
|
||
if !callFn(tx) {
|
||
return nil
|
||
}
|
||
if subGtid != nil {
|
||
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
||
return nil
|
||
}
|
||
}
|
||
tx = Transaction{}
|
||
}
|
||
|
||
currentGtid = ev.Data
|
||
|
||
if inGtid != nil {
|
||
if c, _ := inGtid.Contain(ev.Data); !c {
|
||
tx = Transaction{}
|
||
currentGtid = ""
|
||
continue
|
||
}
|
||
}
|
||
if exGtid != nil {
|
||
if c, _ := exGtid.Contain(ev.Data); c {
|
||
currentGtid = ""
|
||
tx = Transaction{}
|
||
continue
|
||
}
|
||
}
|
||
|
||
tx = Transaction{
|
||
GTID: ev.Data,
|
||
StartPos: startPos,
|
||
EndPos: startPos,
|
||
Timestamp: int64(h.Timestamp),
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
|
||
// 先处理GTID事件(决定当前事务是否命中)
|
||
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
|
||
body, err := readEventBody(r, h)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
e, err := parseEvent(parser, h, body)
|
||
putBodyBuf(body)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||
for _, ev := range evs {
|
||
if ev.Type != "gtid" {
|
||
continue
|
||
}
|
||
startPos := int(h.LogPos - h.EventSize)
|
||
|
||
if currentGtid != "" {
|
||
finalizeTx(&tx, false)
|
||
if !callFn(tx) {
|
||
return nil
|
||
}
|
||
if subGtid != nil {
|
||
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
||
return nil
|
||
}
|
||
}
|
||
tx = Transaction{}
|
||
}
|
||
|
||
currentGtid = ev.Data
|
||
skipCurrentTxn = false
|
||
|
||
if filter.EndPos != 0 && startPos > filter.EndPos {
|
||
skipCurrentTxn = true
|
||
}
|
||
if filter.StartPos != 0 && startPos < filter.StartPos {
|
||
skipCurrentTxn = true
|
||
}
|
||
if inGtid != nil {
|
||
if c, _ := inGtid.Contain(ev.Data); !c {
|
||
skipCurrentTxn = true
|
||
}
|
||
}
|
||
if exGtid != nil {
|
||
if c, _ := exGtid.Contain(ev.Data); c {
|
||
skipCurrentTxn = true
|
||
}
|
||
}
|
||
|
||
if !skipCurrentTxn {
|
||
tx = Transaction{
|
||
GTID: ev.Data,
|
||
StartPos: startPos,
|
||
Timestamp: int64(h.Timestamp),
|
||
Txs: make([]TxDetail, 0, 8),
|
||
sqlOrigin: make([]string, 0, 4),
|
||
}
|
||
} else {
|
||
tx = Transaction{}
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
|
||
// 未命中事务:零解析到底
|
||
if skipCurrentTxn {
|
||
if err := skipEventBody(r, h); err != nil {
|
||
return err
|
||
}
|
||
continue
|
||
}
|
||
|
||
body, err := readEventBody(r, h)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
e, err := parseEvent(parser, h, body)
|
||
putBodyBuf(body)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if h.EventType == replication.TABLE_MAP_EVENT {
|
||
tbMapPos = h.LogPos - h.EventSize
|
||
}
|
||
|
||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||
for _, ev := range evs {
|
||
startPos := 0
|
||
if ev.Type == "query" || ev.Type == "gtid" {
|
||
startPos = int(h.LogPos - h.EventSize)
|
||
} else {
|
||
startPos = int(tbMapPos)
|
||
}
|
||
|
||
switch ev.Type {
|
||
case "":
|
||
tx.EndPos = int(h.LogPos)
|
||
|
||
case "rowsquery":
|
||
tx.EndPos = int(h.LogPos)
|
||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||
|
||
default:
|
||
tx.EndPos = int(h.LogPos)
|
||
status := STATUS_PREPARE
|
||
if ev.Type == "query" {
|
||
switch strings.ToLower(ev.Data) {
|
||
case "begin":
|
||
if tx.TxStartTime == 0 {
|
||
tx.TxStartTime = int64(h.Timestamp)
|
||
}
|
||
status = STATUS_BEGIN
|
||
case "commit":
|
||
status = STATUS_COMMIT
|
||
tx.TxEndTime = int64(h.Timestamp)
|
||
case "rollback":
|
||
status = STATUS_ROLLBACK
|
||
tx.TxEndTime = int64(h.Timestamp)
|
||
}
|
||
tx.Status = status
|
||
}
|
||
if ev.DB != "" && ev.TB != "" {
|
||
tx.dmlEventCount++
|
||
}
|
||
tx.Txs = append(tx.Txs, TxDetail{
|
||
StartPos: startPos,
|
||
EndPos: int(h.LogPos),
|
||
Db: ev.DB,
|
||
Table: ev.TB,
|
||
Sql: ev.Data,
|
||
SqlType: ev.Type,
|
||
Rows: ev.Rows,
|
||
RowCount: int(ev.RowCnt),
|
||
Timestamp: int64(h.Timestamp),
|
||
CompressionType: ev.CompressionType,
|
||
})
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) {
|
||
if len(filter.IncludeTables) > 0 {
|
||
includeMatcher = buildTableMatcher(filter.IncludeTables)
|
||
}
|
||
if len(filter.ExcludeTables) > 0 {
|
||
excludeMatcher = buildTableMatcher(filter.ExcludeTables)
|
||
}
|
||
return includeMatcher, excludeMatcher
|
||
}
|
||
|
||
func buildTableMatcher(patterns []string) *tableMatcher {
|
||
m := &tableMatcher{
|
||
exactMatch: make(map[string]bool),
|
||
dbWildcard: make(map[string]bool),
|
||
tbWildcard: make(map[string]bool),
|
||
}
|
||
|
||
for _, pattern := range patterns {
|
||
if pattern == "*.*" {
|
||
m.matchAll = true
|
||
continue
|
||
}
|
||
parts := strings.Split(pattern, ".")
|
||
if len(parts) != 2 {
|
||
continue
|
||
}
|
||
db, tb := parts[0], parts[1]
|
||
if db == "*" && tb == "*" {
|
||
m.matchAll = true
|
||
} else if db == "*" {
|
||
m.tbWildcard[tb] = true
|
||
} else if tb == "*" {
|
||
m.dbWildcard[db] = true
|
||
} else {
|
||
m.exactMatch[pattern] = true
|
||
}
|
||
}
|
||
return m
|
||
}
|