258 lines
6.0 KiB
Go
258 lines
6.0 KiB
Go
|
|
package binlog
|
||
|
|
|
||
|
|
import (
|
||
|
|
"fmt"
|
||
|
|
|
||
|
|
"b612.me/mysql/gtid"
|
||
|
|
"github.com/starainrt/go-mysql/mysql"
|
||
|
|
"github.com/starainrt/go-mysql/replication"
|
||
|
|
)
|
||
|
|
|
||
|
|
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
|
||
|
|
var buf [1]BinlogEvent
|
||
|
|
sig := &buf[0]
|
||
|
|
|
||
|
|
switch ev.Header.EventType {
|
||
|
|
case replication.ANONYMOUS_GTID_EVENT:
|
||
|
|
sig.Data = "anonymous-gtid-event:1"
|
||
|
|
sig.Type = "gtid"
|
||
|
|
|
||
|
|
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
||
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
sig.DB = string(wrEvent.Table.Schema)
|
||
|
|
sig.TB = string(wrEvent.Table.Table)
|
||
|
|
sig.Type = "insert"
|
||
|
|
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||
|
|
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
||
|
|
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
||
|
|
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
||
|
|
|
||
|
|
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
||
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
sig.DB = string(wrEvent.Table.Schema)
|
||
|
|
sig.TB = string(wrEvent.Table.Table)
|
||
|
|
sig.Type = "update"
|
||
|
|
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
|
||
|
|
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
||
|
|
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
||
|
|
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
||
|
|
|
||
|
|
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
||
|
|
wrEvent, ok := ev.Event.(*replication.RowsEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
sig.DB = string(wrEvent.Table.Schema)
|
||
|
|
sig.TB = string(wrEvent.Table.Table)
|
||
|
|
sig.Type = "delete"
|
||
|
|
sig.RowCnt = uint32(len(wrEvent.Rows))
|
||
|
|
sig.Rows = normalizeRowsByUnsigned(wrEvent)
|
||
|
|
sig.ColumnTypes = cloneColumnTypes(wrEvent.Table)
|
||
|
|
sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table)
|
||
|
|
|
||
|
|
case replication.TABLE_MAP_EVENT:
|
||
|
|
tableEvent, ok := ev.Event.(*replication.TableMapEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
sig.DB = string(tableEvent.Schema)
|
||
|
|
sig.TB = string(tableEvent.Table)
|
||
|
|
sig.Type = "tablemap"
|
||
|
|
|
||
|
|
case replication.ROWS_QUERY_EVENT:
|
||
|
|
queryEvent, ok := ev.Event.(*replication.RowsQueryEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
sig.Data = string(queryEvent.Query)
|
||
|
|
sig.Type = "rowsquery"
|
||
|
|
|
||
|
|
case replication.QUERY_EVENT:
|
||
|
|
queryEvent, ok := ev.Event.(*replication.QueryEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
sig.DB = string(queryEvent.Schema)
|
||
|
|
sig.Data = string(queryEvent.Query)
|
||
|
|
sig.Type = "query"
|
||
|
|
|
||
|
|
case replication.MARIADB_GTID_EVENT:
|
||
|
|
sig.Data = "begin"
|
||
|
|
sig.Type = "query"
|
||
|
|
|
||
|
|
case replication.XID_EVENT:
|
||
|
|
sig.Data = "commit"
|
||
|
|
sig.Type = "query"
|
||
|
|
|
||
|
|
case replication.GTID_EVENT:
|
||
|
|
ge, ok := ev.Event.(*replication.GTIDEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
|
||
|
|
if err != nil {
|
||
|
|
sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO)
|
||
|
|
} else {
|
||
|
|
sig.Data = gid.String()
|
||
|
|
}
|
||
|
|
sig.Type = "gtid"
|
||
|
|
|
||
|
|
case replication.TRANSACTION_PAYLOAD_EVENT:
|
||
|
|
ge, ok := ev.Event.(*replication.TransactionPayloadEvent)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
res := make([]BinlogEvent, 0, len(ge.Events))
|
||
|
|
for _, val := range ge.Events {
|
||
|
|
res = append(res, ParseBinlogEvent(val)...)
|
||
|
|
}
|
||
|
|
compressionType := getCompressionTypeName(ge.CompressionType)
|
||
|
|
for idx := range res {
|
||
|
|
res[idx].CompressionType = compressionType
|
||
|
|
}
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
// 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。
|
||
|
|
return buf[:]
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} {
|
||
|
|
if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 {
|
||
|
|
if wrEvent == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return wrEvent.Rows
|
||
|
|
}
|
||
|
|
|
||
|
|
unsignedMap := wrEvent.Table.UnsignedMap()
|
||
|
|
if len(unsignedMap) == 0 {
|
||
|
|
return wrEvent.Rows
|
||
|
|
}
|
||
|
|
|
||
|
|
columnTypes := wrEvent.Table.ColumnType
|
||
|
|
if len(columnTypes) == 0 {
|
||
|
|
return wrEvent.Rows
|
||
|
|
}
|
||
|
|
|
||
|
|
for rowIdx := range wrEvent.Rows {
|
||
|
|
row := wrEvent.Rows[rowIdx]
|
||
|
|
for colIdx := range row {
|
||
|
|
if !unsignedMap[colIdx] {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if colIdx >= len(columnTypes) {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx])
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return wrEvent.Rows
|
||
|
|
}
|
||
|
|
|
||
|
|
func cloneColumnTypes(table *replication.TableMapEvent) []int {
|
||
|
|
if table == nil || len(table.ColumnType) == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
ret := make([]int, len(table.ColumnType))
|
||
|
|
for i, t := range table.ColumnType {
|
||
|
|
ret[i] = int(t)
|
||
|
|
}
|
||
|
|
return ret
|
||
|
|
}
|
||
|
|
|
||
|
|
func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 {
|
||
|
|
if table == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
columnCount := len(table.ColumnType)
|
||
|
|
if columnCount == 0 && table.ColumnCount > 0 {
|
||
|
|
columnCount = int(table.ColumnCount)
|
||
|
|
}
|
||
|
|
if columnCount == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
ret := make([]uint64, columnCount)
|
||
|
|
hasValue := false
|
||
|
|
|
||
|
|
for idx, collationID := range table.CollationMap() {
|
||
|
|
if idx < 0 || idx >= columnCount {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
ret[idx] = collationID
|
||
|
|
hasValue = hasValue || collationID != 0
|
||
|
|
}
|
||
|
|
for idx, collationID := range table.EnumSetCollationMap() {
|
||
|
|
if idx < 0 || idx >= columnCount {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if ret[idx] == 0 {
|
||
|
|
ret[idx] = collationID
|
||
|
|
}
|
||
|
|
hasValue = hasValue || collationID != 0
|
||
|
|
}
|
||
|
|
|
||
|
|
if !hasValue {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return ret
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeUnsignedValue(v interface{}, colType byte) interface{} {
|
||
|
|
signed, ok := signedToInt64(v)
|
||
|
|
if !ok {
|
||
|
|
return v
|
||
|
|
}
|
||
|
|
|
||
|
|
switch colType {
|
||
|
|
case mysql.MYSQL_TYPE_TINY:
|
||
|
|
return uint8(signed)
|
||
|
|
case mysql.MYSQL_TYPE_SHORT:
|
||
|
|
return uint16(signed)
|
||
|
|
case mysql.MYSQL_TYPE_INT24:
|
||
|
|
return uint32(uint32(int32(signed)) & 0x00FFFFFF)
|
||
|
|
case mysql.MYSQL_TYPE_LONG:
|
||
|
|
return uint32(signed)
|
||
|
|
case mysql.MYSQL_TYPE_LONGLONG:
|
||
|
|
return uint64(signed)
|
||
|
|
default:
|
||
|
|
return v
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func signedToInt64(v interface{}) (int64, bool) {
|
||
|
|
switch x := v.(type) {
|
||
|
|
case int8:
|
||
|
|
return int64(x), true
|
||
|
|
case int16:
|
||
|
|
return int64(x), true
|
||
|
|
case int32:
|
||
|
|
return int64(x), true
|
||
|
|
case int64:
|
||
|
|
return x, true
|
||
|
|
case int:
|
||
|
|
return int64(x), true
|
||
|
|
default:
|
||
|
|
return 0, false
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func getCompressionTypeName(code uint64) string {
|
||
|
|
switch code {
|
||
|
|
case CompressionZSTD:
|
||
|
|
return "ZSTD"
|
||
|
|
case CompressionNone:
|
||
|
|
return ""
|
||
|
|
default:
|
||
|
|
return fmt.Sprintf("UNKNOWN(%d)", code)
|
||
|
|
}
|
||
|
|
}
|