package multisinkx import ( "fmt" "sync" "sync/atomic" ) type State string const ( StateHealthy State = "healthy" StateDegraded State = "degraded" StateRecovered State = "recovered" ) const ( stateHealthy uint32 = iota stateDegraded stateRecovered ) type Stats struct { Index int Writes uint64 WriteErrors uint64 Closes uint64 CloseErrors uint64 ConsecutiveWriteErrors uint64 ConsecutiveCloseErrors uint64 LastWriteError string LastCloseError string State State } type Snapshot struct { ContinueOnError bool Sinks []Stats } type Sink interface { Write([]byte) error Close() error } type slot struct { sink Sink writeCount uint64 writeErrorCount uint64 closeCount uint64 closeErrorCount uint64 consecutiveWriteErrors uint64 consecutiveCloseErrors uint64 state uint32 mu sync.RWMutex lastWriteError string lastCloseError string } func newSlot(sink Sink) *slot { result := &slot{ sink: sink, } atomic.StoreUint32(&result.state, stateHealthy) return result } func (s *slot) setLastWriteError(err error) { msg := "" if err != nil { msg = err.Error() } s.mu.Lock() s.lastWriteError = msg s.mu.Unlock() } func (s *slot) setLastCloseError(err error) { msg := "" if err != nil { msg = err.Error() } s.mu.Lock() s.lastCloseError = msg s.mu.Unlock() } func (s *slot) setStateHealthyOrRecovered() { if atomic.LoadUint64(&s.writeErrorCount)+atomic.LoadUint64(&s.closeErrorCount) > 0 { atomic.StoreUint32(&s.state, stateRecovered) return } atomic.StoreUint32(&s.state, stateHealthy) } func (s *slot) observeWrite(err error) { atomic.AddUint64(&s.writeCount, 1) if err == nil { atomic.StoreUint64(&s.consecutiveWriteErrors, 0) s.setLastWriteError(nil) s.setStateHealthyOrRecovered() return } atomic.AddUint64(&s.writeErrorCount, 1) atomic.AddUint64(&s.consecutiveWriteErrors, 1) atomic.StoreUint32(&s.state, stateDegraded) s.setLastWriteError(err) } func (s *slot) observeClose(err error) { atomic.AddUint64(&s.closeCount, 1) if err == nil { atomic.StoreUint64(&s.consecutiveCloseErrors, 0) s.setLastCloseError(nil) s.setStateHealthyOrRecovered() return } atomic.AddUint64(&s.closeErrorCount, 1) atomic.AddUint64(&s.consecutiveCloseErrors, 1) atomic.StoreUint32(&s.state, stateDegraded) s.setLastCloseError(err) } func (s *slot) snapshot(index int) Stats { lastWriteErr := "" lastCloseErr := "" s.mu.RLock() lastWriteErr = s.lastWriteError lastCloseErr = s.lastCloseError s.mu.RUnlock() return Stats{ Index: index, Writes: atomic.LoadUint64(&s.writeCount), WriteErrors: atomic.LoadUint64(&s.writeErrorCount), Closes: atomic.LoadUint64(&s.closeCount), CloseErrors: atomic.LoadUint64(&s.closeErrorCount), ConsecutiveWriteErrors: atomic.LoadUint64(&s.consecutiveWriteErrors), ConsecutiveCloseErrors: atomic.LoadUint64(&s.consecutiveCloseErrors), LastWriteError: lastWriteErr, LastCloseError: lastCloseErr, State: decodeState(atomic.LoadUint32(&s.state)), } } func (s *slot) resetStats() { atomic.StoreUint64(&s.writeCount, 0) atomic.StoreUint64(&s.writeErrorCount, 0) atomic.StoreUint64(&s.closeCount, 0) atomic.StoreUint64(&s.closeErrorCount, 0) atomic.StoreUint64(&s.consecutiveWriteErrors, 0) atomic.StoreUint64(&s.consecutiveCloseErrors, 0) atomic.StoreUint32(&s.state, stateHealthy) s.mu.Lock() s.lastWriteError = "" s.lastCloseError = "" s.mu.Unlock() } func decodeState(state uint32) State { switch state { case stateDegraded: return StateDegraded case stateRecovered: return StateRecovered default: return StateHealthy } } type MultiSink struct { mu sync.RWMutex slots []*slot continueOnError bool } func New(sinks ...Sink) *MultiSink { multi := &MultiSink{ continueOnError: true, slots: make([]*slot, 0, len(sinks)), } multi.SetSinks(sinks...) return multi } func (sink *MultiSink) SetSinks(sinks ...Sink) { if sink == nil { return } filtered := make([]Sink, 0, len(sinks)) for _, item := range sinks { if item == nil { continue } filtered = append(filtered, item) } slots := make([]*slot, 0, len(filtered)) for _, item := range filtered { slots = append(slots, newSlot(item)) } sink.mu.Lock() sink.slots = slots sink.mu.Unlock() } func (sink *MultiSink) AddSink(item Sink) { if sink == nil || item == nil { return } sink.mu.Lock() sink.slots = append(sink.slots, newSlot(item)) sink.mu.Unlock() } func (sink *MultiSink) SetContinueOnError(continueOnError bool) { if sink == nil { return } sink.mu.Lock() sink.continueOnError = continueOnError sink.mu.Unlock() } func (sink *MultiSink) ContinueOnError() bool { if sink == nil { return true } sink.mu.RLock() defer sink.mu.RUnlock() return sink.continueOnError } func (sink *MultiSink) SinkCount() int { if sink == nil { return 0 } sink.mu.RLock() defer sink.mu.RUnlock() return len(sink.slots) } func (sink *MultiSink) GetStats() Snapshot { if sink == nil { return Snapshot{ ContinueOnError: true, Sinks: nil, } } current, continueOnError := sink.snapshot() stats := make([]Stats, 0, len(current)) for index, item := range current { if item == nil { continue } stats = append(stats, item.snapshot(index)) } return Snapshot{ ContinueOnError: continueOnError, Sinks: stats, } } func (sink *MultiSink) ResetStats() { if sink == nil { return } current, _ := sink.snapshot() for _, item := range current { if item == nil { continue } item.resetStats() } } func (sink *MultiSink) Write(data []byte) error { if sink == nil { return nil } current, continueOnError := sink.snapshot() if len(current) == 0 { return nil } var errs []error for _, item := range current { if item == nil || item.sink == nil { continue } err := item.sink.Write(data) item.observeWrite(err) if err != nil { if !continueOnError { return err } errs = append(errs, err) } } return packErrors("write", errs) } func (sink *MultiSink) Close() error { if sink == nil { return nil } current, continueOnError := sink.snapshot() var errs []error for _, item := range current { if item == nil || item.sink == nil { continue } err := item.sink.Close() item.observeClose(err) if err != nil { if !continueOnError { return err } errs = append(errs, err) } } return packErrors("close", errs) } func (sink *MultiSink) snapshot() ([]*slot, bool) { sink.mu.RLock() defer sink.mu.RUnlock() current := make([]*slot, len(sink.slots)) copy(current, sink.slots) return current, sink.continueOnError } func packErrors(action string, errs []error) error { if len(errs) == 0 { return nil } if len(errs) == 1 { return errs[0] } return fmt.Errorf("multi sink %s failed with %d errors: %v", action, len(errs), errs[0]) }