package binlog import ( "bytes" "encoding/hex" "fmt" "io" "os" "sync" "github.com/starainrt/go-mysql/replication" ) func validateBinlogHeader(f *os.File) error { const fileTypeBytes = int64(4) b := make([]byte, fileTypeBytes) if _, err := f.Read(b); err != nil { return fmt.Errorf("read binlog header failed: %w", err) } if !bytes.Equal(b, replication.BinLogFileHeader) { return ErrInvalidBinlogHeader } if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil { return fmt.Errorf("seek after header failed: %w", err) } return nil } func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) { if _, err := io.ReadFull(r, headBuf); err != nil { return nil, err } h, err := parser.ParseHeader(headBuf) if err != nil { return nil, fmt.Errorf("parse header failed: %w", err) } if h.EventSize <= uint32(replication.EventHeaderSize) { return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize) } return h, nil } func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) { bodyLen := int(h.EventSize) - replication.EventHeaderSize body := make([]byte, bodyLen) if _, err := io.ReadFull(r, body); err != nil { return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen) } return body, nil } func skipEventBody(r io.Reader, h *replication.EventHeader) error { bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize) if bodyLen <= 0 { return nil } if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil { return fmt.Errorf("skip event body failed: %w", err) } return nil } var rawDataPool = sync.Pool{ New: func() any { b := make([]byte, 0, 64*1024) return &b }, } func getRawDataBuf(n int) []byte { p := rawDataPool.Get().(*[]byte) if cap(*p) < n { return make([]byte, n) } return (*p)[:n] } func putRawDataBuf(b []byte) { if cap(b) > maxPooledRawDataCap { return } b = b[:0] rawDataPool.Put(&b) } func formatBodyPreview(body []byte, maxBytes int) string { if maxBytes <= 0 { maxBytes = 256 } if len(body) == 0 { return "len=0" } previewLen := len(body) truncated := false if previewLen > maxBytes { previewLen = maxBytes truncated = true } hexBody := hex.EncodeToString(body[:previewLen]) if truncated { return fmt.Sprintf("len=%d preview(hex,%dB)=%s...", len(body), previewLen, hexBody) } return fmt.Sprintf("len=%d preview(hex,%dB)=%s", len(body), previewLen, hexBody) } func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, headBuf []byte, body []byte) (replication.Event, error) { rawLen := len(headBuf) + len(body) rawData := getRawDataBuf(rawLen) copy(rawData, headBuf) copy(rawData[len(headBuf):], body) e, err := parser.ParseEvent(h, body, rawData) putRawDataBuf(rawData) if err != nil { return nil, fmt.Errorf("parse event failed at pos %d: Header %+v, Body %s, Err: %w", h.LogPos, h, formatBodyPreview(body, 256), err) } return e, nil } func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error { if err := validateBinlogHeader(f); err != nil { return err } headBuf := make([]byte, replication.EventHeaderSize) for { h, err := readEventHeader(f, parser, headBuf) if err != nil { return fmt.Errorf("seek to position failed: %w", err) } body, err := readEventBody(f, h) if err != nil { return err } _, err = parseEvent(parser, h, headBuf, body) if err != nil { return err } if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { break } } if _, err := f.Seek(pos, io.SeekStart); err != nil { return fmt.Errorf("seek to pos %d failed: %w", pos, err) } return nil }