192 lines
4.7 KiB
Go
192 lines
4.7 KiB
Go
|
|
package stario
|
||
|
|
|
||
|
|
import (
|
||
|
|
"errors"
|
||
|
|
"io"
|
||
|
|
)
|
||
|
|
|
||
|
|
// DefaultFrameReaderBufferSize is the default transport read chunk size used by
|
||
|
|
// FrameReader.
|
||
|
|
const DefaultFrameReaderBufferSize = 32 * 1024
|
||
|
|
|
||
|
|
type frameReaderConnKey struct{}
|
||
|
|
|
||
|
|
// FrameWriter adapts StarQueue framing helpers to an io.Writer.
|
||
|
|
type FrameWriter struct {
|
||
|
|
writer io.Writer
|
||
|
|
queue *StarQueue
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewFrameWriter creates a framing writer backed by queue. When queue is nil, a
|
||
|
|
// default StarQueue is created.
|
||
|
|
func NewFrameWriter(writer io.Writer, queue *StarQueue) *FrameWriter {
|
||
|
|
if queue == nil {
|
||
|
|
queue = NewQueue()
|
||
|
|
}
|
||
|
|
return &FrameWriter{
|
||
|
|
writer: writer,
|
||
|
|
queue: queue,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// WriteFrame writes one framed payload.
|
||
|
|
func (writer *FrameWriter) WriteFrame(payload []byte) error {
|
||
|
|
if writer == nil || writer.writer == nil || writer.queue == nil {
|
||
|
|
return io.ErrClosedPipe
|
||
|
|
}
|
||
|
|
return writer.queue.WriteFrame(writer.writer, payload)
|
||
|
|
}
|
||
|
|
|
||
|
|
// WriteFrameBuffers writes one framed payload using net.Buffers when possible.
|
||
|
|
func (writer *FrameWriter) WriteFrameBuffers(payload []byte) error {
|
||
|
|
if writer == nil || writer.writer == nil || writer.queue == nil {
|
||
|
|
return io.ErrClosedPipe
|
||
|
|
}
|
||
|
|
return writer.queue.WriteFrameBuffers(writer.writer, payload)
|
||
|
|
}
|
||
|
|
|
||
|
|
// WriteFramesBuffers writes multiple framed payloads in one batch when
|
||
|
|
// possible.
|
||
|
|
func (writer *FrameWriter) WriteFramesBuffers(payloads ...[]byte) error {
|
||
|
|
if writer == nil || writer.writer == nil || writer.queue == nil {
|
||
|
|
return io.ErrClosedPipe
|
||
|
|
}
|
||
|
|
return writer.queue.WriteFramesBuffers(writer.writer, payloads...)
|
||
|
|
}
|
||
|
|
|
||
|
|
// FrameReader adapts StarQueue parsing helpers to an io.Reader.
|
||
|
|
type FrameReader struct {
|
||
|
|
reader io.Reader
|
||
|
|
queue *StarQueue
|
||
|
|
connKey interface{}
|
||
|
|
readSize int
|
||
|
|
pending [][]byte
|
||
|
|
pendingErr error
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewFrameReader creates a framing reader backed by queue. When queue is nil, a
|
||
|
|
// default StarQueue is created.
|
||
|
|
func NewFrameReader(reader io.Reader, queue *StarQueue) *FrameReader {
|
||
|
|
if queue == nil {
|
||
|
|
queue = NewQueue()
|
||
|
|
}
|
||
|
|
return &FrameReader{
|
||
|
|
reader: reader,
|
||
|
|
queue: queue,
|
||
|
|
connKey: &frameReaderConnKey{},
|
||
|
|
readSize: DefaultFrameReaderBufferSize,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// SetReadBufferSize updates the underlying transport read chunk size.
|
||
|
|
func (reader *FrameReader) SetReadBufferSize(size int) {
|
||
|
|
if reader == nil || size <= 0 {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
reader.readSize = size
|
||
|
|
}
|
||
|
|
|
||
|
|
// SetConnKey overrides the internal queue connection key.
|
||
|
|
func (reader *FrameReader) SetConnKey(conn interface{}) error {
|
||
|
|
if reader == nil {
|
||
|
|
return io.ErrClosedPipe
|
||
|
|
}
|
||
|
|
if conn == nil {
|
||
|
|
reader.connKey = &frameReaderConnKey{}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
if err := validateConnKey(conn); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
reader.connKey = conn
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Next returns the next framed payload.
|
||
|
|
func (reader *FrameReader) Next() ([]byte, error) {
|
||
|
|
if reader == nil || reader.reader == nil || reader.queue == nil {
|
||
|
|
return nil, io.ErrClosedPipe
|
||
|
|
}
|
||
|
|
if len(reader.pending) > 0 {
|
||
|
|
return reader.popPending(), nil
|
||
|
|
}
|
||
|
|
if reader.pendingErr != nil {
|
||
|
|
err := reader.pendingErr
|
||
|
|
reader.pendingErr = nil
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if reader.readSize <= 0 {
|
||
|
|
reader.readSize = DefaultFrameReaderBufferSize
|
||
|
|
}
|
||
|
|
buf := make([]byte, reader.readSize)
|
||
|
|
for {
|
||
|
|
n, readErr := reader.reader.Read(buf)
|
||
|
|
if n > 0 {
|
||
|
|
parseErr := reader.queue.ParseMessageOwned(buf[:n], reader.connKey, func(msg MsgQueue) error {
|
||
|
|
reader.pending = append(reader.pending, msg.Msg)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
err := reader.normalizeReadErr(parseErr, readErr)
|
||
|
|
if len(reader.pending) > 0 {
|
||
|
|
reader.pendingErr = err
|
||
|
|
return reader.popPending(), nil
|
||
|
|
}
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if readErr != nil {
|
||
|
|
return nil, reader.normalizeReadErr(nil, readErr)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (reader *FrameReader) popPending() []byte {
|
||
|
|
next := reader.pending[0]
|
||
|
|
reader.pending = reader.pending[1:]
|
||
|
|
return next
|
||
|
|
}
|
||
|
|
|
||
|
|
func joinFrameReaderError(parseErr error, readErr error) error {
|
||
|
|
switch {
|
||
|
|
case parseErr == nil:
|
||
|
|
return readErr
|
||
|
|
case readErr == nil:
|
||
|
|
return parseErr
|
||
|
|
default:
|
||
|
|
return errors.Join(parseErr, readErr)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (reader *FrameReader) normalizeReadErr(parseErr error, readErr error) error {
|
||
|
|
if errors.Is(readErr, io.EOF) && reader.hasBufferedState() {
|
||
|
|
reader.clearBufferedState()
|
||
|
|
readErr = io.ErrUnexpectedEOF
|
||
|
|
}
|
||
|
|
return joinFrameReaderError(parseErr, readErr)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (reader *FrameReader) hasBufferedState() bool {
|
||
|
|
if reader == nil || reader.queue == nil {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
stateAny, ok := reader.queue.states.Load(reader.connKey)
|
||
|
|
if !ok {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
state, ok := stateAny.(*queConnState)
|
||
|
|
if !ok {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
state.mu.Lock()
|
||
|
|
defer state.mu.Unlock()
|
||
|
|
return len(state.buf) > 0
|
||
|
|
}
|
||
|
|
|
||
|
|
func (reader *FrameReader) clearBufferedState() {
|
||
|
|
if reader == nil || reader.queue == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
reader.queue.states.Delete(reader.connKey)
|
||
|
|
}
|