269 lines
9.3 KiB
Go
269 lines
9.3 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"errors"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
type ClientRuntimeSnapshot struct {
|
||
|
|
OwnerState string
|
||
|
|
Alive bool
|
||
|
|
SessionEpoch uint64
|
||
|
|
TransportAttached bool
|
||
|
|
HasRuntimeConn bool
|
||
|
|
HasRuntimeQueue bool
|
||
|
|
HasRuntimeStopCtx bool
|
||
|
|
ConnectSource string
|
||
|
|
ConnectNetwork string
|
||
|
|
ConnectAddress string
|
||
|
|
CanReconnect bool
|
||
|
|
Retry ConnectionRetrySnapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
type ServerRuntimeSnapshot struct {
|
||
|
|
OwnerState string
|
||
|
|
Alive bool
|
||
|
|
ClientCount int
|
||
|
|
DetachedClientCount int
|
||
|
|
DetachedReattachableClientCount int
|
||
|
|
DetachedExpiredClientCount int
|
||
|
|
DetachedClientKeepSec int64
|
||
|
|
TransportAttached bool
|
||
|
|
HasRuntimeListener bool
|
||
|
|
HasRuntimeUDPListener bool
|
||
|
|
HasRuntimeQueue bool
|
||
|
|
HasRuntimeStopCtx bool
|
||
|
|
Retry ConnectionRetrySnapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
type ClientConnRuntimeSnapshot struct {
|
||
|
|
ClientID string
|
||
|
|
RemoteAddress string
|
||
|
|
Alive bool
|
||
|
|
Reason string
|
||
|
|
Error string
|
||
|
|
IdentityBound bool
|
||
|
|
UsesStreamTransport bool
|
||
|
|
TransportGeneration uint64
|
||
|
|
TransportAttached bool
|
||
|
|
HasRuntimeConn bool
|
||
|
|
HasRuntimeStopCtx bool
|
||
|
|
TransportAttachCount uint64
|
||
|
|
TransportDetachCount uint64
|
||
|
|
LastTransportAttachAt time.Time
|
||
|
|
DetachedClientKeepSec int64
|
||
|
|
LastHeartbeatAt time.Time
|
||
|
|
TransportDetachReason string
|
||
|
|
TransportDetachKind string
|
||
|
|
TransportDetachGeneration uint64
|
||
|
|
TransportDetachError string
|
||
|
|
TransportDetachedAt time.Time
|
||
|
|
TransportDetachHasExpiry bool
|
||
|
|
TransportDetachExpiry time.Time
|
||
|
|
TransportDetachRemaining time.Duration
|
||
|
|
TransportDetachExpired bool
|
||
|
|
ReattachEligible bool
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) clientRuntimeSnapshot() ClientRuntimeSnapshot {
|
||
|
|
status := c.Status()
|
||
|
|
rt := c.clientSessionRuntimeSnapshot()
|
||
|
|
snapshot := ClientRuntimeSnapshot{
|
||
|
|
OwnerState: c.ownerSessionStateName(),
|
||
|
|
Alive: status.Alive,
|
||
|
|
SessionEpoch: c.currentClientSessionEpoch(),
|
||
|
|
}
|
||
|
|
if rt != nil {
|
||
|
|
snapshot.TransportAttached = c.clientTransportAttachedSnapshot()
|
||
|
|
snapshot.HasRuntimeConn = c.clientTransportConnSnapshot() != nil
|
||
|
|
snapshot.HasRuntimeQueue = c.clientQueueSnapshot() != nil
|
||
|
|
snapshot.HasRuntimeStopCtx = rt.stopCtx != nil
|
||
|
|
}
|
||
|
|
if source := c.clientConnectSourceSnapshot(); source != nil {
|
||
|
|
snapshot.ConnectSource = source.kind
|
||
|
|
snapshot.ConnectNetwork = source.network
|
||
|
|
snapshot.ConnectAddress = source.addr
|
||
|
|
snapshot.CanReconnect = source.canReconnect()
|
||
|
|
}
|
||
|
|
snapshot.Retry = c.connectionRetrySnapshot()
|
||
|
|
return snapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) serverRuntimeSnapshot() ServerRuntimeSnapshot {
|
||
|
|
status := s.Status()
|
||
|
|
rt := s.serverSessionRuntimeSnapshot()
|
||
|
|
now := time.Now()
|
||
|
|
snapshot := ServerRuntimeSnapshot{
|
||
|
|
OwnerState: s.ownerSessionStateName(),
|
||
|
|
Alive: status.Alive,
|
||
|
|
DetachedClientKeepSec: s.DetachedClientKeepSec(),
|
||
|
|
}
|
||
|
|
logicals := s.GetLogicalConnList()
|
||
|
|
snapshot.ClientCount = len(logicals)
|
||
|
|
for _, logical := range logicals {
|
||
|
|
if logical != nil && logical.logicalTransportDetachedSnapshot() {
|
||
|
|
snapshot.DetachedClientCount++
|
||
|
|
if logical.transportDetachExpiredSnapshot(now) {
|
||
|
|
snapshot.DetachedExpiredClientCount++
|
||
|
|
}
|
||
|
|
if logical.reattachEligibleSnapshot(now) {
|
||
|
|
snapshot.DetachedReattachableClientCount++
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if rt != nil {
|
||
|
|
snapshot.TransportAttached = s.serverTransportAttachedSnapshot()
|
||
|
|
snapshot.HasRuntimeListener = rt.listener != nil
|
||
|
|
snapshot.HasRuntimeUDPListener = rt.udpListener != nil
|
||
|
|
snapshot.HasRuntimeQueue = rt.queue != nil
|
||
|
|
snapshot.HasRuntimeStopCtx = rt.stopCtx != nil
|
||
|
|
}
|
||
|
|
snapshot.Retry = s.connectionRetrySnapshot()
|
||
|
|
return snapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientConn) clientConnRuntimeSnapshot() ClientConnRuntimeSnapshot {
|
||
|
|
status := c.clientConnStatusSnapshot()
|
||
|
|
now := time.Now()
|
||
|
|
snapshot := ClientConnRuntimeSnapshot{
|
||
|
|
ClientID: c.clientConnIDSnapshot(),
|
||
|
|
Alive: status.Alive,
|
||
|
|
Reason: status.Reason,
|
||
|
|
IdentityBound: c.clientConnIdentityBoundSnapshot(),
|
||
|
|
UsesStreamTransport: c.clientConnUsesStreamTransportSnapshot(),
|
||
|
|
TransportGeneration: c.clientConnTransportGenerationSnapshot(),
|
||
|
|
TransportAttachCount: c.clientConnTransportAttachCountSnapshot(),
|
||
|
|
TransportDetachCount: c.clientConnTransportDetachCountSnapshot(),
|
||
|
|
LastTransportAttachAt: c.clientConnLastTransportAttachedAtSnapshot(),
|
||
|
|
}
|
||
|
|
if status.Err != nil {
|
||
|
|
snapshot.Error = status.Err.Error()
|
||
|
|
}
|
||
|
|
if addr := c.clientConnRemoteAddrSnapshot(); addr != nil {
|
||
|
|
snapshot.RemoteAddress = addr.String()
|
||
|
|
}
|
||
|
|
if lastHeartbeat := c.clientConnLastHeartbeatUnixSnapshot(); lastHeartbeat != 0 {
|
||
|
|
snapshot.LastHeartbeatAt = time.Unix(lastHeartbeat, 0)
|
||
|
|
}
|
||
|
|
if c.server != nil {
|
||
|
|
snapshot.DetachedClientKeepSec = c.server.DetachedClientKeepSec()
|
||
|
|
}
|
||
|
|
if rt := c.clientConnSessionRuntimeSnapshot(); rt != nil {
|
||
|
|
snapshot.TransportAttached = c.clientConnTransportAttachedSnapshot()
|
||
|
|
snapshot.HasRuntimeConn = c.clientConnTransportSnapshot() != nil
|
||
|
|
snapshot.HasRuntimeStopCtx = rt.stopCtx != nil
|
||
|
|
}
|
||
|
|
if detach := c.clientConnTransportDetachSnapshot(); detach != nil {
|
||
|
|
snapshot.TransportDetachReason = detach.Reason
|
||
|
|
snapshot.TransportDetachKind = c.clientConnTransportDetachKindSnapshot()
|
||
|
|
snapshot.TransportDetachGeneration = c.clientConnTransportDetachGenerationSnapshot()
|
||
|
|
snapshot.TransportDetachError = detach.Err
|
||
|
|
snapshot.TransportDetachedAt = detach.At
|
||
|
|
snapshot.TransportDetachExpiry, snapshot.TransportDetachHasExpiry = c.clientConnTransportDetachExpirySnapshot()
|
||
|
|
snapshot.TransportDetachRemaining = c.clientConnTransportDetachRemainingSnapshot(now)
|
||
|
|
snapshot.TransportDetachExpired = c.clientConnTransportDetachExpiredSnapshot(now)
|
||
|
|
}
|
||
|
|
snapshot.ReattachEligible = c.clientConnReattachEligibleSnapshot(now)
|
||
|
|
return snapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
type clientRuntimeSnapshotReader interface {
|
||
|
|
clientRuntimeSnapshot() ClientRuntimeSnapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
type serverRuntimeSnapshotReader interface {
|
||
|
|
serverRuntimeSnapshot() ServerRuntimeSnapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
type serverDetachedClientRuntimeSnapshotReader interface {
|
||
|
|
detachedClientRuntimeSnapshots() []ClientConnRuntimeSnapshot
|
||
|
|
}
|
||
|
|
|
||
|
|
var (
|
||
|
|
errClientRuntimeSnapshotNil = errors.New("client runtime snapshot target is nil")
|
||
|
|
errServerRuntimeSnapshotNil = errors.New("server runtime snapshot target is nil")
|
||
|
|
errClientConnRuntimeSnapshotNil = errors.New("client conn runtime snapshot target is nil")
|
||
|
|
errLogicalConnRuntimeSnapshotNil = errors.New("logical conn runtime snapshot target is nil")
|
||
|
|
errServerDetachedClientRuntimeSnapshotNil = errors.New("server detached client runtime snapshot target is nil")
|
||
|
|
errClientRuntimeSnapshotUnsupported = errors.New("client runtime snapshot target type is unsupported")
|
||
|
|
errServerRuntimeSnapshotUnsupported = errors.New("server runtime snapshot target type is unsupported")
|
||
|
|
errServerDetachedClientSnapshotUnsupported = errors.New("server detached client runtime snapshot target type is unsupported")
|
||
|
|
)
|
||
|
|
|
||
|
|
func GetClientRuntimeSnapshot(c Client) (ClientRuntimeSnapshot, error) {
|
||
|
|
if c == nil {
|
||
|
|
return ClientRuntimeSnapshot{}, errClientRuntimeSnapshotNil
|
||
|
|
}
|
||
|
|
reader, ok := any(c).(clientRuntimeSnapshotReader)
|
||
|
|
if !ok {
|
||
|
|
return ClientRuntimeSnapshot{}, errClientRuntimeSnapshotUnsupported
|
||
|
|
}
|
||
|
|
return reader.clientRuntimeSnapshot(), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func GetServerRuntimeSnapshot(s Server) (ServerRuntimeSnapshot, error) {
|
||
|
|
if s == nil {
|
||
|
|
return ServerRuntimeSnapshot{}, errServerRuntimeSnapshotNil
|
||
|
|
}
|
||
|
|
reader, ok := any(s).(serverRuntimeSnapshotReader)
|
||
|
|
if !ok {
|
||
|
|
return ServerRuntimeSnapshot{}, errServerRuntimeSnapshotUnsupported
|
||
|
|
}
|
||
|
|
return reader.serverRuntimeSnapshot(), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func GetClientConnRuntimeSnapshot(c *ClientConn) (ClientConnRuntimeSnapshot, error) {
|
||
|
|
if c == nil {
|
||
|
|
return ClientConnRuntimeSnapshot{}, errClientConnRuntimeSnapshotNil
|
||
|
|
}
|
||
|
|
return c.clientConnRuntimeSnapshot(), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func GetLogicalConnRuntimeSnapshot(c *LogicalConn) (ClientConnRuntimeSnapshot, error) {
|
||
|
|
if c == nil {
|
||
|
|
return ClientConnRuntimeSnapshot{}, errLogicalConnRuntimeSnapshotNil
|
||
|
|
}
|
||
|
|
return c.runtimeSnapshot(), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func GetCurrentTransportConnRuntimeSnapshotByLogical(c *LogicalConn) (TransportConnRuntimeSnapshot, bool, error) {
|
||
|
|
if c == nil {
|
||
|
|
return TransportConnRuntimeSnapshot{}, false, errLogicalConnRuntimeSnapshotNil
|
||
|
|
}
|
||
|
|
transport := c.CurrentTransportConn()
|
||
|
|
if transport == nil {
|
||
|
|
return TransportConnRuntimeSnapshot{}, false, nil
|
||
|
|
}
|
||
|
|
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
|
||
|
|
if err != nil {
|
||
|
|
return TransportConnRuntimeSnapshot{}, false, err
|
||
|
|
}
|
||
|
|
return snapshot, true, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) detachedClientRuntimeSnapshots() []ClientConnRuntimeSnapshot {
|
||
|
|
if s == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
logicals := s.snapshotDetachedLogicals()
|
||
|
|
snapshots := make([]ClientConnRuntimeSnapshot, 0, len(logicals))
|
||
|
|
for _, logical := range logicals {
|
||
|
|
if logical == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
snapshots = append(snapshots, logical.runtimeSnapshot())
|
||
|
|
}
|
||
|
|
return snapshots
|
||
|
|
}
|
||
|
|
|
||
|
|
func GetServerDetachedClientRuntimeSnapshots(s Server) ([]ClientConnRuntimeSnapshot, error) {
|
||
|
|
if s == nil {
|
||
|
|
return nil, errServerDetachedClientRuntimeSnapshotNil
|
||
|
|
}
|
||
|
|
reader, ok := any(s).(serverDetachedClientRuntimeSnapshotReader)
|
||
|
|
if !ok {
|
||
|
|
return nil, errServerDetachedClientSnapshotUnsupported
|
||
|
|
}
|
||
|
|
return reader.detachedClientRuntimeSnapshots(), nil
|
||
|
|
}
|