package stario import ( "errors" "io" "sync" ) var ErrStarBufferInvalidCapacity = errors.New("star buffer capacity must be greater than zero") var ErrStarBufferClosed = errors.New("star buffer closed") var ErrStarBufferWriteClosed = errors.New("star buffer write closed") type StarBuffer struct { datas []byte pStart uint64 pEnd uint64 size uint64 cap uint64 isClose bool isWriteEnd bool mu sync.Mutex notEmpty *sync.Cond notFull *sync.Cond } func NewStarBuffer(cap uint64) (*StarBuffer, error) { if cap == 0 { return nil, ErrStarBufferInvalidCapacity } rtnBuffer := &StarBuffer{ cap: cap, datas: make([]byte, cap), } rtnBuffer.notEmpty = sync.NewCond(&rtnBuffer.mu) rtnBuffer.notFull = sync.NewCond(&rtnBuffer.mu) return rtnBuffer, nil } func (star *StarBuffer) Free() uint64 { star.mu.Lock() defer star.mu.Unlock() return star.cap - star.size } func (star *StarBuffer) Cap() uint64 { star.mu.Lock() defer star.mu.Unlock() return star.cap } func (star *StarBuffer) Len() uint64 { star.mu.Lock() defer star.mu.Unlock() return star.size } func (star *StarBuffer) getByte() (byte, error) { star.mu.Lock() defer star.mu.Unlock() for star.size == 0 && !star.isWriteEnd && !star.isClose { star.notEmpty.Wait() } if star.size == 0 { return 0, io.EOF } data := star.datas[star.pStart] star.pStart = (star.pStart + 1) % star.cap star.size-- star.notFull.Broadcast() return data, nil } func (star *StarBuffer) putByte(data byte) error { star.mu.Lock() defer star.mu.Unlock() for star.size == star.cap && !star.isClose && !star.isWriteEnd { star.notFull.Wait() } if star.isClose { return ErrStarBufferClosed } if star.isWriteEnd { return ErrStarBufferWriteClosed } star.datas[star.pEnd] = data star.pEnd = (star.pEnd + 1) % star.cap star.size++ star.notEmpty.Broadcast() return nil } func (star *StarBuffer) EndWrite() error { star.mu.Lock() defer star.mu.Unlock() if star.isClose { return ErrStarBufferClosed } star.isWriteEnd = true star.notEmpty.Broadcast() star.notFull.Broadcast() return nil } func (star *StarBuffer) Close() error { star.mu.Lock() defer star.mu.Unlock() star.isClose = true star.isWriteEnd = true star.notEmpty.Broadcast() star.notFull.Broadcast() return nil } func (star *StarBuffer) Read(buf []byte) (int, error) { if buf == nil { return 0, errors.New("buffer is nil") } if len(buf) == 0 { return 0, nil } star.mu.Lock() defer star.mu.Unlock() for star.size == 0 && !star.isWriteEnd && !star.isClose { star.notEmpty.Wait() } if star.size == 0 { return 0, io.EOF } sum := minInt(len(buf), int(star.size)) first := minInt(sum, int(star.cap-star.pStart)) copy(buf, star.datas[star.pStart:star.pStart+uint64(first)]) second := sum - first if second > 0 { copy(buf[first:], star.datas[:second]) } star.pStart = (star.pStart + uint64(sum)) % star.cap star.size -= uint64(sum) star.notFull.Broadcast() return sum, nil } func (star *StarBuffer) Write(bts []byte) (int, error) { if bts == nil { return 0, star.EndWrite() } if len(bts) == 0 { return 0, nil } star.mu.Lock() defer star.mu.Unlock() sum := 0 for sum < len(bts) { for star.size == star.cap && !star.isClose && !star.isWriteEnd { star.notFull.Wait() } if star.isClose { if sum == 0 { return 0, ErrStarBufferClosed } return sum, ErrStarBufferClosed } if star.isWriteEnd { if sum == 0 { return 0, ErrStarBufferWriteClosed } return sum, ErrStarBufferWriteClosed } space := int(star.cap - star.size) if space == 0 { continue } n := minInt(len(bts)-sum, space) first := minInt(n, int(star.cap-star.pEnd)) copy(star.datas[star.pEnd:star.pEnd+uint64(first)], bts[sum:sum+first]) second := n - first if second > 0 { copy(star.datas[:second], bts[sum+first:sum+n]) } star.pEnd = (star.pEnd + uint64(n)) % star.cap star.size += uint64(n) sum += n star.notEmpty.Broadcast() } return sum, nil } func minInt(a int, b int) int { if a < b { return a } return b }