- 将臃肿的 parse.go 按职责拆分为多个模块:
parse_types.go、parse_io.go、parse_event_convert.go、parse_stream.go、parse_filter.go
- parse.go 保留为模块入口说明,提升可维护性与可读性
- 修复事务状态被覆盖问题(BEGIN/COMMIT/ROLLBACK 不再被重置为 PREPARE)
- 增加 include-tables 与 exclude-tables 互斥校验,同时配置时直接报配置错误
- 强化表匹配器模式校验,并补充非法模式测试
- 在明细过滤后重算事务统计(RowsCount/StartPos/EndPos/Size),避免统计失真
- 增加 TABLE_MAP 事件转换,补充列元信息透传(ColumnTypes/ColumnCollationIDs)
- 基于 unsigned 元数据规范化行值,避免无符号整型被渲染为负数
- 优化事件解析报错信息:增加有界 body 十六进制预览
- 补充单元测试:payload/tablemap 转换、unsigned 规范化、过滤逻辑、IO 预览
258 lines
6.0 KiB
Go
258 lines
6.0 KiB
Go
package binlog
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"b612.me/mysql/gtid"
|
|
"github.com/starainrt/go-mysql/mysql"
|
|
"github.com/starainrt/go-mysql/replication"
|
|
)
|
|
|
|
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
|
|
var buf [1]BinlogEvent
|
|
sig := &buf[0]
|
|
|
|
switch ev.Header.EventType {
|
|
case replication.ANONYMOUS_GTID_EVENT:
|
|
sig.Data = "anonymous-gtid-event:1"
|
|
sig.Type = "gtid"
|
|
|
|
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
sig.DB = string(wrEvent.Table.Schema)
|
|
sig.TB = string(wrEvent.Table.Table)
|
|
sig.Type = "insert"
|
|
sig.RowCnt = uint32(len(wrEvent.Rows))
|
|
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
|
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
|
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
|
|
|
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
sig.DB = string(wrEvent.Table.Schema)
|
|
sig.TB = string(wrEvent.Table.Table)
|
|
sig.Type = "update"
|
|
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
|
|
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
|
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
|
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
|
|
|
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
sig.DB = string(wrEvent.Table.Schema)
|
|
sig.TB = string(wrEvent.Table.Table)
|
|
sig.Type = "delete"
|
|
sig.RowCnt = uint32(len(wrEvent.Rows))
|
|
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
|
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
|
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
|
|
|
case replication.TABLE_MAP_EVENT:
|
|
tableEvent, ok := ev.Event.(*replication.TableMapEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
sig.DB = string(tableEvent.Schema)
|
|
sig.TB = string(tableEvent.Table)
|
|
sig.Type = "tablemap"
|
|
|
|
case replication.ROWS_QUERY_EVENT:
|
|
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
sig.Data = string(queryEvent.Query)
|
|
sig.Type = "rowsquery"
|
|
|
|
case replication.QUERY_EVENT:
|
|
queryEvent, ok := ev.Event.(*replication.QueryEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
sig.DB = string(queryEvent.Schema)
|
|
sig.Data = string(queryEvent.Query)
|
|
sig.Type = "query"
|
|
|
|
case replication.MARIADB_GTID_EVENT:
|
|
sig.Data = "begin"
|
|
sig.Type = "query"
|
|
|
|
case replication.XID_EVENT:
|
|
sig.Data = "commit"
|
|
sig.Type = "query"
|
|
|
|
case replication.GTID_EVENT:
|
|
ge, ok := ev.Event.(*replication.GTIDEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
|
|
if err != nil {
|
|
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
|
|
} else {
|
|
sig.Data = gid.String()
|
|
}
|
|
sig.Type = "gtid"
|
|
|
|
case replication.TRANSACTION_PAYLOAD_EVENT:
|
|
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
res := make([]BinlogEvent, 0, len(ge.Events))
|
|
for _, val := range ge.Events {
|
|
res = append(res, ParseBinlogEvent(val)...)
|
|
}
|
|
compressionType := getCompressionTypeName(ge.CompressionType)
|
|
for idx := range res {
|
|
res[idx].CompressionType = compressionType
|
|
}
|
|
return res
|
|
}
|
|
|
|
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
|
|
return buf[:]
|
|
}
|
|
|
|
func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} {
|
|
if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 {
|
|
if wrEvent == nil {
|
|
return nil
|
|
}
|
|
return wrEvent.Rows
|
|
}
|
|
|
|
unsignedMap := wrEvent.Table.UnsignedMap()
|
|
if len(unsignedMap) == 0 {
|
|
return wrEvent.Rows
|
|
}
|
|
|
|
columnTypes := wrEvent.Table.ColumnType
|
|
if len(columnTypes) == 0 {
|
|
return wrEvent.Rows
|
|
}
|
|
|
|
for rowIdx := range wrEvent.Rows {
|
|
row := wrEvent.Rows[rowIdx]
|
|
for colIdx := range row {
|
|
if !unsignedMap[colIdx] {
|
|
continue
|
|
}
|
|
if colIdx >= len(columnTypes) {
|
|
continue
|
|
}
|
|
row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx])
|
|
}
|
|
}
|
|
return wrEvent.Rows
|
|
}
|
|
|
|
func cloneColumnTypes(table *replication.TableMapEvent) []int {
|
|
if table == nil || len(table.ColumnType) == 0 {
|
|
return nil
|
|
}
|
|
ret := make([]int, len(table.ColumnType))
|
|
for i, t := range table.ColumnType {
|
|
ret[i] = int(t)
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 {
|
|
if table == nil {
|
|
return nil
|
|
}
|
|
|
|
columnCount := len(table.ColumnType)
|
|
if columnCount == 0 && table.ColumnCount > 0 {
|
|
columnCount = int(table.ColumnCount)
|
|
}
|
|
if columnCount == 0 {
|
|
return nil
|
|
}
|
|
|
|
ret := make([]uint64, columnCount)
|
|
hasValue := false
|
|
|
|
for idx, collationID := range table.CollationMap() {
|
|
if idx < 0 || idx >= columnCount {
|
|
continue
|
|
}
|
|
ret[idx] = collationID
|
|
hasValue = hasValue || collationID != 0
|
|
}
|
|
for idx, collationID := range table.EnumSetCollationMap() {
|
|
if idx < 0 || idx >= columnCount {
|
|
continue
|
|
}
|
|
if ret[idx] == 0 {
|
|
ret[idx] = collationID
|
|
}
|
|
hasValue = hasValue || collationID != 0
|
|
}
|
|
|
|
if !hasValue {
|
|
return nil
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func normalizeUnsignedValue(v interface{}, colType byte) interface{} {
|
|
signed, ok := signedToInt64(v)
|
|
if !ok {
|
|
return v
|
|
}
|
|
|
|
switch colType {
|
|
case mysql.MYSQL_TYPE_TINY:
|
|
return uint8(signed)
|
|
case mysql.MYSQL_TYPE_SHORT:
|
|
return uint16(signed)
|
|
case mysql.MYSQL_TYPE_INT24:
|
|
return uint32(uint32(int32(signed)) & 0x00FFFFFF)
|
|
case mysql.MYSQL_TYPE_LONG:
|
|
return uint32(signed)
|
|
case mysql.MYSQL_TYPE_LONGLONG:
|
|
return uint64(signed)
|
|
default:
|
|
return v
|
|
}
|
|
}
|
|
|
|
func signedToInt64(v interface{}) (int64, bool) {
|
|
switch x := v.(type) {
|
|
case int8:
|
|
return int64(x), true
|
|
case int16:
|
|
return int64(x), true
|
|
case int32:
|
|
return int64(x), true
|
|
case int64:
|
|
return x, true
|
|
case int:
|
|
return int64(x), true
|
|
default:
|
|
return 0, false
|
|
}
|
|
}
|
|
|
|
func getCompressionTypeName(code uint64) string {
|
|
switch code {
|
|
case CompressionZSTD:
|
|
return "ZSTD"
|
|
case CompressionNone:
|
|
return ""
|
|
default:
|
|
return fmt.Sprintf("UNKNOWN(%d)", code)
|
|
}
|
|
}
|