124 lines
2.2 KiB
Go
124 lines
2.2 KiB
Go
package observerx
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type Buffer struct {
|
|
mu sync.RWMutex
|
|
items []interface{}
|
|
limit int
|
|
dropped uint64
|
|
}
|
|
|
|
func NewBuffer() *Buffer {
|
|
return &Buffer{
|
|
items: make([]interface{}, 0, 16),
|
|
limit: 0,
|
|
}
|
|
}
|
|
|
|
func (buffer *Buffer) Add(item interface{}) {
|
|
if buffer == nil {
|
|
return
|
|
}
|
|
buffer.mu.Lock()
|
|
if buffer.limit > 0 && len(buffer.items) >= buffer.limit {
|
|
buffer.items = buffer.items[1:]
|
|
atomic.AddUint64(&buffer.dropped, 1)
|
|
}
|
|
buffer.items = append(buffer.items, item)
|
|
buffer.mu.Unlock()
|
|
}
|
|
|
|
func (buffer *Buffer) SetLimit(limit int) {
|
|
if buffer == nil {
|
|
return
|
|
}
|
|
if limit < 0 {
|
|
limit = 0
|
|
}
|
|
buffer.mu.Lock()
|
|
buffer.limit = limit
|
|
if limit > 0 && len(buffer.items) > limit {
|
|
dropped := len(buffer.items) - limit
|
|
buffer.items = buffer.items[dropped:]
|
|
atomic.AddUint64(&buffer.dropped, uint64(dropped))
|
|
}
|
|
buffer.mu.Unlock()
|
|
}
|
|
|
|
func (buffer *Buffer) Limit() int {
|
|
if buffer == nil {
|
|
return 0
|
|
}
|
|
buffer.mu.RLock()
|
|
defer buffer.mu.RUnlock()
|
|
return buffer.limit
|
|
}
|
|
|
|
func (buffer *Buffer) Count() int {
|
|
if buffer == nil {
|
|
return 0
|
|
}
|
|
buffer.mu.RLock()
|
|
defer buffer.mu.RUnlock()
|
|
return len(buffer.items)
|
|
}
|
|
|
|
func (buffer *Buffer) Dropped() uint64 {
|
|
if buffer == nil {
|
|
return 0
|
|
}
|
|
return atomic.LoadUint64(&buffer.dropped)
|
|
}
|
|
|
|
func (buffer *Buffer) Snapshot() []interface{} {
|
|
if buffer == nil {
|
|
return nil
|
|
}
|
|
buffer.mu.RLock()
|
|
defer buffer.mu.RUnlock()
|
|
result := make([]interface{}, len(buffer.items))
|
|
copy(result, buffer.items)
|
|
return result
|
|
}
|
|
|
|
func (buffer *Buffer) Last() (interface{}, bool) {
|
|
if buffer == nil {
|
|
return nil, false
|
|
}
|
|
buffer.mu.RLock()
|
|
defer buffer.mu.RUnlock()
|
|
if len(buffer.items) == 0 {
|
|
return nil, false
|
|
}
|
|
return buffer.items[len(buffer.items)-1], true
|
|
}
|
|
|
|
func (buffer *Buffer) TakeAll() []interface{} {
|
|
if buffer == nil {
|
|
return nil
|
|
}
|
|
buffer.mu.Lock()
|
|
defer buffer.mu.Unlock()
|
|
if len(buffer.items) == 0 {
|
|
return nil
|
|
}
|
|
result := make([]interface{}, len(buffer.items))
|
|
copy(result, buffer.items)
|
|
buffer.items = buffer.items[:0]
|
|
return result
|
|
}
|
|
|
|
func (buffer *Buffer) Reset() {
|
|
if buffer == nil {
|
|
return
|
|
}
|
|
buffer.mu.Lock()
|
|
buffer.items = buffer.items[:0]
|
|
buffer.mu.Unlock()
|
|
atomic.StoreUint64(&buffer.dropped, 0)
|
|
}
|