package binlog import ( "fmt" "b612.me/mysql/gtid" "github.com/starainrt/go-mysql/mysql" "github.com/starainrt/go-mysql/replication" ) func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { var buf [1]BinlogEvent sig := &buf[0] switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: sig.Data = "anonymous-gtid-event:1" sig.Type = "gtid" case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { return nil } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "insert" sig.RowCnt = uint32(len(wrEvent.Rows)) sig.Rows = normalizeRowsByUnsigned(wrEvent) sig.ColumnTypes = cloneColumnTypes(wrEvent.Table) sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table) case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { return nil } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "update" sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 sig.Rows = normalizeRowsByUnsigned(wrEvent) sig.ColumnTypes = cloneColumnTypes(wrEvent.Table) sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table) case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { return nil } sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "delete" sig.RowCnt = uint32(len(wrEvent.Rows)) sig.Rows = normalizeRowsByUnsigned(wrEvent) sig.ColumnTypes = cloneColumnTypes(wrEvent.Table) sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table) case replication.TABLE_MAP_EVENT: tableEvent, ok := ev.Event.(*replication.TableMapEvent) if !ok { return nil } sig.DB = string(tableEvent.Schema) sig.TB = string(tableEvent.Table) sig.Type = "tablemap" case replication.ROWS_QUERY_EVENT: queryEvent, ok := ev.Event.(*replication.RowsQueryEvent) if !ok { return nil } sig.Data = string(queryEvent.Query) sig.Type = "rowsquery" case replication.QUERY_EVENT: queryEvent, ok := ev.Event.(*replication.QueryEvent) if !ok { return nil } sig.DB = string(queryEvent.Schema) sig.Data = string(queryEvent.Query) sig.Type = "query" case replication.MARIADB_GTID_EVENT: sig.Data = "begin" sig.Type = "query" case replication.XID_EVENT: sig.Data = "commit" sig.Type = "query" case replication.GTID_EVENT: ge, ok := ev.Event.(*replication.GTIDEvent) if !ok { return nil } gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err != nil { sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO) } else { sig.Data = gid.String() } sig.Type = "gtid" case replication.TRANSACTION_PAYLOAD_EVENT: ge, ok := ev.Event.(*replication.TransactionPayloadEvent) if !ok { return nil } res := make([]BinlogEvent, 0, len(ge.Events)) for _, val := range ge.Events { res = append(res, ParseBinlogEvent(val)...) } compressionType := getCompressionTypeName(ge.CompressionType) for idx := range res { res[idx].CompressionType = compressionType } return res } // 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。 return buf[:] } func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} { if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 { if wrEvent == nil { return nil } return wrEvent.Rows } unsignedMap := wrEvent.Table.UnsignedMap() if len(unsignedMap) == 0 { return wrEvent.Rows } columnTypes := wrEvent.Table.ColumnType if len(columnTypes) == 0 { return wrEvent.Rows } for rowIdx := range wrEvent.Rows { row := wrEvent.Rows[rowIdx] for colIdx := range row { if !unsignedMap[colIdx] { continue } if colIdx >= len(columnTypes) { continue } row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx]) } } return wrEvent.Rows } func cloneColumnTypes(table *replication.TableMapEvent) []int { if table == nil || len(table.ColumnType) == 0 { return nil } ret := make([]int, len(table.ColumnType)) for i, t := range table.ColumnType { ret[i] = int(t) } return ret } func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 { if table == nil { return nil } columnCount := len(table.ColumnType) if columnCount == 0 && table.ColumnCount > 0 { columnCount = int(table.ColumnCount) } if columnCount == 0 { return nil } ret := make([]uint64, columnCount) hasValue := false for idx, collationID := range table.CollationMap() { if idx < 0 || idx >= columnCount { continue } ret[idx] = collationID hasValue = hasValue || collationID != 0 } for idx, collationID := range table.EnumSetCollationMap() { if idx < 0 || idx >= columnCount { continue } if ret[idx] == 0 { ret[idx] = collationID } hasValue = hasValue || collationID != 0 } if !hasValue { return nil } return ret } func normalizeUnsignedValue(v interface{}, colType byte) interface{} { signed, ok := signedToInt64(v) if !ok { return v } switch colType { case mysql.MYSQL_TYPE_TINY: return uint8(signed) case mysql.MYSQL_TYPE_SHORT: return uint16(signed) case mysql.MYSQL_TYPE_INT24: return uint32(uint32(int32(signed)) & 0x00FFFFFF) case mysql.MYSQL_TYPE_LONG: return uint32(signed) case mysql.MYSQL_TYPE_LONGLONG: return uint64(signed) default: return v } } func signedToInt64(v interface{}) (int64, bool) { switch x := v.(type) { case int8: return int64(x), true case int16: return int64(x), true case int32: return int64(x), true case int64: return x, true case int: return int64(x), true default: return 0, false } } func getCompressionTypeName(code uint64) string { switch code { case CompressionZSTD: return "ZSTD" case CompressionNone: return "" default: return fmt.Sprintf("UNKNOWN(%d)", code) } }