stario/circle.go

193 lines
4.0 KiB
Go

package stario
import (
"errors"
"io"
"sync"
)
var ErrStarBufferInvalidCapacity = errors.New("star buffer capacity must be greater than zero")
var ErrStarBufferClosed = errors.New("star buffer closed")
var ErrStarBufferWriteClosed = errors.New("star buffer write closed")
type StarBuffer struct {
datas []byte
pStart uint64
pEnd uint64
size uint64
cap uint64
isClose bool
isWriteEnd bool
mu sync.Mutex
notEmpty *sync.Cond
notFull *sync.Cond
}
func NewStarBuffer(cap uint64) (*StarBuffer, error) {
if cap == 0 {
return nil, ErrStarBufferInvalidCapacity
}
rtnBuffer := &StarBuffer{
cap: cap,
datas: make([]byte, cap),
}
rtnBuffer.notEmpty = sync.NewCond(&rtnBuffer.mu)
rtnBuffer.notFull = sync.NewCond(&rtnBuffer.mu)
return rtnBuffer, nil
}
func (star *StarBuffer) Free() uint64 {
star.mu.Lock()
defer star.mu.Unlock()
return star.cap - star.size
}
func (star *StarBuffer) Cap() uint64 {
star.mu.Lock()
defer star.mu.Unlock()
return star.cap
}
func (star *StarBuffer) Len() uint64 {
star.mu.Lock()
defer star.mu.Unlock()
return star.size
}
func (star *StarBuffer) getByte() (byte, error) {
star.mu.Lock()
defer star.mu.Unlock()
for star.size == 0 && !star.isWriteEnd && !star.isClose {
star.notEmpty.Wait()
}
if star.size == 0 {
return 0, io.EOF
}
data := star.datas[star.pStart]
star.pStart = (star.pStart + 1) % star.cap
star.size--
star.notFull.Broadcast()
return data, nil
}
func (star *StarBuffer) putByte(data byte) error {
star.mu.Lock()
defer star.mu.Unlock()
for star.size == star.cap && !star.isClose && !star.isWriteEnd {
star.notFull.Wait()
}
if star.isClose {
return ErrStarBufferClosed
}
if star.isWriteEnd {
return ErrStarBufferWriteClosed
}
star.datas[star.pEnd] = data
star.pEnd = (star.pEnd + 1) % star.cap
star.size++
star.notEmpty.Broadcast()
return nil
}
func (star *StarBuffer) EndWrite() error {
star.mu.Lock()
defer star.mu.Unlock()
if star.isClose {
return ErrStarBufferClosed
}
star.isWriteEnd = true
star.notEmpty.Broadcast()
star.notFull.Broadcast()
return nil
}
func (star *StarBuffer) Close() error {
star.mu.Lock()
defer star.mu.Unlock()
star.isClose = true
star.isWriteEnd = true
star.notEmpty.Broadcast()
star.notFull.Broadcast()
return nil
}
func (star *StarBuffer) Read(buf []byte) (int, error) {
if buf == nil {
return 0, errors.New("buffer is nil")
}
if len(buf) == 0 {
return 0, nil
}
star.mu.Lock()
defer star.mu.Unlock()
for star.size == 0 && !star.isWriteEnd && !star.isClose {
star.notEmpty.Wait()
}
if star.size == 0 {
return 0, io.EOF
}
sum := minInt(len(buf), int(star.size))
first := minInt(sum, int(star.cap-star.pStart))
copy(buf, star.datas[star.pStart:star.pStart+uint64(first)])
second := sum - first
if second > 0 {
copy(buf[first:], star.datas[:second])
}
star.pStart = (star.pStart + uint64(sum)) % star.cap
star.size -= uint64(sum)
star.notFull.Broadcast()
return sum, nil
}
func (star *StarBuffer) Write(bts []byte) (int, error) {
if bts == nil {
return 0, star.EndWrite()
}
if len(bts) == 0 {
return 0, nil
}
star.mu.Lock()
defer star.mu.Unlock()
sum := 0
for sum < len(bts) {
for star.size == star.cap && !star.isClose && !star.isWriteEnd {
star.notFull.Wait()
}
if star.isClose {
if sum == 0 {
return 0, ErrStarBufferClosed
}
return sum, ErrStarBufferClosed
}
if star.isWriteEnd {
if sum == 0 {
return 0, ErrStarBufferWriteClosed
}
return sum, ErrStarBufferWriteClosed
}
space := int(star.cap - star.size)
if space == 0 {
continue
}
n := minInt(len(bts)-sum, space)
first := minInt(n, int(star.cap-star.pEnd))
copy(star.datas[star.pEnd:star.pEnd+uint64(first)], bts[sum:sum+first])
second := n - first
if second > 0 {
copy(star.datas[:second], bts[sum+first:sum+n])
}
star.pEnd = (star.pEnd + uint64(n)) % star.cap
star.size += uint64(n)
sum += n
star.notEmpty.Broadcast()
}
return sum, nil
}
func minInt(a int, b int) int {
if a < b {
return a
}
return b
}