mysqlbinlog/parse_event_convert.go

258 lines
6.0 KiB
Go
Raw Permalink Normal View History

package binlog
import (
"fmt"
"b612.me/mysql/gtid"
"github.com/starainrt/go-mysql/mysql"
"github.com/starainrt/go-mysql/replication"
)
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var buf [1]BinlogEvent
sig := &buf[0]
switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
wrEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return nil
}
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = normalizeRowsByUnsigned(wrEvent)
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
case replication.TABLE_MAP_EVENT:
tableEvent, ok := ev.Event.(*replication.TableMapEvent)
if !ok {
return nil
}
sig.DB = string(tableEvent.Schema)
sig.TB = string(tableEvent.Table)
sig.Type = "tablemap"
case replication.ROWS_QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
if !ok {
return nil
}
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
case replication.QUERY_EVENT:
queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok {
return nil
}
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
case replication.MARIADB_GTID_EVENT:
sig.Data = "begin"
sig.Type = "query"
case replication.XID_EVENT:
sig.Data = "commit"
sig.Type = "query"
case replication.GTID_EVENT:
ge, ok := ev.Event.(*replication.GTIDEvent)
if !ok {
return nil
}
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err != nil {
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
} else {
sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
if !ok {
return nil
}
res := make([]BinlogEvent, 0, len(ge.Events))
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
compressionType := getCompressionTypeName(ge.CompressionType)
for idx := range res {
res[idx].CompressionType = compressionType
}
return res
}
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
return buf[:]
}
func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} {
if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 {
if wrEvent == nil {
return nil
}
return wrEvent.Rows
}
unsignedMap := wrEvent.Table.UnsignedMap()
if len(unsignedMap) == 0 {
return wrEvent.Rows
}
columnTypes := wrEvent.Table.ColumnType
if len(columnTypes) == 0 {
return wrEvent.Rows
}
for rowIdx := range wrEvent.Rows {
row := wrEvent.Rows[rowIdx]
for colIdx := range row {
if !unsignedMap[colIdx] {
continue
}
if colIdx >= len(columnTypes) {
continue
}
row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx])
}
}
return wrEvent.Rows
}
func cloneColumnTypes(table *replication.TableMapEvent) []int {
if table == nil || len(table.ColumnType) == 0 {
return nil
}
ret := make([]int, len(table.ColumnType))
for i, t := range table.ColumnType {
ret[i] = int(t)
}
return ret
}
func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 {
if table == nil {
return nil
}
columnCount := len(table.ColumnType)
if columnCount == 0 && table.ColumnCount > 0 {
columnCount = int(table.ColumnCount)
}
if columnCount == 0 {
return nil
}
ret := make([]uint64, columnCount)
hasValue := false
for idx, collationID := range table.CollationMap() {
if idx < 0 || idx >= columnCount {
continue
}
ret[idx] = collationID
hasValue = hasValue || collationID != 0
}
for idx, collationID := range table.EnumSetCollationMap() {
if idx < 0 || idx >= columnCount {
continue
}
if ret[idx] == 0 {
ret[idx] = collationID
}
hasValue = hasValue || collationID != 0
}
if !hasValue {
return nil
}
return ret
}
func normalizeUnsignedValue(v interface{}, colType byte) interface{} {
signed, ok := signedToInt64(v)
if !ok {
return v
}
switch colType {
case mysql.MYSQL_TYPE_TINY:
return uint8(signed)
case mysql.MYSQL_TYPE_SHORT:
return uint16(signed)
case mysql.MYSQL_TYPE_INT24:
return uint32(uint32(int32(signed)) & 0x00FFFFFF)
case mysql.MYSQL_TYPE_LONG:
return uint32(signed)
case mysql.MYSQL_TYPE_LONGLONG:
return uint64(signed)
default:
return v
}
}
func signedToInt64(v interface{}) (int64, bool) {
switch x := v.(type) {
case int8:
return int64(x), true
case int16:
return int64(x), true
case int32:
return int64(x), true
case int64:
return x, true
case int:
return int64(x), true
default:
return 0, false
}
}
func getCompressionTypeName(code uint64) string {
switch code {
case CompressionZSTD:
return "ZSTD"
case CompressionNone:
return ""
default:
return fmt.Sprintf("UNKNOWN(%d)", code)
}
}