mysqlbinlog/parse_io.go
starainrt 8469c11373
refactor(parse): 拆分 parse.go 并修复事务/过滤一致性问题
- 将臃肿的 parse.go 按职责拆分为多个模块:
    parse_types.go、parse_io.go、parse_event_convert.go、parse_stream.go、parse_filter.go
  - parse.go 保留为模块入口说明,提升可维护性与可读性
  - 修复事务状态被覆盖问题(BEGIN/COMMIT/ROLLBACK 不再被重置为 PREPARE)
  - 增加 include-tables 与 exclude-tables 互斥校验,同时配置时直接报配置错误
  - 强化表匹配器模式校验,并补充非法模式测试
  - 在明细过滤后重算事务统计(RowsCount/StartPos/EndPos/Size),避免统计失真
  - 增加 TABLE_MAP 事件转换,补充列元信息透传(ColumnTypes/ColumnCollationIDs)
  - 基于 unsigned 元数据规范化行值,避免无符号整型被渲染为负数
  - 优化事件解析报错信息:增加有界 body 十六进制预览
  - 补充单元测试:payload/tablemap 转换、unsigned 规范化、过滤逻辑、IO 预览
2026-03-19 17:04:35 +08:00

151 lines
3.7 KiB
Go

package binlog
import (
"bytes"
"encoding/hex"
"fmt"
"io"
"os"
"sync"
"github.com/starainrt/go-mysql/replication"
)
func validateBinlogHeader(f *os.File) error {
const fileTypeBytes = int64(4)
b := make([]byte, fileTypeBytes)
if _, err := f.Read(b); err != nil {
return fmt.Errorf("read binlog header failed: %w", err)
}
if !bytes.Equal(b, replication.BinLogFileHeader) {
return ErrInvalidBinlogHeader
}
if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil {
return fmt.Errorf("seek after header failed: %w", err)
}
return nil
}
func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) {
if _, err := io.ReadFull(r, headBuf); err != nil {
return nil, err
}
h, err := parser.ParseHeader(headBuf)
if err != nil {
return nil, fmt.Errorf("parse header failed: %w", err)
}
if h.EventSize <= uint32(replication.EventHeaderSize) {
return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize)
}
return h, nil
}
func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) {
bodyLen := int(h.EventSize) - replication.EventHeaderSize
body := make([]byte, bodyLen)
if _, err := io.ReadFull(r, body); err != nil {
return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen)
}
return body, nil
}
func skipEventBody(r io.Reader, h *replication.EventHeader) error {
bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize)
if bodyLen <= 0 {
return nil
}
if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil {
return fmt.Errorf("skip event body failed: %w", err)
}
return nil
}
var rawDataPool = sync.Pool{
New: func() any {
b := make([]byte, 0, 64*1024)
return &b
},
}
func getRawDataBuf(n int) []byte {
p := rawDataPool.Get().(*[]byte)
if cap(*p) < n {
return make([]byte, n)
}
return (*p)[:n]
}
func putRawDataBuf(b []byte) {
if cap(b) > maxPooledRawDataCap {
return
}
b = b[:0]
rawDataPool.Put(&b)
}
func formatBodyPreview(body []byte, maxBytes int) string {
if maxBytes <= 0 {
maxBytes = 256
}
if len(body) == 0 {
return "len=0"
}
previewLen := len(body)
truncated := false
if previewLen > maxBytes {
previewLen = maxBytes
truncated = true
}
hexBody := hex.EncodeToString(body[:previewLen])
if truncated {
return fmt.Sprintf("len=%d preview(hex,%dB)=%s...", len(body), previewLen, hexBody)
}
return fmt.Sprintf("len=%d preview(hex,%dB)=%s", len(body), previewLen, hexBody)
}
func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) {
rawLen := len(headBuf) + len(body)
rawData := getRawDataBuf(rawLen)
copy(rawData, headBuf)
copy(rawData[len(headBuf):], body)
e, err := parser.ParseEvent(h, body, rawData)
putRawDataBuf(rawData)
if err != nil {
return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Body %s, Err: %w",
h.LogPos, h, formatBodyPreview(body, 256), err)
}
return e, nil
}
func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error {
if err := validateBinlogHeader(f); err != nil {
return err
}
headBuf := make([]byte, replication.EventHeaderSize)
for {
h, err := readEventHeader(f, parser, headBuf)
if err != nil {
return fmt.Errorf("seek to position failed: %w", err)
}
body, err := readEventBody(f, h)
if err != nil {
return err
}
_, err = parseEvent(parser, h, headBuf, body)
if err != nil {
return err
}
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
break
}
}
if _, err := f.Seek(pos, io.SeekStart); err != nil {
return fmt.Errorf("seek to pos %d failed: %w", pos, err)
}
return nil
}