124 lines
2.2 KiB
Go
Raw Permalink Normal View History

2026-03-19 16:37:57 +08:00
package observerx
import (
"sync"
"sync/atomic"
)
type Buffer struct {
mu sync.RWMutex
items []interface{}
limit int
dropped uint64
}
func NewBuffer() *Buffer {
return &Buffer{
items: make([]interface{}, 0, 16),
limit: 0,
}
}
func (buffer *Buffer) Add(item interface{}) {
if buffer == nil {
return
}
buffer.mu.Lock()
if buffer.limit > 0 && len(buffer.items) >= buffer.limit {
buffer.items = buffer.items[1:]
atomic.AddUint64(&buffer.dropped, 1)
}
buffer.items = append(buffer.items, item)
buffer.mu.Unlock()
}
func (buffer *Buffer) SetLimit(limit int) {
if buffer == nil {
return
}
if limit < 0 {
limit = 0
}
buffer.mu.Lock()
buffer.limit = limit
if limit > 0 && len(buffer.items) > limit {
dropped := len(buffer.items) - limit
buffer.items = buffer.items[dropped:]
atomic.AddUint64(&buffer.dropped, uint64(dropped))
}
buffer.mu.Unlock()
}
func (buffer *Buffer) Limit() int {
if buffer == nil {
return 0
}
buffer.mu.RLock()
defer buffer.mu.RUnlock()
return buffer.limit
}
func (buffer *Buffer) Count() int {
if buffer == nil {
return 0
}
buffer.mu.RLock()
defer buffer.mu.RUnlock()
return len(buffer.items)
}
func (buffer *Buffer) Dropped() uint64 {
if buffer == nil {
return 0
}
return atomic.LoadUint64(&buffer.dropped)
}
func (buffer *Buffer) Snapshot() []interface{} {
if buffer == nil {
return nil
}
buffer.mu.RLock()
defer buffer.mu.RUnlock()
result := make([]interface{}, len(buffer.items))
copy(result, buffer.items)
return result
}
func (buffer *Buffer) Last() (interface{}, bool) {
if buffer == nil {
return nil, false
}
buffer.mu.RLock()
defer buffer.mu.RUnlock()
if len(buffer.items) == 0 {
return nil, false
}
return buffer.items[len(buffer.items)-1], true
}
func (buffer *Buffer) TakeAll() []interface{} {
if buffer == nil {
return nil
}
buffer.mu.Lock()
defer buffer.mu.Unlock()
if len(buffer.items) == 0 {
return nil
}
result := make([]interface{}, len(buffer.items))
copy(result, buffer.items)
buffer.items = buffer.items[:0]
return result
}
func (buffer *Buffer) Reset() {
if buffer == nil {
return
}
buffer.mu.Lock()
buffer.items = buffer.items[:0]
buffer.mu.Unlock()
atomic.StoreUint64(&buffer.dropped, 0)
}