master
兔子 2 years ago
parent 2954eaf9e7
commit c196beb9aa

@ -3,7 +3,7 @@ module binlog
go 1.20 go 1.20
require ( require (
b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9
b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044
b612.me/starlog v1.3.2 b612.me/starlog v1.3.2
b612.me/staros v1.1.6 b612.me/staros v1.1.6

@ -1,5 +1,5 @@
b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 h1:U5z6K7FTtGwAhDJn3TFqRGkuXYd623osfsus0AGwAPg= b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9 h1:6U/hChR8T9L9v+0olHpJRlx4iDiFj1e5psWVjP668jo=
b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA=
b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA=
b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94=
b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo= b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo=

@ -31,9 +31,11 @@ var (
pos int64 pos int64
prefix string prefix string
outasRow bool outasRow bool
counts int
) )
func init() { func init() {
cmd.Flags().IntVarP(&counts, "count", "c", 0, "counts of binlog transaction")
cmd.Flags().IntVarP(&endPos, "pos", "P", 0, "skipPos of binlog") cmd.Flags().IntVarP(&endPos, "pos", "P", 0, "skipPos of binlog")
cmd.Flags().IntVarP(&startPos, "start-pos", "S", 0, "startPos of binlog") cmd.Flags().IntVarP(&startPos, "start-pos", "S", 0, "startPos of binlog")
cmd.Flags().IntVarP(&endPos, "end-pos", "E", 0, "endPos of binlog") cmd.Flags().IntVarP(&endPos, "end-pos", "E", 0, "endPos of binlog")
@ -77,41 +79,11 @@ func ParseBinlog() {
foundCount := 0 foundCount := 0
var totalGtid *gtid.Gtid var totalGtid *gtid.Gtid
var owrt *xlsx.File var owrt *xlsx.File
var res *xlsx.Sheet
if outPath != "" { if outPath != "" {
owrt = xlsx.NewFile() owrt, err = prepareXlsx()
res, err = owrt.AddSheet("结果")
if err != nil { if err != nil {
starlog.Errorln(err)
return return
} }
title := res.AddRow()
title.AddCell().SetValue("序号")
title.AddCell().SetValue("GTID")
title.AddCell().SetValue("时间")
title.AddCell().SetValue("时间戳")
title.AddCell().SetValue("StartPos")
title.AddCell().SetValue("EndPos")
title.AddCell().SetValue("事务大小")
title.AddCell().SetValue("影响行数")
title.AddCell().SetValue("压缩类型")
title.AddCell().SetValue("单语句StartPos")
title.AddCell().SetValue("单语句EndPos")
title.AddCell().SetValue("单语句时间")
title.AddCell().SetValue("单语句影响行数")
title.AddCell().SetValue("单语句影响库")
title.AddCell().SetValue("单语句影响表")
title.AddCell().SetValue("SQL类型")
title.AddCell().SetValue("具体SQL")
title.AddCell().SetValue("从属事务编号")
title.AddCell().SetValue("同事务行编号")
title.AddCell().SetValue("行变更内容")
res.SetColWidth(0, 0, 5)
res.SetColWidth(1, 1, 40)
res.SetColWidth(3, 6, 6)
res.SetColWidth(7, 7, 5)
res.SetColWidth(16, 16, 40)
owrt.Save(outPath)
} }
getParser := func(fpath string) string { getParser := func(fpath string) string {
var sTime, eTime time.Time var sTime, eTime time.Time
@ -160,7 +132,7 @@ func ParseBinlog() {
} }
}() }()
} }
err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) { err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) bool {
foundCount++ foundCount++
if cGtid == nil { if cGtid == nil {
cGtid, _ = gtid.Parse(tx.GTID) cGtid, _ = gtid.Parse(tx.GTID)
@ -185,106 +157,118 @@ func ParseBinlog() {
if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") {
continue continue
} }
switch t.SqlType { _, sql := generateRowSql(t)
case "insert": fmt.Printf("GTID:\t%s\nTime:\t%s\nStartPos:\t%v\nEndPos:\t%v\nRowsCount:\t%v\nSQLOrigin:\t%v\nSQL:\t%+v\n\n",
for _, rows := range t.Rows { tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql)
setence := ""
for _, row := range rows {
switch row.(type) {
case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32:
setence += fmt.Sprintf("%v, ", row)
case string:
setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''"))
case []byte:
setence += fmt.Sprintf("%v, ", row)
case nil:
setence += fmt.Sprintf("%v, ", "NULL")
default:
setence += fmt.Sprintf("%v, ", row)
} }
} }
if setence != "" && len(setence) > 2 {
setence = setence[:len(setence)-2]
} }
sql := fmt.Sprintf(`INSERT INTO %s.%s VALUES (%v)`, t.Db, t.Table, setence) if outPath != "" {
fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", add2Xlsx(owrt, tx, foundCount)
tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql)
} }
case "update": if foundCount >= counts {
var sql string return false
var where string
for idxc, rows := range t.Rows {
setence := ""
spec := ", "
if idxc%2 == 0 {
spec = " AND "
} }
for idxf, row := range rows { return true
switch row.(type) { })
case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: if !vbo {
setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) time.Sleep(time.Millisecond * 500)
case string:
setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec)
case []byte:
setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec)
case nil:
setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec)
default:
setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec)
} }
var cGtidStr string
if cGtid != nil {
cGtidStr = cGtid.String()
} }
if setence != "" && len(setence) > 2 { if outPath != "" {
setence = setence[:len(setence)-len(spec)] err = owrt.Save(outPath)
if err != nil {
starlog.Errorln(err)
return cGtidStr
} }
if idxc%2 == 0 {
where = setence
continue
} }
sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) if err != nil {
fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", starlog.Errorln(err)
tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) return cGtidStr
} }
case "delete": return cGtidStr
for _, rows := range t.Rows {
setence := ""
spec := " AND "
for idxf, row := range rows {
switch row.(type) {
case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32:
setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec)
case string:
setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec)
case []byte:
setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec)
case nil:
setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec)
default:
setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec)
} }
var gtidRes [][]string
for _, fp := range filePath {
if staros.IsFolder(fp) {
files, err := os.ReadDir(fp)
if err != nil {
starlog.Errorln("read folder failed:", err)
return
} }
if setence != "" && len(setence) > 2 { rgp := regexp.MustCompile(`^` + prefix + `\.\d+$`)
setence = setence[:len(setence)-len(spec)] for _, v := range files {
if !v.IsDir() && rgp.MatchString(v.Name()) {
gtidRes = append(gtidRes, []string{v.Name(), getParser(filepath.Join(fp, v.Name()))})
} }
sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence)
fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n",
tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql)
} }
default: } else {
fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", getParser(fp)
tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, t.Rows)
} }
} }
if outPath != "" {
owrt.Save(outPath)
}
fmt.Println("")
if len(gtidRes) != 0 {
for _, v := range gtidRes {
fmt.Printf("%s:%s\n", v[0], v[1])
} }
} }
if outPath != "" { fmt.Println("")
allGtid := ""
if totalGtid != nil {
allGtid = totalGtid.String()
}
fmt.Printf("Total Gtid:%v\nTotal SQL Number:%v\n", allGtid, foundCount)
}
func prepareXlsx() (*xlsx.File, error) {
owrt := xlsx.NewFile()
res, err := owrt.AddSheet("结果")
if err != nil {
starlog.Errorln(err)
return owrt, err
}
title := res.AddRow()
title.AddCell().SetValue("序号")
title.AddCell().SetValue("GTID")
title.AddCell().SetValue("时间")
title.AddCell().SetValue("时间戳")
title.AddCell().SetValue("StartPos")
title.AddCell().SetValue("EndPos")
title.AddCell().SetValue("事务大小")
title.AddCell().SetValue("影响行数")
title.AddCell().SetValue("压缩类型")
title.AddCell().SetValue("单语句StartPos")
title.AddCell().SetValue("单语句EndPos")
title.AddCell().SetValue("单语句时间")
title.AddCell().SetValue("单语句影响行数")
title.AddCell().SetValue("单语句影响库")
title.AddCell().SetValue("单语句影响表")
title.AddCell().SetValue("SQL类型")
title.AddCell().SetValue("具体SQL")
title.AddCell().SetValue("从属事务编号")
title.AddCell().SetValue("同事务行编号")
title.AddCell().SetValue("行变更内容")
res.SetColWidth(0, 0, 5)
res.SetColWidth(1, 1, 40)
res.SetColWidth(3, 6, 6)
res.SetColWidth(7, 7, 5)
res.SetColWidth(16, 16, 40)
return owrt, owrt.Save(outPath)
}
func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) {
for k, t := range tx.Txs { for k, t := range tx.Txs {
if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") {
continue continue
} }
addRow := func() *xlsx.Row { addRow := func() *xlsx.Row {
r := res.AddRow() r := owrt.Sheets[0].AddRow()
r.AddCell().SetValue(foundCount) r.AddCell().SetValue(foundCount)
r.AddCell().SetValue(tx.GTID) r.AddCell().SetValue(tx.GTID)
r.AddCell().SetValue(tx.Time.String()) r.AddCell().SetValue(tx.Time.String())
@ -315,6 +299,15 @@ func ParseBinlog() {
r.AddCell().SetValue(t.Rows) r.AddCell().SetValue(t.Rows)
continue continue
} }
idx, sql := generateRowSql(t)
r := addRow()
r.AddCell().SetValue(k + 1)
r.AddCell().SetValue(idx + 1)
r.AddCell().SetValue(sql)
}
}
func generateRowSql(t binlog.TxDetail) (int, interface{}) {
switch t.SqlType { switch t.SqlType {
case "insert": case "insert":
for idx, rows := range t.Rows { for idx, rows := range t.Rows {
@ -336,11 +329,7 @@ func ParseBinlog() {
if setence != "" && len(setence) > 2 { if setence != "" && len(setence) > 2 {
setence = setence[:len(setence)-2] setence = setence[:len(setence)-2]
} }
sql := fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence) return idx + 1, fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence)
r := addRow()
r.AddCell().SetValue(k + 1)
r.AddCell().SetValue(idx + 1)
r.AddCell().SetValue(sql)
} }
case "update": case "update":
var sql string var sql string
@ -373,10 +362,7 @@ func ParseBinlog() {
continue continue
} }
sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where)
r := addRow() return (idxc + 1) / 2, sql
r.AddCell().SetValue(k + 1)
r.AddCell().SetValue((idxc + 1) / 2)
r.AddCell().SetValue(sql)
} }
case "delete": case "delete":
for idx, rows := range t.Rows { for idx, rows := range t.Rows {
@ -400,72 +386,11 @@ func ParseBinlog() {
setence = setence[:len(setence)-len(spec)] setence = setence[:len(setence)-len(spec)]
} }
sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence)
r := addRow() return idx + 1, sql
r.AddCell().SetValue(k + 1)
r.AddCell().SetValue(idx + 1)
r.AddCell().SetValue(sql)
} }
default: default:
r := addRow() return 1, t.Rows
r.AddCell().SetValue(k + 1)
r.AddCell().SetValue(1)
r.AddCell().SetValue(t.Rows)
}
}
}
})
if !vbo {
time.Sleep(time.Millisecond * 500)
}
var cGtidStr string
if cGtid != nil {
cGtidStr = cGtid.String()
}
if outPath != "" {
err = owrt.Save(outPath)
if err != nil {
starlog.Errorln(err)
return cGtidStr
}
}
if err != nil {
starlog.Errorln(err)
return cGtidStr
}
return cGtidStr
}
var gtidRes [][]string
for _, fp := range filePath {
if staros.IsFolder(fp) {
files, err := os.ReadDir(fp)
if err != nil {
starlog.Errorln("read folder failed:", err)
return
}
rgp := regexp.MustCompile(`^` + prefix + `\.\d+$`)
for _, v := range files {
if !v.IsDir() && rgp.MatchString(v.Name()) {
gtidRes = append(gtidRes, []string{v.Name(), getParser(filepath.Join(fp, v.Name()))})
}
}
} else {
getParser(fp)
}
} }
if outPath != "" { return 0, ""
owrt.Save(outPath)
}
fmt.Println("")
if len(gtidRes) != 0 {
for _, v := range gtidRes {
fmt.Printf("%s:%s\n", v[0], v[1])
}
}
fmt.Println("")
allGtid := ""
if totalGtid != nil {
allGtid = totalGtid.String()
}
fmt.Printf("Total Gtid:%v\nTotal SQL Number:%v\n", allGtid, foundCount)
} }

@ -41,10 +41,10 @@ func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin return t.sqlOrigin
} }
func ParseBinlogFile(path string, fx func(transaction Transaction)) error { func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx) return parseOneBinlog(path, fx)
} }
func parseOneBinlog(path string, fx func(Transaction)) error { func parseOneBinlog(path string, fx func(Transaction) bool) error {
if !staros.Exists(path) { if !staros.Exists(path) {
return os.ErrNotExist return os.ErrNotExist
} }
@ -73,7 +73,7 @@ func parseOneBinlog(path string, fx func(Transaction)) error {
return parseBinlogDetail(f, fx) return parseBinlogDetail(f, fx)
} }
func parseBinlogDetail(r io.Reader, f func(Transaction)) error { func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
parse := replication.NewBinlogParser() parse := replication.NewBinlogParser()
parse.SetParseTime(false) parse.SetParseTime(false)
parse.SetUseDecimal(false) parse.SetUseDecimal(false)
@ -180,7 +180,9 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
} }
tx.Size = tx.EndPos - tx.StartPos tx.Size = tx.EndPos - tx.StartPos
if f != nil { if f != nil {
f(tx) if !f(tx) {
return nil
}
} }
} }
currentGtid = ev.Data currentGtid = ev.Data
@ -320,14 +322,15 @@ type BinlogFilter struct {
OnlyShowGtid bool OnlyShowGtid bool
} }
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var inGtid, exGtid *gtid.Gtid var subGtid, inGtid, exGtid *gtid.Gtid
var err error var err error
if filter.IncludeGtid != "" { if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid) inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil { if err != nil {
return err return err
} }
subGtid = inGtid.Clone()
} }
if filter.ExcludeGtid != "" { if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid) exGtid, err = gtid.Parse(filter.ExcludeGtid)
@ -345,33 +348,36 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
var tx Transaction var tx Transaction
currentGtid := "" currentGtid := ""
callFn := func(tx Transaction) { callFn := func(tx Transaction) bool {
if fn == nil { if fn == nil {
return return true
} }
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return return true
} }
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return return true
} }
if filter.StartPos != 0 && filter.StartPos > tx.StartPos { if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return return true
} }
if filter.EndPos != 0 && filter.EndPos < tx.EndPos { if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return return true
} }
if filter.BigThan != 0 && filter.BigThan > tx.Size { if filter.BigThan != 0 && filter.BigThan > tx.Size {
return return true
} }
if filter.SmallThan != 0 && filter.SmallThan < tx.Size { if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return return true
} }
fn(tx) return fn(tx)
} }
for { for {
headBuf := make([]byte, replication.EventHeaderSize) headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(r, headBuf); err == io.EOF { if _, err = io.ReadFull(r, headBuf); err == io.EOF {
if tx.Time.IsZero() {
return nil
}
idx := 0 idx := 0
for k, v := range tx.Txs { for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx { if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
@ -481,7 +487,16 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
tx.EndPos = startPos - 1 tx.EndPos = startPos - 1
} }
tx.Size = tx.EndPos - tx.StartPos tx.Size = tx.EndPos - tx.StartPos
callFn(tx) if !callFn(tx) {
return nil
}
if subGtid != nil {
subGtid.Sub(tx.GTID)
if subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
} }
currentGtid = ev.Data currentGtid = ev.Data
if inGtid != nil { if inGtid != nil {
@ -530,7 +545,7 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
} }
} }
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction)) error { func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
defer func() { defer func() {
recover() recover()
}() }()

@ -1,4 +1,4 @@
# b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 # b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9
## explicit; go 1.20 ## explicit; go 1.20
b612.me/mysql/binlog b612.me/mysql/binlog
# b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 # b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044

Loading…
Cancel
Save