mysqlbinlog/parse_event_convert.go
starainrt 8469c11373
refactor(parse): 拆分 parse.go 并修复事务/过滤一致性问题
- 将臃肿的 parse.go 按职责拆分为多个模块:
    parse_types.go、parse_io.go、parse_event_convert.go、parse_stream.go、parse_filter.go
  - parse.go 保留为模块入口说明,提升可维护性与可读性
  - 修复事务状态被覆盖问题(BEGIN/COMMIT/ROLLBACK 不再被重置为 PREPARE)
  - 增加 include-tables 与 exclude-tables 互斥校验,同时配置时直接报配置错误
  - 强化表匹配器模式校验,并补充非法模式测试
  - 在明细过滤后重算事务统计(RowsCount/StartPos/EndPos/Size),避免统计失真
  - 增加 TABLE_MAP 事件转换,补充列元信息透传(ColumnTypes/ColumnCollationIDs)
  - 基于 unsigned 元数据规范化行值,避免无符号整型被渲染为负数
  - 优化事件解析报错信息:增加有界 body 十六进制预览
  - 补充单元测试:payload/tablemap 转换、unsigned 规范化、过滤逻辑、IO 预览
2026-03-19 17:04:35 +08:00

258 lines
6.0 KiB
Go

package binlog
import (
"fmt"
"b612.me/mysql/gtid"
"github.com/starainrt/go-mysql/mysql"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var buf [1]BinlogEvent
sig := &buf[0]
switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.TABLE_MAP_EVENT:
tableEvent, ok := ev.Event.(*replication.TableMapEvent)
if !ok {
return nil
}
sig.DB = string(tableEvent.Schema)
sig.TB = string(tableEvent.Table)
sig.Type = "tablemap"
case replication.ROWS_QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
if !ok {
return nil
}
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
case replication.QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok {
return nil
}
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
case replication.MARIADB_GTID_EVENT:
sig.Data = "begin"
sig.Type = "query"
case replication.XID_EVENT:
sig.Data = "commit"
sig.Type = "query"
case replication.GTID_EVENT:
ge, ok := ev.Event.(*replication.GTIDEvent)
if !ok {
return nil
}
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err != nil {
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
} else {
sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
if !ok {
return nil
}
res := make([]BinlogEvent, 0, len(ge.Events))
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
compressionType := getCompressionTypeName(ge.CompressionType)
for idx := range res {
res[idx].CompressionType = compressionType
}
return res
}
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
return buf[:]
}
func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} {
if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 {
if wrEvent == nil {
return nil
}
return wrEvent.Rows
}
unsignedMap := wrEvent.Table.UnsignedMap()
if len(unsignedMap) == 0 {
return wrEvent.Rows
}
columnTypes := wrEvent.Table.ColumnType
if len(columnTypes) == 0 {
return wrEvent.Rows
}
for rowIdx := range wrEvent.Rows {
row := wrEvent.Rows[rowIdx]
for colIdx := range row {
if !unsignedMap[colIdx] {
continue
}
if colIdx >= len(columnTypes) {
continue
}
row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx])
}
}
return wrEvent.Rows
}
func cloneColumnTypes(table *replication.TableMapEvent) []int {
if table == nil || len(table.ColumnType) == 0 {
return nil
}
ret := make([]int, len(table.ColumnType))
for i, t := range table.ColumnType {
ret[i] = int(t)
}
return ret
}
func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 {
if table == nil {
return nil
}
columnCount := len(table.ColumnType)
if columnCount == 0 && table.ColumnCount > 0 {
columnCount = int(table.ColumnCount)
}
if columnCount == 0 {
return nil
}
ret := make([]uint64, columnCount)
hasValue := false
for idx, collationID := range table.CollationMap() {
if idx < 0 || idx >= columnCount {
continue
}
ret[idx] = collationID
hasValue = hasValue || collationID != 0
}
for idx, collationID := range table.EnumSetCollationMap() {
if idx < 0 || idx >= columnCount {
continue
}
if ret[idx] == 0 {
ret[idx] = collationID
}
hasValue = hasValue || collationID != 0
}
if !hasValue {
return nil
}
return ret
}
func normalizeUnsignedValue(v interface{}, colType byte) interface{} {
signed, ok := signedToInt64(v)
if !ok {
return v
}
switch colType {
case mysql.MYSQL_TYPE_TINY:
return uint8(signed)
case mysql.MYSQL_TYPE_SHORT:
return uint16(signed)
case mysql.MYSQL_TYPE_INT24:
return uint32(uint32(int32(signed)) & 0x00FFFFFF)
case mysql.MYSQL_TYPE_LONG:
return uint32(signed)
case mysql.MYSQL_TYPE_LONGLONG:
return uint64(signed)
default:
return v
}
}
func signedToInt64(v interface{}) (int64, bool) {
switch x := v.(type) {
case int8:
return int64(x), true
case int16:
return int64(x), true
case int32:
return int64(x), true
case int64:
return x, true
case int:
return int64(x), true
default:
return 0, false
}
}
func getCompressionTypeName(code uint64) string {
switch code {
case CompressionZSTD:
return "ZSTD"
case CompressionNone:
return ""
default:
return fmt.Sprintf("UNKNOWN(%d)", code)
}
}