notify/session_runtime_snapshot.go

403 lines
17 KiB
Go
Raw Permalink Normal View History

package notify
import (
"encoding/hex"
"errors"
"time"
)
type ClientRuntimeSnapshot struct {
OwnerState string
Alive bool
SessionEpoch uint64
TransportAttached bool
HasRuntimeConn bool
HasRuntimeQueue bool
HasRuntimeStopCtx bool
ConnectSource string
ConnectNetwork string
ConnectAddress string
CanReconnect bool
AuthMode string
ProtectionMode string
ProtectionKeyMode string
ForwardSecrecyEnabled bool
ForwardSecrecyFallback bool
ForwardSecrecyRequired bool
TransportSessionID string
PeerAttachAuthenticated bool
PeerAttachAuthFallback bool
LastPeerAttachAt time.Time
PeerAttachRequireExplicitAuth bool
PeerAttachRequireChannelBinding bool
PeerAttachChannelBindingConfigured bool
BulkNetworkProfile string
BulkDefaultMode string
BulkChunkSize int
BulkWindowBytes int
BulkMaxInFlight int
BulkAttachLimit int
BulkActiveLimit int
BulkLaneLimit int
BulkAttachAttempts int64
BulkAttachRetries int64
BulkAttachSuccess int64
BulkAutoFallbacks int64
TransportBulkAdaptiveSoftPayloadBytes int
TransportStreamAdaptiveSoftPayloadBytes int
TransportStreamAdaptiveWaitThresholdBytes int
TransportStreamAdaptiveFlushDelay time.Duration
Retry ConnectionRetrySnapshot
}
type ServerRuntimeSnapshot struct {
OwnerState string
Alive bool
ClientCount int
DetachedClientCount int
DetachedReattachableClientCount int
DetachedExpiredClientCount int
DetachedClientKeepSec int64
TransportAttached bool
HasRuntimeListener bool
HasRuntimeUDPListener bool
HasRuntimeQueue bool
HasRuntimeStopCtx bool
AuthMode string
ProtectionMode string
ForwardSecrecySupported bool
ForwardSecrecyRequired bool
PeerAttachRequireExplicitAuth bool
PeerAttachRequireChannelBinding bool
PeerAttachChannelBindingConfigured bool
PeerAttachReplayWindow time.Duration
PeerAttachReplayCapacity int
PeerAttachExplicitAuth int64
PeerAttachAuthFallbacks int64
PeerAttachAuthRejects int64
PeerAttachDowngradeRejects int64
PeerAttachBindingRejects int64
PeerAttachReplayRejects int64
PeerAttachReplayOverflowRejects int64
BulkChunkSize int
BulkWindowBytes int
BulkMaxInFlight int
Retry ConnectionRetrySnapshot
}
type ClientConnRuntimeSnapshot struct {
ClientID string
RemoteAddress string
Alive bool
Reason string
Error string
IdentityBound bool
UsesStreamTransport bool
TransportGeneration uint64
TransportAttached bool
HasRuntimeConn bool
HasRuntimeStopCtx bool
TransportAttachCount uint64
TransportDetachCount uint64
LastTransportAttachAt time.Time
DetachedClientKeepSec int64
LastHeartbeatAt time.Time
TransportDetachReason string
TransportDetachKind string
TransportDetachGeneration uint64
TransportDetachError string
TransportDetachedAt time.Time
TransportDetachHasExpiry bool
TransportDetachExpiry time.Time
TransportDetachRemaining time.Duration
TransportDetachExpired bool
ReattachEligible bool
AuthMode string
ProtectionMode string
ProtectionKeyMode string
ForwardSecrecyEnabled bool
ForwardSecrecyFallback bool
TransportSessionID string
PeerAttachAuthenticated bool
PeerAttachAuthFallback bool
LastPeerAttachAt time.Time
TransportBulkAdaptiveSoftPayloadBytes int
TransportStreamAdaptiveSoftPayloadBytes int
TransportStreamAdaptiveWaitThresholdBytes int
TransportStreamAdaptiveFlushDelay time.Duration
}
func (c *ClientCommon) clientRuntimeSnapshot() ClientRuntimeSnapshot {
status := c.Status()
rt := c.clientSessionRuntimeSnapshot()
snapshot := ClientRuntimeSnapshot{
OwnerState: c.ownerSessionStateName(),
Alive: status.Alive,
SessionEpoch: c.currentClientSessionEpoch(),
}
if rt != nil {
snapshot.TransportAttached = c.clientTransportAttachedSnapshot()
snapshot.HasRuntimeConn = c.clientTransportConnSnapshot() != nil
snapshot.HasRuntimeQueue = c.clientQueueSnapshot() != nil
snapshot.HasRuntimeStopCtx = rt.stopCtx != nil
}
if source := c.clientConnectSourceSnapshot(); source != nil {
snapshot.ConnectSource = source.kind
snapshot.ConnectNetwork = source.network
snapshot.ConnectAddress = source.addr
snapshot.CanReconnect = source.canReconnect()
}
snapshot.AuthMode = authModeName(c.securityAuthMode)
snapshot.ProtectionMode = protectionModeName(c.securityProtectionMode)
protection := c.clientTransportProtectionSnapshot()
snapshot.ProtectionKeyMode = protection.keyMode
snapshot.ForwardSecrecyEnabled = protection.forwardSecrecy
snapshot.ForwardSecrecyFallback = protection.forwardSecrecyFallback
snapshot.ForwardSecrecyRequired = c.clientRequiresForwardSecrecy()
snapshot.TransportSessionID = hex.EncodeToString(protection.sessionID)
snapshot.PeerAttachAuthenticated, snapshot.PeerAttachAuthFallback, snapshot.LastPeerAttachAt = c.clientPeerAttachAuthSnapshot()
peerAttachCfg := c.peerAttachSecuritySnapshot()
snapshot.PeerAttachRequireExplicitAuth = peerAttachCfg.requireExplicitAuth
snapshot.PeerAttachRequireChannelBinding = peerAttachCfg.requireChannelBinding
snapshot.PeerAttachChannelBindingConfigured = peerAttachCfg.channelBinding != nil
snapshot.BulkNetworkProfile = bulkNetworkProfileName(c.BulkNetworkProfile())
snapshot.BulkDefaultMode = bulkOpenModeName(c.BulkDefaultOpenMode())
tuning := c.BulkOpenTuning()
cfg := c.BulkDedicatedAttachConfig()
snapshot.BulkChunkSize = tuning.ChunkSize
snapshot.BulkWindowBytes = tuning.WindowBytes
snapshot.BulkMaxInFlight = tuning.MaxInFlight
snapshot.BulkAttachLimit = cfg.AttachLimit
snapshot.BulkActiveLimit = cfg.ActiveLimit
snapshot.BulkLaneLimit = cfg.LaneLimit
snapshot.BulkAttachAttempts = c.bulkAttachAttemptCount.Load()
snapshot.BulkAttachRetries = c.bulkAttachRetryCount.Load()
snapshot.BulkAttachSuccess = c.bulkAttachSuccessCount.Load()
snapshot.BulkAutoFallbacks = c.bulkAttachFallbackCount.Load()
if binding := c.clientTransportBindingSnapshot(); binding != nil {
snapshot.TransportBulkAdaptiveSoftPayloadBytes = binding.bulkAdaptiveSoftPayloadBytesSnapshot()
snapshot.TransportStreamAdaptiveSoftPayloadBytes = binding.streamAdaptiveSoftPayloadBytesSnapshot()
snapshot.TransportStreamAdaptiveWaitThresholdBytes = binding.streamAdaptiveWaitThresholdBytesSnapshot()
snapshot.TransportStreamAdaptiveFlushDelay = binding.streamAdaptiveFlushDelaySnapshot()
}
snapshot.Retry = c.connectionRetrySnapshot()
return snapshot
}
func (s *ServerCommon) serverRuntimeSnapshot() ServerRuntimeSnapshot {
status := s.Status()
rt := s.serverSessionRuntimeSnapshot()
now := time.Now()
snapshot := ServerRuntimeSnapshot{
OwnerState: s.ownerSessionStateName(),
Alive: status.Alive,
DetachedClientKeepSec: s.DetachedClientKeepSec(),
}
logicals := s.GetLogicalConnList()
snapshot.ClientCount = len(logicals)
for _, logical := range logicals {
if logical != nil && logical.logicalTransportDetachedSnapshot() {
snapshot.DetachedClientCount++
if logical.transportDetachExpiredSnapshot(now) {
snapshot.DetachedExpiredClientCount++
}
if logical.reattachEligibleSnapshot(now) {
snapshot.DetachedReattachableClientCount++
}
}
}
if rt != nil {
snapshot.TransportAttached = s.serverTransportAttachedSnapshot()
snapshot.HasRuntimeListener = rt.listener != nil
snapshot.HasRuntimeUDPListener = rt.udpListener != nil
snapshot.HasRuntimeQueue = rt.queue != nil
snapshot.HasRuntimeStopCtx = rt.stopCtx != nil
}
snapshot.AuthMode = authModeName(s.securityAuthMode)
snapshot.ProtectionMode = protectionModeName(s.securityProtectionMode)
snapshot.ForwardSecrecySupported = s.serverSupportsForwardSecrecy()
snapshot.ForwardSecrecyRequired = s.serverRequiresForwardSecrecy()
peerAttachCfg := s.peerAttachSecuritySnapshot()
snapshot.PeerAttachRequireExplicitAuth = peerAttachCfg.requireExplicitAuth
snapshot.PeerAttachRequireChannelBinding = peerAttachCfg.requireChannelBinding
snapshot.PeerAttachChannelBindingConfigured = peerAttachCfg.channelBinding != nil
snapshot.PeerAttachReplayWindow = peerAttachCfg.replayWindow
snapshot.PeerAttachReplayCapacity = peerAttachCfg.replayCapacity
snapshot.PeerAttachExplicitAuth = s.peerAttachExplicitCount.Load()
snapshot.PeerAttachAuthFallbacks = s.peerAttachAuthFallbackCount.Load()
snapshot.PeerAttachAuthRejects = s.peerAttachAuthRejectCount.Load()
snapshot.PeerAttachDowngradeRejects = s.peerAttachDowngradeRejectCount.Load()
snapshot.PeerAttachBindingRejects = s.peerAttachBindingRejectCount.Load()
snapshot.PeerAttachReplayRejects = s.peerAttachReplayRejectCountSnapshot()
snapshot.PeerAttachReplayOverflowRejects = s.peerAttachReplayOverflowRejectCountSnapshot()
tuning := s.BulkOpenTuning()
snapshot.BulkChunkSize = tuning.ChunkSize
snapshot.BulkWindowBytes = tuning.WindowBytes
snapshot.BulkMaxInFlight = tuning.MaxInFlight
snapshot.Retry = s.connectionRetrySnapshot()
return snapshot
}
func (c *ClientConn) clientConnRuntimeSnapshot() ClientConnRuntimeSnapshot {
status := c.clientConnStatusSnapshot()
attachment := c.clientConnAttachmentStateSnapshot()
now := time.Now()
snapshot := ClientConnRuntimeSnapshot{
ClientID: c.clientConnIDSnapshot(),
Alive: status.Alive,
Reason: status.Reason,
IdentityBound: c.clientConnIdentityBoundSnapshot(),
UsesStreamTransport: c.clientConnUsesStreamTransportSnapshot(),
TransportGeneration: c.clientConnTransportGenerationSnapshot(),
TransportAttachCount: c.clientConnTransportAttachCountSnapshot(),
TransportDetachCount: c.clientConnTransportDetachCountSnapshot(),
LastTransportAttachAt: c.clientConnLastTransportAttachedAtSnapshot(),
AuthMode: authModeName(attachment.authMode),
ProtectionMode: protectionModeName(attachment.protectionMode),
}
snapshot.PeerAttachAuthenticated = attachment.peerAttached
snapshot.PeerAttachAuthFallback = attachment.peerAttachFallback
snapshot.ProtectionKeyMode = attachment.keyMode
snapshot.ForwardSecrecyEnabled = attachment.forwardSecrecy
snapshot.ForwardSecrecyFallback = attachment.forwardSecrecyFallback
snapshot.TransportSessionID = hex.EncodeToString(attachment.sessionID)
if attachment.peerAttachAt != 0 {
snapshot.LastPeerAttachAt = time.Unix(0, attachment.peerAttachAt)
}
if status.Err != nil {
snapshot.Error = status.Err.Error()
}
if addr := c.clientConnRemoteAddrSnapshot(); addr != nil {
snapshot.RemoteAddress = addr.String()
}
if lastHeartbeat := c.clientConnLastHeartbeatUnixSnapshot(); lastHeartbeat != 0 {
snapshot.LastHeartbeatAt = time.Unix(lastHeartbeat, 0)
}
if c.server != nil {
snapshot.DetachedClientKeepSec = c.server.DetachedClientKeepSec()
}
if rt := c.clientConnSessionRuntimeSnapshot(); rt != nil {
snapshot.TransportAttached = c.clientConnTransportAttachedSnapshot()
snapshot.HasRuntimeConn = c.clientConnTransportSnapshot() != nil
snapshot.HasRuntimeStopCtx = rt.stopCtx != nil
}
if binding := c.clientConnTransportBindingSnapshot(); binding != nil {
snapshot.TransportBulkAdaptiveSoftPayloadBytes = binding.bulkAdaptiveSoftPayloadBytesSnapshot()
snapshot.TransportStreamAdaptiveSoftPayloadBytes = binding.streamAdaptiveSoftPayloadBytesSnapshot()
snapshot.TransportStreamAdaptiveWaitThresholdBytes = binding.streamAdaptiveWaitThresholdBytesSnapshot()
snapshot.TransportStreamAdaptiveFlushDelay = binding.streamAdaptiveFlushDelaySnapshot()
}
if detach := c.clientConnTransportDetachSnapshot(); detach != nil {
snapshot.TransportDetachReason = detach.Reason
snapshot.TransportDetachKind = c.clientConnTransportDetachKindSnapshot()
snapshot.TransportDetachGeneration = c.clientConnTransportDetachGenerationSnapshot()
snapshot.TransportDetachError = detach.Err
snapshot.TransportDetachedAt = detach.At
snapshot.TransportDetachExpiry, snapshot.TransportDetachHasExpiry = c.clientConnTransportDetachExpirySnapshot()
snapshot.TransportDetachRemaining = c.clientConnTransportDetachRemainingSnapshot(now)
snapshot.TransportDetachExpired = c.clientConnTransportDetachExpiredSnapshot(now)
}
snapshot.ReattachEligible = c.clientConnReattachEligibleSnapshot(now)
return snapshot
}
type clientRuntimeSnapshotReader interface {
clientRuntimeSnapshot() ClientRuntimeSnapshot
}
type serverRuntimeSnapshotReader interface {
serverRuntimeSnapshot() ServerRuntimeSnapshot
}
type serverDetachedClientRuntimeSnapshotReader interface {
detachedClientRuntimeSnapshots() []ClientConnRuntimeSnapshot
}
var (
errClientRuntimeSnapshotNil = errors.New("client runtime snapshot target is nil")
errServerRuntimeSnapshotNil = errors.New("server runtime snapshot target is nil")
errClientConnRuntimeSnapshotNil = errors.New("client conn runtime snapshot target is nil")
errLogicalConnRuntimeSnapshotNil = errors.New("logical conn runtime snapshot target is nil")
errServerDetachedClientRuntimeSnapshotNil = errors.New("server detached client runtime snapshot target is nil")
errClientRuntimeSnapshotUnsupported = errors.New("client runtime snapshot target type is unsupported")
errServerRuntimeSnapshotUnsupported = errors.New("server runtime snapshot target type is unsupported")
errServerDetachedClientSnapshotUnsupported = errors.New("server detached client runtime snapshot target type is unsupported")
)
func GetClientRuntimeSnapshot(c Client) (ClientRuntimeSnapshot, error) {
if c == nil {
return ClientRuntimeSnapshot{}, errClientRuntimeSnapshotNil
}
reader, ok := any(c).(clientRuntimeSnapshotReader)
if !ok {
return ClientRuntimeSnapshot{}, errClientRuntimeSnapshotUnsupported
}
return reader.clientRuntimeSnapshot(), nil
}
func GetServerRuntimeSnapshot(s Server) (ServerRuntimeSnapshot, error) {
if s == nil {
return ServerRuntimeSnapshot{}, errServerRuntimeSnapshotNil
}
reader, ok := any(s).(serverRuntimeSnapshotReader)
if !ok {
return ServerRuntimeSnapshot{}, errServerRuntimeSnapshotUnsupported
}
return reader.serverRuntimeSnapshot(), nil
}
func GetClientConnRuntimeSnapshot(c *ClientConn) (ClientConnRuntimeSnapshot, error) {
if c == nil {
return ClientConnRuntimeSnapshot{}, errClientConnRuntimeSnapshotNil
}
return c.clientConnRuntimeSnapshot(), nil
}
func GetLogicalConnRuntimeSnapshot(c *LogicalConn) (ClientConnRuntimeSnapshot, error) {
if c == nil {
return ClientConnRuntimeSnapshot{}, errLogicalConnRuntimeSnapshotNil
}
return c.runtimeSnapshot(), nil
}
func GetCurrentTransportConnRuntimeSnapshotByLogical(c *LogicalConn) (TransportConnRuntimeSnapshot, bool, error) {
if c == nil {
return TransportConnRuntimeSnapshot{}, false, errLogicalConnRuntimeSnapshotNil
}
transport := c.CurrentTransportConn()
if transport == nil {
return TransportConnRuntimeSnapshot{}, false, nil
}
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
if err != nil {
return TransportConnRuntimeSnapshot{}, false, err
}
return snapshot, true, nil
}
func (s *ServerCommon) detachedClientRuntimeSnapshots() []ClientConnRuntimeSnapshot {
if s == nil {
return nil
}
logicals := s.snapshotDetachedLogicals()
snapshots := make([]ClientConnRuntimeSnapshot, 0, len(logicals))
for _, logical := range logicals {
if logical == nil {
continue
}
snapshots = append(snapshots, logical.runtimeSnapshot())
}
return snapshots
}
func GetServerDetachedClientRuntimeSnapshots(s Server) ([]ClientConnRuntimeSnapshot, error) {
if s == nil {
return nil, errServerDetachedClientRuntimeSnapshotNil
}
reader, ok := any(s).(serverDetachedClientRuntimeSnapshotReader)
if !ok {
return nil, errServerDetachedClientSnapshotUnsupported
}
return reader.detachedClientRuntimeSnapshots(), nil
}