starlog/internal/multisinkx/multi_sink.go

339 lines
7.0 KiB
Go
Raw Normal View History

2026-03-19 16:37:57 +08:00
package multisinkx
import (
"fmt"
"sync"
"sync/atomic"
)
type State string
const (
StateHealthy State = "healthy"
StateDegraded State = "degraded"
StateRecovered State = "recovered"
)
const (
stateHealthy uint32 = iota
stateDegraded
stateRecovered
)
type Stats struct {
Index int
Writes uint64
WriteErrors uint64
Closes uint64
CloseErrors uint64
ConsecutiveWriteErrors uint64
ConsecutiveCloseErrors uint64
LastWriteError string
LastCloseError string
State State
}
type Snapshot struct {
ContinueOnError bool
Sinks []Stats
}
type Sink interface {
Write([]byte) error
Close() error
}
type slot struct {
sink Sink
writeCount uint64
writeErrorCount uint64
closeCount uint64
closeErrorCount uint64
consecutiveWriteErrors uint64
consecutiveCloseErrors uint64
state uint32
mu sync.RWMutex
lastWriteError string
lastCloseError string
}
func newSlot(sink Sink) *slot {
result := &slot{
sink: sink,
}
atomic.StoreUint32(&result.state, stateHealthy)
return result
}
func (s *slot) setLastWriteError(err error) {
msg := ""
if err != nil {
msg = err.Error()
}
s.mu.Lock()
s.lastWriteError = msg
s.mu.Unlock()
}
func (s *slot) setLastCloseError(err error) {
msg := ""
if err != nil {
msg = err.Error()
}
s.mu.Lock()
s.lastCloseError = msg
s.mu.Unlock()
}
func (s *slot) setStateHealthyOrRecovered() {
if atomic.LoadUint64(&s.writeErrorCount)+atomic.LoadUint64(&s.closeErrorCount) > 0 {
atomic.StoreUint32(&s.state, stateRecovered)
return
}
atomic.StoreUint32(&s.state, stateHealthy)
}
func (s *slot) observeWrite(err error) {
atomic.AddUint64(&s.writeCount, 1)
if err == nil {
atomic.StoreUint64(&s.consecutiveWriteErrors, 0)
s.setLastWriteError(nil)
s.setStateHealthyOrRecovered()
return
}
atomic.AddUint64(&s.writeErrorCount, 1)
atomic.AddUint64(&s.consecutiveWriteErrors, 1)
atomic.StoreUint32(&s.state, stateDegraded)
s.setLastWriteError(err)
}
func (s *slot) observeClose(err error) {
atomic.AddUint64(&s.closeCount, 1)
if err == nil {
atomic.StoreUint64(&s.consecutiveCloseErrors, 0)
s.setLastCloseError(nil)
s.setStateHealthyOrRecovered()
return
}
atomic.AddUint64(&s.closeErrorCount, 1)
atomic.AddUint64(&s.consecutiveCloseErrors, 1)
atomic.StoreUint32(&s.state, stateDegraded)
s.setLastCloseError(err)
}
func (s *slot) snapshot(index int) Stats {
lastWriteErr := ""
lastCloseErr := ""
s.mu.RLock()
lastWriteErr = s.lastWriteError
lastCloseErr = s.lastCloseError
s.mu.RUnlock()
return Stats{
Index: index,
Writes: atomic.LoadUint64(&s.writeCount),
WriteErrors: atomic.LoadUint64(&s.writeErrorCount),
Closes: atomic.LoadUint64(&s.closeCount),
CloseErrors: atomic.LoadUint64(&s.closeErrorCount),
ConsecutiveWriteErrors: atomic.LoadUint64(&s.consecutiveWriteErrors),
ConsecutiveCloseErrors: atomic.LoadUint64(&s.consecutiveCloseErrors),
LastWriteError: lastWriteErr,
LastCloseError: lastCloseErr,
State: decodeState(atomic.LoadUint32(&s.state)),
}
}
func (s *slot) resetStats() {
atomic.StoreUint64(&s.writeCount, 0)
atomic.StoreUint64(&s.writeErrorCount, 0)
atomic.StoreUint64(&s.closeCount, 0)
atomic.StoreUint64(&s.closeErrorCount, 0)
atomic.StoreUint64(&s.consecutiveWriteErrors, 0)
atomic.StoreUint64(&s.consecutiveCloseErrors, 0)
atomic.StoreUint32(&s.state, stateHealthy)
s.mu.Lock()
s.lastWriteError = ""
s.lastCloseError = ""
s.mu.Unlock()
}
func decodeState(state uint32) State {
switch state {
case stateDegraded:
return StateDegraded
case stateRecovered:
return StateRecovered
default:
return StateHealthy
}
}
type MultiSink struct {
mu sync.RWMutex
slots []*slot
continueOnError bool
}
func New(sinks ...Sink) *MultiSink {
multi := &MultiSink{
continueOnError: true,
slots: make([]*slot, 0, len(sinks)),
}
multi.SetSinks(sinks...)
return multi
}
func (sink *MultiSink) SetSinks(sinks ...Sink) {
if sink == nil {
return
}
filtered := make([]Sink, 0, len(sinks))
for _, item := range sinks {
if item == nil {
continue
}
filtered = append(filtered, item)
}
slots := make([]*slot, 0, len(filtered))
for _, item := range filtered {
slots = append(slots, newSlot(item))
}
sink.mu.Lock()
sink.slots = slots
sink.mu.Unlock()
}
func (sink *MultiSink) AddSink(item Sink) {
if sink == nil || item == nil {
return
}
sink.mu.Lock()
sink.slots = append(sink.slots, newSlot(item))
sink.mu.Unlock()
}
func (sink *MultiSink) SetContinueOnError(continueOnError bool) {
if sink == nil {
return
}
sink.mu.Lock()
sink.continueOnError = continueOnError
sink.mu.Unlock()
}
func (sink *MultiSink) ContinueOnError() bool {
if sink == nil {
return true
}
sink.mu.RLock()
defer sink.mu.RUnlock()
return sink.continueOnError
}
func (sink *MultiSink) SinkCount() int {
if sink == nil {
return 0
}
sink.mu.RLock()
defer sink.mu.RUnlock()
return len(sink.slots)
}
func (sink *MultiSink) GetStats() Snapshot {
if sink == nil {
return Snapshot{
ContinueOnError: true,
Sinks: nil,
}
}
current, continueOnError := sink.snapshot()
stats := make([]Stats, 0, len(current))
for index, item := range current {
if item == nil {
continue
}
stats = append(stats, item.snapshot(index))
}
return Snapshot{
ContinueOnError: continueOnError,
Sinks: stats,
}
}
func (sink *MultiSink) ResetStats() {
if sink == nil {
return
}
current, _ := sink.snapshot()
for _, item := range current {
if item == nil {
continue
}
item.resetStats()
}
}
func (sink *MultiSink) Write(data []byte) error {
if sink == nil {
return nil
}
current, continueOnError := sink.snapshot()
if len(current) == 0 {
return nil
}
var errs []error
for _, item := range current {
if item == nil || item.sink == nil {
continue
}
err := item.sink.Write(data)
item.observeWrite(err)
if err != nil {
if !continueOnError {
return err
}
errs = append(errs, err)
}
}
return packErrors("write", errs)
}
func (sink *MultiSink) Close() error {
if sink == nil {
return nil
}
current, continueOnError := sink.snapshot()
var errs []error
for _, item := range current {
if item == nil || item.sink == nil {
continue
}
err := item.sink.Close()
item.observeClose(err)
if err != nil {
if !continueOnError {
return err
}
errs = append(errs, err)
}
}
return packErrors("close", errs)
}
func (sink *MultiSink) snapshot() ([]*slot, bool) {
sink.mu.RLock()
defer sink.mu.RUnlock()
current := make([]*slot, len(sink.slots))
copy(current, sink.slots)
return current, sink.continueOnError
}
func packErrors(action string, errs []error) error {
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
return fmt.Errorf("multi sink %s failed with %d errors: %v", action, len(errs), errs[0])
}