mysqlbinlog/parse_types.go
starainrt 8469c11373
refactor(parse): 拆分 parse.go 并修复事务/过滤一致性问题
- 将臃肿的 parse.go 按职责拆分为多个模块:
    parse_types.go、parse_io.go、parse_event_convert.go、parse_stream.go、parse_filter.go
  - parse.go 保留为模块入口说明,提升可维护性与可读性
  - 修复事务状态被覆盖问题(BEGIN/COMMIT/ROLLBACK 不再被重置为 PREPARE)
  - 增加 include-tables 与 exclude-tables 互斥校验,同时配置时直接报配置错误
  - 强化表匹配器模式校验,并补充非法模式测试
  - 在明细过滤后重算事务统计(RowsCount/StartPos/EndPos/Size),避免统计失真
  - 增加 TABLE_MAP 事件转换,补充列元信息透传(ColumnTypes/ColumnCollationIDs)
  - 基于 unsigned 元数据规范化行值,避免无符号整型被渲染为负数
  - 优化事件解析报错信息:增加有界 body 十六进制预览
  - 补充单元测试:payload/tablemap 转换、unsigned 规范化、过滤逻辑、IO 预览
2026-03-19 17:04:35 +08:00

127 lines
3.1 KiB
Go

package binlog
import (
"errors"
"strings"
"time"
)
var (
ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
ErrEventTooSmall = errors.New("event size too small")
)
const (
CompressionNone uint64 = 255
CompressionZSTD uint64 = 0
)
const (
maxPooledRawDataCap = 4 << 20 // 4MB
defaultReadBufSize = 1 << 20 // 1MB
)
type TxDetail struct {
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
RowCount int `json:"rowCount"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
Sql string `json:"sql"`
Db string `json:"db"`
Table string `json:"table"`
SqlType string `json:"sqlType"`
CompressionType string `json:"compressionType"`
Rows [][]interface{} `json:"rows"`
ColumnTypes []int `json:"columnTypes,omitempty"`
ColumnCollationIDs []uint64 `json:"columnCollationIds,omitempty"`
}
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct {
GTID string `json:"gtid"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
Size int `json:"size"`
RowsCount int `json:"rowsCount"`
Status uint8 `json:"status"`
TxStartTime int64 `json:"txStartTime"`
TxEndTime int64 `json:"txEndTime"`
sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
dmlEventCount int
}
func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin
}
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
IncludeTables []string
ExcludeTables []string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
OnlyShowGtid bool
OnlyShowDML bool
PickTxAllIfMatch bool
ExcludeBlank bool
IncludeBlank bool
}
type BinlogEvent struct {
Type string
DB string
TB string
Data string
RowCnt uint32
Rows [][]interface{}
ColumnTypes []int
ColumnCollationIDs []uint64
CompressionType string
}
type tableMatcher struct {
exactMatch map[string]bool
dbWildcard map[string]bool
tbWildcard map[string]bool
matchAll bool
}
func (m *tableMatcher) match(db, tb string) bool {
db = strings.ToLower(strings.TrimSpace(db))
tb = strings.ToLower(strings.TrimSpace(tb))
if m.matchAll {
return true
}
if m.dbWildcard[db] || m.tbWildcard[tb] {
return true
}
if len(m.exactMatch) > 0 {
// Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配
var buf [128]byte
key := buf[:0]
key = append(key, db...)
key = append(key, '.')
key = append(key, tb...)
if m.exactMatch[string(key)] {
return true
}
}
return false
}