stario/circle.go
starainrt c8facb5a03
stario: 提升 Go 1.20 基线与交互/队列稳定性
- 提升 go.mod 基线到 Go 1.20,并补齐对应测试
  - 修正 Passwd / PasswdResponseSignal 语义,Ctrl+C 默认退出当前流程
  - 优化 raw terminal redraw、Restore 与 StopUntil 的边界行为
  - 新增 StarPipe、FrameReader/FrameWriter、ReadFullContext/WriteFullContext/CopyContext、IsTerminal/ReadPasswordContext
  - 收口 StarQueue / StarBuffer 语义,删除 EndWrite,统一 Close / Abort 行为
  - 补齐 signal、timeout、queue、terminal、pipe、buffer 的回归测试与 race 覆盖
2026-04-15 14:35:19 +08:00

206 lines
4.7 KiB
Go

package stario
import (
"errors"
"io"
"sync"
)
var ErrStarBufferInvalidCapacity = errors.New("star buffer capacity must be greater than zero")
var ErrStarBufferClosed = errors.New("star buffer closed")
var ErrStarBufferWriteClosed = errors.New("star buffer write closed")
// StarBuffer is a blocking ring buffer that implements stream-style reads and writes.
//
// Close marks the write side finished after all payload bytes are sent.
// Abort aborts both sides immediately but still allows buffered bytes to be drained.
type StarBuffer struct {
datas []byte
pStart uint64
pEnd uint64
size uint64
cap uint64
isClose bool
isWriteEnd bool
mu sync.Mutex
notEmpty *sync.Cond
notFull *sync.Cond
}
func NewStarBuffer(cap uint64) (*StarBuffer, error) {
if cap == 0 {
return nil, ErrStarBufferInvalidCapacity
}
rtnBuffer := &StarBuffer{
cap: cap,
datas: make([]byte, cap),
}
rtnBuffer.notEmpty = sync.NewCond(&rtnBuffer.mu)
rtnBuffer.notFull = sync.NewCond(&rtnBuffer.mu)
return rtnBuffer, nil
}
// Free returns the remaining writable capacity.
func (star *StarBuffer) Free() uint64 {
star.mu.Lock()
defer star.mu.Unlock()
return star.cap - star.size
}
// Cap returns the fixed buffer capacity.
func (star *StarBuffer) Cap() uint64 {
star.mu.Lock()
defer star.mu.Unlock()
return star.cap
}
// Len returns the currently buffered byte count.
func (star *StarBuffer) Len() uint64 {
star.mu.Lock()
defer star.mu.Unlock()
return star.size
}
func (star *StarBuffer) getByte() (byte, error) {
star.mu.Lock()
defer star.mu.Unlock()
for star.size == 0 && !star.isWriteEnd && !star.isClose {
star.notEmpty.Wait()
}
if star.size == 0 {
return 0, io.EOF
}
data := star.datas[star.pStart]
star.pStart = (star.pStart + 1) % star.cap
star.size--
star.notFull.Broadcast()
return data, nil
}
func (star *StarBuffer) putByte(data byte) error {
star.mu.Lock()
defer star.mu.Unlock()
for star.size == star.cap && !star.isClose && !star.isWriteEnd {
star.notFull.Wait()
}
if star.isClose {
return ErrStarBufferClosed
}
if star.isWriteEnd {
return ErrStarBufferWriteClosed
}
star.datas[star.pEnd] = data
star.pEnd = (star.pEnd + 1) % star.cap
star.size++
star.notEmpty.Broadcast()
return nil
}
// Close closes only the write side and satisfies the usual io.Closer-style
// "producer finished" semantics.
//
// Buffered bytes remain readable until drained; afterwards reads return io.EOF.
func (star *StarBuffer) Close() error {
star.mu.Lock()
defer star.mu.Unlock()
return star.closeWriteLocked()
}
// Abort aborts the buffer and wakes blocked readers/writers immediately.
//
// Buffered bytes remain readable until drained; subsequent writes fail with
// ErrStarBufferClosed.
func (star *StarBuffer) Abort() error {
star.mu.Lock()
defer star.mu.Unlock()
star.isClose = true
star.isWriteEnd = true
star.notEmpty.Broadcast()
star.notFull.Broadcast()
return nil
}
func (star *StarBuffer) closeWriteLocked() error {
if star.isClose {
return ErrStarBufferClosed
}
star.isWriteEnd = true
star.notEmpty.Broadcast()
star.notFull.Broadcast()
return nil
}
func (star *StarBuffer) Read(buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
}
star.mu.Lock()
defer star.mu.Unlock()
for star.size == 0 && !star.isWriteEnd && !star.isClose {
star.notEmpty.Wait()
}
if star.size == 0 {
return 0, io.EOF
}
sum := minInt(len(buf), int(star.size))
first := minInt(sum, int(star.cap-star.pStart))
copy(buf, star.datas[star.pStart:star.pStart+uint64(first)])
second := sum - first
if second > 0 {
copy(buf[first:], star.datas[:second])
}
star.pStart = (star.pStart + uint64(sum)) % star.cap
star.size -= uint64(sum)
star.notFull.Broadcast()
return sum, nil
}
func (star *StarBuffer) Write(bts []byte) (int, error) {
if len(bts) == 0 {
return 0, nil
}
star.mu.Lock()
defer star.mu.Unlock()
sum := 0
for sum < len(bts) {
for star.size == star.cap && !star.isClose && !star.isWriteEnd {
star.notFull.Wait()
}
if star.isClose {
if sum == 0 {
return 0, ErrStarBufferClosed
}
return sum, ErrStarBufferClosed
}
if star.isWriteEnd {
if sum == 0 {
return 0, ErrStarBufferWriteClosed
}
return sum, ErrStarBufferWriteClosed
}
space := int(star.cap - star.size)
if space == 0 {
continue
}
n := minInt(len(bts)-sum, space)
first := minInt(n, int(star.cap-star.pEnd))
copy(star.datas[star.pEnd:star.pEnd+uint64(first)], bts[sum:sum+first])
second := n - first
if second > 0 {
copy(star.datas[:second], bts[sum+first:sum+n])
}
star.pEnd = (star.pEnd + uint64(n)) % star.cap
star.size += uint64(n)
sum += n
star.notEmpty.Broadcast()
}
return sum, nil
}
func minInt(a int, b int) int {
if a < b {
return a
}
return b
}