339 lines
7.0 KiB
Go
339 lines
7.0 KiB
Go
|
|
package multisinkx
|
||
|
|
|
||
|
|
import (
|
||
|
|
"fmt"
|
||
|
|
"sync"
|
||
|
|
"sync/atomic"
|
||
|
|
)
|
||
|
|
|
||
|
|
type State string
|
||
|
|
|
||
|
|
const (
|
||
|
|
StateHealthy State = "healthy"
|
||
|
|
StateDegraded State = "degraded"
|
||
|
|
StateRecovered State = "recovered"
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
stateHealthy uint32 = iota
|
||
|
|
stateDegraded
|
||
|
|
stateRecovered
|
||
|
|
)
|
||
|
|
|
||
|
|
type Stats struct {
|
||
|
|
Index int
|
||
|
|
Writes uint64
|
||
|
|
WriteErrors uint64
|
||
|
|
Closes uint64
|
||
|
|
CloseErrors uint64
|
||
|
|
ConsecutiveWriteErrors uint64
|
||
|
|
ConsecutiveCloseErrors uint64
|
||
|
|
LastWriteError string
|
||
|
|
LastCloseError string
|
||
|
|
State State
|
||
|
|
}
|
||
|
|
|
||
|
|
type Snapshot struct {
|
||
|
|
ContinueOnError bool
|
||
|
|
Sinks []Stats
|
||
|
|
}
|
||
|
|
|
||
|
|
type Sink interface {
|
||
|
|
Write([]byte) error
|
||
|
|
Close() error
|
||
|
|
}
|
||
|
|
|
||
|
|
type slot struct {
|
||
|
|
sink Sink
|
||
|
|
writeCount uint64
|
||
|
|
writeErrorCount uint64
|
||
|
|
closeCount uint64
|
||
|
|
closeErrorCount uint64
|
||
|
|
consecutiveWriteErrors uint64
|
||
|
|
consecutiveCloseErrors uint64
|
||
|
|
state uint32
|
||
|
|
mu sync.RWMutex
|
||
|
|
lastWriteError string
|
||
|
|
lastCloseError string
|
||
|
|
}
|
||
|
|
|
||
|
|
func newSlot(sink Sink) *slot {
|
||
|
|
result := &slot{
|
||
|
|
sink: sink,
|
||
|
|
}
|
||
|
|
atomic.StoreUint32(&result.state, stateHealthy)
|
||
|
|
return result
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *slot) setLastWriteError(err error) {
|
||
|
|
msg := ""
|
||
|
|
if err != nil {
|
||
|
|
msg = err.Error()
|
||
|
|
}
|
||
|
|
s.mu.Lock()
|
||
|
|
s.lastWriteError = msg
|
||
|
|
s.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *slot) setLastCloseError(err error) {
|
||
|
|
msg := ""
|
||
|
|
if err != nil {
|
||
|
|
msg = err.Error()
|
||
|
|
}
|
||
|
|
s.mu.Lock()
|
||
|
|
s.lastCloseError = msg
|
||
|
|
s.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *slot) setStateHealthyOrRecovered() {
|
||
|
|
if atomic.LoadUint64(&s.writeErrorCount)+atomic.LoadUint64(&s.closeErrorCount) > 0 {
|
||
|
|
atomic.StoreUint32(&s.state, stateRecovered)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
atomic.StoreUint32(&s.state, stateHealthy)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *slot) observeWrite(err error) {
|
||
|
|
atomic.AddUint64(&s.writeCount, 1)
|
||
|
|
if err == nil {
|
||
|
|
atomic.StoreUint64(&s.consecutiveWriteErrors, 0)
|
||
|
|
s.setLastWriteError(nil)
|
||
|
|
s.setStateHealthyOrRecovered()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
atomic.AddUint64(&s.writeErrorCount, 1)
|
||
|
|
atomic.AddUint64(&s.consecutiveWriteErrors, 1)
|
||
|
|
atomic.StoreUint32(&s.state, stateDegraded)
|
||
|
|
s.setLastWriteError(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *slot) observeClose(err error) {
|
||
|
|
atomic.AddUint64(&s.closeCount, 1)
|
||
|
|
if err == nil {
|
||
|
|
atomic.StoreUint64(&s.consecutiveCloseErrors, 0)
|
||
|
|
s.setLastCloseError(nil)
|
||
|
|
s.setStateHealthyOrRecovered()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
atomic.AddUint64(&s.closeErrorCount, 1)
|
||
|
|
atomic.AddUint64(&s.consecutiveCloseErrors, 1)
|
||
|
|
atomic.StoreUint32(&s.state, stateDegraded)
|
||
|
|
s.setLastCloseError(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *slot) snapshot(index int) Stats {
|
||
|
|
lastWriteErr := ""
|
||
|
|
lastCloseErr := ""
|
||
|
|
s.mu.RLock()
|
||
|
|
lastWriteErr = s.lastWriteError
|
||
|
|
lastCloseErr = s.lastCloseError
|
||
|
|
s.mu.RUnlock()
|
||
|
|
return Stats{
|
||
|
|
Index: index,
|
||
|
|
Writes: atomic.LoadUint64(&s.writeCount),
|
||
|
|
WriteErrors: atomic.LoadUint64(&s.writeErrorCount),
|
||
|
|
Closes: atomic.LoadUint64(&s.closeCount),
|
||
|
|
CloseErrors: atomic.LoadUint64(&s.closeErrorCount),
|
||
|
|
ConsecutiveWriteErrors: atomic.LoadUint64(&s.consecutiveWriteErrors),
|
||
|
|
ConsecutiveCloseErrors: atomic.LoadUint64(&s.consecutiveCloseErrors),
|
||
|
|
LastWriteError: lastWriteErr,
|
||
|
|
LastCloseError: lastCloseErr,
|
||
|
|
State: decodeState(atomic.LoadUint32(&s.state)),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *slot) resetStats() {
|
||
|
|
atomic.StoreUint64(&s.writeCount, 0)
|
||
|
|
atomic.StoreUint64(&s.writeErrorCount, 0)
|
||
|
|
atomic.StoreUint64(&s.closeCount, 0)
|
||
|
|
atomic.StoreUint64(&s.closeErrorCount, 0)
|
||
|
|
atomic.StoreUint64(&s.consecutiveWriteErrors, 0)
|
||
|
|
atomic.StoreUint64(&s.consecutiveCloseErrors, 0)
|
||
|
|
atomic.StoreUint32(&s.state, stateHealthy)
|
||
|
|
s.mu.Lock()
|
||
|
|
s.lastWriteError = ""
|
||
|
|
s.lastCloseError = ""
|
||
|
|
s.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeState(state uint32) State {
|
||
|
|
switch state {
|
||
|
|
case stateDegraded:
|
||
|
|
return StateDegraded
|
||
|
|
case stateRecovered:
|
||
|
|
return StateRecovered
|
||
|
|
default:
|
||
|
|
return StateHealthy
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
type MultiSink struct {
|
||
|
|
mu sync.RWMutex
|
||
|
|
slots []*slot
|
||
|
|
continueOnError bool
|
||
|
|
}
|
||
|
|
|
||
|
|
func New(sinks ...Sink) *MultiSink {
|
||
|
|
multi := &MultiSink{
|
||
|
|
continueOnError: true,
|
||
|
|
slots: make([]*slot, 0, len(sinks)),
|
||
|
|
}
|
||
|
|
multi.SetSinks(sinks...)
|
||
|
|
return multi
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) SetSinks(sinks ...Sink) {
|
||
|
|
if sink == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
filtered := make([]Sink, 0, len(sinks))
|
||
|
|
for _, item := range sinks {
|
||
|
|
if item == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
filtered = append(filtered, item)
|
||
|
|
}
|
||
|
|
slots := make([]*slot, 0, len(filtered))
|
||
|
|
for _, item := range filtered {
|
||
|
|
slots = append(slots, newSlot(item))
|
||
|
|
}
|
||
|
|
sink.mu.Lock()
|
||
|
|
sink.slots = slots
|
||
|
|
sink.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) AddSink(item Sink) {
|
||
|
|
if sink == nil || item == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
sink.mu.Lock()
|
||
|
|
sink.slots = append(sink.slots, newSlot(item))
|
||
|
|
sink.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) SetContinueOnError(continueOnError bool) {
|
||
|
|
if sink == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
sink.mu.Lock()
|
||
|
|
sink.continueOnError = continueOnError
|
||
|
|
sink.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) ContinueOnError() bool {
|
||
|
|
if sink == nil {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
sink.mu.RLock()
|
||
|
|
defer sink.mu.RUnlock()
|
||
|
|
return sink.continueOnError
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) SinkCount() int {
|
||
|
|
if sink == nil {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
sink.mu.RLock()
|
||
|
|
defer sink.mu.RUnlock()
|
||
|
|
return len(sink.slots)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) GetStats() Snapshot {
|
||
|
|
if sink == nil {
|
||
|
|
return Snapshot{
|
||
|
|
ContinueOnError: true,
|
||
|
|
Sinks: nil,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
current, continueOnError := sink.snapshot()
|
||
|
|
stats := make([]Stats, 0, len(current))
|
||
|
|
for index, item := range current {
|
||
|
|
if item == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
stats = append(stats, item.snapshot(index))
|
||
|
|
}
|
||
|
|
return Snapshot{
|
||
|
|
ContinueOnError: continueOnError,
|
||
|
|
Sinks: stats,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) ResetStats() {
|
||
|
|
if sink == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
current, _ := sink.snapshot()
|
||
|
|
for _, item := range current {
|
||
|
|
if item == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
item.resetStats()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) Write(data []byte) error {
|
||
|
|
if sink == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
current, continueOnError := sink.snapshot()
|
||
|
|
if len(current) == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
var errs []error
|
||
|
|
for _, item := range current {
|
||
|
|
if item == nil || item.sink == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
err := item.sink.Write(data)
|
||
|
|
item.observeWrite(err)
|
||
|
|
if err != nil {
|
||
|
|
if !continueOnError {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
errs = append(errs, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return packErrors("write", errs)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) Close() error {
|
||
|
|
if sink == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
current, continueOnError := sink.snapshot()
|
||
|
|
var errs []error
|
||
|
|
for _, item := range current {
|
||
|
|
if item == nil || item.sink == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
err := item.sink.Close()
|
||
|
|
item.observeClose(err)
|
||
|
|
if err != nil {
|
||
|
|
if !continueOnError {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
errs = append(errs, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return packErrors("close", errs)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (sink *MultiSink) snapshot() ([]*slot, bool) {
|
||
|
|
sink.mu.RLock()
|
||
|
|
defer sink.mu.RUnlock()
|
||
|
|
current := make([]*slot, len(sink.slots))
|
||
|
|
copy(current, sink.slots)
|
||
|
|
return current, sink.continueOnError
|
||
|
|
}
|
||
|
|
|
||
|
|
func packErrors(action string, errs []error) error {
|
||
|
|
if len(errs) == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
if len(errs) == 1 {
|
||
|
|
return errs[0]
|
||
|
|
}
|
||
|
|
return fmt.Errorf("multi sink %s failed with %d errors: %v", action, len(errs), errs[0])
|
||
|
|
}
|