From 8469c11373d53fce179bbd258abc9ed2a92ef012 Mon Sep 17 00:00:00 2001 From: starainrt Date: Thu, 19 Mar 2026 17:04:35 +0800 Subject: [PATCH] =?UTF-8?q?refactor(parse):=20=E6=8B=86=E5=88=86=20parse.g?= =?UTF-8?q?o=20=E5=B9=B6=E4=BF=AE=E5=A4=8D=E4=BA=8B=E5=8A=A1/=E8=BF=87?= =?UTF-8?q?=E6=BB=A4=E4=B8=80=E8=87=B4=E6=80=A7=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将臃肿的 parse.go 按职责拆分为多个模块: parse_types.go、parse_io.go、parse_event_convert.go、parse_stream.go、parse_filter.go - parse.go 保留为模块入口说明,提升可维护性与可读性 - 修复事务状态被覆盖问题(BEGIN/COMMIT/ROLLBACK 不再被重置为 PREPARE) - 增加 include-tables 与 exclude-tables 互斥校验,同时配置时直接报配置错误 - 强化表匹配器模式校验,并补充非法模式测试 - 在明细过滤后重算事务统计(RowsCount/StartPos/EndPos/Size),避免统计失真 - 增加 TABLE_MAP 事件转换,补充列元信息透传(ColumnTypes/ColumnCollationIDs) - 基于 unsigned 元数据规范化行值,避免无符号整型被渲染为负数 - 优化事件解析报错信息:增加有界 body 十六进制预览 - 补充单元测试:payload/tablemap 转换、unsigned 规范化、过滤逻辑、IO 预览 --- parse.go | 968 +-------------------------- parse_event_convert.go | 257 +++++++ parse_event_convert_payload_test.go | 73 ++ parse_event_convert_unsigned_test.go | 95 +++ parse_filter.go | 572 ++++++++++++++++ parse_filter_table_test.go | 85 +++ parse_io.go | 150 +++++ parse_io_test.go | 33 + parse_stream.go | 176 +++++ parse_types.go | 126 ++++ 10 files changed, 1573 insertions(+), 962 deletions(-) create mode 100644 parse_event_convert.go create mode 100644 parse_event_convert_payload_test.go create mode 100644 parse_event_convert_unsigned_test.go create mode 100644 parse_filter.go create mode 100644 parse_filter_table_test.go create mode 100644 parse_io.go create mode 100644 parse_io_test.go create mode 100644 parse_stream.go create mode 100644 parse_types.go diff --git a/parse.go b/parse.go index 4e070eb..edaee1a 100644 --- a/parse.go +++ b/parse.go @@ -1,964 +1,8 @@ package binlog -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io" - "os" - "strings" - "sync" - "time" - - "b612.me/mysql/gtid" - "b612.me/staros" - "github.com/starainrt/go-mysql/replication" -) - -var ( - ErrInvalidBinlogHeader = errors.New("invalid binlog file header") - ErrEventTooSmall = errors.New("event size too small") -) - -const ( - CompressionNone uint64 = 255 - CompressionZSTD uint64 = 0 -) - -const ( - maxPooledRawDataCap = 4 << 20 // 4MB - defaultReadBufSize = 1 << 20 // 1MB -) - -type TxDetail struct { - StartPos int `json:"startPos"` - EndPos int `json:"endPos"` - RowCount int `json:"rowCount"` - Timestamp int64 `json:"timestamp"` - Time time.Time `json:"time"` - Sql string `json:"sql"` - Db string `json:"db"` - Table string `json:"table"` - SqlType string `json:"sqlType"` - CompressionType string `json:"compressionType"` - Rows [][]interface{} `json:"rows"` -} - -const ( - STATUS_PREPARE uint8 = iota - STATUS_BEGIN - STATUS_COMMIT - STATUS_ROLLBACK -) - -type Transaction struct { - GTID string `json:"gtid"` - Timestamp int64 `json:"timestamp"` - Time time.Time `json:"time"` - StartPos int `json:"startPos"` - EndPos int `json:"endPos"` - Size int `json:"size"` - RowsCount int `json:"rowsCount"` - Status uint8 `json:"status"` - TxStartTime int64 `json:"txStartTime"` - TxEndTime int64 `json:"txEndTime"` - sqlOrigin []string `json:"sqlOrigin"` - Txs []TxDetail `json:"txs"` - dmlEventCount int -} - -func (t Transaction) GetSqlOrigin() []string { - return t.sqlOrigin -} - -type BinlogFilter struct { - IncludeGtid string - ExcludeGtid string - IncludeTables []string - ExcludeTables []string - StartPos int - EndPos int - StartDate time.Time - EndDate time.Time - BigThan int - SmallThan int - OnlyShowGtid bool - OnlyShowDML bool - PickTxAllIfMatch bool - ExcludeBlank bool - IncludeBlank bool -} - -type BinlogEvent struct { - Type string - DB string - TB string - Data string - RowCnt uint32 - Rows [][]interface{} - CompressionType string -} - -type tableMatcher struct { - exactMatch map[string]bool - dbWildcard map[string]bool - tbWildcard map[string]bool - matchAll bool -} - -func (m *tableMatcher) match(db, tb string) bool { - if m.matchAll { - return true - } - if m.dbWildcard[db] || m.tbWildcard[tb] { - return true - } - if len(m.exactMatch) > 0 { - // Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配 - var buf [128]byte - key := buf[:0] - key = append(key, db...) - key = append(key, '.') - key = append(key, tb...) - if m.exactMatch[string(key)] { - return true - } - } - return false -} - -func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { - return parseOneBinlog(path, fx) -} - -func parseOneBinlog(path string, fx func(Transaction) bool) error { - if !staros.Exists(path) { - return os.ErrNotExist - } - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - - if err := validateBinlogHeader(f); err != nil { - return err - } - - br := bufio.NewReaderSize(f, defaultReadBufSize) - return parseBinlogDetail(br, fx) -} - -func validateBinlogHeader(f *os.File) error { - const fileTypeBytes = int64(4) - b := make([]byte, fileTypeBytes) - - if _, err := f.Read(b); err != nil { - return fmt.Errorf("read binlog header failed: %w", err) - } - if !bytes.Equal(b, replication.BinLogFileHeader) { - return ErrInvalidBinlogHeader - } - if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil { - return fmt.Errorf("seek after header failed: %w", err) - } - return nil -} - -func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) { - if _, err := io.ReadFull(r, headBuf); err != nil { - return nil, err - } - h, err := parser.ParseHeader(headBuf) - if err != nil { - return nil, fmt.Errorf("parse header failed: %w", err) - } - if h.EventSize <= uint32(replication.EventHeaderSize) { - return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize) - } - return h, nil -} - -func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) { - bodyLen := int(h.EventSize) - replication.EventHeaderSize - body := make([]byte, bodyLen) - if _, err := io.ReadFull(r, body); err != nil { - return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen) - } - return body, nil -} - -func skipEventBody(r io.Reader, h *replication.EventHeader) error { - bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize) - if bodyLen <= 0 { - return nil - } - if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil { - return fmt.Errorf("skip event body failed: %w", err) - } - return nil -} - -var rawDataPool = sync.Pool{ - New: func() any { - b := make([]byte, 0, 64*1024) - return &b - }, -} - -func getRawDataBuf(n int) []byte { - p := rawDataPool.Get().(*[]byte) - if cap(*p) < n { - return make([]byte, n) - } - return (*p)[:n] -} - -func putRawDataBuf(b []byte) { - if cap(b) > maxPooledRawDataCap { - return - } - b = b[:0] - rawDataPool.Put(&b) -} - -func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) { - rawLen := len(headBuf) + len(body) - rawData := getRawDataBuf(rawLen) - copy(rawData, headBuf) - copy(rawData[len(headBuf):], body) - - e, err := parser.ParseEvent(h, body, rawData) - putRawDataBuf(rawData) - if err != nil { - return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Data %q, Err: %w", - h.LogPos, h, body, err) - } - return e, nil -} - -func finalizeTx(tx *Transaction, onlyShowGtid bool) { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ - } - tx.RowsCount += v.RowCount - tx.Txs[k] = v - } - - if onlyShowGtid { - tx.Size = 0 - } else { - tx.Size = tx.EndPos - tx.StartPos - } -} - -func fillTimeLazy(tx *Transaction) { - if tx.Timestamp != 0 && tx.Time.IsZero() { - tx.Time = time.Unix(tx.Timestamp, 0) - } - for i := range tx.Txs { - if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() { - tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0) - } - } -} - -func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { - parser := replication.NewBinlogParser() - parser.SetParseTime(false) - parser.SetUseDecimal(false) - - var ( - tbMapPos uint32 - tx Transaction - headBuf = make([]byte, replication.EventHeaderSize) - ) - currentGtid := "" - - for { - h, err := readEventHeader(r, parser, headBuf) - if err == io.EOF { - if currentGtid != "" { - finalizeTx(&tx, false) - fillTimeLazy(&tx) - if f != nil { - f(tx) - } - } - return nil - } - if err != nil { - return err - } - - body, err := readEventBody(r, h) - if err != nil { - return err - } - - e, err := parseEvent(parser, h, headBuf, body) - if err != nil { - return err - } - - if h.EventType == replication.TABLE_MAP_EVENT { - tbMapPos = h.LogPos - h.EventSize - } - - evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) - for _, ev := range evs { - startPos := 0 - if ev.Type == "query" || ev.Type == "gtid" { - startPos = int(h.LogPos - h.EventSize) - } else { - startPos = int(tbMapPos) - } - - switch ev.Type { - case "gtid": - if currentGtid != "" { - finalizeTx(&tx, false) - fillTimeLazy(&tx) - if f != nil && !f(tx) { - return nil - } - } - currentGtid = ev.Data - tx = Transaction{ - GTID: ev.Data, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Txs: make([]TxDetail, 0, 8), - sqlOrigin: make([]string, 0, 4), - } - case "": - tx.EndPos = int(h.LogPos) - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) - default: - tx.EndPos = int(h.LogPos) - status := STATUS_PREPARE - if ev.Type == "query" { - if equalFoldShort(ev.Data, "begin") { - if tx.TxStartTime == 0 { - tx.TxStartTime = int64(h.Timestamp) - } - tx.Status = STATUS_BEGIN - } else if equalFoldShort(ev.Data, "commit") { - tx.Status = STATUS_COMMIT - tx.TxEndTime = int64(h.Timestamp) - } else if equalFoldShort(ev.Data, "rollback") { - tx.Status = STATUS_ROLLBACK - tx.TxEndTime = int64(h.Timestamp) - } - tx.Status = status - } - if ev.DB != "" && ev.TB != "" { - tx.dmlEventCount++ - } - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: ev.DB, - Table: ev.TB, - Sql: ev.Data, - SqlType: ev.Type, - Rows: ev.Rows, - RowCount: int(ev.RowCnt), - Timestamp: int64(h.Timestamp), - CompressionType: ev.CompressionType, - }) - } - } - } -} - -func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { - var buf [1]BinlogEvent - sig := &buf[0] - - switch ev.Header.EventType { - case replication.ANONYMOUS_GTID_EVENT: - sig.Data = "anonymous-gtid-event:1" - sig.Type = "gtid" - - case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: - wrEvent, ok := ev.Event.(*replication.RowsEvent) - if !ok { - return nil - } - sig.DB = string(wrEvent.Table.Schema) - sig.TB = string(wrEvent.Table.Table) - sig.Type = "insert" - sig.RowCnt = uint32(len(wrEvent.Rows)) - sig.Rows = wrEvent.Rows - - case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: - wrEvent, ok := ev.Event.(*replication.RowsEvent) - if !ok { - return nil - } - sig.DB = string(wrEvent.Table.Schema) - sig.TB = string(wrEvent.Table.Table) - sig.Type = "update" - sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 - sig.Rows = wrEvent.Rows - - case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: - wrEvent, ok := ev.Event.(*replication.RowsEvent) - if !ok { - return nil - } - sig.DB = string(wrEvent.Table.Schema) - sig.TB = string(wrEvent.Table.Table) - sig.Type = "delete" - sig.RowCnt = uint32(len(wrEvent.Rows)) - sig.Rows = wrEvent.Rows - - case replication.ROWS_QUERY_EVENT: - queryEvent, ok := ev.Event.(*replication.RowsQueryEvent) - if !ok { - return nil - } - sig.Data = string(queryEvent.Query) - sig.Type = "rowsquery" - - case replication.QUERY_EVENT: - queryEvent, ok := ev.Event.(*replication.QueryEvent) - if !ok { - return nil - } - sig.DB = string(queryEvent.Schema) - sig.Data = string(queryEvent.Query) - sig.Type = "query" - - case replication.MARIADB_GTID_EVENT: - sig.Data = "begin" - sig.Type = "query" - - case replication.XID_EVENT: - sig.Data = "commit" - sig.Type = "query" - - case replication.GTID_EVENT: - ge, ok := ev.Event.(*replication.GTIDEvent) - if !ok { - return nil - } - gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) - if err != nil { - sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO) - } else { - sig.Data = gid.String() - } - sig.Type = "gtid" - - case replication.TRANSACTION_PAYLOAD_EVENT: - ge, ok := ev.Event.(*replication.TransactionPayloadEvent) - if !ok { - return nil - } - res := make([]BinlogEvent, 0, len(ge.Events)) - for _, val := range ge.Events { - res = append(res, ParseBinlogEvent(val)...) - } - compressionType := getCompressionTypeName(ge.CompressionType) - for idx := range res { - res[idx].CompressionType = compressionType - } - return res - } - - // 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。 - return buf[:] -} - -func getCompressionTypeName(code uint64) string { - switch code { - case CompressionZSTD: - return "ZSTD" - case CompressionNone: - return "" - default: - return fmt.Sprintf("UNKNOWN(%d)", code) - } -} - -func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { - if !staros.Exists(path) { - return os.ErrNotExist - } - - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - - parser := replication.NewBinlogParser() - parser.SetParseTime(false) - parser.SetUseDecimal(false) - - if pos != 0 { - if err := seekToPosition(f, parser, pos); err != nil { - return err - } - } else { - if err := validateBinlogHeader(f); err != nil { - return err - } - } - - br := bufio.NewReaderSize(f, defaultReadBufSize) - return parseBinlogWithFilter(br, parser, filter, fx) -} - -func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error { - if err := validateBinlogHeader(f); err != nil { - return err - } - - headBuf := make([]byte, replication.EventHeaderSize) - for { - h, err := readEventHeader(f, parser, headBuf) - if err != nil { - return fmt.Errorf("seek to position failed: %w", err) - } - body, err := readEventBody(f, h) - if err != nil { - return err - } - _, err = parseEvent(parser, h, headBuf, body) - if err != nil { - return err - } - if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { - break - } - } - - if _, err := f.Seek(pos, io.SeekStart); err != nil { - return fmt.Errorf("seek to pos %d failed: %w", pos, err) - } - return nil -} -func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { - var subGtid, inGtid, exGtid *gtid.Gtid - var err error - - includeMatcher, excludeMatcher := prepareTableMatchers(filter) - - if filter.IncludeGtid != "" { - inGtid, err = gtid.Parse(filter.IncludeGtid) - if err != nil { - return fmt.Errorf("parse include gtid failed: %w", err) - } - subGtid = inGtid.Clone() - } - if filter.ExcludeGtid != "" { - exGtid, err = gtid.Parse(filter.ExcludeGtid) - if err != nil { - return fmt.Errorf("parse exclude gtid failed: %w", err) - } - } - - var ( - tbMapPos uint32 - skipCurrentTxn bool - tx Transaction - headBuf = make([]byte, replication.EventHeaderSize) - ) - currentGtid := "" - - callFn := func(tx Transaction) bool { - if fn == nil { - return true - } - - fillTimeLazy(&tx) - - if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { - return true - } - if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { - return true - } - if filter.StartPos != 0 && filter.StartPos > tx.StartPos { - return true - } - if filter.EndPos != 0 && filter.EndPos < tx.EndPos { - return true - } - if filter.BigThan != 0 && filter.BigThan > tx.Size { - return true - } - if filter.SmallThan != 0 && filter.SmallThan < tx.Size { - return true - } - if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 { - return true - } - - var txs []TxDetail - var matched bool - - for _, t := range tx.Txs { - includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table) - excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table) - - if t.Db == "" && t.Table == "" { - if includeMatcher != nil && !filter.IncludeBlank { - continue - } - if excludeMatcher != nil && filter.ExcludeBlank { - matched = true - if filter.PickTxAllIfMatch { - return true - } - continue - } - } - - if includeMatcher != nil { - if includeMatch { - matched = true - if filter.PickTxAllIfMatch { - return fn(tx) - } - txs = append(txs, t) - } - } else if excludeMatcher != nil { - if excludeMatch { - matched = true - if filter.PickTxAllIfMatch { - return true - } - } else { - txs = append(txs, t) - } - } else { - txs = append(txs, t) - } - } - - if matched { - tx.Txs = txs - } - if !matched && includeMatcher != nil { - return true - } - if len(tx.Txs) == 0 && matched { - return true - } - return fn(tx) - } - - for { - h, err := readEventHeader(r, parser, headBuf) - if err == io.EOF { - if !tx.Time.IsZero() || tx.Timestamp != 0 { - finalizeTx(&tx, filter.OnlyShowGtid) - callFn(tx) - } - return nil - } - if err != nil { - return err - } - - // GTID-only fast path - if filter.OnlyShowGtid { - if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT { - if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || - h.EventType == replication.TABLE_MAP_EVENT { - body, err := readEventBody(r, h) - if err != nil { - return err - } - if _, err = parseEvent(parser, h, headBuf, body); err != nil { - return err - } - } else { - if err := skipEventBody(r, h); err != nil { - return err - } - } - continue - } - body, err := readEventBody(r, h) - if err != nil { - return err - } - e, err := parseEvent(parser, h, headBuf, body) - if err != nil { - return err - } - - evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) - for _, ev := range evs { - if ev.Type != "gtid" { - continue - } - startPos := int(h.LogPos - h.EventSize) - - if filter.EndPos != 0 && startPos > filter.EndPos { - continue - } - if filter.StartPos != 0 && startPos < filter.StartPos { - continue - } - - if currentGtid != "" { - tx.EndPos = startPos - 1 - finalizeTx(&tx, true) - if !callFn(tx) { - return nil - } - if subGtid != nil { - if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { - return nil - } - } - tx = Transaction{} - } - - currentGtid = ev.Data - - if inGtid != nil { - if c, _ := inGtid.Contain(ev.Data); !c { - tx = Transaction{} - currentGtid = "" - continue - } - } - if exGtid != nil { - if c, _ := exGtid.Contain(ev.Data); c { - currentGtid = "" - tx = Transaction{} - continue - } - } - - tx = Transaction{ - GTID: ev.Data, - StartPos: startPos, - EndPos: startPos, - Timestamp: int64(h.Timestamp), - } - } - continue - } - - // 先处理GTID事件(决定当前事务是否命中) - if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT { - body, err := readEventBody(r, h) - if err != nil { - return err - } - e, err := parseEvent(parser, h, headBuf, body) - if err != nil { - return err - } - - evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) - for _, ev := range evs { - if ev.Type != "gtid" { - continue - } - startPos := int(h.LogPos - h.EventSize) - - if currentGtid != "" { - finalizeTx(&tx, false) - if !callFn(tx) { - return nil - } - if subGtid != nil { - if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { - return nil - } - } - tx = Transaction{} - } - - currentGtid = ev.Data - skipCurrentTxn = false - - if filter.EndPos != 0 && startPos > filter.EndPos { - skipCurrentTxn = true - } - if filter.StartPos != 0 && startPos < filter.StartPos { - skipCurrentTxn = true - } - if inGtid != nil { - if c, _ := inGtid.Contain(ev.Data); !c { - skipCurrentTxn = true - } - } - if exGtid != nil { - if c, _ := exGtid.Contain(ev.Data); c { - skipCurrentTxn = true - } - } - - if !skipCurrentTxn { - tx = Transaction{ - GTID: ev.Data, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Txs: make([]TxDetail, 0, 8), - sqlOrigin: make([]string, 0, 4), - } - } else { - tx = Transaction{} - } - } - continue - } - - // 未命中事务时:TABLE_MAP_EVENT 仍需解析(parser 缓存表元数据), - // 其余事件可安全跳过 - if skipCurrentTxn { - if h.EventType == replication.TABLE_MAP_EVENT || - h.EventType == replication.FORMAT_DESCRIPTION_EVENT { - body, err := readEventBody(r, h) - if err != nil { - return err - } - if _, err = parseEvent(parser, h, headBuf, body); err != nil { - return err - } - } else { - if err := skipEventBody(r, h); err != nil { - return err - } - } - continue - } - - body, err := readEventBody(r, h) - if err != nil { - return err - } - e, err := parseEvent(parser, h, headBuf, body) - if err != nil { - return err - } - - if h.EventType == replication.TABLE_MAP_EVENT { - tbMapPos = h.LogPos - h.EventSize - } - - evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) - for _, ev := range evs { - startPos := 0 - if ev.Type == "query" || ev.Type == "gtid" { - startPos = int(h.LogPos - h.EventSize) - } else { - startPos = int(tbMapPos) - } - - switch ev.Type { - case "": - tx.EndPos = int(h.LogPos) - - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) - - default: - tx.EndPos = int(h.LogPos) - status := STATUS_PREPARE - if ev.Type == "query" { - if equalFoldShort(ev.Data, "begin") { - if tx.TxStartTime == 0 { - tx.TxStartTime = int64(h.Timestamp) - } - tx.Status = STATUS_BEGIN - } else if equalFoldShort(ev.Data, "commit") { - tx.Status = STATUS_COMMIT - tx.TxEndTime = int64(h.Timestamp) - } else if equalFoldShort(ev.Data, "rollback") { - tx.Status = STATUS_ROLLBACK - tx.TxEndTime = int64(h.Timestamp) - } - tx.Status = status - } - if ev.DB != "" && ev.TB != "" { - tx.dmlEventCount++ - } - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: ev.DB, - Table: ev.TB, - Sql: ev.Data, - SqlType: ev.Type, - Rows: ev.Rows, - RowCount: int(ev.RowCnt), - Timestamp: int64(h.Timestamp), - CompressionType: ev.CompressionType, - }) - } - } - } -} -func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) { - if len(filter.IncludeTables) > 0 { - includeMatcher = buildTableMatcher(filter.IncludeTables) - } - if len(filter.ExcludeTables) > 0 { - excludeMatcher = buildTableMatcher(filter.ExcludeTables) - } - return includeMatcher, excludeMatcher -} - -func buildTableMatcher(patterns []string) *tableMatcher { - m := &tableMatcher{ - exactMatch: make(map[string]bool), - dbWildcard: make(map[string]bool), - tbWildcard: make(map[string]bool), - } - - for _, pattern := range patterns { - if pattern == "*.*" { - m.matchAll = true - continue - } - parts := strings.Split(pattern, ".") - if len(parts) != 2 { - continue - } - db, tb := parts[0], parts[1] - if db == "*" && tb == "*" { - m.matchAll = true - } else if db == "*" { - m.tbWildcard[tb] = true - } else if tb == "*" { - m.dbWildcard[db] = true - } else { - m.exactMatch[pattern] = true - } - } - return m -} - -func equalFoldShort(s, lower string) bool { - if len(s) != len(lower) { - return false - } - for i := 0; i < len(s); i++ { - c := s[i] - if 'A' <= c && c <= 'Z' { - c += 'a' - 'A' - } - if c != lower[i] { - return false - } - } - return true -} +// Parsing implementation is split across: +// - parse_types.go +// - parse_io.go +// - parse_event_convert.go +// - parse_stream.go +// - parse_filter.go diff --git a/parse_event_convert.go b/parse_event_convert.go new file mode 100644 index 0000000..47d68a4 --- /dev/null +++ b/parse_event_convert.go @@ -0,0 +1,257 @@ +package binlog + +import ( + "fmt" + + "b612.me/mysql/gtid" + "github.com/starainrt/go-mysql/mysql" + "github.com/starainrt/go-mysql/replication" +) + +func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { + var buf [1]BinlogEvent + sig := &buf[0] + + switch ev.Header.EventType { + case replication.ANONYMOUS_GTID_EVENT: + sig.Data = "anonymous-gtid-event:1" + sig.Type = "gtid" + + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + wrEvent, ok := ev.Event.(*replication.RowsEvent) + if !ok { + return nil + } + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "insert" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = normalizeRowsByUnsigned(wrEvent) + sig.ColumnTypes = cloneColumnTypes(wrEvent.Table) + sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table) + + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + wrEvent, ok := ev.Event.(*replication.RowsEvent) + if !ok { + return nil + } + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "update" + sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 + sig.Rows = normalizeRowsByUnsigned(wrEvent) + sig.ColumnTypes = cloneColumnTypes(wrEvent.Table) + sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table) + + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + wrEvent, ok := ev.Event.(*replication.RowsEvent) + if !ok { + return nil + } + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "delete" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = normalizeRowsByUnsigned(wrEvent) + sig.ColumnTypes = cloneColumnTypes(wrEvent.Table) + sig.ColumnCollationIDs = buildColumnCollationIDs(wrEvent.Table) + + case replication.TABLE_MAP_EVENT: + tableEvent, ok := ev.Event.(*replication.TableMapEvent) + if !ok { + return nil + } + sig.DB = string(tableEvent.Schema) + sig.TB = string(tableEvent.Table) + sig.Type = "tablemap" + + case replication.ROWS_QUERY_EVENT: + queryEvent, ok := ev.Event.(*replication.RowsQueryEvent) + if !ok { + return nil + } + sig.Data = string(queryEvent.Query) + sig.Type = "rowsquery" + + case replication.QUERY_EVENT: + queryEvent, ok := ev.Event.(*replication.QueryEvent) + if !ok { + return nil + } + sig.DB = string(queryEvent.Schema) + sig.Data = string(queryEvent.Query) + sig.Type = "query" + + case replication.MARIADB_GTID_EVENT: + sig.Data = "begin" + sig.Type = "query" + + case replication.XID_EVENT: + sig.Data = "commit" + sig.Type = "query" + + case replication.GTID_EVENT: + ge, ok := ev.Event.(*replication.GTIDEvent) + if !ok { + return nil + } + gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) + if err != nil { + sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO) + } else { + sig.Data = gid.String() + } + sig.Type = "gtid" + + case replication.TRANSACTION_PAYLOAD_EVENT: + ge, ok := ev.Event.(*replication.TransactionPayloadEvent) + if !ok { + return nil + } + res := make([]BinlogEvent, 0, len(ge.Events)) + for _, val := range ge.Events { + res = append(res, ParseBinlogEvent(val)...) + } + compressionType := getCompressionTypeName(ge.CompressionType) + for idx := range res { + res[idx].CompressionType = compressionType + } + return res + } + + // 返回栈上数组的切片。调用方在当前迭代内立即消费,不持有跨迭代引用,安全。 + return buf[:] +} + +func normalizeRowsByUnsigned(wrEvent *replication.RowsEvent) [][]interface{} { + if wrEvent == nil || wrEvent.Table == nil || len(wrEvent.Rows) == 0 { + if wrEvent == nil { + return nil + } + return wrEvent.Rows + } + + unsignedMap := wrEvent.Table.UnsignedMap() + if len(unsignedMap) == 0 { + return wrEvent.Rows + } + + columnTypes := wrEvent.Table.ColumnType + if len(columnTypes) == 0 { + return wrEvent.Rows + } + + for rowIdx := range wrEvent.Rows { + row := wrEvent.Rows[rowIdx] + for colIdx := range row { + if !unsignedMap[colIdx] { + continue + } + if colIdx >= len(columnTypes) { + continue + } + row[colIdx] = normalizeUnsignedValue(row[colIdx], columnTypes[colIdx]) + } + } + return wrEvent.Rows +} + +func cloneColumnTypes(table *replication.TableMapEvent) []int { + if table == nil || len(table.ColumnType) == 0 { + return nil + } + ret := make([]int, len(table.ColumnType)) + for i, t := range table.ColumnType { + ret[i] = int(t) + } + return ret +} + +func buildColumnCollationIDs(table *replication.TableMapEvent) []uint64 { + if table == nil { + return nil + } + + columnCount := len(table.ColumnType) + if columnCount == 0 && table.ColumnCount > 0 { + columnCount = int(table.ColumnCount) + } + if columnCount == 0 { + return nil + } + + ret := make([]uint64, columnCount) + hasValue := false + + for idx, collationID := range table.CollationMap() { + if idx < 0 || idx >= columnCount { + continue + } + ret[idx] = collationID + hasValue = hasValue || collationID != 0 + } + for idx, collationID := range table.EnumSetCollationMap() { + if idx < 0 || idx >= columnCount { + continue + } + if ret[idx] == 0 { + ret[idx] = collationID + } + hasValue = hasValue || collationID != 0 + } + + if !hasValue { + return nil + } + return ret +} + +func normalizeUnsignedValue(v interface{}, colType byte) interface{} { + signed, ok := signedToInt64(v) + if !ok { + return v + } + + switch colType { + case mysql.MYSQL_TYPE_TINY: + return uint8(signed) + case mysql.MYSQL_TYPE_SHORT: + return uint16(signed) + case mysql.MYSQL_TYPE_INT24: + return uint32(uint32(int32(signed)) & 0x00FFFFFF) + case mysql.MYSQL_TYPE_LONG: + return uint32(signed) + case mysql.MYSQL_TYPE_LONGLONG: + return uint64(signed) + default: + return v + } +} + +func signedToInt64(v interface{}) (int64, bool) { + switch x := v.(type) { + case int8: + return int64(x), true + case int16: + return int64(x), true + case int32: + return int64(x), true + case int64: + return x, true + case int: + return int64(x), true + default: + return 0, false + } +} + +func getCompressionTypeName(code uint64) string { + switch code { + case CompressionZSTD: + return "ZSTD" + case CompressionNone: + return "" + default: + return fmt.Sprintf("UNKNOWN(%d)", code) + } +} diff --git a/parse_event_convert_payload_test.go b/parse_event_convert_payload_test.go new file mode 100644 index 0000000..0a8bb71 --- /dev/null +++ b/parse_event_convert_payload_test.go @@ -0,0 +1,73 @@ +package binlog + +import ( + "testing" + + "github.com/starainrt/go-mysql/mysql" + "github.com/starainrt/go-mysql/replication" +) + +func TestParseBinlogEvent_TableMapEvent(t *testing.T) { + ev := &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.TABLE_MAP_EVENT}, + Event: &replication.TableMapEvent{ + Schema: []byte("db1"), + Table: []byte("tb1"), + }, + } + + events := ParseBinlogEvent(ev) + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + if events[0].Type != "tablemap" { + t.Fatalf("expected tablemap event type, got %q", events[0].Type) + } + if events[0].DB != "db1" || events[0].TB != "tb1" { + t.Fatalf("unexpected db/table: %s.%s", events[0].DB, events[0].TB) + } +} + +func TestParseBinlogEvent_TransactionPayloadContainsTableMap(t *testing.T) { + table := &replication.TableMapEvent{ + Schema: []byte("db2"), + Table: []byte("tb2"), + ColumnType: []byte{mysql.MYSQL_TYPE_LONG}, + } + + payload := &replication.TransactionPayloadEvent{ + CompressionType: CompressionZSTD, + Events: []*replication.BinlogEvent{ + { + Header: &replication.EventHeader{EventType: replication.TABLE_MAP_EVENT}, + Event: table, + }, + { + Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2}, + Event: &replication.RowsEvent{ + Table: table, + Rows: [][]interface{}{{int32(1)}}, + }, + }, + }, + } + + ev := &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.TRANSACTION_PAYLOAD_EVENT}, + Event: payload, + } + + events := ParseBinlogEvent(ev) + if len(events) != 2 { + t.Fatalf("expected 2 events from payload, got %d", len(events)) + } + if events[0].Type != "tablemap" { + t.Fatalf("expected first payload event to be tablemap, got %q", events[0].Type) + } + if events[1].Type != "insert" { + t.Fatalf("expected second payload event to be insert, got %q", events[1].Type) + } + if events[0].CompressionType != "ZSTD" || events[1].CompressionType != "ZSTD" { + t.Fatalf("expected payload events to carry compression type, got %q/%q", events[0].CompressionType, events[1].CompressionType) + } +} diff --git a/parse_event_convert_unsigned_test.go b/parse_event_convert_unsigned_test.go new file mode 100644 index 0000000..8b9aea7 --- /dev/null +++ b/parse_event_convert_unsigned_test.go @@ -0,0 +1,95 @@ +package binlog + +import ( + "testing" + + "github.com/starainrt/go-mysql/mysql" + "github.com/starainrt/go-mysql/replication" +) + +func TestNormalizeRowsByUnsigned_AllIntegerKinds(t *testing.T) { + event := &replication.RowsEvent{ + Table: &replication.TableMapEvent{ + ColumnCount: 5, + ColumnType: []byte{mysql.MYSQL_TYPE_TINY, mysql.MYSQL_TYPE_SHORT, mysql.MYSQL_TYPE_INT24, mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_LONGLONG}, + SignednessBitmap: []byte{0xF8}, + }, + Rows: [][]interface{}{{int8(-1), int16(-2), int32(-1), int32(-1), int64(-1)}}, + } + + got := normalizeRowsByUnsigned(event) + row := got[0] + + if v, ok := row[0].(uint8); !ok || v != 255 { + t.Fatalf("tiny unsigned mismatch: %T %v", row[0], row[0]) + } + if v, ok := row[1].(uint16); !ok || v != 65534 { + t.Fatalf("short unsigned mismatch: %T %v", row[1], row[1]) + } + if v, ok := row[2].(uint32); !ok || v != 16777215 { + t.Fatalf("int24 unsigned mismatch: %T %v", row[2], row[2]) + } + if v, ok := row[3].(uint32); !ok || v != 4294967295 { + t.Fatalf("long unsigned mismatch: %T %v", row[3], row[3]) + } + if v, ok := row[4].(uint64); !ok || v != 18446744073709551615 { + t.Fatalf("longlong unsigned mismatch: %T %v", row[4], row[4]) + } +} + +func TestNormalizeRowsByUnsigned_NoSignednessMetadata(t *testing.T) { + event := &replication.RowsEvent{ + Table: &replication.TableMapEvent{ + ColumnCount: 1, + ColumnType: []byte{mysql.MYSQL_TYPE_LONGLONG}, + }, + Rows: [][]interface{}{{int64(-1)}}, + } + + got := normalizeRowsByUnsigned(event) + if v, ok := got[0][0].(int64); !ok || v != -1 { + t.Fatalf("value should remain signed when metadata missing: %T %v", got[0][0], got[0][0]) + } +} + +func TestParseBinlogEvent_IncludeColumnMetadata(t *testing.T) { + event := &replication.RowsEvent{ + Table: &replication.TableMapEvent{ + ColumnCount: 3, + ColumnType: []byte{mysql.MYSQL_TYPE_VAR_STRING, mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_VAR_STRING}, + DefaultCharset: []uint64{45}, // utf8mb4_general_ci + }, + Rows: [][]interface{}{{"name", int32(1), "desc"}}, + } + + ev := &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2}, + Event: event, + } + + events := ParseBinlogEvent(ev) + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + got := events[0] + + if len(got.ColumnTypes) != 3 { + t.Fatalf("unexpected column type length: %d", len(got.ColumnTypes)) + } + if got.ColumnTypes[0] != int(mysql.MYSQL_TYPE_VAR_STRING) || got.ColumnTypes[1] != int(mysql.MYSQL_TYPE_LONG) { + t.Fatalf("unexpected column types: %v", got.ColumnTypes) + } + + if len(got.ColumnCollationIDs) != 3 { + t.Fatalf("unexpected column collation length: %d", len(got.ColumnCollationIDs)) + } + if got.ColumnCollationIDs[0] != 45 { + t.Fatalf("unexpected collation for column 0: %d", got.ColumnCollationIDs[0]) + } + if got.ColumnCollationIDs[2] != 45 { + t.Fatalf("unexpected collation for column 2: %d", got.ColumnCollationIDs[2]) + } + if got.ColumnCollationIDs[1] != 0 { + t.Fatalf("non-character column should keep zero collation: %d", got.ColumnCollationIDs[1]) + } +} diff --git a/parse_filter.go b/parse_filter.go new file mode 100644 index 0000000..b2479a0 --- /dev/null +++ b/parse_filter.go @@ -0,0 +1,572 @@ +package binlog + +import ( + "bufio" + "fmt" + "io" + "os" + "strings" + + "b612.me/mysql/gtid" + "b612.me/staros" + "github.com/starainrt/go-mysql/replication" +) + +func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { + if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) { + return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time") + } + + if !staros.Exists(path) { + return os.ErrNotExist + } + + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + parser := replication.NewBinlogParser() + parser.SetParseTime(false) + parser.SetUseDecimal(false) + + if pos != 0 { + if err := seekToPosition(f, parser, pos); err != nil { + return err + } + } else { + if err := validateBinlogHeader(f); err != nil { + return err + } + } + + br := bufio.NewReaderSize(f, defaultReadBufSize) + return parseBinlogWithFilter(br, parser, filter, fx) +} + +func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { + if hasConfiguredTablePatterns(filter.IncludeTables) && hasConfiguredTablePatterns(filter.ExcludeTables) { + return fmt.Errorf("invalid filter: include-tables and exclude-tables cannot be set at the same time") + } + + var subGtid, inGtid, exGtid *gtid.Gtid + var err error + + includeMatcher, excludeMatcher, err := prepareTableMatchers(filter) + if err != nil { + return err + } + + if filter.IncludeGtid != "" { + inGtid, err = gtid.Parse(filter.IncludeGtid) + if err != nil { + return fmt.Errorf("parse include gtid failed: %w", err) + } + subGtid = inGtid.Clone() + } + if filter.ExcludeGtid != "" { + exGtid, err = gtid.Parse(filter.ExcludeGtid) + if err != nil { + return fmt.Errorf("parse exclude gtid failed: %w", err) + } + } + + var ( + tbMapPos uint32 + skipCurrentTxn bool + tx Transaction + headBuf = make([]byte, replication.EventHeaderSize) + ) + currentGtid := "" + + callFn := func(tx Transaction) bool { + if fn == nil { + return true + } + + fillTimeLazy(&tx) + + if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { + return true + } + if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { + return true + } + if filter.StartPos != 0 && filter.StartPos > tx.StartPos { + return true + } + if filter.EndPos != 0 && filter.EndPos < tx.EndPos { + return true + } + if filter.BigThan != 0 && filter.BigThan > tx.Size { + return true + } + if filter.SmallThan != 0 && filter.SmallThan < tx.Size { + return true + } + if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 { + return true + } + + txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, excludeMatcher, filter) + if pickAll { + return fn(tx) + } + if skipAll { + return true + } + + if matched { + tx.Txs = txs + recomputeTxStatsFromVisibleDetails(&tx) + } + if !matched && includeMatcher != nil { + return true + } + if len(tx.Txs) == 0 && matched { + return true + } + return fn(tx) + } + + for { + h, err := readEventHeader(r, parser, headBuf) + if err == io.EOF { + if currentGtid != "" { + finalizeTx(&tx, filter.OnlyShowGtid) + callFn(tx) + } + return nil + } + if err != nil { + return err + } + + // GTID-only fast path + if filter.OnlyShowGtid { + if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT { + if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || + h.EventType == replication.TABLE_MAP_EVENT { + body, err := readEventBody(r, h) + if err != nil { + return err + } + if _, err = parseEvent(parser, h, headBuf, body); err != nil { + return err + } + } else { + if err := skipEventBody(r, h); err != nil { + return err + } + } + continue + } + body, err := readEventBody(r, h) + if err != nil { + return err + } + e, err := parseEvent(parser, h, headBuf, body) + if err != nil { + return err + } + + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + for _, ev := range evs { + if ev.Type != "gtid" { + continue + } + startPos := int(h.LogPos - h.EventSize) + + if filter.EndPos != 0 && startPos > filter.EndPos { + continue + } + if filter.StartPos != 0 && startPos < filter.StartPos { + continue + } + + if currentGtid != "" { + tx.EndPos = startPos - 1 + finalizeTx(&tx, true) + if !callFn(tx) { + return nil + } + if subGtid != nil { + if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { + return nil + } + } + tx = Transaction{} + } + + currentGtid = ev.Data + + if inGtid != nil { + if c, _ := inGtid.Contain(ev.Data); !c { + tx = Transaction{} + currentGtid = "" + continue + } + } + if exGtid != nil { + if c, _ := exGtid.Contain(ev.Data); c { + currentGtid = "" + tx = Transaction{} + continue + } + } + + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + EndPos: startPos, + Timestamp: int64(h.Timestamp), + } + } + continue + } + + // 先处理 GTID 事件(决定当前事务是否命中) + if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT { + body, err := readEventBody(r, h) + if err != nil { + return err + } + e, err := parseEvent(parser, h, headBuf, body) + if err != nil { + return err + } + + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + for _, ev := range evs { + if ev.Type != "gtid" { + continue + } + startPos := int(h.LogPos - h.EventSize) + + if currentGtid != "" { + finalizeTx(&tx, false) + if !callFn(tx) { + return nil + } + if subGtid != nil { + if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { + return nil + } + } + tx = Transaction{} + } + + currentGtid = ev.Data + skipCurrentTxn = false + + if filter.EndPos != 0 && startPos > filter.EndPos { + skipCurrentTxn = true + } + if filter.StartPos != 0 && startPos < filter.StartPos { + skipCurrentTxn = true + } + if inGtid != nil { + if c, _ := inGtid.Contain(ev.Data); !c { + skipCurrentTxn = true + } + } + if exGtid != nil { + if c, _ := exGtid.Contain(ev.Data); c { + skipCurrentTxn = true + } + } + + if !skipCurrentTxn { + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Txs: make([]TxDetail, 0, 8), + sqlOrigin: make([]string, 0, 4), + } + } else { + tx = Transaction{} + } + } + continue + } + + // 未命中事务时,TABLE_MAP_EVENT 仍需解析(parser 缓存表元数据), + // 其余事件可安全跳过 + if skipCurrentTxn { + if h.EventType == replication.TABLE_MAP_EVENT || + h.EventType == replication.FORMAT_DESCRIPTION_EVENT { + body, err := readEventBody(r, h) + if err != nil { + return err + } + if _, err = parseEvent(parser, h, headBuf, body); err != nil { + return err + } + } else { + if err := skipEventBody(r, h); err != nil { + return err + } + } + continue + } + + body, err := readEventBody(r, h) + if err != nil { + return err + } + e, err := parseEvent(parser, h, headBuf, body) + if err != nil { + return err + } + + if h.EventType == replication.TABLE_MAP_EVENT { + tbMapPos = h.LogPos - h.EventSize + } + + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + for _, ev := range evs { + startPos := 0 + if ev.Type == "query" || ev.Type == "gtid" { + startPos = int(h.LogPos - h.EventSize) + } else { + startPos = int(tbMapPos) + } + + switch ev.Type { + case "": + tx.EndPos = int(h.LogPos) + + case "tablemap": + tx.EndPos = int(h.LogPos) + tbMapPos = h.LogPos - h.EventSize + + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) + + default: + tx.EndPos = int(h.LogPos) + if ev.Type == "query" { + if equalFoldShort(ev.Data, "begin") { + if tx.TxStartTime == 0 { + tx.TxStartTime = int64(h.Timestamp) + } + tx.Status = STATUS_BEGIN + } else if equalFoldShort(ev.Data, "commit") { + tx.Status = STATUS_COMMIT + tx.TxEndTime = int64(h.Timestamp) + } else if equalFoldShort(ev.Data, "rollback") { + tx.Status = STATUS_ROLLBACK + tx.TxEndTime = int64(h.Timestamp) + } + } + if ev.DB != "" && ev.TB != "" { + tx.dmlEventCount++ + } + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: ev.DB, + Table: ev.TB, + Sql: ev.Data, + SqlType: ev.Type, + Rows: ev.Rows, + ColumnTypes: ev.ColumnTypes, + ColumnCollationIDs: ev.ColumnCollationIDs, + RowCount: int(ev.RowCnt), + Timestamp: int64(h.Timestamp), + CompressionType: ev.CompressionType, + }) + } + } + } +} + +func selectVisibleTxDetails(tx Transaction, includeMatcher, excludeMatcher *tableMatcher, filter BinlogFilter) ([]TxDetail, bool, bool, bool) { + txs := make([]TxDetail, 0, len(tx.Txs)) + matched := false + + for _, t := range tx.Txs { + includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table) + excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table) + + if t.Db == "" && t.Table == "" { + if includeMatcher != nil { + if filter.IncludeBlank { + matched = true + if filter.PickTxAllIfMatch { + return nil, true, true, false + } + txs = append(txs, t) + } + continue + } + + if excludeMatcher != nil { + if filter.ExcludeBlank { + matched = true + if filter.PickTxAllIfMatch { + return nil, true, false, true + } + continue + } + txs = append(txs, t) + continue + } + + txs = append(txs, t) + continue + } + + if includeMatcher != nil { + if includeMatch { + matched = true + if filter.PickTxAllIfMatch { + return nil, true, true, false + } + txs = append(txs, t) + } + continue + } + + if excludeMatcher != nil { + if excludeMatch { + matched = true + if filter.PickTxAllIfMatch { + return nil, true, false, true + } + continue + } + txs = append(txs, t) + continue + } + + txs = append(txs, t) + } + + return txs, matched, false, false +} + +func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher, err error) { + if len(filter.IncludeTables) > 0 { + includeMatcher, err = buildTableMatcher(filter.IncludeTables) + if err != nil { + return nil, nil, fmt.Errorf("invalid include-tables: %w", err) + } + } + if len(filter.ExcludeTables) > 0 { + excludeMatcher, err = buildTableMatcher(filter.ExcludeTables) + if err != nil { + return nil, nil, fmt.Errorf("invalid exclude-tables: %w", err) + } + } + return includeMatcher, excludeMatcher, nil +} + +func buildTableMatcher(patterns []string) (*tableMatcher, error) { + m := &tableMatcher{ + exactMatch: make(map[string]bool), + dbWildcard: make(map[string]bool), + tbWildcard: make(map[string]bool), + } + + for _, pattern := range patterns { + origin := pattern + pattern = strings.ToLower(strings.TrimSpace(pattern)) + if pattern == "" { + continue + } + if pattern == "*.*" { + m.matchAll = true + continue + } + parts := strings.Split(pattern, ".") + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return nil, fmt.Errorf("invalid table pattern %q: expect db.table", strings.TrimSpace(origin)) + } + db, tb := parts[0], parts[1] + if db != "*" && strings.Contains(db, "*") { + return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full db segment", strings.TrimSpace(origin)) + } + if tb != "*" && strings.Contains(tb, "*") { + return nil, fmt.Errorf("invalid table pattern %q: wildcard '*' must occupy full table segment", strings.TrimSpace(origin)) + } + if db == "*" && tb == "*" { + m.matchAll = true + } else if db == "*" { + m.tbWildcard[tb] = true + } else if tb == "*" { + m.dbWildcard[db] = true + } else { + m.exactMatch[db+"."+tb] = true + } + } + return m, nil +} + +func hasConfiguredTablePatterns(patterns []string) bool { + for _, p := range patterns { + if strings.TrimSpace(p) != "" { + return true + } + } + return false +} + +func recomputeTxStatsFromVisibleDetails(tx *Transaction) { + if tx == nil { + return + } + if len(tx.Txs) == 0 { + tx.RowsCount = 0 + tx.Size = 0 + return + } + + firstSet := false + minStart := 0 + maxEnd := 0 + rows := 0 + for _, d := range tx.Txs { + rows += d.RowCount + if !firstSet { + minStart = d.StartPos + maxEnd = d.EndPos + firstSet = true + continue + } + if d.StartPos < minStart { + minStart = d.StartPos + } + if d.EndPos > maxEnd { + maxEnd = d.EndPos + } + } + + tx.RowsCount = rows + tx.StartPos = minStart + tx.EndPos = maxEnd + if maxEnd > minStart { + tx.Size = maxEnd - minStart + } else { + tx.Size = 0 + } +} + +func equalFoldShort(s, lower string) bool { + if len(s) != len(lower) { + return false + } + for i := 0; i < len(s); i++ { + c := s[i] + if 'A' <= c && c <= 'Z' { + c += 'a' - 'A' + } + if c != lower[i] { + return false + } + } + return true +} diff --git a/parse_filter_table_test.go b/parse_filter_table_test.go new file mode 100644 index 0000000..6e0c8cf --- /dev/null +++ b/parse_filter_table_test.go @@ -0,0 +1,85 @@ +package binlog + +import ( + "strings" + "testing" +) + +func TestBuildTableMatcher_InvalidPattern(t *testing.T) { + cases := [][]string{ + {"db"}, + {"db."}, + {".tb"}, + {"db.tb.more"}, + {"db.t*"}, + {"d*.tb"}, + } + + for _, patterns := range cases { + if _, err := buildTableMatcher(patterns); err == nil { + t.Fatalf("expected invalid pattern error, got nil: %v", patterns) + } + } +} + +func TestPrepareTableMatchers_ReturnErrorOnInvalidPattern(t *testing.T) { + _, _, err := prepareTableMatchers(BinlogFilter{IncludeTables: []string{"invalid"}}) + if err == nil { + t.Fatal("expected include-tables error, got nil") + } + if !strings.Contains(err.Error(), "invalid include-tables") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestSelectVisibleTxDetails_IncludeBlank(t *testing.T) { + includeMatcher, err := buildTableMatcher([]string{"db1.tb1"}) + if err != nil { + t.Fatalf("build include matcher failed: %v", err) + } + + tx := Transaction{Txs: []TxDetail{ + {SqlType: "query", Sql: "BEGIN"}, + {SqlType: "insert", Db: "db1", Table: "tb1", RowCount: 1}, + }} + + txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, includeMatcher, nil, BinlogFilter{IncludeBlank: true}) + if !matched { + t.Fatal("expected matched=true") + } + if pickAll || skipAll { + t.Fatalf("unexpected pickAll/skipAll: %v/%v", pickAll, skipAll) + } + if len(txs) != 2 { + t.Fatalf("expected 2 details with IncludeBlank=true, got %d", len(txs)) + } + if txs[0].SqlType != "query" || txs[1].Table != "tb1" { + t.Fatalf("unexpected details order/content: %#v", txs) + } +} + +func TestSelectVisibleTxDetails_ExcludeBlank(t *testing.T) { + excludeMatcher, err := buildTableMatcher([]string{"db2.tb2"}) + if err != nil { + t.Fatalf("build exclude matcher failed: %v", err) + } + + tx := Transaction{Txs: []TxDetail{ + {SqlType: "query", Sql: "BEGIN"}, + {SqlType: "insert", Db: "db1", Table: "tb1", RowCount: 1}, + }} + + txs, matched, pickAll, skipAll := selectVisibleTxDetails(tx, nil, excludeMatcher, BinlogFilter{ExcludeBlank: true}) + if !matched { + t.Fatal("expected matched=true when excluding blank detail") + } + if pickAll || skipAll { + t.Fatalf("unexpected pickAll/skipAll: %v/%v", pickAll, skipAll) + } + if len(txs) != 1 { + t.Fatalf("expected 1 detail after ExcludeBlank=true, got %d", len(txs)) + } + if txs[0].Db != "db1" || txs[0].Table != "tb1" { + t.Fatalf("unexpected remaining detail: %#v", txs[0]) + } +} diff --git a/parse_io.go b/parse_io.go new file mode 100644 index 0000000..41625cb --- /dev/null +++ b/parse_io.go @@ -0,0 +1,150 @@ +package binlog + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" + "os" + "sync" + + "github.com/starainrt/go-mysql/replication" +) + +func validateBinlogHeader(f *os.File) error { + const fileTypeBytes = int64(4) + b := make([]byte, fileTypeBytes) + + if _, err := f.Read(b); err != nil { + return fmt.Errorf("read binlog header failed: %w", err) + } + if !bytes.Equal(b, replication.BinLogFileHeader) { + return ErrInvalidBinlogHeader + } + if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil { + return fmt.Errorf("seek after header failed: %w", err) + } + return nil +} + +func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) { + if _, err := io.ReadFull(r, headBuf); err != nil { + return nil, err + } + h, err := parser.ParseHeader(headBuf) + if err != nil { + return nil, fmt.Errorf("parse header failed: %w", err) + } + if h.EventSize <= uint32(replication.EventHeaderSize) { + return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize) + } + return h, nil +} + +func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) { + bodyLen := int(h.EventSize) - replication.EventHeaderSize + body := make([]byte, bodyLen) + if _, err := io.ReadFull(r, body); err != nil { + return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen) + } + return body, nil +} + +func skipEventBody(r io.Reader, h *replication.EventHeader) error { + bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize) + if bodyLen <= 0 { + return nil + } + if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil { + return fmt.Errorf("skip event body failed: %w", err) + } + return nil +} + +var rawDataPool = sync.Pool{ + New: func() any { + b := make([]byte, 0, 64*1024) + return &b + }, +} + +func getRawDataBuf(n int) []byte { + p := rawDataPool.Get().(*[]byte) + if cap(*p) < n { + return make([]byte, n) + } + return (*p)[:n] +} + +func putRawDataBuf(b []byte) { + if cap(b) > maxPooledRawDataCap { + return + } + b = b[:0] + rawDataPool.Put(&b) +} + +func formatBodyPreview(body []byte, maxBytes int) string { + if maxBytes <= 0 { + maxBytes = 256 + } + if len(body) == 0 { + return "len=0" + } + previewLen := len(body) + truncated := false + if previewLen > maxBytes { + previewLen = maxBytes + truncated = true + } + hexBody := hex.EncodeToString(body[:previewLen]) + if truncated { + return fmt.Sprintf("len=%d preview(hex,%dB)=%s...", len(body), previewLen, hexBody) + } + return fmt.Sprintf("len=%d preview(hex,%dB)=%s", len(body), previewLen, hexBody) +} + +func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) { + rawLen := len(headBuf) + len(body) + rawData := getRawDataBuf(rawLen) + copy(rawData, headBuf) + copy(rawData[len(headBuf):], body) + + e, err := parser.ParseEvent(h, body, rawData) + putRawDataBuf(rawData) + if err != nil { + return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Body %s, Err: %w", + h.LogPos, h, formatBodyPreview(body, 256), err) + } + return e, nil +} + +func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error { + if err := validateBinlogHeader(f); err != nil { + return err + } + + headBuf := make([]byte, replication.EventHeaderSize) + for { + h, err := readEventHeader(f, parser, headBuf) + if err != nil { + return fmt.Errorf("seek to position failed: %w", err) + } + body, err := readEventBody(f, h) + if err != nil { + return err + } + _, err = parseEvent(parser, h, headBuf, body) + if err != nil { + return err + } + if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { + break + } + } + + if _, err := f.Seek(pos, io.SeekStart); err != nil { + return fmt.Errorf("seek to pos %d failed: %w", pos, err) + } + return nil +} diff --git a/parse_io_test.go b/parse_io_test.go new file mode 100644 index 0000000..8ac6234 --- /dev/null +++ b/parse_io_test.go @@ -0,0 +1,33 @@ +package binlog + +import ( + "strings" + "testing" +) + +func TestFormatBodyPreview(t *testing.T) { + if got := formatBodyPreview(nil, 256); got != "len=0" { + t.Fatalf("unexpected empty preview: %q", got) + } + + small := []byte{0x01, 0x02, 0xAB} + got := formatBodyPreview(small, 8) + if !strings.Contains(got, "len=3") || !strings.Contains(got, "0102ab") { + t.Fatalf("unexpected preview for small body: %q", got) + } + if strings.Contains(got, "...") { + t.Fatalf("small body should not be truncated: %q", got) + } + + large := make([]byte, 300) + for i := range large { + large[i] = byte(i) + } + got = formatBodyPreview(large, 16) + if !strings.Contains(got, "len=300") || !strings.Contains(got, "preview(hex,16B)=") { + t.Fatalf("unexpected preview for large body: %q", got) + } + if !strings.HasSuffix(got, "...") { + t.Fatalf("large body should be truncated with ellipsis: %q", got) + } +} diff --git a/parse_stream.go b/parse_stream.go new file mode 100644 index 0000000..7e13581 --- /dev/null +++ b/parse_stream.go @@ -0,0 +1,176 @@ +package binlog + +import ( + "bufio" + "io" + "os" + "time" + + "b612.me/staros" + "github.com/starainrt/go-mysql/replication" +) + +func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { + return parseOneBinlog(path, fx) +} + +func parseOneBinlog(path string, fx func(Transaction) bool) error { + if !staros.Exists(path) { + return os.ErrNotExist + } + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + if err := validateBinlogHeader(f); err != nil { + return err + } + + br := bufio.NewReaderSize(f, defaultReadBufSize) + return parseBinlogDetail(br, fx) +} + +func finalizeTx(tx *Transaction, onlyShowGtid bool) { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v + } + + if onlyShowGtid { + tx.Size = 0 + } else { + tx.Size = tx.EndPos - tx.StartPos + } +} + +func fillTimeLazy(tx *Transaction) { + if tx.Timestamp != 0 && tx.Time.IsZero() { + tx.Time = time.Unix(tx.Timestamp, 0) + } + for i := range tx.Txs { + if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() { + tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0) + } + } +} + +func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { + parser := replication.NewBinlogParser() + parser.SetParseTime(false) + parser.SetUseDecimal(false) + + var ( + tbMapPos uint32 + tx Transaction + headBuf = make([]byte, replication.EventHeaderSize) + ) + currentGtid := "" + + for { + h, err := readEventHeader(r, parser, headBuf) + if err == io.EOF { + if currentGtid != "" { + finalizeTx(&tx, false) + fillTimeLazy(&tx) + if f != nil { + f(tx) + } + } + return nil + } + if err != nil { + return err + } + + body, err := readEventBody(r, h) + if err != nil { + return err + } + + e, err := parseEvent(parser, h, headBuf, body) + if err != nil { + return err + } + + if h.EventType == replication.TABLE_MAP_EVENT { + tbMapPos = h.LogPos - h.EventSize + } + + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + for _, ev := range evs { + startPos := 0 + if ev.Type == "query" || ev.Type == "gtid" { + startPos = int(h.LogPos - h.EventSize) + } else { + startPos = int(tbMapPos) + } + + switch ev.Type { + case "gtid": + if currentGtid != "" { + finalizeTx(&tx, false) + fillTimeLazy(&tx) + if f != nil && !f(tx) { + return nil + } + } + currentGtid = ev.Data + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Txs: make([]TxDetail, 0, 8), + sqlOrigin: make([]string, 0, 4), + } + case "": + tx.EndPos = int(h.LogPos) + case "tablemap": + tx.EndPos = int(h.LogPos) + tbMapPos = h.LogPos - h.EventSize + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) + default: + tx.EndPos = int(h.LogPos) + if ev.Type == "query" { + if equalFoldShort(ev.Data, "begin") { + if tx.TxStartTime == 0 { + tx.TxStartTime = int64(h.Timestamp) + } + tx.Status = STATUS_BEGIN + } else if equalFoldShort(ev.Data, "commit") { + tx.Status = STATUS_COMMIT + tx.TxEndTime = int64(h.Timestamp) + } else if equalFoldShort(ev.Data, "rollback") { + tx.Status = STATUS_ROLLBACK + tx.TxEndTime = int64(h.Timestamp) + } + } + if ev.DB != "" && ev.TB != "" { + tx.dmlEventCount++ + } + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: ev.DB, + Table: ev.TB, + Sql: ev.Data, + SqlType: ev.Type, + Rows: ev.Rows, + ColumnTypes: ev.ColumnTypes, + ColumnCollationIDs: ev.ColumnCollationIDs, + RowCount: int(ev.RowCnt), + Timestamp: int64(h.Timestamp), + CompressionType: ev.CompressionType, + }) + } + } + } +} diff --git a/parse_types.go b/parse_types.go new file mode 100644 index 0000000..cdedc60 --- /dev/null +++ b/parse_types.go @@ -0,0 +1,126 @@ +package binlog + +import ( + "errors" + "strings" + "time" +) + +var ( + ErrInvalidBinlogHeader = errors.New("invalid binlog file header") + ErrEventTooSmall = errors.New("event size too small") +) + +const ( + CompressionNone uint64 = 255 + CompressionZSTD uint64 = 0 +) + +const ( + maxPooledRawDataCap = 4 << 20 // 4MB + defaultReadBufSize = 1 << 20 // 1MB +) + +type TxDetail struct { + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + RowCount int `json:"rowCount"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + Sql string `json:"sql"` + Db string `json:"db"` + Table string `json:"table"` + SqlType string `json:"sqlType"` + CompressionType string `json:"compressionType"` + Rows [][]interface{} `json:"rows"` + ColumnTypes []int `json:"columnTypes,omitempty"` + ColumnCollationIDs []uint64 `json:"columnCollationIds,omitempty"` +} + +const ( + STATUS_PREPARE uint8 = iota + STATUS_BEGIN + STATUS_COMMIT + STATUS_ROLLBACK +) + +type Transaction struct { + GTID string `json:"gtid"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + Size int `json:"size"` + RowsCount int `json:"rowsCount"` + Status uint8 `json:"status"` + TxStartTime int64 `json:"txStartTime"` + TxEndTime int64 `json:"txEndTime"` + sqlOrigin []string `json:"sqlOrigin"` + Txs []TxDetail `json:"txs"` + dmlEventCount int +} + +func (t Transaction) GetSqlOrigin() []string { + return t.sqlOrigin +} + +type BinlogFilter struct { + IncludeGtid string + ExcludeGtid string + IncludeTables []string + ExcludeTables []string + StartPos int + EndPos int + StartDate time.Time + EndDate time.Time + BigThan int + SmallThan int + OnlyShowGtid bool + OnlyShowDML bool + PickTxAllIfMatch bool + ExcludeBlank bool + IncludeBlank bool +} + +type BinlogEvent struct { + Type string + DB string + TB string + Data string + RowCnt uint32 + Rows [][]interface{} + ColumnTypes []int + ColumnCollationIDs []uint64 + CompressionType string +} + +type tableMatcher struct { + exactMatch map[string]bool + dbWildcard map[string]bool + tbWildcard map[string]bool + matchAll bool +} + +func (m *tableMatcher) match(db, tb string) bool { + db = strings.ToLower(strings.TrimSpace(db)) + tb = strings.ToLower(strings.TrimSpace(tb)) + + if m.matchAll { + return true + } + if m.dbWildcard[db] || m.tbWildcard[tb] { + return true + } + if len(m.exactMatch) > 0 { + // Go 1.12+ 对 map[string] 查找时 string([]byte) 不分配 + var buf [128]byte + key := buf[:0] + key = append(key, db...) + key = append(key, '.') + key = append(key, tb...) + if m.exactMatch[string(key)] { + return true + } + } + return false +}