package stario import ( "errors" "io" ) // DefaultFrameReaderBufferSize is the default transport read chunk size used by // FrameReader. const DefaultFrameReaderBufferSize = 32 * 1024 type frameReaderConnKey struct{} // FrameWriter adapts StarQueue framing helpers to an io.Writer. type FrameWriter struct { writer io.Writer queue *StarQueue } // NewFrameWriter creates a framing writer backed by queue. When queue is nil, a // default StarQueue is created. func NewFrameWriter(writer io.Writer, queue *StarQueue) *FrameWriter { if queue == nil { queue = NewQueue() } return &FrameWriter{ writer: writer, queue: queue, } } // WriteFrame writes one framed payload. func (writer *FrameWriter) WriteFrame(payload []byte) error { if writer == nil || writer.writer == nil || writer.queue == nil { return io.ErrClosedPipe } return writer.queue.WriteFrame(writer.writer, payload) } // WriteFrameBuffers writes one framed payload using net.Buffers when possible. func (writer *FrameWriter) WriteFrameBuffers(payload []byte) error { if writer == nil || writer.writer == nil || writer.queue == nil { return io.ErrClosedPipe } return writer.queue.WriteFrameBuffers(writer.writer, payload) } // WriteFramesBuffers writes multiple framed payloads in one batch when // possible. func (writer *FrameWriter) WriteFramesBuffers(payloads ...[]byte) error { if writer == nil || writer.writer == nil || writer.queue == nil { return io.ErrClosedPipe } return writer.queue.WriteFramesBuffers(writer.writer, payloads...) } // FrameReader adapts StarQueue parsing helpers to an io.Reader. type FrameReader struct { reader io.Reader queue *StarQueue connKey interface{} readSize int pending [][]byte pendingErr error } // NewFrameReader creates a framing reader backed by queue. When queue is nil, a // default StarQueue is created. func NewFrameReader(reader io.Reader, queue *StarQueue) *FrameReader { if queue == nil { queue = NewQueue() } return &FrameReader{ reader: reader, queue: queue, connKey: &frameReaderConnKey{}, readSize: DefaultFrameReaderBufferSize, } } // SetReadBufferSize updates the underlying transport read chunk size. func (reader *FrameReader) SetReadBufferSize(size int) { if reader == nil || size <= 0 { return } reader.readSize = size } // SetConnKey overrides the internal queue connection key. func (reader *FrameReader) SetConnKey(conn interface{}) error { if reader == nil { return io.ErrClosedPipe } if conn == nil { reader.connKey = &frameReaderConnKey{} return nil } if err := validateConnKey(conn); err != nil { return err } reader.connKey = conn return nil } // Next returns the next framed payload. func (reader *FrameReader) Next() ([]byte, error) { if reader == nil || reader.reader == nil || reader.queue == nil { return nil, io.ErrClosedPipe } if len(reader.pending) > 0 { return reader.popPending(), nil } if reader.pendingErr != nil { err := reader.pendingErr reader.pendingErr = nil return nil, err } if reader.readSize <= 0 { reader.readSize = DefaultFrameReaderBufferSize } buf := make([]byte, reader.readSize) for { n, readErr := reader.reader.Read(buf) if n > 0 { parseErr := reader.queue.ParseMessageOwned(buf[:n], reader.connKey, func(msg MsgQueue) error { reader.pending = append(reader.pending, msg.Msg) return nil }) err := reader.normalizeReadErr(parseErr, readErr) if len(reader.pending) > 0 { reader.pendingErr = err return reader.popPending(), nil } if err != nil { return nil, err } } if readErr != nil { return nil, reader.normalizeReadErr(nil, readErr) } } } func (reader *FrameReader) popPending() []byte { next := reader.pending[0] reader.pending = reader.pending[1:] return next } func joinFrameReaderError(parseErr error, readErr error) error { switch { case parseErr == nil: return readErr case readErr == nil: return parseErr default: return errors.Join(parseErr, readErr) } } func (reader *FrameReader) normalizeReadErr(parseErr error, readErr error) error { if errors.Is(readErr, io.EOF) && reader.hasBufferedState() { reader.clearBufferedState() readErr = io.ErrUnexpectedEOF } return joinFrameReaderError(parseErr, readErr) } func (reader *FrameReader) hasBufferedState() bool { if reader == nil || reader.queue == nil { return false } stateAny, ok := reader.queue.states.Load(reader.connKey) if !ok { return false } state, ok := stateAny.(*queConnState) if !ok { return false } state.mu.Lock() defer state.mu.Unlock() return len(state.buf) > 0 } func (reader *FrameReader) clearBufferedState() { if reader == nil || reader.queue == nil { return } reader.queue.states.Delete(reader.connKey) }