Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8469c11373 |
968
parse.go
968
parse.go
@ -1,964 +1,8 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"b612.me/mysql/gtid"
|
||||
"b612.me/staros"
|
||||
"github.com/starainrt/go-mysql/replication"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
|
||||
ErrEventTooSmall = errors.New("event size too small")
|
||||
)
|
||||
|
||||
const (
|
||||
CompressionNone uint64 = 255
|
||||
CompressionZSTD uint64 = 0
|
||||
)
|
||||
|
||||
const (
|
||||
maxPooledRawDataCap = 4 << 20 // 4MB
|
||||
defaultReadBufSize = 1 << 20 // 1MB
|
||||
)
|
||||
|
||||
type TxDetail struct {
|
||||
StartPos int `json:"startPos"`
|
||||
EndPos int `json:"endPos"`
|
||||
RowCount int `json:"rowCount"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Time time.Time `json:"time"`
|
||||
Sql string `json:"sql"`
|
||||
Db string `json:"db"`
|
||||
Table string `json:"table"`
|
||||
SqlType string `json:"sqlType"`
|
||||
CompressionType string `json:"compressionType"`
|
||||
Rows [][]interface{} `json:"rows"`
|
||||
}
|
||||
|
||||
const (
|
||||
STATUS_PREPARE uint8 = iota
|
||||
STATUS_BEGIN
|
||||
STATUS_COMMIT
|
||||
STATUS_ROLLBACK
|
||||
)
|
||||
|
||||
type Transaction struct {
|
||||
GTID string `json:"gtid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Time time.Time `json:"time"`
|
||||
StartPos int `json:"startPos"`
|
||||
EndPos int `json:"endPos"`
|
||||
Size int `json:"size"`
|
||||
RowsCount int `json:"rowsCount"`
|
||||
Status uint8 `json:"status"`
|
||||
TxStartTime int64 `json:"txStartTime"`
|
||||
TxEndTime int64 `json:"txEndTime"`
|
||||
sqlOrigin []string `json:"sqlOrigin"`
|
||||
Txs []TxDetail `json:"txs"`
|
||||
dmlEventCount int
|
||||
}
|
||||
|
||||
func (t Transaction) GetSqlOrigin() []string {
|
||||
return t.sqlOrigin
|
||||
}
|
||||
|
||||
type BinlogFilter struct {
|
||||
IncludeGtid string
|
||||
ExcludeGtid string
|
||||
IncludeTables []string
|
||||
ExcludeTables []string
|
||||
StartPos int
|
||||
EndPos int
|
||||
StartDate time.Time
|
||||
EndDate time.Time
|
||||
BigThan int
|
||||
SmallThan int
|
||||
OnlyShowGtid bool
|
||||
OnlyShowDML bool
|
||||
PickTxAllIfMatch bool
|
||||
ExcludeBlank bool
|
||||
IncludeBlank bool
|
||||
}
|
||||
|
||||
type BinlogEvent struct {
|
||||
Type string
|
||||
DB string
|
||||
TB string
|
||||
Data string
|
||||
RowCnt uint32
|
||||
Rows [][]interface{}
|
||||
CompressionType string
|
||||
}
|
||||
|
||||
type tableMatcher struct {
|
||||
exactMatch map[string]bool
|
||||
dbWildcard map[string]bool
|
||||
tbWildcard map[string]bool
|
||||
matchAll bool
|
||||
}
|
||||
|
||||
func (m *tableMatcher) match(db, tb string) bool {
|
||||
if m.matchAll {
|
||||
return true
|
||||
}
|
||||
if m.dbWildcard[db] || m.tbWildcard[tb] {
|
||||
return true
|
||||
}
|
||||
if len(m.exactMatch) > 0 {
|
||||
// Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配
|
||||
var buf [128]byte
|
||||
key := buf[:0]
|
||||
key = append(key, db...)
|
||||
key = append(key, '.')
|
||||
key = append(key, tb...)
|
||||
if m.exactMatch[string(key)] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
|
||||
return parseOneBinlog(path, fx)
|
||||
}
|
||||
|
||||
func parseOneBinlog(path string, fx func(Transaction) bool) error {
|
||||
if !staros.Exists(path) {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if err := validateBinlogHeader(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
||||
return parseBinlogDetail(br, fx)
|
||||
}
|
||||
|
||||
func validateBinlogHeader(f *os.File) error {
|
||||
const fileTypeBytes = int64(4)
|
||||
b := make([]byte, fileTypeBytes)
|
||||
|
||||
if _, err := f.Read(b); err != nil {
|
||||
return fmt.Errorf("read binlog header failed: %w", err)
|
||||
}
|
||||
if !bytes.Equal(b, replication.BinLogFileHeader) {
|
||||
return ErrInvalidBinlogHeader
|
||||
}
|
||||
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("seek after header failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
|
||||
if _, err := io.ReadFull(r, headBuf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h, err := parser.ParseHeader(headBuf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse header failed: %w", err)
|
||||
}
|
||||
if h.EventSize <= uint32(replication.EventHeaderSize) {
|
||||
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
|
||||
bodyLen := int(h.EventSize) - replication.EventHeaderSize
|
||||
body := make([]byte, bodyLen)
|
||||
if _, err := io.ReadFull(r, body); err != nil {
|
||||
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
|
||||
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
|
||||
if bodyLen <= 0 {
|
||||
return nil
|
||||
}
|
||||
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
|
||||
return fmt.Errorf("skip event body failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var rawDataPool = sync.Pool{
|
||||
New: func() any {
|
||||
b := make([]byte, 0, 64*1024)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
func getRawDataBuf(n int) []byte {
|
||||
p := rawDataPool.Get().(*[]byte)
|
||||
if cap(*p) < n {
|
||||
return make([]byte, n)
|
||||
}
|
||||
return (*p)[:n]
|
||||
}
|
||||
|
||||
func putRawDataBuf(b []byte) {
|
||||
if cap(b) > maxPooledRawDataCap {
|
||||
return
|
||||
}
|
||||
b = b[:0]
|
||||
rawDataPool.Put(&b)
|
||||
}
|
||||
|
||||
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) {
|
||||
rawLen := len(headBuf) + len(body)
|
||||
rawData := getRawDataBuf(rawLen)
|
||||
copy(rawData, headBuf)
|
||||
copy(rawData[len(headBuf):], body)
|
||||
|
||||
e, err := parser.ParseEvent(h, body, rawData)
|
||||
putRawDataBuf(rawData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Data %q, Err: %w",
|
||||
h.LogPos, h, body, err)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
|
||||
idx := 0
|
||||
for k, v := range tx.Txs {
|
||||
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
|
||||
v.Sql = tx.sqlOrigin[idx]
|
||||
idx++
|
||||
}
|
||||
tx.RowsCount += v.RowCount
|
||||
tx.Txs[k] = v
|
||||
}
|
||||
|
||||
if onlyShowGtid {
|
||||
tx.Size = 0
|
||||
} else {
|
||||
tx.Size = tx.EndPos - tx.StartPos
|
||||
}
|
||||
}
|
||||
|
||||
func fillTimeLazy(tx *Transaction) {
|
||||
if tx.Timestamp != 0 && tx.Time.IsZero() {
|
||||
tx.Time = time.Unix(tx.Timestamp, 0)
|
||||
}
|
||||
for i := range tx.Txs {
|
||||
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
|
||||
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
|
||||
parser := replication.NewBinlogParser()
|
||||
parser.SetParseTime(false)
|
||||
parser.SetUseDecimal(false)
|
||||
|
||||
var (
|
||||
tbMapPos uint32
|
||||
tx Transaction
|
||||
headBuf = make([]byte, replication.EventHeaderSize)
|
||||
)
|
||||
currentGtid := ""
|
||||
|
||||
for {
|
||||
h, err := readEventHeader(r, parser, headBuf)
|
||||
if err == io.EOF {
|
||||
if currentGtid != "" {
|
||||
finalizeTx(&tx, false)
|
||||
fillTimeLazy(&tx)
|
||||
if f != nil {
|
||||
f(tx)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if h.EventType == replication.TABLE_MAP_EVENT {
|
||||
tbMapPos = h.LogPos - h.EventSize
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
startPos := 0
|
||||
if ev.Type == "query" || ev.Type == "gtid" {
|
||||
startPos = int(h.LogPos - h.EventSize)
|
||||
} else {
|
||||
startPos = int(tbMapPos)
|
||||
}
|
||||
|
||||
switch ev.Type {
|
||||
case "gtid":
|
||||
if currentGtid != "" {
|
||||
finalizeTx(&tx, false)
|
||||
fillTimeLazy(&tx)
|
||||
if f != nil && !f(tx) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
currentGtid = ev.Data
|
||||
tx = Transaction{
|
||||
GTID: ev.Data,
|
||||
StartPos: startPos,
|
||||
Timestamp: int64(h.Timestamp),
|
||||
Txs: make([]TxDetail, 0, 8),
|
||||
sqlOrigin: make([]string, 0, 4),
|
||||
}
|
||||
case "":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
case "rowsquery":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||||
default:
|
||||
tx.EndPos = int(h.LogPos)
|
||||
status := STATUS_PREPARE
|
||||
if ev.Type == "query" {
|
||||
if equalFoldShort(ev.Data, "begin") {
|
||||
if tx.TxStartTime == 0 {
|
||||
tx.TxStartTime = int64(h.Timestamp)
|
||||
}
|
||||
tx.Status = STATUS_BEGIN
|
||||
} else if equalFoldShort(ev.Data, "commit") {
|
||||
tx.Status = STATUS_COMMIT
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
} else if equalFoldShort(ev.Data, "rollback") {
|
||||
tx.Status = STATUS_ROLLBACK
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
}
|
||||
tx.Status = status
|
||||
}
|
||||
if ev.DB != "" && ev.TB != "" {
|
||||
tx.dmlEventCount++
|
||||
}
|
||||
tx.Txs = append(tx.Txs, TxDetail{
|
||||
StartPos: startPos,
|
||||
EndPos: int(h.LogPos),
|
||||
Db: ev.DB,
|
||||
Table: ev.TB,
|
||||
Sql: ev.Data,
|
||||
SqlType: ev.Type,
|
||||
Rows: ev.Rows,
|
||||
RowCount: int(ev.RowCnt),
|
||||
Timestamp: int64(h.Timestamp),
|
||||
CompressionType: ev.CompressionType,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
|
||||
var buf [1]BinlogEvent
|
||||
sig := &buf[0]
|
||||
|
||||
switch ev.Header.EventType {
|
||||
case replication.ANONYMOUS_GTID_EVENT:
|
||||
sig.Data = "anonymous-gtid-event:1"
|
||||
sig.Type = "gtid"
|
||||
|
||||
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
||||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(wrEvent.Table.Schema)
|
||||
sig.TB = string(wrEvent.Table.Table)
|
||||
sig.Type = "insert"
|
||||
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||||
sig.Rows = wrEvent.Rows
|
||||
|
||||
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
||||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(wrEvent.Table.Schema)
|
||||
sig.TB = string(wrEvent.Table.Table)
|
||||
sig.Type = "update"
|
||||
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
|
||||
sig.Rows = wrEvent.Rows
|
||||
|
||||
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
||||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(wrEvent.Table.Schema)
|
||||
sig.TB = string(wrEvent.Table.Table)
|
||||
sig.Type = "delete"
|
||||
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||||
sig.Rows = wrEvent.Rows
|
||||
|
||||
case replication.ROWS_QUERY_EVENT:
|
||||
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.Data = string(queryEvent.Query)
|
||||
sig.Type = "rowsquery"
|
||||
|
||||
case replication.QUERY_EVENT:
|
||||
queryEvent, ok := ev.Event.(*replication.QueryEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(queryEvent.Schema)
|
||||
sig.Data = string(queryEvent.Query)
|
||||
sig.Type = "query"
|
||||
|
||||
case replication.MARIADB_GTID_EVENT:
|
||||
sig.Data = "begin"
|
||||
sig.Type = "query"
|
||||
|
||||
case replication.XID_EVENT:
|
||||
sig.Data = "commit"
|
||||
sig.Type = "query"
|
||||
|
||||
case replication.GTID_EVENT:
|
||||
ge, ok := ev.Event.(*replication.GTIDEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
|
||||
if err != nil {
|
||||
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
|
||||
} else {
|
||||
sig.Data = gid.String()
|
||||
}
|
||||
sig.Type = "gtid"
|
||||
|
||||
case replication.TRANSACTION_PAYLOAD_EVENT:
|
||||
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
res := make([]BinlogEvent, 0, len(ge.Events))
|
||||
for _, val := range ge.Events {
|
||||
res = append(res, ParseBinlogEvent(val)...)
|
||||
}
|
||||
compressionType := getCompressionTypeName(ge.CompressionType)
|
||||
for idx := range res {
|
||||
res[idx].CompressionType = compressionType
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
|
||||
return buf[:]
|
||||
}
|
||||
|
||||
func getCompressionTypeName(code uint64) string {
|
||||
switch code {
|
||||
case CompressionZSTD:
|
||||
return "ZSTD"
|
||||
case CompressionNone:
|
||||
return ""
|
||||
default:
|
||||
return fmt.Sprintf("UNKNOWN(%d)", code)
|
||||
}
|
||||
}
|
||||
|
||||
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
|
||||
if !staros.Exists(path) {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
parser := replication.NewBinlogParser()
|
||||
parser.SetParseTime(false)
|
||||
parser.SetUseDecimal(false)
|
||||
|
||||
if pos != 0 {
|
||||
if err := seekToPosition(f, parser, pos); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := validateBinlogHeader(f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
||||
return parseBinlogWithFilter(br, parser, filter, fx)
|
||||
}
|
||||
|
||||
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
|
||||
if err := validateBinlogHeader(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
headBuf := make([]byte, replication.EventHeaderSize)
|
||||
for {
|
||||
h, err := readEventHeader(f, parser, headBuf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("seek to position failed: %w", err)
|
||||
}
|
||||
body, err := readEventBody(f, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := f.Seek(pos, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
|
||||
var subGtid, inGtid, exGtid *gtid.Gtid
|
||||
var err error
|
||||
|
||||
includeMatcher, excludeMatcher := prepareTableMatchers(filter)
|
||||
|
||||
if filter.IncludeGtid != "" {
|
||||
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse include gtid failed: %w", err)
|
||||
}
|
||||
subGtid = inGtid.Clone()
|
||||
}
|
||||
if filter.ExcludeGtid != "" {
|
||||
exGtid, err = gtid.Parse(filter.ExcludeGtid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse exclude gtid failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
tbMapPos uint32
|
||||
skipCurrentTxn bool
|
||||
tx Transaction
|
||||
headBuf = make([]byte, replication.EventHeaderSize)
|
||||
)
|
||||
currentGtid := ""
|
||||
|
||||
callFn := func(tx Transaction) bool {
|
||||
if fn == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
fillTimeLazy(&tx)
|
||||
|
||||
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
|
||||
return true
|
||||
}
|
||||
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
|
||||
return true
|
||||
}
|
||||
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
|
||||
return true
|
||||
}
|
||||
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
|
||||
return true
|
||||
}
|
||||
if filter.BigThan != 0 && filter.BigThan > tx.Size {
|
||||
return true
|
||||
}
|
||||
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
||||
return true
|
||||
}
|
||||
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
var txs []TxDetail
|
||||
var matched bool
|
||||
|
||||
for _, t := range tx.Txs {
|
||||
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
|
||||
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
|
||||
|
||||
if t.Db == "" && t.Table == "" {
|
||||
if includeMatcher != nil && !filter.IncludeBlank {
|
||||
continue
|
||||
}
|
||||
if excludeMatcher != nil && filter.ExcludeBlank {
|
||||
matched = true
|
||||
if filter.PickTxAllIfMatch {
|
||||
return true
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if includeMatcher != nil {
|
||||
if includeMatch {
|
||||
matched = true
|
||||
if filter.PickTxAllIfMatch {
|
||||
return fn(tx)
|
||||
}
|
||||
txs = append(txs, t)
|
||||
}
|
||||
} else if excludeMatcher != nil {
|
||||
if excludeMatch {
|
||||
matched = true
|
||||
if filter.PickTxAllIfMatch {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
txs = append(txs, t)
|
||||
}
|
||||
} else {
|
||||
txs = append(txs, t)
|
||||
}
|
||||
}
|
||||
|
||||
if matched {
|
||||
tx.Txs = txs
|
||||
}
|
||||
if !matched && includeMatcher != nil {
|
||||
return true
|
||||
}
|
||||
if len(tx.Txs) == 0 && matched {
|
||||
return true
|
||||
}
|
||||
return fn(tx)
|
||||
}
|
||||
|
||||
for {
|
||||
h, err := readEventHeader(r, parser, headBuf)
|
||||
if err == io.EOF {
|
||||
if !tx.Time.IsZero() || tx.Timestamp != 0 {
|
||||
finalizeTx(&tx, filter.OnlyShowGtid)
|
||||
callFn(tx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// GTID-only fast path
|
||||
if filter.OnlyShowGtid {
|
||||
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
|
||||
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT ||
|
||||
h.EventType == replication.TABLE_MAP_EVENT {
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := skipEventBody(r, h); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
if ev.Type != "gtid" {
|
||||
continue
|
||||
}
|
||||
startPos := int(h.LogPos - h.EventSize)
|
||||
|
||||
if filter.EndPos != 0 && startPos > filter.EndPos {
|
||||
continue
|
||||
}
|
||||
if filter.StartPos != 0 && startPos < filter.StartPos {
|
||||
continue
|
||||
}
|
||||
|
||||
if currentGtid != "" {
|
||||
tx.EndPos = startPos - 1
|
||||
finalizeTx(&tx, true)
|
||||
if !callFn(tx) {
|
||||
return nil
|
||||
}
|
||||
if subGtid != nil {
|
||||
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
tx = Transaction{}
|
||||
}
|
||||
|
||||
currentGtid = ev.Data
|
||||
|
||||
if inGtid != nil {
|
||||
if c, _ := inGtid.Contain(ev.Data); !c {
|
||||
tx = Transaction{}
|
||||
currentGtid = ""
|
||||
continue
|
||||
}
|
||||
}
|
||||
if exGtid != nil {
|
||||
if c, _ := exGtid.Contain(ev.Data); c {
|
||||
currentGtid = ""
|
||||
tx = Transaction{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
tx = Transaction{
|
||||
GTID: ev.Data,
|
||||
StartPos: startPos,
|
||||
EndPos: startPos,
|
||||
Timestamp: int64(h.Timestamp),
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 先处理GTID事件(决定当前事务是否命中)
|
||||
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
if ev.Type != "gtid" {
|
||||
continue
|
||||
}
|
||||
startPos := int(h.LogPos - h.EventSize)
|
||||
|
||||
if currentGtid != "" {
|
||||
finalizeTx(&tx, false)
|
||||
if !callFn(tx) {
|
||||
return nil
|
||||
}
|
||||
if subGtid != nil {
|
||||
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
tx = Transaction{}
|
||||
}
|
||||
|
||||
currentGtid = ev.Data
|
||||
skipCurrentTxn = false
|
||||
|
||||
if filter.EndPos != 0 && startPos > filter.EndPos {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
if filter.StartPos != 0 && startPos < filter.StartPos {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
if inGtid != nil {
|
||||
if c, _ := inGtid.Contain(ev.Data); !c {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
}
|
||||
if exGtid != nil {
|
||||
if c, _ := exGtid.Contain(ev.Data); c {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
}
|
||||
|
||||
if !skipCurrentTxn {
|
||||
tx = Transaction{
|
||||
GTID: ev.Data,
|
||||
StartPos: startPos,
|
||||
Timestamp: int64(h.Timestamp),
|
||||
Txs: make([]TxDetail, 0, 8),
|
||||
sqlOrigin: make([]string, 0, 4),
|
||||
}
|
||||
} else {
|
||||
tx = Transaction{}
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 未命中事务时:TABLE_MAP_EVENT 仍需解析(parser 缓存表元数据),
|
||||
// 其余事件可安全跳过
|
||||
if skipCurrentTxn {
|
||||
if h.EventType == replication.TABLE_MAP_EVENT ||
|
||||
h.EventType == replication.FORMAT_DESCRIPTION_EVENT {
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := skipEventBody(r, h); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if h.EventType == replication.TABLE_MAP_EVENT {
|
||||
tbMapPos = h.LogPos - h.EventSize
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
startPos := 0
|
||||
if ev.Type == "query" || ev.Type == "gtid" {
|
||||
startPos = int(h.LogPos - h.EventSize)
|
||||
} else {
|
||||
startPos = int(tbMapPos)
|
||||
}
|
||||
|
||||
switch ev.Type {
|
||||
case "":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
|
||||
case "rowsquery":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||||
|
||||
default:
|
||||
tx.EndPos = int(h.LogPos)
|
||||
status := STATUS_PREPARE
|
||||
if ev.Type == "query" {
|
||||
if equalFoldShort(ev.Data, "begin") {
|
||||
if tx.TxStartTime == 0 {
|
||||
tx.TxStartTime = int64(h.Timestamp)
|
||||
}
|
||||
tx.Status = STATUS_BEGIN
|
||||
} else if equalFoldShort(ev.Data, "commit") {
|
||||
tx.Status = STATUS_COMMIT
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
} else if equalFoldShort(ev.Data, "rollback") {
|
||||
tx.Status = STATUS_ROLLBACK
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
}
|
||||
tx.Status = status
|
||||
}
|
||||
if ev.DB != "" && ev.TB != "" {
|
||||
tx.dmlEventCount++
|
||||
}
|
||||
tx.Txs = append(tx.Txs, TxDetail{
|
||||
StartPos: startPos,
|
||||
EndPos: int(h.LogPos),
|
||||
Db: ev.DB,
|
||||
Table: ev.TB,
|
||||
Sql: ev.Data,
|
||||
SqlType: ev.Type,
|
||||
Rows: ev.Rows,
|
||||
RowCount: int(ev.RowCnt),
|
||||
Timestamp: int64(h.Timestamp),
|
||||
CompressionType: ev.CompressionType,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) {
|
||||
if len(filter.IncludeTables) > 0 {
|
||||
includeMatcher = buildTableMatcher(filter.IncludeTables)
|
||||
}
|
||||
if len(filter.ExcludeTables) > 0 {
|
||||
excludeMatcher = buildTableMatcher(filter.ExcludeTables)
|
||||
}
|
||||
return includeMatcher, excludeMatcher
|
||||
}
|
||||
|
||||
func buildTableMatcher(patterns []string) *tableMatcher {
|
||||
m := &tableMatcher{
|
||||
exactMatch: make(map[string]bool),
|
||||
dbWildcard: make(map[string]bool),
|
||||
tbWildcard: make(map[string]bool),
|
||||
}
|
||||
|
||||
for _, pattern := range patterns {
|
||||
if pattern == "*.*" {
|
||||
m.matchAll = true
|
||||
continue
|
||||
}
|
||||
parts := strings.Split(pattern, ".")
|
||||
if len(parts) != 2 {
|
||||
continue
|
||||
}
|
||||
db, tb := parts[0], parts[1]
|
||||
if db == "*" && tb == "*" {
|
||||
m.matchAll = true
|
||||
} else if db == "*" {
|
||||
m.tbWildcard[tb] = true
|
||||
} else if tb == "*" {
|
||||
m.dbWildcard[db] = true
|
||||
} else {
|
||||
m.exactMatch[pattern] = true
|
||||
}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func equalFoldShort(s, lower string) bool {
|
||||
if len(s) != len(lower) {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(s); i++ {
|
||||
c := s[i]
|
||||
if 'A' <= c && c <= 'Z' {
|
||||
c += 'a' - 'A'
|
||||
}
|
||||
if c != lower[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
// Parsing implementation is split across:
|
||||
// - parse_types.go
|
||||
// - parse_io.go
|
||||
// - parse_event_convert.go
|
||||
// - parse_stream.go
|
||||
// - parse_filter.go
|
||||
|
||||
257
parse_event_convert.go
Normal file
257
parse_event_convert.go
Normal file
@ -0,0 +1,257 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"b612.me/mysql/gtid"
|
||||
"github.com/starainrt/go-mysql/mysql"
|
||||
"github.com/starainrt/go-mysql/replication"
|
||||
)
|
||||
|
||||
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
|
||||
var buf [1]BinlogEvent
|
||||
sig := &buf[0]
|
||||
|
||||
switch ev.Header.EventType {
|
||||
case replication.ANONYMOUS_GTID_EVENT:
|
||||
sig.Data = "anonymous-gtid-event:1"
|
||||
sig.Type = "gtid"
|
||||
|
||||
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
||||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(wrEvent.Table.Schema)
|
||||
sig.TB = string(wrEvent.Table.Table)
|
||||
sig.Type = "insert"
|
||||
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||||
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
||||
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
||||
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
||||
|
||||
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
||||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(wrEvent.Table.Schema)
|
||||
sig.TB = string(wrEvent.Table.Table)
|
||||
sig.Type = "update"
|
||||
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
|
||||
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
||||
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
||||
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
||||
|
||||
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
||||
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(wrEvent.Table.Schema)
|
||||
sig.TB = string(wrEvent.Table.Table)
|
||||
sig.Type = "delete"
|
||||
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||||
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
||||
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
||||
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
||||
|
||||
case replication.TABLE_MAP_EVENT:
|
||||
tableEvent, ok := ev.Event.(*replication.TableMapEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(tableEvent.Schema)
|
||||
sig.TB = string(tableEvent.Table)
|
||||
sig.Type = "tablemap"
|
||||
|
||||
case replication.ROWS_QUERY_EVENT:
|
||||
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.Data = string(queryEvent.Query)
|
||||
sig.Type = "rowsquery"
|
||||
|
||||
case replication.QUERY_EVENT:
|
||||
queryEvent, ok := ev.Event.(*replication.QueryEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sig.DB = string(queryEvent.Schema)
|
||||
sig.Data = string(queryEvent.Query)
|
||||
sig.Type = "query"
|
||||
|
||||
case replication.MARIADB_GTID_EVENT:
|
||||
sig.Data = "begin"
|
||||
sig.Type = "query"
|
||||
|
||||
case replication.XID_EVENT:
|
||||
sig.Data = "commit"
|
||||
sig.Type = "query"
|
||||
|
||||
case replication.GTID_EVENT:
|
||||
ge, ok := ev.Event.(*replication.GTIDEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
|
||||
if err != nil {
|
||||
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
|
||||
} else {
|
||||
sig.Data = gid.String()
|
||||
}
|
||||
sig.Type = "gtid"
|
||||
|
||||
case replication.TRANSACTION_PAYLOAD_EVENT:
|
||||
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
res := make([]BinlogEvent, 0, len(ge.Events))
|
||||
for _, val := range ge.Events {
|
||||
res = append(res, ParseBinlogEvent(val)...)
|
||||
}
|
||||
compressionType := getCompressionTypeName(ge.CompressionType)
|
||||
for idx := range res {
|
||||
res[idx].CompressionType = compressionType
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
|
||||
return buf[:]
|
||||
}
|
||||
|
||||
func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} {
|
||||
if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 {
|
||||
if wrEvent == nil {
|
||||
return nil
|
||||
}
|
||||
return wrEvent.Rows
|
||||
}
|
||||
|
||||
unsignedMap := wrEvent.Table.UnsignedMap()
|
||||
if len(unsignedMap) == 0 {
|
||||
return wrEvent.Rows
|
||||
}
|
||||
|
||||
columnTypes := wrEvent.Table.ColumnType
|
||||
if len(columnTypes) == 0 {
|
||||
return wrEvent.Rows
|
||||
}
|
||||
|
||||
for rowIdx := range wrEvent.Rows {
|
||||
row := wrEvent.Rows[rowIdx]
|
||||
for colIdx := range row {
|
||||
if !unsignedMap[colIdx] {
|
||||
continue
|
||||
}
|
||||
if colIdx >= len(columnTypes) {
|
||||
continue
|
||||
}
|
||||
row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx])
|
||||
}
|
||||
}
|
||||
return wrEvent.Rows
|
||||
}
|
||||
|
||||
func cloneColumnTypes(table *replication.TableMapEvent) []int {
|
||||
if table == nil || len(table.ColumnType) == 0 {
|
||||
return nil
|
||||
}
|
||||
ret := make([]int, len(table.ColumnType))
|
||||
for i, t := range table.ColumnType {
|
||||
ret[i] = int(t)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 {
|
||||
if table == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
columnCount := len(table.ColumnType)
|
||||
if columnCount == 0 && table.ColumnCount > 0 {
|
||||
columnCount = int(table.ColumnCount)
|
||||
}
|
||||
if columnCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]uint64, columnCount)
|
||||
hasValue := false
|
||||
|
||||
for idx, collationID := range table.CollationMap() {
|
||||
if idx < 0 || idx >= columnCount {
|
||||
continue
|
||||
}
|
||||
ret[idx] = collationID
|
||||
hasValue = hasValue || collationID != 0
|
||||
}
|
||||
for idx, collationID := range table.EnumSetCollationMap() {
|
||||
if idx < 0 || idx >= columnCount {
|
||||
continue
|
||||
}
|
||||
if ret[idx] == 0 {
|
||||
ret[idx] = collationID
|
||||
}
|
||||
hasValue = hasValue || collationID != 0
|
||||
}
|
||||
|
||||
if !hasValue {
|
||||
return nil
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func normalizeUnsignedValue(v interface{}, colType byte) interface{} {
|
||||
signed, ok := signedToInt64(v)
|
||||
if !ok {
|
||||
return v
|
||||
}
|
||||
|
||||
switch colType {
|
||||
case mysql.MYSQL_TYPE_TINY:
|
||||
return uint8(signed)
|
||||
case mysql.MYSQL_TYPE_SHORT:
|
||||
return uint16(signed)
|
||||
case mysql.MYSQL_TYPE_INT24:
|
||||
return uint32(uint32(int32(signed)) & 0x00FFFFFF)
|
||||
case mysql.MYSQL_TYPE_LONG:
|
||||
return uint32(signed)
|
||||
case mysql.MYSQL_TYPE_LONGLONG:
|
||||
return uint64(signed)
|
||||
default:
|
||||
return v
|
||||
}
|
||||
}
|
||||
|
||||
func signedToInt64(v interface{}) (int64, bool) {
|
||||
switch x := v.(type) {
|
||||
case int8:
|
||||
return int64(x), true
|
||||
case int16:
|
||||
return int64(x), true
|
||||
case int32:
|
||||
return int64(x), true
|
||||
case int64:
|
||||
return x, true
|
||||
case int:
|
||||
return int64(x), true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
func getCompressionTypeName(code uint64) string {
|
||||
switch code {
|
||||
case CompressionZSTD:
|
||||
return "ZSTD"
|
||||
case CompressionNone:
|
||||
return ""
|
||||
default:
|
||||
return fmt.Sprintf("UNKNOWN(%d)", code)
|
||||
}
|
||||
}
|
||||
73
parse_event_convert_payload_test.go
Normal file
73
parse_event_convert_payload_test.go
Normal file
@ -0,0 +1,73 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/starainrt/go-mysql/mysql"
|
||||
"github.com/starainrt/go-mysql/replication"
|
||||
)
|
||||
|
||||
func TestParseBinlogEvent_TableMapEvent(t *testing.T) {
|
||||
ev := &replication.BinlogEvent{
|
||||
Header: &replication.EventHeader{EventType: replication.TABLE_MAP_EVENT},
|
||||
Event: &replication.TableMapEvent{
|
||||
Schema: []byte("db1"),
|
||||
Table: []byte("tb1"),
|
||||
},
|
||||
}
|
||||
|
||||
events := ParseBinlogEvent(ev)
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
if events[0].Type != "tablemap" {
|
||||
t.Fatalf("expected tablemap event type, got %q", events[0].Type)
|
||||
}
|
||||
if events[0].DB != "db1" || events[0].TB != "tb1" {
|
||||
t.Fatalf("unexpected db/table: %s.%s", events[0].DB, events[0].TB)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseBinlogEvent_TransactionPayloadContainsTableMap(t *testing.T) {
|
||||
table := &replication.TableMapEvent{
|
||||
Schema: []byte("db2"),
|
||||
Table: []byte("tb2"),
|
||||
ColumnType: []byte{mysql.MYSQL_TYPE_LONG},
|
||||
}
|
||||
|
||||
payload := &replication.TransactionPayloadEvent{
|
||||
CompressionType: CompressionZSTD,
|
||||
Events: []*replication.BinlogEvent{
|
||||
{
|
||||
Header: &replication.EventHeader{EventType: replication.TABLE_MAP_EVENT},
|
||||
Event: table,
|
||||
},
|
||||
{
|
||||
Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2},
|
||||
Event: &replication.RowsEvent{
|
||||
Table: table,
|
||||
Rows: [][]interface{}{{int32(1)}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ev := &replication.BinlogEvent{
|
||||
Header: &replication.EventHeader{EventType: replication.TRANSACTION_PAYLOAD_EVENT},
|
||||
Event: payload,
|
||||
}
|
||||
|
||||
events := ParseBinlogEvent(ev)
|
||||
if len(events) != 2 {
|
||||
t.Fatalf("expected 2 events from payload, got %d", len(events))
|
||||
}
|
||||
if events[0].Type != "tablemap" {
|
||||
t.Fatalf("expected first payload event to be tablemap, got %q", events[0].Type)
|
||||
}
|
||||
if events[1].Type != "insert" {
|
||||
t.Fatalf("expected second payload event to be insert, got %q", events[1].Type)
|
||||
}
|
||||
if events[0].CompressionType != "ZSTD" || events[1].CompressionType != "ZSTD" {
|
||||
t.Fatalf("expected payload events to carry compression type, got %q/%q", events[0].CompressionType, events[1].CompressionType)
|
||||
}
|
||||
}
|
||||
95
parse_event_convert_unsigned_test.go
Normal file
95
parse_event_convert_unsigned_test.go
Normal file
@ -0,0 +1,95 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/starainrt/go-mysql/mysql"
|
||||
"github.com/starainrt/go-mysql/replication"
|
||||
)
|
||||
|
||||
func TestNormalizeRowsByUnsigned_AllIntegerKinds(t *testing.T) {
|
||||
event := &replication.RowsEvent{
|
||||
Table: &replication.TableMapEvent{
|
||||
ColumnCount: 5,
|
||||
ColumnType: []byte{mysql.MYSQL_TYPE_TINY, mysql.MYSQL_TYPE_SHORT, mysql.MYSQL_TYPE_INT24, mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_LONGLONG},
|
||||
SignednessBitmap: []byte{0xF8},
|
||||
},
|
||||
Rows: [][]interface{}{{int8(-1), int16(-2), int32(-1), int32(-1), int64(-1)}},
|
||||
}
|
||||
|
||||
got := normalizeRowsByUnsigned(event)
|
||||
row := got[0]
|
||||
|
||||
if v, ok := row[0].(uint8); !ok || v != 255 {
|
||||
t.Fatalf("tiny unsigned mismatch: %T %v", row[0], row[0])
|
||||
}
|
||||
if v, ok := row[1].(uint16); !ok || v != 65534 {
|
||||
t.Fatalf("short unsigned mismatch: %T %v", row[1], row[1])
|
||||
}
|
||||
if v, ok := row[2].(uint32); !ok || v != 16777215 {
|
||||
t.Fatalf("int24 unsigned mismatch: %T %v", row[2], row[2])
|
||||
}
|
||||
if v, ok := row[3].(uint32); !ok || v != 4294967295 {
|
||||
t.Fatalf("long unsigned mismatch: %T %v", row[3], row[3])
|
||||
}
|
||||
if v, ok := row[4].(uint64); !ok || v != 18446744073709551615 {
|
||||
t.Fatalf("longlong unsigned mismatch: %T %v", row[4], row[4])
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeRowsByUnsigned_NoSignednessMetadata(t *testing.T) {
|
||||
event := &replication.RowsEvent{
|
||||
Table: &replication.TableMapEvent{
|
||||
ColumnCount: 1,
|
||||
ColumnType: []byte{mysql.MYSQL_TYPE_LONGLONG},
|
||||
},
|
||||
Rows: [][]interface{}{{int64(-1)}},
|
||||
}
|
||||
|
||||
got := normalizeRowsByUnsigned(event)
|
||||
if v, ok := got[0][0].(int64); !ok || v != -1 {
|
||||
t.Fatalf("value should remain signed when metadata missing: %T %v", got[0][0], got[0][0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseBinlogEvent_IncludeColumnMetadata(t *testing.T) {
|
||||
event := &replication.RowsEvent{
|
||||
Table: &replication.TableMapEvent{
|
||||
ColumnCount: 3,
|
||||
ColumnType: []byte{mysql.MYSQL_TYPE_VAR_STRING, mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_VAR_STRING},
|
||||
DefaultCharset: []uint64{45}, // utf8mb4_general_ci
|
||||
},
|
||||
Rows: [][]interface{}{{"name", int32(1), "desc"}},
|
||||
}
|
||||
|
||||
ev := &replication.BinlogEvent{
|
||||
Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2},
|
||||
Event: event,
|
||||
}
|
||||
|
||||
events := ParseBinlogEvent(ev)
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
got := events[0]
|
||||
|
||||
if len(got.ColumnTypes) != 3 {
|
||||
t.Fatalf("unexpected column type length: %d", len(got.ColumnTypes))
|
||||
}
|
||||
if got.ColumnTypes[0] != int(mysql.MYSQL_TYPE_VAR_STRING) || got.ColumnTypes[1] != int(mysql.MYSQL_TYPE_LONG) {
|
||||
t.Fatalf("unexpected column types: %v", got.ColumnTypes)
|
||||
}
|
||||
|
||||
if len(got.ColumnCollationIDs) != 3 {
|
||||
t.Fatalf("unexpected column collation length: %d", len(got.ColumnCollationIDs))
|
||||
}
|
||||
if got.ColumnCollationIDs[0] != 45 {
|
||||
t.Fatalf("unexpected collation for column 0: %d", got.ColumnCollationIDs[0])
|
||||
}
|
||||
if got.ColumnCollationIDs[2] != 45 {
|
||||
t.Fatalf("unexpected collation for column 2: %d", got.ColumnCollationIDs[2])
|
||||
}
|
||||
if got.ColumnCollationIDs[1] != 0 {
|
||||
t.Fatalf("non-character column should keep zero collation: %d", got.ColumnCollationIDs[1])
|
||||
}
|
||||
}
|
||||
572
parse_filter.go
Normal file
572
parse_filter.go
Normal file
@ -0,0 +1,572 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"b612.me/mysql/gtid"
|
||||
"b612.me/staros"
|
||||
"github.com/starainrt/go-mysql/replication"
|
||||
)
|
||||
|
||||
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
|
||||
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
|
||||
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
|
||||
}
|
||||
|
||||
if !staros.Exists(path) {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
parser := replication.NewBinlogParser()
|
||||
parser.SetParseTime(false)
|
||||
parser.SetUseDecimal(false)
|
||||
|
||||
if pos != 0 {
|
||||
if err := seekToPosition(f, parser, pos); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := validateBinlogHeader(f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
||||
return parseBinlogWithFilter(br, parser, filter, fx)
|
||||
}
|
||||
|
||||
func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
|
||||
if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) {
|
||||
return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time")
|
||||
}
|
||||
|
||||
var subGtid, inGtid, exGtid *gtid.Gtid
|
||||
var err error
|
||||
|
||||
includeMatcher, excludeMatcher, err := prepareTableMatchers(filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if filter.IncludeGtid != "" {
|
||||
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse include gtid failed: %w", err)
|
||||
}
|
||||
subGtid = inGtid.Clone()
|
||||
}
|
||||
if filter.ExcludeGtid != "" {
|
||||
exGtid, err = gtid.Parse(filter.ExcludeGtid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse exclude gtid failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
tbMapPos uint32
|
||||
skipCurrentTxn bool
|
||||
tx Transaction
|
||||
headBuf = make([]byte, replication.EventHeaderSize)
|
||||
)
|
||||
currentGtid := ""
|
||||
|
||||
callFn := func(tx Transaction) bool {
|
||||
if fn == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
fillTimeLazy(&tx)
|
||||
|
||||
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
|
||||
return true
|
||||
}
|
||||
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
|
||||
return true
|
||||
}
|
||||
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
|
||||
return true
|
||||
}
|
||||
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
|
||||
return true
|
||||
}
|
||||
if filter.BigThan != 0 && filter.BigThan > tx.Size {
|
||||
return true
|
||||
}
|
||||
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
||||
return true
|
||||
}
|
||||
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, excludeMatcher, filter)
|
||||
if pickAll {
|
||||
return fn(tx)
|
||||
}
|
||||
if skipAll {
|
||||
return true
|
||||
}
|
||||
|
||||
if matched {
|
||||
tx.Txs = txs
|
||||
recomputeTxStatsFromVisibleDetails(&tx)
|
||||
}
|
||||
if !matched && includeMatcher != nil {
|
||||
return true
|
||||
}
|
||||
if len(tx.Txs) == 0 && matched {
|
||||
return true
|
||||
}
|
||||
return fn(tx)
|
||||
}
|
||||
|
||||
for {
|
||||
h, err := readEventHeader(r, parser, headBuf)
|
||||
if err == io.EOF {
|
||||
if currentGtid != "" {
|
||||
finalizeTx(&tx, filter.OnlyShowGtid)
|
||||
callFn(tx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// GTID-only fast path
|
||||
if filter.OnlyShowGtid {
|
||||
if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT {
|
||||
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT ||
|
||||
h.EventType == replication.TABLE_MAP_EVENT {
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := skipEventBody(r, h); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
if ev.Type != "gtid" {
|
||||
continue
|
||||
}
|
||||
startPos := int(h.LogPos - h.EventSize)
|
||||
|
||||
if filter.EndPos != 0 && startPos > filter.EndPos {
|
||||
continue
|
||||
}
|
||||
if filter.StartPos != 0 && startPos < filter.StartPos {
|
||||
continue
|
||||
}
|
||||
|
||||
if currentGtid != "" {
|
||||
tx.EndPos = startPos - 1
|
||||
finalizeTx(&tx, true)
|
||||
if !callFn(tx) {
|
||||
return nil
|
||||
}
|
||||
if subGtid != nil {
|
||||
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
tx = Transaction{}
|
||||
}
|
||||
|
||||
currentGtid = ev.Data
|
||||
|
||||
if inGtid != nil {
|
||||
if c, _ := inGtid.Contain(ev.Data); !c {
|
||||
tx = Transaction{}
|
||||
currentGtid = ""
|
||||
continue
|
||||
}
|
||||
}
|
||||
if exGtid != nil {
|
||||
if c, _ := exGtid.Contain(ev.Data); c {
|
||||
currentGtid = ""
|
||||
tx = Transaction{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
tx = Transaction{
|
||||
GTID: ev.Data,
|
||||
StartPos: startPos,
|
||||
EndPos: startPos,
|
||||
Timestamp: int64(h.Timestamp),
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 先处理 GTID 事件(决定当前事务是否命中)
|
||||
if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT {
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
if ev.Type != "gtid" {
|
||||
continue
|
||||
}
|
||||
startPos := int(h.LogPos - h.EventSize)
|
||||
|
||||
if currentGtid != "" {
|
||||
finalizeTx(&tx, false)
|
||||
if !callFn(tx) {
|
||||
return nil
|
||||
}
|
||||
if subGtid != nil {
|
||||
if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
tx = Transaction{}
|
||||
}
|
||||
|
||||
currentGtid = ev.Data
|
||||
skipCurrentTxn = false
|
||||
|
||||
if filter.EndPos != 0 && startPos > filter.EndPos {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
if filter.StartPos != 0 && startPos < filter.StartPos {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
if inGtid != nil {
|
||||
if c, _ := inGtid.Contain(ev.Data); !c {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
}
|
||||
if exGtid != nil {
|
||||
if c, _ := exGtid.Contain(ev.Data); c {
|
||||
skipCurrentTxn = true
|
||||
}
|
||||
}
|
||||
|
||||
if !skipCurrentTxn {
|
||||
tx = Transaction{
|
||||
GTID: ev.Data,
|
||||
StartPos: startPos,
|
||||
Timestamp: int64(h.Timestamp),
|
||||
Txs: make([]TxDetail, 0, 8),
|
||||
sqlOrigin: make([]string, 0, 4),
|
||||
}
|
||||
} else {
|
||||
tx = Transaction{}
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 未命中事务时,TABLE_MAP_EVENT 仍需解析(parser 缓存表元数据),
|
||||
// 其余事件可安全跳过
|
||||
if skipCurrentTxn {
|
||||
if h.EventType == replication.TABLE_MAP_EVENT ||
|
||||
h.EventType == replication.FORMAT_DESCRIPTION_EVENT {
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = parseEvent(parser, h, headBuf, body); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := skipEventBody(r, h); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if h.EventType == replication.TABLE_MAP_EVENT {
|
||||
tbMapPos = h.LogPos - h.EventSize
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
startPos := 0
|
||||
if ev.Type == "query" || ev.Type == "gtid" {
|
||||
startPos = int(h.LogPos - h.EventSize)
|
||||
} else {
|
||||
startPos = int(tbMapPos)
|
||||
}
|
||||
|
||||
switch ev.Type {
|
||||
case "":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
|
||||
case "tablemap":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
tbMapPos = h.LogPos - h.EventSize
|
||||
|
||||
case "rowsquery":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||||
|
||||
default:
|
||||
tx.EndPos = int(h.LogPos)
|
||||
if ev.Type == "query" {
|
||||
if equalFoldShort(ev.Data, "begin") {
|
||||
if tx.TxStartTime == 0 {
|
||||
tx.TxStartTime = int64(h.Timestamp)
|
||||
}
|
||||
tx.Status = STATUS_BEGIN
|
||||
} else if equalFoldShort(ev.Data, "commit") {
|
||||
tx.Status = STATUS_COMMIT
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
} else if equalFoldShort(ev.Data, "rollback") {
|
||||
tx.Status = STATUS_ROLLBACK
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
}
|
||||
}
|
||||
if ev.DB != "" && ev.TB != "" {
|
||||
tx.dmlEventCount++
|
||||
}
|
||||
tx.Txs = append(tx.Txs, TxDetail{
|
||||
StartPos: startPos,
|
||||
EndPos: int(h.LogPos),
|
||||
Db: ev.DB,
|
||||
Table: ev.TB,
|
||||
Sql: ev.Data,
|
||||
SqlType: ev.Type,
|
||||
Rows: ev.Rows,
|
||||
ColumnTypes: ev.ColumnTypes,
|
||||
ColumnCollationIDs: ev.ColumnCollationIDs,
|
||||
RowCount: int(ev.RowCnt),
|
||||
Timestamp: int64(h.Timestamp),
|
||||
CompressionType: ev.CompressionType,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func selectVisibleTxDetails(tx Transaction, includeMatcher, excludeMatcher *tableMatcher, filter BinlogFilter) ([]TxDetail, bool, bool, bool) {
|
||||
txs := make([]TxDetail, 0, len(tx.Txs))
|
||||
matched := false
|
||||
|
||||
for _, t := range tx.Txs {
|
||||
includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table)
|
||||
excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table)
|
||||
|
||||
if t.Db == "" && t.Table == "" {
|
||||
if includeMatcher != nil {
|
||||
if filter.IncludeBlank {
|
||||
matched = true
|
||||
if filter.PickTxAllIfMatch {
|
||||
return nil, true, true, false
|
||||
}
|
||||
txs = append(txs, t)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if excludeMatcher != nil {
|
||||
if filter.ExcludeBlank {
|
||||
matched = true
|
||||
if filter.PickTxAllIfMatch {
|
||||
return nil, true, false, true
|
||||
}
|
||||
continue
|
||||
}
|
||||
txs = append(txs, t)
|
||||
continue
|
||||
}
|
||||
|
||||
txs = append(txs, t)
|
||||
continue
|
||||
}
|
||||
|
||||
if includeMatcher != nil {
|
||||
if includeMatch {
|
||||
matched = true
|
||||
if filter.PickTxAllIfMatch {
|
||||
return nil, true, true, false
|
||||
}
|
||||
txs = append(txs, t)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if excludeMatcher != nil {
|
||||
if excludeMatch {
|
||||
matched = true
|
||||
if filter.PickTxAllIfMatch {
|
||||
return nil, true, false, true
|
||||
}
|
||||
continue
|
||||
}
|
||||
txs = append(txs, t)
|
||||
continue
|
||||
}
|
||||
|
||||
txs = append(txs, t)
|
||||
}
|
||||
|
||||
return txs, matched, false, false
|
||||
}
|
||||
|
||||
func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher, err error) {
|
||||
if len(filter.IncludeTables) > 0 {
|
||||
includeMatcher, err = buildTableMatcher(filter.IncludeTables)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid include-tables: %w", err)
|
||||
}
|
||||
}
|
||||
if len(filter.ExcludeTables) > 0 {
|
||||
excludeMatcher, err = buildTableMatcher(filter.ExcludeTables)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid exclude-tables: %w", err)
|
||||
}
|
||||
}
|
||||
return includeMatcher, excludeMatcher, nil
|
||||
}
|
||||
|
||||
func buildTableMatcher(patterns []string) (*tableMatcher, error) {
|
||||
m := &tableMatcher{
|
||||
exactMatch: make(map[string]bool),
|
||||
dbWildcard: make(map[string]bool),
|
||||
tbWildcard: make(map[string]bool),
|
||||
}
|
||||
|
||||
for _, pattern := range patterns {
|
||||
origin := pattern
|
||||
pattern = strings.ToLower(strings.TrimSpace(pattern))
|
||||
if pattern == "" {
|
||||
continue
|
||||
}
|
||||
if pattern == "*.*" {
|
||||
m.matchAll = true
|
||||
continue
|
||||
}
|
||||
parts := strings.Split(pattern, ".")
|
||||
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
|
||||
return nil, fmt.Errorf("invalid table pattern %q: expect db.table", strings.TrimSpace(origin))
|
||||
}
|
||||
db, tb := parts[0], parts[1]
|
||||
if db != "*" && strings.Contains(db, "*") {
|
||||
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full db segment", strings.TrimSpace(origin))
|
||||
}
|
||||
if tb != "*" && strings.Contains(tb, "*") {
|
||||
return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full table segment", strings.TrimSpace(origin))
|
||||
}
|
||||
if db == "*" && tb == "*" {
|
||||
m.matchAll = true
|
||||
} else if db == "*" {
|
||||
m.tbWildcard[tb] = true
|
||||
} else if tb == "*" {
|
||||
m.dbWildcard[db] = true
|
||||
} else {
|
||||
m.exactMatch[db+"."+tb] = true
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func hasConfiguredTablePatterns(patterns []string) bool {
|
||||
for _, p := range patterns {
|
||||
if strings.TrimSpace(p) != "" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func recomputeTxStatsFromVisibleDetails(tx *Transaction) {
|
||||
if tx == nil {
|
||||
return
|
||||
}
|
||||
if len(tx.Txs) == 0 {
|
||||
tx.RowsCount = 0
|
||||
tx.Size = 0
|
||||
return
|
||||
}
|
||||
|
||||
firstSet := false
|
||||
minStart := 0
|
||||
maxEnd := 0
|
||||
rows := 0
|
||||
for _, d := range tx.Txs {
|
||||
rows += d.RowCount
|
||||
if !firstSet {
|
||||
minStart = d.StartPos
|
||||
maxEnd = d.EndPos
|
||||
firstSet = true
|
||||
continue
|
||||
}
|
||||
if d.StartPos < minStart {
|
||||
minStart = d.StartPos
|
||||
}
|
||||
if d.EndPos > maxEnd {
|
||||
maxEnd = d.EndPos
|
||||
}
|
||||
}
|
||||
|
||||
tx.RowsCount = rows
|
||||
tx.StartPos = minStart
|
||||
tx.EndPos = maxEnd
|
||||
if maxEnd > minStart {
|
||||
tx.Size = maxEnd - minStart
|
||||
} else {
|
||||
tx.Size = 0
|
||||
}
|
||||
}
|
||||
|
||||
func equalFoldShort(s, lower string) bool {
|
||||
if len(s) != len(lower) {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(s); i++ {
|
||||
c := s[i]
|
||||
if 'A' <= c && c <= 'Z' {
|
||||
c += 'a' - 'A'
|
||||
}
|
||||
if c != lower[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
85
parse_filter_table_test.go
Normal file
85
parse_filter_table_test.go
Normal file
@ -0,0 +1,85 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBuildTableMatcher_InvalidPattern(t *testing.T) {
|
||||
cases := [][]string{
|
||||
{"db"},
|
||||
{"db."},
|
||||
{".tb"},
|
||||
{"db.tb.more"},
|
||||
{"db.t*"},
|
||||
{"d*.tb"},
|
||||
}
|
||||
|
||||
for _, patterns := range cases {
|
||||
if _, err := buildTableMatcher(patterns); err == nil {
|
||||
t.Fatalf("expected invalid pattern error, got nil: %v", patterns)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrepareTableMatchers_ReturnErrorOnInvalidPattern(t *testing.T) {
|
||||
_, _, err := prepareTableMatchers(BinlogFilter{IncludeTables: []string{"invalid"}})
|
||||
if err == nil {
|
||||
t.Fatal("expected include-tables error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "invalid include-tables") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelectVisibleTxDetails_IncludeBlank(t *testing.T) {
|
||||
includeMatcher, err := buildTableMatcher([]string{"db1.tb1"})
|
||||
if err != nil {
|
||||
t.Fatalf("build include matcher failed: %v", err)
|
||||
}
|
||||
|
||||
tx := Transaction{Txs: []TxDetail{
|
||||
{SqlType: "query", Sql: "BEGIN"},
|
||||
{SqlType: "insert", Db: "db1", Table: "tb1", RowCount: 1},
|
||||
}}
|
||||
|
||||
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, nil, BinlogFilter{IncludeBlank: true})
|
||||
if !matched {
|
||||
t.Fatal("expected matched=true")
|
||||
}
|
||||
if pickAll || skipAll {
|
||||
t.Fatalf("unexpected pickAll/skipAll: %v/%v", pickAll, skipAll)
|
||||
}
|
||||
if len(txs) != 2 {
|
||||
t.Fatalf("expected 2 details with IncludeBlank=true, got %d", len(txs))
|
||||
}
|
||||
if txs[0].SqlType != "query" || txs[1].Table != "tb1" {
|
||||
t.Fatalf("unexpected details order/content: %#v", txs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelectVisibleTxDetails_ExcludeBlank(t *testing.T) {
|
||||
excludeMatcher, err := buildTableMatcher([]string{"db2.tb2"})
|
||||
if err != nil {
|
||||
t.Fatalf("build exclude matcher failed: %v", err)
|
||||
}
|
||||
|
||||
tx := Transaction{Txs: []TxDetail{
|
||||
{SqlType: "query", Sql: "BEGIN"},
|
||||
{SqlType: "insert", Db: "db1", Table: "tb1", RowCount: 1},
|
||||
}}
|
||||
|
||||
txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, nil, excludeMatcher, BinlogFilter{ExcludeBlank: true})
|
||||
if !matched {
|
||||
t.Fatal("expected matched=true when excluding blank detail")
|
||||
}
|
||||
if pickAll || skipAll {
|
||||
t.Fatalf("unexpected pickAll/skipAll: %v/%v", pickAll, skipAll)
|
||||
}
|
||||
if len(txs) != 1 {
|
||||
t.Fatalf("expected 1 detail after ExcludeBlank=true, got %d", len(txs))
|
||||
}
|
||||
if txs[0].Db != "db1" || txs[0].Table != "tb1" {
|
||||
t.Fatalf("unexpected remaining detail: %#v", txs[0])
|
||||
}
|
||||
}
|
||||
150
parse_io.go
Normal file
150
parse_io.go
Normal file
@ -0,0 +1,150 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/starainrt/go-mysql/replication"
|
||||
)
|
||||
|
||||
func validateBinlogHeader(f *os.File) error {
|
||||
const fileTypeBytes = int64(4)
|
||||
b := make([]byte, fileTypeBytes)
|
||||
|
||||
if _, err := f.Read(b); err != nil {
|
||||
return fmt.Errorf("read binlog header failed: %w", err)
|
||||
}
|
||||
if !bytes.Equal(b, replication.BinLogFileHeader) {
|
||||
return ErrInvalidBinlogHeader
|
||||
}
|
||||
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("seek after header failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
|
||||
if _, err := io.ReadFull(r, headBuf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h, err := parser.ParseHeader(headBuf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse header failed: %w", err)
|
||||
}
|
||||
if h.EventSize <= uint32(replication.EventHeaderSize) {
|
||||
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
|
||||
bodyLen := int(h.EventSize) - replication.EventHeaderSize
|
||||
body := make([]byte, bodyLen)
|
||||
if _, err := io.ReadFull(r, body); err != nil {
|
||||
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
|
||||
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
|
||||
if bodyLen <= 0 {
|
||||
return nil
|
||||
}
|
||||
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
|
||||
return fmt.Errorf("skip event body failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var rawDataPool = sync.Pool{
|
||||
New: func() any {
|
||||
b := make([]byte, 0, 64*1024)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
func getRawDataBuf(n int) []byte {
|
||||
p := rawDataPool.Get().(*[]byte)
|
||||
if cap(*p) < n {
|
||||
return make([]byte, n)
|
||||
}
|
||||
return (*p)[:n]
|
||||
}
|
||||
|
||||
func putRawDataBuf(b []byte) {
|
||||
if cap(b) > maxPooledRawDataCap {
|
||||
return
|
||||
}
|
||||
b = b[:0]
|
||||
rawDataPool.Put(&b)
|
||||
}
|
||||
|
||||
func formatBodyPreview(body []byte, maxBytes int) string {
|
||||
if maxBytes <= 0 {
|
||||
maxBytes = 256
|
||||
}
|
||||
if len(body) == 0 {
|
||||
return "len=0"
|
||||
}
|
||||
previewLen := len(body)
|
||||
truncated := false
|
||||
if previewLen > maxBytes {
|
||||
previewLen = maxBytes
|
||||
truncated = true
|
||||
}
|
||||
hexBody := hex.EncodeToString(body[:previewLen])
|
||||
if truncated {
|
||||
return fmt.Sprintf("len=%d preview(hex,%dB)=%s...", len(body), previewLen, hexBody)
|
||||
}
|
||||
return fmt.Sprintf("len=%d preview(hex,%dB)=%s", len(body), previewLen, hexBody)
|
||||
}
|
||||
|
||||
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) {
|
||||
rawLen := len(headBuf) + len(body)
|
||||
rawData := getRawDataBuf(rawLen)
|
||||
copy(rawData, headBuf)
|
||||
copy(rawData[len(headBuf):], body)
|
||||
|
||||
e, err := parser.ParseEvent(h, body, rawData)
|
||||
putRawDataBuf(rawData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Body %s, Err: %w",
|
||||
h.LogPos, h, formatBodyPreview(body, 256), err)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
|
||||
if err := validateBinlogHeader(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
headBuf := make([]byte, replication.EventHeaderSize)
|
||||
for {
|
||||
h, err := readEventHeader(f, parser, headBuf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("seek to position failed: %w", err)
|
||||
}
|
||||
body, err := readEventBody(f, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := f.Seek(pos, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
33
parse_io_test.go
Normal file
33
parse_io_test.go
Normal file
@ -0,0 +1,33 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFormatBodyPreview(t *testing.T) {
|
||||
if got := formatBodyPreview(nil, 256); got != "len=0" {
|
||||
t.Fatalf("unexpected empty preview: %q", got)
|
||||
}
|
||||
|
||||
small := []byte{0x01, 0x02, 0xAB}
|
||||
got := formatBodyPreview(small, 8)
|
||||
if !strings.Contains(got, "len=3") || !strings.Contains(got, "0102ab") {
|
||||
t.Fatalf("unexpected preview for small body: %q", got)
|
||||
}
|
||||
if strings.Contains(got, "...") {
|
||||
t.Fatalf("small body should not be truncated: %q", got)
|
||||
}
|
||||
|
||||
large := make([]byte, 300)
|
||||
for i := range large {
|
||||
large[i] = byte(i)
|
||||
}
|
||||
got = formatBodyPreview(large, 16)
|
||||
if !strings.Contains(got, "len=300") || !strings.Contains(got, "preview(hex,16B)=") {
|
||||
t.Fatalf("unexpected preview for large body: %q", got)
|
||||
}
|
||||
if !strings.HasSuffix(got, "...") {
|
||||
t.Fatalf("large body should be truncated with ellipsis: %q", got)
|
||||
}
|
||||
}
|
||||
176
parse_stream.go
Normal file
176
parse_stream.go
Normal file
@ -0,0 +1,176 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"b612.me/staros"
|
||||
"github.com/starainrt/go-mysql/replication"
|
||||
)
|
||||
|
||||
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
|
||||
return parseOneBinlog(path, fx)
|
||||
}
|
||||
|
||||
func parseOneBinlog(path string, fx func(Transaction) bool) error {
|
||||
if !staros.Exists(path) {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if err := validateBinlogHeader(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
br := bufio.NewReaderSize(f, defaultReadBufSize)
|
||||
return parseBinlogDetail(br, fx)
|
||||
}
|
||||
|
||||
func finalizeTx(tx *Transaction, onlyShowGtid bool) {
|
||||
idx := 0
|
||||
for k, v := range tx.Txs {
|
||||
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
|
||||
v.Sql = tx.sqlOrigin[idx]
|
||||
idx++
|
||||
}
|
||||
tx.RowsCount += v.RowCount
|
||||
tx.Txs[k] = v
|
||||
}
|
||||
|
||||
if onlyShowGtid {
|
||||
tx.Size = 0
|
||||
} else {
|
||||
tx.Size = tx.EndPos - tx.StartPos
|
||||
}
|
||||
}
|
||||
|
||||
func fillTimeLazy(tx *Transaction) {
|
||||
if tx.Timestamp != 0 && tx.Time.IsZero() {
|
||||
tx.Time = time.Unix(tx.Timestamp, 0)
|
||||
}
|
||||
for i := range tx.Txs {
|
||||
if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() {
|
||||
tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
|
||||
parser := replication.NewBinlogParser()
|
||||
parser.SetParseTime(false)
|
||||
parser.SetUseDecimal(false)
|
||||
|
||||
var (
|
||||
tbMapPos uint32
|
||||
tx Transaction
|
||||
headBuf = make([]byte, replication.EventHeaderSize)
|
||||
)
|
||||
currentGtid := ""
|
||||
|
||||
for {
|
||||
h, err := readEventHeader(r, parser, headBuf)
|
||||
if err == io.EOF {
|
||||
if currentGtid != "" {
|
||||
finalizeTx(&tx, false)
|
||||
fillTimeLazy(&tx)
|
||||
if f != nil {
|
||||
f(tx)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
body, err := readEventBody(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e, err := parseEvent(parser, h, headBuf, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if h.EventType == replication.TABLE_MAP_EVENT {
|
||||
tbMapPos = h.LogPos - h.EventSize
|
||||
}
|
||||
|
||||
evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e})
|
||||
for _, ev := range evs {
|
||||
startPos := 0
|
||||
if ev.Type == "query" || ev.Type == "gtid" {
|
||||
startPos = int(h.LogPos - h.EventSize)
|
||||
} else {
|
||||
startPos = int(tbMapPos)
|
||||
}
|
||||
|
||||
switch ev.Type {
|
||||
case "gtid":
|
||||
if currentGtid != "" {
|
||||
finalizeTx(&tx, false)
|
||||
fillTimeLazy(&tx)
|
||||
if f != nil && !f(tx) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
currentGtid = ev.Data
|
||||
tx = Transaction{
|
||||
GTID: ev.Data,
|
||||
StartPos: startPos,
|
||||
Timestamp: int64(h.Timestamp),
|
||||
Txs: make([]TxDetail, 0, 8),
|
||||
sqlOrigin: make([]string, 0, 4),
|
||||
}
|
||||
case "":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
case "tablemap":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
tbMapPos = h.LogPos - h.EventSize
|
||||
case "rowsquery":
|
||||
tx.EndPos = int(h.LogPos)
|
||||
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
|
||||
default:
|
||||
tx.EndPos = int(h.LogPos)
|
||||
if ev.Type == "query" {
|
||||
if equalFoldShort(ev.Data, "begin") {
|
||||
if tx.TxStartTime == 0 {
|
||||
tx.TxStartTime = int64(h.Timestamp)
|
||||
}
|
||||
tx.Status = STATUS_BEGIN
|
||||
} else if equalFoldShort(ev.Data, "commit") {
|
||||
tx.Status = STATUS_COMMIT
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
} else if equalFoldShort(ev.Data, "rollback") {
|
||||
tx.Status = STATUS_ROLLBACK
|
||||
tx.TxEndTime = int64(h.Timestamp)
|
||||
}
|
||||
}
|
||||
if ev.DB != "" && ev.TB != "" {
|
||||
tx.dmlEventCount++
|
||||
}
|
||||
tx.Txs = append(tx.Txs, TxDetail{
|
||||
StartPos: startPos,
|
||||
EndPos: int(h.LogPos),
|
||||
Db: ev.DB,
|
||||
Table: ev.TB,
|
||||
Sql: ev.Data,
|
||||
SqlType: ev.Type,
|
||||
Rows: ev.Rows,
|
||||
ColumnTypes: ev.ColumnTypes,
|
||||
ColumnCollationIDs: ev.ColumnCollationIDs,
|
||||
RowCount: int(ev.RowCnt),
|
||||
Timestamp: int64(h.Timestamp),
|
||||
CompressionType: ev.CompressionType,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
126
parse_types.go
Normal file
126
parse_types.go
Normal file
@ -0,0 +1,126 @@
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
|
||||
ErrEventTooSmall = errors.New("event size too small")
|
||||
)
|
||||
|
||||
const (
|
||||
CompressionNone uint64 = 255
|
||||
CompressionZSTD uint64 = 0
|
||||
)
|
||||
|
||||
const (
|
||||
maxPooledRawDataCap = 4 << 20 // 4MB
|
||||
defaultReadBufSize = 1 << 20 // 1MB
|
||||
)
|
||||
|
||||
type TxDetail struct {
|
||||
StartPos int `json:"startPos"`
|
||||
EndPos int `json:"endPos"`
|
||||
RowCount int `json:"rowCount"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Time time.Time `json:"time"`
|
||||
Sql string `json:"sql"`
|
||||
Db string `json:"db"`
|
||||
Table string `json:"table"`
|
||||
SqlType string `json:"sqlType"`
|
||||
CompressionType string `json:"compressionType"`
|
||||
Rows [][]interface{} `json:"rows"`
|
||||
ColumnTypes []int `json:"columnTypes,omitempty"`
|
||||
ColumnCollationIDs []uint64 `json:"columnCollationIds,omitempty"`
|
||||
}
|
||||
|
||||
const (
|
||||
STATUS_PREPARE uint8 = iota
|
||||
STATUS_BEGIN
|
||||
STATUS_COMMIT
|
||||
STATUS_ROLLBACK
|
||||
)
|
||||
|
||||
type Transaction struct {
|
||||
GTID string `json:"gtid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Time time.Time `json:"time"`
|
||||
StartPos int `json:"startPos"`
|
||||
EndPos int `json:"endPos"`
|
||||
Size int `json:"size"`
|
||||
RowsCount int `json:"rowsCount"`
|
||||
Status uint8 `json:"status"`
|
||||
TxStartTime int64 `json:"txStartTime"`
|
||||
TxEndTime int64 `json:"txEndTime"`
|
||||
sqlOrigin []string `json:"sqlOrigin"`
|
||||
Txs []TxDetail `json:"txs"`
|
||||
dmlEventCount int
|
||||
}
|
||||
|
||||
func (t Transaction) GetSqlOrigin() []string {
|
||||
return t.sqlOrigin
|
||||
}
|
||||
|
||||
type BinlogFilter struct {
|
||||
IncludeGtid string
|
||||
ExcludeGtid string
|
||||
IncludeTables []string
|
||||
ExcludeTables []string
|
||||
StartPos int
|
||||
EndPos int
|
||||
StartDate time.Time
|
||||
EndDate time.Time
|
||||
BigThan int
|
||||
SmallThan int
|
||||
OnlyShowGtid bool
|
||||
OnlyShowDML bool
|
||||
PickTxAllIfMatch bool
|
||||
ExcludeBlank bool
|
||||
IncludeBlank bool
|
||||
}
|
||||
|
||||
type BinlogEvent struct {
|
||||
Type string
|
||||
DB string
|
||||
TB string
|
||||
Data string
|
||||
RowCnt uint32
|
||||
Rows [][]interface{}
|
||||
ColumnTypes []int
|
||||
ColumnCollationIDs []uint64
|
||||
CompressionType string
|
||||
}
|
||||
|
||||
type tableMatcher struct {
|
||||
exactMatch map[string]bool
|
||||
dbWildcard map[string]bool
|
||||
tbWildcard map[string]bool
|
||||
matchAll bool
|
||||
}
|
||||
|
||||
func (m *tableMatcher) match(db, tb string) bool {
|
||||
db = strings.ToLower(strings.TrimSpace(db))
|
||||
tb = strings.ToLower(strings.TrimSpace(tb))
|
||||
|
||||
if m.matchAll {
|
||||
return true
|
||||
}
|
||||
if m.dbWildcard[db] || m.tbWildcard[tb] {
|
||||
return true
|
||||
}
|
||||
if len(m.exactMatch) > 0 {
|
||||
// Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配
|
||||
var buf [128]byte
|
||||
key := buf[:0]
|
||||
key = append(key, db...)
|
||||
key = append(key, '.')
|
||||
key = append(key, tb...)
|
||||
if m.exactMatch[string(key)] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user