package starlog import ( "errors" "io" "os" "sync" "sync/atomic" ) type starMapKV struct { kvMap map[interface{}]interface{} mu sync.RWMutex } func newStarMap() starMapKV { var mp starMapKV mp.kvMap = make(map[interface{}]interface{}) return mp } func (m *starMapKV) Get(key interface{}) (interface{}, error) { var err error m.mu.RLock() defer m.mu.RUnlock() data, ok := m.kvMap[key] if !ok { err = os.ErrNotExist } return data, err } func (m *starMapKV) MustGet(key interface{}) interface{} { result, _ := m.Get(key) return result } func (m *starMapKV) Store(key interface{}, value interface{}) error { m.mu.Lock() defer m.mu.Unlock() m.kvMap[key] = value return nil } func (m *starMapKV) Exists(key interface{}) bool { m.mu.RLock() defer m.mu.RUnlock() _, ok := m.kvMap[key] return ok } func (m *starMapKV) Delete(key interface{}) error { m.mu.Lock() defer m.mu.Unlock() delete(m.kvMap, key) return nil } func (m *starMapKV) Range(run func(k interface{}, v interface{}) bool) error { for k, v := range m.kvMap { if !run(k, v) { break } } return nil } type starChanStack struct { data chan interface{} cap uint64 current uint64 isClose atomic.Value } func newStarChanStack(cap uint64) *starChanStack { rtnBuffer := new(starChanStack) rtnBuffer.cap = cap rtnBuffer.isClose.Store(false) rtnBuffer.data = make(chan interface{}, cap) return rtnBuffer } func (s *starChanStack) init() { s.cap = 1024 s.data = make(chan interface{}, s.cap) s.isClose.Store(false) } func (s *starChanStack) Free() uint64 { return s.cap - s.current } func (s *starChanStack) Cap() uint64 { return s.cap } func (s *starChanStack) Len() uint64 { return s.current } func (s *starChanStack) Pop() (interface{}, error) { if s.isClose.Load() == nil { s.init() } if s.isClose.Load().(bool) { return 0, io.EOF } data, ok := <-s.data if !ok { s.isClose.Store(true) return 0, errors.New("channel read error") } for { current := atomic.LoadUint64(&s.current) if atomic.CompareAndSwapUint64(&s.current, current, current-1) { break } } return data, nil } func (s *starChanStack) Push(data interface{}) error { defer func() { recover() }() if s.isClose.Load() == nil { s.init() } if s.isClose.Load().(bool) { return io.EOF } s.data <- data for { current := atomic.LoadUint64(&s.current) if atomic.CompareAndSwapUint64(&s.current, current, current+1) { break } } return nil } func (s *starChanStack) Close() error { if s.isClose.Load() == nil { s.init() } s.isClose.Store(true) close(s.data) return nil }