stario/pipe.go

72 lines
1.8 KiB
Go
Raw Permalink Normal View History

package stario
import "io"
// StarPipeReader is the read side returned by NewStarPipe.
type StarPipeReader struct {
buf *StarBuffer
}
// StarPipeWriter is the write side returned by NewStarPipe.
type StarPipeWriter struct {
buf *StarBuffer
}
// NewStarPipe creates a buffered in-memory pipe backed by StarBuffer.
//
// The writer side uses Close to signal a graceful end-of-stream and Abort to
// fail both sides immediately.
func NewStarPipe(capacity uint64) (*StarPipeReader, *StarPipeWriter, error) {
buf, err := NewStarBuffer(capacity)
if err != nil {
return nil, nil, err
}
return &StarPipeReader{buf: buf}, &StarPipeWriter{buf: buf}, nil
}
// Read reads buffered bytes from the pipe.
func (reader *StarPipeReader) Read(p []byte) (int, error) {
if reader == nil || reader.buf == nil {
return 0, io.ErrClosedPipe
}
return reader.buf.Read(p)
}
// Close aborts the pipe from the read side and wakes blocked writers.
func (reader *StarPipeReader) Close() error {
if reader == nil || reader.buf == nil {
return io.ErrClosedPipe
}
return reader.buf.Abort()
}
// Abort aborts the pipe from the read side and wakes blocked writers.
func (reader *StarPipeReader) Abort() error {
return reader.Close()
}
// Write writes bytes into the pipe buffer.
func (writer *StarPipeWriter) Write(p []byte) (int, error) {
if writer == nil || writer.buf == nil {
return 0, io.ErrClosedPipe
}
return writer.buf.Write(p)
}
// Close gracefully closes the write side. Buffered bytes remain readable until
// drained.
func (writer *StarPipeWriter) Close() error {
if writer == nil || writer.buf == nil {
return io.ErrClosedPipe
}
return writer.buf.Close()
}
// Abort aborts the pipe immediately.
func (writer *StarPipeWriter) Abort() error {
if writer == nil || writer.buf == nil {
return io.ErrClosedPipe
}
return writer.buf.Abort()
}