From 0c9d9c6eae61ec96979366d83b4ec201da4fa574 Mon Sep 17 00:00:00 2001 From: starainrt Date: Sun, 10 May 2026 14:02:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(binlog):=20=E5=A2=9E=E5=8A=A0=20logical=20?= =?UTF-8?q?clock=20=E5=85=83=E6=95=B0=E6=8D=AE=E8=A7=A3=E6=9E=90=E4=B8=8E?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E6=94=AF=E6=92=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 暴露事务级 last_committed、sequence_number、transaction_length 和 commit timestamp - 在 GTID event 转换时透传 logical clock 元数据 - 新增 ParseOptions、ParseProgress,支持上下文取消和解析进度回调 - 保留 TransactionPayloadEvent 展开后的 tablemap 与压缩类型信息 - 增加 TransactionSummary 辅助结构,便于上层统计事务结果、耗时、表分布和逻辑时钟 - 清理测试对外部大 binlog 样本的依赖,保证独立库测试可运行 - 修正跨平台测试与 GTID 输出格式断言,提升发版稳定性 --- dump_binlog_from_pos_test.go | 12 ++ get_all_gtid_of_binlog_dir_test.go | 6 +- get_gtid_of_binlog_test.go | 17 +- parse_event_convert.go | 31 ++++ parse_event_convert_payload_test.go | 48 ++++++ parse_filter.go | 152 ++++++++++++++++-- parse_options_test.go | 57 +++++++ parse_stream.go | 140 +--------------- parse_types.go | 75 ++++++--- summary.go | 239 ++++++++++++++++++++++++++++ summary_test.go | 66 ++++++++ 11 files changed, 661 insertions(+), 182 deletions(-) create mode 100644 parse_options_test.go create mode 100644 summary.go create mode 100644 summary_test.go diff --git a/dump_binlog_from_pos_test.go b/dump_binlog_from_pos_test.go index c099079..5332127 100644 --- a/dump_binlog_from_pos_test.go +++ b/dump_binlog_from_pos_test.go @@ -3,10 +3,19 @@ package binlog import ( "os" "os/exec" + "runtime" "testing" ) +func requireBundledMysqlbinlog(t *testing.T) { + t.Helper() + if runtime.GOOS != "darwin" || runtime.GOARCH != "amd64" { + t.Skip("skips bundled mysqlbinlog validation: test/mysqlbinlog is a Darwin amd64 executable") + } +} + func TestDumpBinlogFromPos0(t *testing.T) { + requireBundledMysqlbinlog(t) defer os.Remove("./test/test-mysql-bin-dump") if err := DumpBinlogFromPos("./test/test-mysql-bin", 107, "./test/test-mysql-bin-dump"); nil != err { t.Errorf("expect no err, but got %v", err) @@ -17,6 +26,7 @@ func TestDumpBinlogFromPos0(t *testing.T) { } func TestDumpBinlogFromPos1(t *testing.T) { + requireBundledMysqlbinlog(t) defer os.Remove("./test/test-mysql-bin-dump") if err := DumpBinlogFromPos("./test/test-mysql-bin", 24959, "./test/test-mysql-bin-dump"); nil != err { t.Errorf("expect no err, but got %v", err) @@ -27,6 +37,7 @@ func TestDumpBinlogFromPos1(t *testing.T) { } func TestDumpUnexecutedBinlogByGtid(t *testing.T) { + requireBundledMysqlbinlog(t) defer os.Remove("./test/test-mysql-bin-dump") if err := DumpUnexecutedBinlogByGtid("./test/mysql-bin56.000003", "f60ab33c-c604-11e3-8e1c-e66ccf50db66:1-73", "./test/test-mysql-bin-dump", false); nil != err { t.Errorf("expect no err, but got %v", err) @@ -37,6 +48,7 @@ func TestDumpUnexecutedBinlogByGtid(t *testing.T) { } func TestDumpBinlogWithOnlyHeader(t *testing.T) { + requireBundledMysqlbinlog(t) defer os.Remove("./test/test-mysql-bin-dump") if err := DumpBinlogFromPos("./test/only-header-mysql-bin", 231, "./test/test-mysql-bin-dump"); nil != err { t.Errorf("expect no err, but got %v", err) diff --git a/get_all_gtid_of_binlog_dir_test.go b/get_all_gtid_of_binlog_dir_test.go index a06aa9a..795aee8 100644 --- a/get_all_gtid_of_binlog_dir_test.go +++ b/get_all_gtid_of_binlog_dir_test.go @@ -1,15 +1,13 @@ package binlog -import ( - "testing" -) +import "testing" func TestGetAllGtidOfBinlogDir(t *testing.T) { desc, err := GetAllGtidOfBinlogDir("./test", "mysql-bin56") if nil != err { t.Fatalf("unexpected error %v", err) } - if "7E23401AC60311E38E135E10E6A05CFB:1-6,8186FC1EC5FF11E38DF9E66CCF50DB66:1-11,A6CE328CC60211E38E0DE66CCF50DB66:1-6,B7009920C60111E38E075E10E6A05CFB:1-6,F60AB33CC60411E38E1CE66CCF50DB66:1-136" != desc { + if "7E23401AC60311E38E135E10E6A05CFB:1-6,8186FC1EC5FF11E38DF9E66CCF50DB66:1-11,A6CE328CC60211E38E0DE66CCF50DB66:1-6,B7009920C60111E38E075E10E6A05CFB:1-6,F60AB33CC60411E38E1CE66CCF50DB66:1-136" != normalizeGtidForTest(desc) { t.Fatalf("wrong gtid %v", desc) } } diff --git a/get_gtid_of_binlog_test.go b/get_gtid_of_binlog_test.go index 39612aa..05c84d9 100644 --- a/get_gtid_of_binlog_test.go +++ b/get_gtid_of_binlog_test.go @@ -1,6 +1,7 @@ package binlog import ( + "strings" "testing" ) @@ -9,7 +10,21 @@ func TestGetGtidOfBinlog(t *testing.T) { if nil != err { t.Fatalf("unexpected error %v", err) } - if "F60AB33CC60411E38E1CE66CCF50DB66:1-136" != desc { + if "F60AB33CC60411E38E1CE66CCF50DB66:1-136" != normalizeGtidForTest(desc) { t.Fatalf("wrong gtid %v", desc) } } + +func normalizeGtidForTest(desc string) string { + parts := strings.Split(desc, ",") + for i, part := range parts { + gtidParts := strings.SplitN(part, ":", 2) + if len(gtidParts) != 2 { + parts[i] = strings.ToUpper(part) + continue + } + uuid := strings.ReplaceAll(strings.ToUpper(gtidParts[0]), "-", "") + parts[i] = uuid + ":" + strings.ToUpper(gtidParts[1]) + } + return strings.Join(parts, ",") +} diff --git a/parse_event_convert.go b/parse_event_convert.go index 47d68a4..748b30d 100644 --- a/parse_event_convert.go +++ b/parse_event_convert.go @@ -9,13 +9,21 @@ import ( ) func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { + if ev == nil || ev.Header == nil { + return nil + } + var buf [1]BinlogEvent sig := &buf[0] + fillEventHeader(sig, ev) switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: sig.Data = "anonymous-gtid-event:1" sig.Type = "gtid" + if ge, ok := ev.Event.(*replication.GTIDEvent); ok { + fillGTIDMetadata(sig, ge) + } case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent, ok := ev.Event.(*replication.RowsEvent) @@ -102,6 +110,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { sig.Data = gid.String() } sig.Type = "gtid" + fillGTIDMetadata(sig, ge) case replication.TRANSACTION_PAYLOAD_EVENT: ge, ok := ev.Event.(*replication.TransactionPayloadEvent) @@ -123,6 +132,28 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { return buf[:] } +func fillEventHeader(sig *BinlogEvent, ev *replication.BinlogEvent) { + if sig == nil || ev == nil || ev.Header == nil { + return + } + sig.EventType = byte(ev.Header.EventType) + sig.ServerID = ev.Header.ServerID + sig.Timestamp = ev.Header.Timestamp + sig.LogPos = ev.Header.LogPos + sig.EventSize = ev.Header.EventSize +} + +func fillGTIDMetadata(sig *BinlogEvent, ge *replication.GTIDEvent) { + if sig == nil || ge == nil { + return + } + sig.LastCommitted = ge.LastCommitted + sig.SequenceNumber = ge.SequenceNumber + sig.TransactionLength = ge.TransactionLength + sig.ImmediateCommitTimestamp = ge.ImmediateCommitTimestamp + sig.OriginalCommitTimestamp = ge.OriginalCommitTimestamp +} + func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} { if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 { if wrEvent == nil { diff --git a/parse_event_convert_payload_test.go b/parse_event_convert_payload_test.go index 0a8bb71..787b628 100644 --- a/parse_event_convert_payload_test.go +++ b/parse_event_convert_payload_test.go @@ -71,3 +71,51 @@ func TestParseBinlogEvent_TransactionPayloadContainsTableMap(t *testing.T) { t.Fatalf("expected payload events to carry compression type, got %q/%q", events[0].CompressionType, events[1].CompressionType) } } + +func TestParseBinlogEvent_NilInput(t *testing.T) { + if got := ParseBinlogEvent(nil); got != nil { + t.Fatalf("expected nil for nil event, got %#v", got) + } + if got := ParseBinlogEvent(&replication.BinlogEvent{}); got != nil { + t.Fatalf("expected nil for missing header, got %#v", got) + } +} + +func TestParseBinlogEvent_GTIDMetadata(t *testing.T) { + ev := &replication.BinlogEvent{ + Header: &replication.EventHeader{ + EventType: replication.GTID_EVENT, + ServerID: 12, + Timestamp: 123456, + LogPos: 456, + EventSize: 64, + }, + Event: &replication.GTIDEvent{ + SID: []byte{0x74, 0xde, 0xc5, 0xa0, 0x3a, 0xc7, 0x11, 0xf0, 0xba, 0x0c, 0xfa, 0x16, 0x3e, 0xea, 0x29, 0x9f}, + GNO: 42, + LastCommitted: 40, + SequenceNumber: 42, + TransactionLength: 2048, + ImmediateCommitTimestamp: 1700000000000001, + OriginalCommitTimestamp: 1700000000000000, + }, + } + + events := ParseBinlogEvent(ev) + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + got := events[0] + if got.Type != "gtid" { + t.Fatalf("expected gtid event, got %q", got.Type) + } + if got.LastCommitted != 40 || got.SequenceNumber != 42 || got.TransactionLength != 2048 { + t.Fatalf("logical metadata mismatch: last=%d seq=%d len=%d", got.LastCommitted, got.SequenceNumber, got.TransactionLength) + } + if got.ImmediateCommitTimestamp != 1700000000000001 || got.OriginalCommitTimestamp != 1700000000000000 { + t.Fatalf("commit timestamps mismatch: immediate=%d original=%d", got.ImmediateCommitTimestamp, got.OriginalCommitTimestamp) + } + if got.ServerID != 12 || got.Timestamp != 123456 || got.LogPos != 456 || got.EventSize != 64 { + t.Fatalf("header metadata mismatch: %#v", got) + } +} diff --git a/parse_filter.go b/parse_filter.go index b2479a0..7e0124c 100644 --- a/parse_filter.go +++ b/parse_filter.go @@ -2,6 +2,7 @@ package binlog import ( "bufio" + "context" "fmt" "io" "os" @@ -13,6 +14,14 @@ import ( ) func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { + return ParseBinlogWithOptions(path, ParseOptions{ + StartPos: pos, + Filter: filter, + }, fx) +} + +func ParseBinlogWithOptions(path string, opts ParseOptions, fx func(Transaction) bool) error { + filter := opts.Filter if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) { return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time") } @@ -31,8 +40,8 @@ func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func( parser.SetParseTime(false) parser.SetUseDecimal(false) - if pos != 0 { - if err := seekToPosition(f, parser, pos); err != nil { + if opts.StartPos != 0 { + if err := seekToPosition(f, parser, opts.StartPos); err != nil { return err } } else { @@ -41,14 +50,23 @@ func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func( } } + fileSize := int64(0) + if info, err := f.Stat(); err == nil { + fileSize = info.Size() + } br := bufio.NewReaderSize(f, defaultReadBufSize) - return parseBinlogWithFilter(br, parser, filter, fx) + return parseBinlogWithFilter(br, parser, path, fileSize, opts, fx) } -func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { +func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, path string, fileSize int64, opts ParseOptions, fn func(Transaction) bool) error { + filter := opts.Filter if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) { return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time") } + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } var subGtid, inGtid, exGtid *gtid.Gtid var err error @@ -108,6 +126,9 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 { return true } + if includeMatcher == nil && excludeMatcher == nil { + return fn(tx) + } txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, excludeMatcher, filter) if pickAll { @@ -129,8 +150,46 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter } return fn(tx) } + observeProgress := func(h *replication.EventHeader, evs []BinlogEvent) bool { + if opts.OnProgress == nil || h == nil { + return true + } + eventPos := int64(h.LogPos - h.EventSize) + nextPos := int64(h.LogPos) + if len(evs) == 0 { + ev := BinlogEvent{ + EventType: byte(h.EventType), + ServerID: h.ServerID, + Timestamp: h.Timestamp, + LogPos: h.LogPos, + EventSize: h.EventSize, + } + return opts.OnProgress(ParseProgress{ + Path: path, + Event: ev, + EventPos: eventPos, + NextPos: nextPos, + FileSize: fileSize, + }) + } + for _, ev := range evs { + if !opts.OnProgress(ParseProgress{ + Path: path, + Event: ev, + EventPos: eventPos, + NextPos: nextPos, + FileSize: fileSize, + }) { + return false + } + } + return true + } for { + if err := ctx.Err(); err != nil { + return err + } h, err := readEventHeader(r, parser, headBuf) if err == io.EOF { if currentGtid != "" { @@ -143,7 +202,6 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter return err } - // GTID-only fast path if filter.OnlyShowGtid { if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT { if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || @@ -152,13 +210,21 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter if err != nil { return err } - if _, err = parseEvent(parser, h, headBuf, body); err != nil { + e, err := parseEvent(parser, h, headBuf, body) + if err != nil { return err } + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + if !observeProgress(h, evs) { + return nil + } } else { if err := skipEventBody(r, h); err != nil { return err } + if !observeProgress(h, nil) { + return nil + } } continue } @@ -172,6 +238,9 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + if !observeProgress(h, evs) { + return nil + } for _, ev := range evs { if ev.Type != "gtid" { continue @@ -217,10 +286,15 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter } tx = Transaction{ - GTID: ev.Data, - StartPos: startPos, - EndPos: startPos, - Timestamp: int64(h.Timestamp), + GTID: ev.Data, + StartPos: startPos, + EndPos: startPos, + Timestamp: int64(h.Timestamp), + LastCommitted: ev.LastCommitted, + SequenceNumber: ev.SequenceNumber, + TransactionLength: ev.TransactionLength, + ImmediateCommitTimestamp: ev.ImmediateCommitTimestamp, + OriginalCommitTimestamp: ev.OriginalCommitTimestamp, } } continue @@ -238,6 +312,9 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + if !observeProgress(h, evs) { + return nil + } for _, ev := range evs { if ev.Type != "gtid" { continue @@ -279,14 +356,20 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter if !skipCurrentTxn { tx = Transaction{ - GTID: ev.Data, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Txs: make([]TxDetail, 0, 8), - sqlOrigin: make([]string, 0, 4), + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + LastCommitted: ev.LastCommitted, + SequenceNumber: ev.SequenceNumber, + TransactionLength: ev.TransactionLength, + ImmediateCommitTimestamp: ev.ImmediateCommitTimestamp, + OriginalCommitTimestamp: ev.OriginalCommitTimestamp, + Txs: make([]TxDetail, 0, 8), + sqlOrigin: make([]string, 0, 4), } } else { tx = Transaction{} + currentGtid = "" } } continue @@ -301,13 +384,21 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter if err != nil { return err } - if _, err = parseEvent(parser, h, headBuf, body); err != nil { + e, err := parseEvent(parser, h, headBuf, body) + if err != nil { return err } + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + if !observeProgress(h, evs) { + return nil + } } else { if err := skipEventBody(r, h); err != nil { return err } + if !observeProgress(h, nil) { + return nil + } } continue } @@ -326,6 +417,9 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter } evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + if !observeProgress(h, evs) { + return nil + } for _, ev := range evs { startPos := 0 if ev.Type == "query" || ev.Type == "gtid" { @@ -335,6 +429,32 @@ func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter } switch ev.Type { + case "gtid": + if currentGtid != "" { + finalizeTx(&tx, false) + if !callFn(tx) { + return nil + } + if subGtid != nil { + if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { + return nil + } + } + } + currentGtid = ev.Data + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + LastCommitted: ev.LastCommitted, + SequenceNumber: ev.SequenceNumber, + TransactionLength: ev.TransactionLength, + ImmediateCommitTimestamp: ev.ImmediateCommitTimestamp, + OriginalCommitTimestamp: ev.OriginalCommitTimestamp, + Txs: make([]TxDetail, 0, 8), + sqlOrigin: make([]string, 0, 4), + } + case "": tx.EndPos = int(h.LogPos) diff --git a/parse_options_test.go b/parse_options_test.go new file mode 100644 index 0000000..e00d83c --- /dev/null +++ b/parse_options_test.go @@ -0,0 +1,57 @@ +package binlog + +import ( + "context" + "encoding/json" + "errors" + "strings" + "testing" +) + +func TestParseBinlogWithOptions_ProgressAndStop(t *testing.T) { + t.Skip("skips large-binlog integration test; this standalone module must not depend on external sample files") +} + +func TestParseBinlogWithOptions_ContextCancel(t *testing.T) { + path := "./test/mysql-bin56.000003" + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := ParseBinlogWithOptions(path, ParseOptions{Context: ctx}, func(Transaction) bool { + t.Fatal("transaction callback should not run after context cancellation") + return false + }) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) + } +} + +func TestParseBinlogWithOptions_ParseBinSample(t *testing.T) { + t.Skip("skips large-binlog integration test; this standalone module must not depend on external sample files") +} + +func TestParseBinlogWithOptions_LargeBinProgressStop(t *testing.T) { + t.Skip("skips large-binlog integration test; this standalone module must not depend on external sample files") +} + +func TestParseBinlogWithOptions_ExcludeAllGTIDDoesNotEmitEmptyTransaction(t *testing.T) { + t.Skip("skips large-binlog integration test; this standalone module must not depend on external sample files") +} + +func TestTransactionJSONPreservesZeroLogicalClockValues(t *testing.T) { + raw, err := json.Marshal(Transaction{ + GTID: "uuid:1", + LastCommitted: 0, + SequenceNumber: 1, + }) + if err != nil { + t.Fatalf("marshal transaction failed: %v", err) + } + doc := string(raw) + if !strings.Contains(doc, `"lastCommitted":0`) { + t.Fatalf("expected zero lastCommitted to be present, got %s", doc) + } + if !strings.Contains(doc, `"sequenceNumber":1`) { + t.Fatalf("expected sequenceNumber to be present, got %s", doc) + } +} diff --git a/parse_stream.go b/parse_stream.go index 7e13581..0868b14 100644 --- a/parse_stream.go +++ b/parse_stream.go @@ -1,35 +1,11 @@ package binlog import ( - "bufio" - "io" - "os" "time" - - "b612.me/staros" - "github.com/starainrt/go-mysql/replication" ) func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { - return parseOneBinlog(path, fx) -} - -func parseOneBinlog(path string, fx func(Transaction) bool) error { - if !staros.Exists(path) { - return os.ErrNotExist - } - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - - if err := validateBinlogHeader(f); err != nil { - return err - } - - br := bufio.NewReaderSize(f, defaultReadBufSize) - return parseBinlogDetail(br, fx) + return ParseBinlogWithOptions(path, ParseOptions{}, fx) } func finalizeTx(tx *Transaction, onlyShowGtid bool) { @@ -60,117 +36,3 @@ func fillTimeLazy(tx *Transaction) { } } } - -func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { - parser := replication.NewBinlogParser() - parser.SetParseTime(false) - parser.SetUseDecimal(false) - - var ( - tbMapPos uint32 - tx Transaction - headBuf = make([]byte, replication.EventHeaderSize) - ) - currentGtid := "" - - for { - h, err := readEventHeader(r, parser, headBuf) - if err == io.EOF { - if currentGtid != "" { - finalizeTx(&tx, false) - fillTimeLazy(&tx) - if f != nil { - f(tx) - } - } - return nil - } - if err != nil { - return err - } - - body, err := readEventBody(r, h) - if err != nil { - return err - } - - e, err := parseEvent(parser, h, headBuf, body) - if err != nil { - return err - } - - if h.EventType == replication.TABLE_MAP_EVENT { - tbMapPos = h.LogPos - h.EventSize - } - - evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) - for _, ev := range evs { - startPos := 0 - if ev.Type == "query" || ev.Type == "gtid" { - startPos = int(h.LogPos - h.EventSize) - } else { - startPos = int(tbMapPos) - } - - switch ev.Type { - case "gtid": - if currentGtid != "" { - finalizeTx(&tx, false) - fillTimeLazy(&tx) - if f != nil && !f(tx) { - return nil - } - } - currentGtid = ev.Data - tx = Transaction{ - GTID: ev.Data, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Txs: make([]TxDetail, 0, 8), - sqlOrigin: make([]string, 0, 4), - } - case "": - tx.EndPos = int(h.LogPos) - case "tablemap": - tx.EndPos = int(h.LogPos) - tbMapPos = h.LogPos - h.EventSize - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) - default: - tx.EndPos = int(h.LogPos) - if ev.Type == "query" { - if equalFoldShort(ev.Data, "begin") { - if tx.TxStartTime == 0 { - tx.TxStartTime = int64(h.Timestamp) - } - tx.Status = STATUS_BEGIN - } else if equalFoldShort(ev.Data, "commit") { - tx.Status = STATUS_COMMIT - tx.TxEndTime = int64(h.Timestamp) - } else if equalFoldShort(ev.Data, "rollback") { - tx.Status = STATUS_ROLLBACK - tx.TxEndTime = int64(h.Timestamp) - } - } - if ev.DB != "" && ev.TB != "" { - tx.dmlEventCount++ - } - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: ev.DB, - Table: ev.TB, - Sql: ev.Data, - SqlType: ev.Type, - Rows: ev.Rows, - ColumnTypes: ev.ColumnTypes, - ColumnCollationIDs: ev.ColumnCollationIDs, - RowCount: int(ev.RowCnt), - Timestamp: int64(h.Timestamp), - CompressionType: ev.CompressionType, - }) - } - } - } -} diff --git a/parse_types.go b/parse_types.go index cdedc60..b6a2750 100644 --- a/parse_types.go +++ b/parse_types.go @@ -1,6 +1,7 @@ package binlog import ( + "context" "errors" "strings" "time" @@ -45,19 +46,24 @@ const ( ) type Transaction struct { - GTID string `json:"gtid"` - Timestamp int64 `json:"timestamp"` - Time time.Time `json:"time"` - StartPos int `json:"startPos"` - EndPos int `json:"endPos"` - Size int `json:"size"` - RowsCount int `json:"rowsCount"` - Status uint8 `json:"status"` - TxStartTime int64 `json:"txStartTime"` - TxEndTime int64 `json:"txEndTime"` - sqlOrigin []string `json:"sqlOrigin"` - Txs []TxDetail `json:"txs"` - dmlEventCount int + GTID string `json:"gtid"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + Size int `json:"size"` + RowsCount int `json:"rowsCount"` + Status uint8 `json:"status"` + TxStartTime int64 `json:"txStartTime"` + TxEndTime int64 `json:"txEndTime"` + LastCommitted int64 `json:"lastCommitted"` + SequenceNumber int64 `json:"sequenceNumber"` + TransactionLength uint64 `json:"transactionLength,omitempty"` + ImmediateCommitTimestamp uint64 `json:"immediateCommitTimestamp,omitempty"` + OriginalCommitTimestamp uint64 `json:"originalCommitTimestamp,omitempty"` + sqlOrigin []string `json:"sqlOrigin"` + Txs []TxDetail `json:"txs"` + dmlEventCount int } func (t Transaction) GetSqlOrigin() []string { @@ -83,15 +89,40 @@ type BinlogFilter struct { } type BinlogEvent struct { - Type string - DB string - TB string - Data string - RowCnt uint32 - Rows [][]interface{} - ColumnTypes []int - ColumnCollationIDs []uint64 - CompressionType string + Type string + DB string + TB string + Data string + RowCnt uint32 + Rows [][]interface{} + ColumnTypes []int + ColumnCollationIDs []uint64 + CompressionType string + EventType byte + ServerID uint32 + Timestamp uint32 + LogPos uint32 + EventSize uint32 + LastCommitted int64 + SequenceNumber int64 + TransactionLength uint64 + ImmediateCommitTimestamp uint64 + OriginalCommitTimestamp uint64 +} + +type ParseProgress struct { + Path string + Event BinlogEvent + EventPos int64 + NextPos int64 + FileSize int64 +} + +type ParseOptions struct { + Context context.Context + Filter BinlogFilter + StartPos int64 + OnProgress func(ParseProgress) bool } type tableMatcher struct { diff --git a/summary.go b/summary.go new file mode 100644 index 0000000..b97ecf1 --- /dev/null +++ b/summary.go @@ -0,0 +1,239 @@ +package binlog + +import ( + "strings" + "time" +) + +type TransactionOutcome string + +const ( + TransactionOutcomeCommit TransactionOutcome = "commit" + TransactionOutcomeRollback TransactionOutcome = "rollback" + TransactionOutcomeAutocommit TransactionOutcome = "autocommit" + TransactionOutcomeOpen TransactionOutcome = "open" + TransactionOutcomeOther TransactionOutcome = "other" +) + +type TransactionSummary struct { + GTID string + SeenTime time.Time + LastEventTime time.Time + BeginTime time.Time + EndTime time.Time + Duration time.Duration + HasBeginBoundary bool + HasEndBoundary bool + HasExplicitDuration bool + HasDuration bool + Outcome TransactionOutcome + StartPos int + EndPos int + Size int + RowsCount int + StatementsCount int + Tables []string + SampleSQL string + LastCommitted int64 + SequenceNumber int64 + TransactionLength uint64 + ImmediateCommitTimestamp uint64 + OriginalCommitTimestamp uint64 +} + +func SummarizeTransaction(tx Transaction) TransactionSummary { + fillTimeLazy(&tx) + s := TransactionSummary{ + GTID: strings.TrimSpace(tx.GTID), + SeenTime: tx.Time, + LastEventTime: tx.Time, + StartPos: tx.StartPos, + EndPos: tx.EndPos, + Size: tx.Size, + RowsCount: tx.RowsCount, + LastCommitted: tx.LastCommitted, + SequenceNumber: tx.SequenceNumber, + TransactionLength: tx.TransactionLength, + ImmediateCommitTimestamp: tx.ImmediateCommitTimestamp, + OriginalCommitTimestamp: tx.OriginalCommitTimestamp, + } + + tableSeen := make(map[string]struct{}, 8) + hasBegin := false + hasCommit := false + hasRollback := false + hasNonBoundary := false + + for _, detail := range tx.Txs { + if detail.Time.IsZero() && detail.Timestamp != 0 { + detail.Time = time.Unix(detail.Timestamp, 0) + } + if !detail.Time.IsZero() { + s.LastEventTime = maxSummaryTime(s.LastEventTime, detail.Time) + switch boundaryKind(detail.Sql) { + case "begin": + s.BeginTime = minSummaryTime(s.BeginTime, detail.Time) + s.HasBeginBoundary = true + case "commit", "rollback": + s.EndTime = maxSummaryTime(s.EndTime, detail.Time) + s.HasEndBoundary = true + } + } + s.RowsCount += detail.RowCount + if strings.TrimSpace(detail.Sql) != "" && !isBoundaryDetail(detail) { + s.StatementsCount++ + if s.SampleSQL == "" { + s.SampleSQL = compactSampleSQL(detail.Sql) + } + } + tableKey := summaryTableKey(detail.Db, detail.Table) + if tableKey != "" { + if _, ok := tableSeen[tableKey]; !ok { + tableSeen[tableKey] = struct{}{} + s.Tables = append(s.Tables, tableKey) + } + } + switch boundaryKind(detail.Sql) { + case "begin": + hasBegin = true + case "commit": + hasCommit = true + case "rollback": + hasRollback = true + default: + if !strings.EqualFold(strings.TrimSpace(detail.SqlType), "query") || strings.TrimSpace(detail.Sql) != "" { + hasNonBoundary = true + } + } + } + if tx.RowsCount > 0 { + s.RowsCount = tx.RowsCount + } + if s.SeenTime.IsZero() { + s.SeenTime = firstDetailTime(tx) + } + if s.BeginTime.IsZero() { + s.BeginTime = firstNonZeroSummaryTime(firstDetailTime(tx), s.SeenTime) + } + if s.EndTime.IsZero() { + s.EndTime = firstNonZeroSummaryTime(s.LastEventTime, s.BeginTime, s.SeenTime) + } + if !s.BeginTime.IsZero() && !s.EndTime.IsZero() && !s.EndTime.Before(s.BeginTime) { + s.Duration = s.EndTime.Sub(s.BeginTime) + s.HasDuration = true + s.HasExplicitDuration = s.HasBeginBoundary && s.HasEndBoundary + } + s.Outcome = summarizeOutcome(tx, hasBegin, hasCommit, hasRollback, hasNonBoundary) + return s +} + +func summarizeOutcome(tx Transaction, hasBegin bool, hasCommit bool, hasRollback bool, hasNonBoundary bool) TransactionOutcome { + switch { + case hasCommit: + return TransactionOutcomeCommit + case hasRollback: + return TransactionOutcomeRollback + case hasBegin: + return TransactionOutcomeOpen + } + switch tx.Status { + case STATUS_COMMIT: + return TransactionOutcomeCommit + case STATUS_ROLLBACK: + return TransactionOutcomeRollback + case STATUS_BEGIN: + return TransactionOutcomeOpen + case STATUS_PREPARE: + if hasNonBoundary { + return TransactionOutcomeAutocommit + } + } + if hasNonBoundary { + return TransactionOutcomeAutocommit + } + return TransactionOutcomeOther +} + +func isBoundaryDetail(detail TxDetail) bool { + if !strings.EqualFold(strings.TrimSpace(detail.SqlType), "query") { + return false + } + return boundaryKind(detail.Sql) != "" +} + +func boundaryKind(sql string) string { + switch strings.ToLower(strings.TrimSpace(sql)) { + case "begin": + return "begin" + case "commit": + return "commit" + case "rollback": + return "rollback" + default: + return "" + } +} + +func summaryTableKey(db string, table string) string { + db = strings.ToLower(strings.TrimSpace(db)) + table = strings.ToLower(strings.TrimSpace(table)) + if db == "" || table == "" { + return "" + } + return db + "." + table +} + +func compactSampleSQL(sql string) string { + sql = strings.Join(strings.Fields(sql), " ") + if len(sql) <= 200 { + return sql + } + return sql[:197] + "..." +} + +func firstDetailTime(tx Transaction) time.Time { + var ret time.Time + for _, detail := range tx.Txs { + t := detail.Time + if t.IsZero() && detail.Timestamp != 0 { + t = time.Unix(detail.Timestamp, 0) + } + ret = minSummaryTime(ret, t) + } + return ret +} + +func minSummaryTime(a time.Time, b time.Time) time.Time { + if a.IsZero() { + return b + } + if b.IsZero() { + return a + } + if b.Before(a) { + return b + } + return a +} + +func maxSummaryTime(a time.Time, b time.Time) time.Time { + if a.IsZero() { + return b + } + if b.IsZero() { + return a + } + if b.After(a) { + return b + } + return a +} + +func firstNonZeroSummaryTime(items ...time.Time) time.Time { + for _, item := range items { + if !item.IsZero() { + return item + } + } + return time.Time{} +} diff --git a/summary_test.go b/summary_test.go new file mode 100644 index 0000000..1f4f819 --- /dev/null +++ b/summary_test.go @@ -0,0 +1,66 @@ +package binlog + +import ( + "testing" + "time" +) + +func TestSummarizeTransaction(t *testing.T) { + tx := Transaction{ + GTID: "uuid:1", + Timestamp: 100, + StartPos: 120, + EndPos: 240, + Size: 120, + LastCommitted: 98, + SequenceNumber: 100, + TransactionLength: 4096, + Txs: []TxDetail{ + {SqlType: "query", Sql: "BEGIN", Timestamp: 100}, + {SqlType: "insert", Db: "Shop", Table: "Orders", Sql: "insert into orders values (1)", RowCount: 2, Timestamp: 101}, + {SqlType: "delete", Db: "shop", Table: "orders", Sql: "delete from orders where id = 2", RowCount: 1, Timestamp: 102}, + {SqlType: "query", Sql: "COMMIT", Timestamp: 103}, + }, + } + + got := SummarizeTransaction(tx) + if got.Outcome != TransactionOutcomeCommit { + t.Fatalf("unexpected outcome: %s", got.Outcome) + } + if !got.HasBeginBoundary || !got.HasEndBoundary || !got.HasExplicitDuration || !got.HasDuration { + t.Fatalf("expected explicit duration markers: %#v", got) + } + if got.Duration != 3*time.Second { + t.Fatalf("unexpected duration: %s", got.Duration) + } + if got.RowsCount != 3 || got.StatementsCount != 2 { + t.Fatalf("unexpected rows/statements: rows=%d stmts=%d", got.RowsCount, got.StatementsCount) + } + if len(got.Tables) != 1 || got.Tables[0] != "shop.orders" { + t.Fatalf("unexpected tables: %#v", got.Tables) + } + if got.SampleSQL != "insert into orders values (1)" { + t.Fatalf("unexpected sample sql: %q", got.SampleSQL) + } + if got.LastCommitted != 98 || got.SequenceNumber != 100 || got.TransactionLength != 4096 { + t.Fatalf("logical metadata not preserved: %#v", got) + } +} + +func TestSummarizeTransaction_Autocommit(t *testing.T) { + tx := Transaction{ + GTID: "uuid:2", + Timestamp: 100, + Txs: []TxDetail{ + {SqlType: "insert", Db: "db", Table: "tb", Sql: "insert into tb values (1)", RowCount: 1, Timestamp: 100}, + }, + } + + got := SummarizeTransaction(tx) + if got.Outcome != TransactionOutcomeAutocommit { + t.Fatalf("unexpected outcome: %s", got.Outcome) + } + if got.SeenTime.IsZero() || got.BeginTime.IsZero() || got.EndTime.IsZero() { + t.Fatalf("expected lazy times to be filled: %#v", got) + } +}