148 lines
3.4 KiB
Go
148 lines
3.4 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
type ConnectionRetrySnapshot struct {
|
||
|
|
RetryEventTotal uint64
|
||
|
|
LastRetryAttempt int
|
||
|
|
LastRetryDelay time.Duration
|
||
|
|
LastRetryError string
|
||
|
|
LastRetryAt time.Time
|
||
|
|
LastResultError string
|
||
|
|
LastResultAt time.Time
|
||
|
|
}
|
||
|
|
|
||
|
|
type connectionRetryState struct {
|
||
|
|
mu sync.Mutex
|
||
|
|
|
||
|
|
retryEventTotal uint64
|
||
|
|
lastRetryAttempt int
|
||
|
|
lastRetryDelay time.Duration
|
||
|
|
lastRetryError string
|
||
|
|
lastRetryAt time.Time
|
||
|
|
lastResultError string
|
||
|
|
lastResultAt time.Time
|
||
|
|
}
|
||
|
|
|
||
|
|
func newConnectionRetryState() *connectionRetryState {
|
||
|
|
return &connectionRetryState{}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *connectionRetryState) recordRetryEvent(event ConnectRetryEvent) {
|
||
|
|
if s == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
s.retryEventTotal++
|
||
|
|
s.lastRetryAttempt = event.Attempt
|
||
|
|
s.lastRetryDelay = event.NextDelay
|
||
|
|
if event.Err != nil {
|
||
|
|
s.lastRetryError = event.Err.Error()
|
||
|
|
} else {
|
||
|
|
s.lastRetryError = ""
|
||
|
|
}
|
||
|
|
s.lastRetryAt = time.Now()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *connectionRetryState) recordResult(err error) {
|
||
|
|
if s == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
if err != nil {
|
||
|
|
s.lastResultError = err.Error()
|
||
|
|
} else {
|
||
|
|
s.lastResultError = ""
|
||
|
|
}
|
||
|
|
s.lastResultAt = time.Now()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *connectionRetryState) snapshot() ConnectionRetrySnapshot {
|
||
|
|
if s == nil {
|
||
|
|
return ConnectionRetrySnapshot{}
|
||
|
|
}
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
return ConnectionRetrySnapshot{
|
||
|
|
RetryEventTotal: s.retryEventTotal,
|
||
|
|
LastRetryAttempt: s.lastRetryAttempt,
|
||
|
|
LastRetryDelay: s.lastRetryDelay,
|
||
|
|
LastRetryError: s.lastRetryError,
|
||
|
|
LastRetryAt: s.lastRetryAt,
|
||
|
|
LastResultError: s.lastResultError,
|
||
|
|
LastResultAt: s.lastResultAt,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
type connectionRetryRecorder interface {
|
||
|
|
recordConnectionRetryEvent(event ConnectRetryEvent)
|
||
|
|
recordConnectionRetryResult(err error)
|
||
|
|
}
|
||
|
|
|
||
|
|
func wrapConnectRetryOptionsWithRecorder(opts *ConnectRetryOptions, recorder connectionRetryRecorder) *ConnectRetryOptions {
|
||
|
|
if recorder == nil {
|
||
|
|
return opts
|
||
|
|
}
|
||
|
|
if opts == nil {
|
||
|
|
return &ConnectRetryOptions{
|
||
|
|
OnRetry: recorder.recordConnectionRetryEvent,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
next := *opts
|
||
|
|
originOnRetry := next.OnRetry
|
||
|
|
next.OnRetry = func(event ConnectRetryEvent) {
|
||
|
|
recorder.recordConnectionRetryEvent(event)
|
||
|
|
if originOnRetry != nil {
|
||
|
|
originOnRetry(event)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return &next
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) getConnectionRetryState() *connectionRetryState {
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
if c.connectionRetryState == nil {
|
||
|
|
c.connectionRetryState = newConnectionRetryState()
|
||
|
|
}
|
||
|
|
return c.connectionRetryState
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) recordConnectionRetryEvent(event ConnectRetryEvent) {
|
||
|
|
c.getConnectionRetryState().recordRetryEvent(event)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) recordConnectionRetryResult(err error) {
|
||
|
|
c.getConnectionRetryState().recordResult(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) connectionRetrySnapshot() ConnectionRetrySnapshot {
|
||
|
|
return c.getConnectionRetryState().snapshot()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) getConnectionRetryState() *connectionRetryState {
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
if s.connectionRetryState == nil {
|
||
|
|
s.connectionRetryState = newConnectionRetryState()
|
||
|
|
}
|
||
|
|
return s.connectionRetryState
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) recordConnectionRetryEvent(event ConnectRetryEvent) {
|
||
|
|
s.getConnectionRetryState().recordRetryEvent(event)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) recordConnectionRetryResult(err error) {
|
||
|
|
s.getConnectionRetryState().recordResult(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) connectionRetrySnapshot() ConnectionRetrySnapshot {
|
||
|
|
return s.getConnectionRetryState().snapshot()
|
||
|
|
}
|