From 8b0423eb94af2ab5537689d9feae0085d0049bda Mon Sep 17 00:00:00 2001 From: starainrt Date: Thu, 29 Jun 2023 13:16:42 +0800 Subject: [PATCH] support binlog compression --- parse.go | 392 ++++++++++++++++++++++++++------------------------ parse_test.go | 13 ++ 2 files changed, 216 insertions(+), 189 deletions(-) diff --git a/parse.go b/parse.go index df37894..0de8e49 100644 --- a/parse.go +++ b/parse.go @@ -12,16 +12,17 @@ import ( ) type TxDetail struct { - StartPos int - EndPos int - RowCount int - Timestamp int64 - Time time.Time - Sql string - Db string - Table string - SqlType string - Rows [][]interface{} + StartPos int + EndPos int + RowCount int + Timestamp int64 + Time time.Time + Sql string + Db string + Table string + SqlType string + CompressionType string + Rows [][]interface{} } type Transaction struct { @@ -80,13 +81,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { var ( err error n int64 - db string = "" - tb string = "" - sql string = "" - sqlType string = "" - rowCnt uint32 = 0 tbMapPos uint32 = 0 - rows [][]interface{} ) var tx Transaction currentGtid := "" @@ -157,100 +152,105 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data - db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) - startPos := 0 - if sqlType == "query" || sqlType == "gtid" { - startPos = int(h.LogPos - h.EventSize) - //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } else { - startPos = int(tbMapPos) - //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } - switch sqlType { - case "gtid": - if currentGtid != "" { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ + evs := ParseBinlogEvent(binEvent) + for _, ev := range evs { + startPos := 0 + if ev.Type == "query" || ev.Type == "gtid" { + startPos = int(h.LogPos - h.EventSize) + //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } else { + startPos = int(tbMapPos) + //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } + switch ev.Type { + case "gtid": + if currentGtid != "" { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v + } + tx.Size = tx.EndPos - tx.StartPos + if f != nil { + f(tx) } - tx.RowsCount += v.RowCount - tx.Txs[k] = v } - tx.Size = tx.EndPos - tx.StartPos - if f != nil { - f(tx) + currentGtid = ev.Data + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), } + case "": + tx.EndPos = int(h.LogPos) + continue + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) + default: + tx.EndPos = int(h.LogPos) + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: ev.DB, + Table: ev.TB, + Sql: ev.Data, + SqlType: ev.Type, + Rows: ev.Rows, + RowCount: int(ev.RowCnt), + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + CompressionType: ev.CompressionType, + }) } - currentGtid = sql - tx = Transaction{ - GTID: sql, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - } - case "": - tx.EndPos = int(h.LogPos) - continue - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, sql) - default: - tx.EndPos = int(h.LogPos) - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: db, - Table: tb, - Sql: sql, - SqlType: sqlType, - Rows: rows, - RowCount: int(rowCnt), - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - }) } } } -func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32, [][]interface{}) { - var ( - db string = "" - tb string = "" - sql string = "" - sqlType string = "" - rowCnt uint32 = 0 - rows [][]interface{} - ) +type BinlogEvent struct { + Type string + DB string + TB string + Data string + RowCnt uint32 + Rows [][]interface{} + CompressionType string +} +func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { + var res []BinlogEvent + var sig BinlogEvent switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: //ge := ev.Event.(*replication.GTIDEvent) - sql = "anonymous-gtid-event:1" - sqlType = "gtid" + sig.Data = "anonymous-gtid-event:1" + sig.Type = "gtid" case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: - wrEvent := ev.Event.(*replication.RowsEvent) - db = string(wrEvent.Table.Schema) - tb = string(wrEvent.Table.Table) - sqlType = "insert" - rowCnt = uint32(len(wrEvent.Rows)) - rows = wrEvent.Rows + sig.TB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "insert" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = wrEvent.Rows case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) - db = string(wrEvent.Table.Schema) - tb = string(wrEvent.Table.Table) - sqlType = "update" - rowCnt = uint32(len(wrEvent.Rows)) / 2 - rows = wrEvent.Rows + sig.DB = string(wrEvent.Table.Schema) + sig.DB = string(wrEvent.Table.Table) + sig.Type = "update" + sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 + sig.Rows = wrEvent.Rows case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: @@ -258,52 +258,66 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, //replication.TABLE_MAP_EVENT: wrEvent := ev.Event.(*replication.RowsEvent) - db = string(wrEvent.Table.Schema) - tb = string(wrEvent.Table.Table) - sqlType = "delete" - rowCnt = uint32(len(wrEvent.Rows)) - rows = wrEvent.Rows + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "delete" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = wrEvent.Rows case replication.ROWS_QUERY_EVENT: queryEvent := ev.Event.(*replication.RowsQueryEvent) - sql = string(queryEvent.Query) - sqlType = "rowsquery" + sig.Data = string(queryEvent.Query) + sig.Type = "rowsquery" case replication.QUERY_EVENT: queryEvent := ev.Event.(*replication.QueryEvent) - db = string(queryEvent.Schema) - sql = string(queryEvent.Query) - sqlType = "query" + sig.DB = string(queryEvent.Schema) + sig.Data = string(queryEvent.Query) + sig.Type = "query" case replication.MARIADB_GTID_EVENT: // For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl). //https://mariadb.com/kb/en/library/gtid_event/ - sql = "begin" - sqlType = "query" + sig.Data = "begin" + sig.Type = "query" case replication.XID_EVENT: // XID_EVENT represents commit。rollback transaction not in binlog - sql = "commit" - sqlType = "query" + sig.Data = "commit" + sig.Type = "query" case replication.GTID_EVENT: ge := ev.Event.(*replication.GTIDEvent) gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err == nil { - sql = gid.String() + sig.Data = gid.String() + } + sig.Type = "gtid" + case replication.TRANSACTION_PAYLOAD_EVENT: + ge := ev.Event.(*replication.TransactionPayloadEvent) + for _, val := range ge.Events { + res = append(res, ParseBinlogEvent(val)...) + } + for idx := range res { + if ge.CompressionType == 0 { + res[idx].CompressionType = "ZSTD" + } else if ge.CompressionType != 255 { + res[idx].CompressionType = "UNKNOWN" + } } - sqlType = "gtid" + return res } - return db, tb, sqlType, sql, rowCnt, rows - + res = append(res, sig) + return res } type BinlogFilter struct { - IncludeGtid string - ExcludeGtid string - StartPos int - EndPos int - StartDate time.Time - EndDate time.Time - BigThan int - SmallThan int + IncludeGtid string + ExcludeGtid string + StartPos int + EndPos int + StartDate time.Time + EndDate time.Time + BigThan int + SmallThan int + OnlyShowGtid bool } func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { @@ -325,14 +339,8 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter // process: 0, continue: 1, break: 2, EOF: 3 var ( n int64 - db string = "" - tb string = "" - sql string = "" - sqlType string = "" - rowCnt uint32 = 0 tbMapPos uint32 = 0 skipTillNext bool = false - rows [][]interface{} ) var tx Transaction @@ -398,6 +406,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter if skipTillNext && h.EventType != replication.GTID_EVENT { continue } + if filter.OnlyShowGtid && h.EventType != replication.GTID_EVENT { + continue + } //h.Dump(os.Stdout) data := buf.Bytes() @@ -426,78 +437,81 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data - db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) - startPos := 0 - if sqlType == "query" || sqlType == "gtid" { - startPos = int(h.LogPos - h.EventSize) - //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } else { - startPos = int(tbMapPos) - //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } - switch sqlType { - case "gtid": - if skipTillNext { - skipTillNext = false + evs := ParseBinlogEvent(binEvent) + for _, ev := range evs { + startPos := 0 + if ev.Type == "query" || ev.Type == "gtid" { + startPos = int(h.LogPos - h.EventSize) + //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } else { + startPos = int(tbMapPos) + //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } - if currentGtid != "" { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ + switch ev.Type { + case "gtid": + if skipTillNext { + skipTillNext = false + } + if currentGtid != "" { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v } - tx.RowsCount += v.RowCount - tx.Txs[k] = v + tx.Size = tx.EndPos - tx.StartPos + callFn(tx) } - tx.Size = tx.EndPos - tx.StartPos - callFn(tx) - } - currentGtid = sql - if inGtid != nil { - if c, _ := inGtid.Contain(sql); !c { - currentGtid = "" - skipTillNext = true - continue + currentGtid = ev.Data + if inGtid != nil { + if c, _ := inGtid.Contain(ev.Data); !c { + currentGtid = "" + skipTillNext = true + continue + } } - } - if exGtid != nil { - if c, _ := exGtid.Contain(sql); c { - currentGtid = "" - skipTillNext = true - continue + if exGtid != nil { + if c, _ := exGtid.Contain(ev.Data); c { + currentGtid = "" + skipTillNext = true + continue + } } + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + } + case "": + tx.EndPos = int(h.LogPos) + continue + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) + default: + tx.EndPos = int(h.LogPos) + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: ev.DB, + Table: ev.TB, + Sql: ev.Data, + SqlType: ev.Type, + Rows: ev.Rows, + RowCount: int(ev.RowCnt), + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + CompressionType: ev.CompressionType, + }) } - tx = Transaction{ - GTID: sql, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - } - case "": - tx.EndPos = int(h.LogPos) - continue - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, sql) - default: - tx.EndPos = int(h.LogPos) - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: db, - Table: tb, - Sql: sql, - SqlType: sqlType, - Rows: rows, - RowCount: int(rowCnt), - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - }) } } } diff --git a/parse_test.go b/parse_test.go index 5b5dcdc..14b2533 100644 --- a/parse_test.go +++ b/parse_test.go @@ -2,6 +2,7 @@ package binlog import ( "fmt" + "os" "testing" "time" ) @@ -26,3 +27,15 @@ func TestParseFilter(t *testing.T) { fmt.Println(transaction) }) } + +func TestParseExternal(t *testing.T) { + file := `C:\mysql-bin.000001` + if _, err := os.Stat(file); err != nil { + return + } + ParseBinlogWithFilter(file, 0, BinlogFilter{ + OnlyShowGtid: true, + }, func(transaction Transaction) { + fmt.Println(transaction) + }) +}