You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
starlog/stacks.go

150 lines
2.5 KiB
Go

package starlog
import (
"errors"
"io"
"os"
"sync"
"sync/atomic"
)
type starMapKV struct {
kvMap map[interface{}]interface{}
mu sync.RWMutex
}
func newStarMap() starMapKV {
var mp starMapKV
mp.kvMap = make(map[interface{}]interface{})
return mp
}
func (m *starMapKV) Get(key interface{}) (interface{}, error) {
var err error
m.mu.RLock()
defer m.mu.RUnlock()
data, ok := m.kvMap[key]
if !ok {
err = os.ErrNotExist
}
return data, err
}
func (m *starMapKV) MustGet(key interface{}) interface{} {
result, _ := m.Get(key)
return result
}
func (m *starMapKV) Store(key interface{}, value interface{}) error {
m.mu.Lock()
defer m.mu.Unlock()
m.kvMap[key] = value
return nil
}
func (m *starMapKV) Exists(key interface{}) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.kvMap[key]
return ok
}
func (m *starMapKV) Delete(key interface{}) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.kvMap, key)
return nil
}
func (m *starMapKV) Range(run func(k interface{}, v interface{}) bool) error {
for k, v := range m.kvMap {
if !run(k, v) {
break
}
}
return nil
}
type starChanStack struct {
data chan interface{}
cap uint64
current uint64
isClose atomic.Value
}
func newStarChanStack(cap uint64) *starChanStack {
rtnBuffer := new(starChanStack)
rtnBuffer.cap = cap
rtnBuffer.isClose.Store(false)
rtnBuffer.data = make(chan interface{}, cap)
return rtnBuffer
}
func (s *starChanStack) init() {
s.cap = 1024
s.data = make(chan interface{}, s.cap)
s.isClose.Store(false)
}
func (s *starChanStack) Free() uint64 {
return s.cap - s.current
}
func (s *starChanStack) Cap() uint64 {
return s.cap
}
func (s *starChanStack) Len() uint64 {
return s.current
}
func (s *starChanStack) Pop() (interface{}, error) {
if s.isClose.Load() == nil {
s.init()
}
if s.isClose.Load().(bool) {
return 0, io.EOF
}
data, ok := <-s.data
if !ok {
s.isClose.Store(true)
return 0, errors.New("channel read error")
}
for {
current := atomic.LoadUint64(&s.current)
if atomic.CompareAndSwapUint64(&s.current, current, current-1) {
break
}
}
return data, nil
}
func (s *starChanStack) Push(data interface{}) error {
defer func() {
recover()
}()
if s.isClose.Load() == nil {
s.init()
}
if s.isClose.Load().(bool) {
return io.EOF
}
s.data <- data
for {
current := atomic.LoadUint64(&s.current)
if atomic.CompareAndSwapUint64(&s.current, current, current+1) {
break
}
}
return nil
}
func (s *starChanStack) Close() error {
if s.isClose.Load() == nil {
s.init()
}
s.isClose.Store(true)
close(s.data)
return nil
}