stario/frameio.go

192 lines
4.7 KiB
Go
Raw Normal View History

package stario
import (
"errors"
"io"
)
// DefaultFrameReaderBufferSize is the default transport read chunk size used by
// FrameReader.
const DefaultFrameReaderBufferSize = 32 * 1024
type frameReaderConnKey struct{}
// FrameWriter adapts StarQueue framing helpers to an io.Writer.
type FrameWriter struct {
writer io.Writer
queue *StarQueue
}
// NewFrameWriter creates a framing writer backed by queue. When queue is nil, a
// default StarQueue is created.
func NewFrameWriter(writer io.Writer, queue *StarQueue) *FrameWriter {
if queue == nil {
queue = NewQueue()
}
return &FrameWriter{
writer: writer,
queue: queue,
}
}
// WriteFrame writes one framed payload.
func (writer *FrameWriter) WriteFrame(payload []byte) error {
if writer == nil || writer.writer == nil || writer.queue == nil {
return io.ErrClosedPipe
}
return writer.queue.WriteFrame(writer.writer, payload)
}
// WriteFrameBuffers writes one framed payload using net.Buffers when possible.
func (writer *FrameWriter) WriteFrameBuffers(payload []byte) error {
if writer == nil || writer.writer == nil || writer.queue == nil {
return io.ErrClosedPipe
}
return writer.queue.WriteFrameBuffers(writer.writer, payload)
}
// WriteFramesBuffers writes multiple framed payloads in one batch when
// possible.
func (writer *FrameWriter) WriteFramesBuffers(payloads ...[]byte) error {
if writer == nil || writer.writer == nil || writer.queue == nil {
return io.ErrClosedPipe
}
return writer.queue.WriteFramesBuffers(writer.writer, payloads...)
}
// FrameReader adapts StarQueue parsing helpers to an io.Reader.
type FrameReader struct {
reader io.Reader
queue *StarQueue
connKey interface{}
readSize int
pending [][]byte
pendingErr error
}
// NewFrameReader creates a framing reader backed by queue. When queue is nil, a
// default StarQueue is created.
func NewFrameReader(reader io.Reader, queue *StarQueue) *FrameReader {
if queue == nil {
queue = NewQueue()
}
return &FrameReader{
reader: reader,
queue: queue,
connKey: &frameReaderConnKey{},
readSize: DefaultFrameReaderBufferSize,
}
}
// SetReadBufferSize updates the underlying transport read chunk size.
func (reader *FrameReader) SetReadBufferSize(size int) {
if reader == nil || size <= 0 {
return
}
reader.readSize = size
}
// SetConnKey overrides the internal queue connection key.
func (reader *FrameReader) SetConnKey(conn interface{}) error {
if reader == nil {
return io.ErrClosedPipe
}
if conn == nil {
reader.connKey = &frameReaderConnKey{}
return nil
}
if err := validateConnKey(conn); err != nil {
return err
}
reader.connKey = conn
return nil
}
// Next returns the next framed payload.
func (reader *FrameReader) Next() ([]byte, error) {
if reader == nil || reader.reader == nil || reader.queue == nil {
return nil, io.ErrClosedPipe
}
if len(reader.pending) > 0 {
return reader.popPending(), nil
}
if reader.pendingErr != nil {
err := reader.pendingErr
reader.pendingErr = nil
return nil, err
}
if reader.readSize <= 0 {
reader.readSize = DefaultFrameReaderBufferSize
}
buf := make([]byte, reader.readSize)
for {
n, readErr := reader.reader.Read(buf)
if n > 0 {
parseErr := reader.queue.ParseMessageOwned(buf[:n], reader.connKey, func(msg MsgQueue) error {
reader.pending = append(reader.pending, msg.Msg)
return nil
})
err := reader.normalizeReadErr(parseErr, readErr)
if len(reader.pending) > 0 {
reader.pendingErr = err
return reader.popPending(), nil
}
if err != nil {
return nil, err
}
}
if readErr != nil {
return nil, reader.normalizeReadErr(nil, readErr)
}
}
}
func (reader *FrameReader) popPending() []byte {
next := reader.pending[0]
reader.pending = reader.pending[1:]
return next
}
func joinFrameReaderError(parseErr error, readErr error) error {
switch {
case parseErr == nil:
return readErr
case readErr == nil:
return parseErr
default:
return errors.Join(parseErr, readErr)
}
}
func (reader *FrameReader) normalizeReadErr(parseErr error, readErr error) error {
if errors.Is(readErr, io.EOF) && reader.hasBufferedState() {
reader.clearBufferedState()
readErr = io.ErrUnexpectedEOF
}
return joinFrameReaderError(parseErr, readErr)
}
func (reader *FrameReader) hasBufferedState() bool {
if reader == nil || reader.queue == nil {
return false
}
stateAny, ok := reader.queue.states.Load(reader.connKey)
if !ok {
return false
}
state, ok := stateAny.(*queConnState)
if !ok {
return false
}
state.mu.Lock()
defer state.mu.Unlock()
return len(state.buf) > 0
}
func (reader *FrameReader) clearBufferedState() {
if reader == nil || reader.queue == nil {
return
}
reader.queue.states.Delete(reader.connKey)
}