127 lines
3.1 KiB
Go
127 lines
3.1 KiB
Go
|
|
package binlog
|
||
|
|
|
||
|
|
import (
|
||
|
|
"errors"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
var (
|
||
|
|
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
|
||
|
|
ErrEventTooSmall = errors.New("event size too small")
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
CompressionNone uint64 = 255
|
||
|
|
CompressionZSTD uint64 = 0
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
maxPooledRawDataCap = 4 << 20 // 4MB
|
||
|
|
defaultReadBufSize = 1 << 20 // 1MB
|
||
|
|
)
|
||
|
|
|
||
|
|
type TxDetail struct {
|
||
|
|
StartPos int `json:"startPos"`
|
||
|
|
EndPos int `json:"endPos"`
|
||
|
|
RowCount int `json:"rowCount"`
|
||
|
|
Timestamp int64 `json:"timestamp"`
|
||
|
|
Time time.Time `json:"time"`
|
||
|
|
Sql string `json:"sql"`
|
||
|
|
Db string `json:"db"`
|
||
|
|
Table string `json:"table"`
|
||
|
|
SqlType string `json:"sqlType"`
|
||
|
|
CompressionType string `json:"compressionType"`
|
||
|
|
Rows [][]interface{} `json:"rows"`
|
||
|
|
ColumnTypes []int `json:"columnTypes,omitempty"`
|
||
|
|
ColumnCollationIDs []uint64 `json:"columnCollationIds,omitempty"`
|
||
|
|
}
|
||
|
|
|
||
|
|
const (
|
||
|
|
STATUS_PREPARE uint8 = iota
|
||
|
|
STATUS_BEGIN
|
||
|
|
STATUS_COMMIT
|
||
|
|
STATUS_ROLLBACK
|
||
|
|
)
|
||
|
|
|
||
|
|
type Transaction struct {
|
||
|
|
GTID string `json:"gtid"`
|
||
|
|
Timestamp int64 `json:"timestamp"`
|
||
|
|
Time time.Time `json:"time"`
|
||
|
|
StartPos int `json:"startPos"`
|
||
|
|
EndPos int `json:"endPos"`
|
||
|
|
Size int `json:"size"`
|
||
|
|
RowsCount int `json:"rowsCount"`
|
||
|
|
Status uint8 `json:"status"`
|
||
|
|
TxStartTime int64 `json:"txStartTime"`
|
||
|
|
TxEndTime int64 `json:"txEndTime"`
|
||
|
|
sqlOrigin []string `json:"sqlOrigin"`
|
||
|
|
Txs []TxDetail `json:"txs"`
|
||
|
|
dmlEventCount int
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t Transaction) GetSqlOrigin() []string {
|
||
|
|
return t.sqlOrigin
|
||
|
|
}
|
||
|
|
|
||
|
|
type BinlogFilter struct {
|
||
|
|
IncludeGtid string
|
||
|
|
ExcludeGtid string
|
||
|
|
IncludeTables []string
|
||
|
|
ExcludeTables []string
|
||
|
|
StartPos int
|
||
|
|
EndPos int
|
||
|
|
StartDate time.Time
|
||
|
|
EndDate time.Time
|
||
|
|
BigThan int
|
||
|
|
SmallThan int
|
||
|
|
OnlyShowGtid bool
|
||
|
|
OnlyShowDML bool
|
||
|
|
PickTxAllIfMatch bool
|
||
|
|
ExcludeBlank bool
|
||
|
|
IncludeBlank bool
|
||
|
|
}
|
||
|
|
|
||
|
|
type BinlogEvent struct {
|
||
|
|
Type string
|
||
|
|
DB string
|
||
|
|
TB string
|
||
|
|
Data string
|
||
|
|
RowCnt uint32
|
||
|
|
Rows [][]interface{}
|
||
|
|
ColumnTypes []int
|
||
|
|
ColumnCollationIDs []uint64
|
||
|
|
CompressionType string
|
||
|
|
}
|
||
|
|
|
||
|
|
type tableMatcher struct {
|
||
|
|
exactMatch map[string]bool
|
||
|
|
dbWildcard map[string]bool
|
||
|
|
tbWildcard map[string]bool
|
||
|
|
matchAll bool
|
||
|
|
}
|
||
|
|
|
||
|
|
func (m *tableMatcher) match(db, tb string) bool {
|
||
|
|
db = strings.ToLower(strings.TrimSpace(db))
|
||
|
|
tb = strings.ToLower(strings.TrimSpace(tb))
|
||
|
|
|
||
|
|
if m.matchAll {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
if m.dbWildcard[db] || m.tbWildcard[tb] {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
if len(m.exactMatch) > 0 {
|
||
|
|
// Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配
|
||
|
|
var buf [128]byte
|
||
|
|
key := buf[:0]
|
||
|
|
key = append(key, db...)
|
||
|
|
key = append(key, '.')
|
||
|
|
key = append(key, tb...)
|
||
|
|
if m.exactMatch[string(key)] {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return false
|
||
|
|
}
|