package binlog import ( "b612.me/mysql/gtid" "b612.me/staros" "bytes" "fmt" "github.com/starainrt/go-mysql/replication" "io" "os" "time" ) type TxDetail struct { StartPos int EndPos int RowCount int Timestamp int64 Time time.Time Sql string Db string Table string SqlType string } type Transaction struct { GTID string Timestamp int64 Time time.Time StartPos int EndPos int Size int RowsCount int sqlOrigin []string Txs []TxDetail } func (t Transaction) GetSqlOrigin() []string { return t.sqlOrigin } func ParseBinlogFile(path string, fx func(transaction Transaction)) error { return parseOneBinlog(path, fx) } func parseOneBinlog(path string, fx func(Transaction)) error { if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if f != nil { defer f.Close() } if err != nil { return err } fileTypeBytes := int64(4) b := make([]byte, fileTypeBytes) // 读取binlog头 if _, err = f.Read(b); err != nil { return err } else if !bytes.Equal(b, replication.BinLogFileHeader) { //不是binlog格式 return err } // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { return err } return parseBinlogDetail(f, fx) } func parseBinlogDetail(r io.Reader, f func(Transaction)) error { parse := replication.NewBinlogParser() parse.SetParseTime(false) parse.SetUseDecimal(false) // process: 0, continue: 1, break: 2, EOF: 3 var ( err error n int64 db string = "" tb string = "" sql string = "" sqlType string = "" rowCnt uint32 = 0 tbMapPos uint32 = 0 ) var tx Transaction currentGtid := "" for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { if tx.EndPos != 0 { f(tx) } return nil } else if err != nil { return err } var h *replication.EventHeader h, err = parse.ParseHeader(headBuf) if err != nil { return err } //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) if h.EventSize <= uint32(replication.EventHeaderSize) { err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) return err } var buf bytes.Buffer if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) return err } //h.Dump(os.Stdout) data := buf.Bytes() var rawData []byte rawData = append(rawData, headBuf...) rawData = append(rawData, data...) eventLen := int(h.EventSize) - replication.EventHeaderSize if len(data) != eventLen { err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) return err } var e replication.Event e, err = parse.ParseEvent(h, data, rawData) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event } //e.Dump(os.Stdout) //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data db, tb, sqlType, sql, rowCnt = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) startPos := 0 if sqlType == "query" || sqlType == "gtid" { startPos = int(h.LogPos - h.EventSize) //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } else { startPos = int(tbMapPos) //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } switch sqlType { case "gtid": if currentGtid != "" { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos if f != nil { f(tx) } } currentGtid = sql tx = Transaction{ GTID: sql, StartPos: startPos, Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), } case "": tx.EndPos = int(h.LogPos) continue case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, sql) default: tx.EndPos = int(h.LogPos) tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: db, Table: tb, Sql: sql, SqlType: sqlType, RowCount: int(rowCnt), Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), }) } } } func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32) { var ( db string = "" tb string = "" sql string = "" sqlType string = "" rowCnt uint32 = 0 ) switch ev.Header.EventType { case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) db = string(wrEvent.Table.Schema) tb = string(wrEvent.Table.Table) sqlType = "insert" rowCnt = uint32(len(wrEvent.Rows)) case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) db = string(wrEvent.Table.Schema) tb = string(wrEvent.Table.Table) sqlType = "update" rowCnt = uint32(len(wrEvent.Rows)) / 2 case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: //replication.XID_EVENT, //replication.TABLE_MAP_EVENT: wrEvent := ev.Event.(*replication.RowsEvent) db = string(wrEvent.Table.Schema) tb = string(wrEvent.Table.Table) sqlType = "delete" rowCnt = uint32(len(wrEvent.Rows)) case replication.ROWS_QUERY_EVENT: queryEvent := ev.Event.(*replication.RowsQueryEvent) sql = string(queryEvent.Query) sqlType = "rowsquery" case replication.QUERY_EVENT: queryEvent := ev.Event.(*replication.QueryEvent) db = string(queryEvent.Schema) sql = string(queryEvent.Query) sqlType = "query" case replication.MARIADB_GTID_EVENT: // For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl). //https://mariadb.com/kb/en/library/gtid_event/ sql = "begin" sqlType = "query" case replication.XID_EVENT: // XID_EVENT represents commit。rollback transaction not in binlog sql = "commit" sqlType = "query" case replication.GTID_EVENT: ge := ev.Event.(*replication.GTIDEvent) gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err == nil { sql = gid.String() } sqlType = "gtid" } return db, tb, sqlType, sql, rowCnt }