package notify import ( "context" "net" "sync" "sync/atomic" ) type logicalConnPeerState struct { clientID string clientAddr net.Addr identityBound bool } type logicalConnState struct { alive atomic.Value statusMu sync.Mutex status Status peer atomic.Pointer[logicalConnPeerState] } func cloneLogicalConnPeerState(src *logicalConnPeerState) *logicalConnPeerState { if src == nil { return &logicalConnPeerState{} } cloned := *src return &cloned } func (s *logicalConnState) peerSnapshot() logicalConnPeerState { if s == nil { return logicalConnPeerState{} } if peer := s.peer.Load(); peer != nil { return *cloneLogicalConnPeerState(peer) } return logicalConnPeerState{} } func (s *logicalConnState) updatePeer(apply func(*logicalConnPeerState)) { if s == nil || apply == nil { return } for { current := s.peer.Load() next := cloneLogicalConnPeerState(current) apply(next) if current == nil { if s.peer.CompareAndSwap((*logicalConnPeerState)(nil), next) { return } continue } if s.peer.CompareAndSwap(current, next) { return } } } func (s *logicalConnState) aliveSnapshot() bool { if s == nil { return false } return sessionIsAlive(&s.alive) } func (s *logicalConnState) statusSnapshot() Status { if s == nil { return Status{} } return sessionStatusValue(&s.statusMu, &s.status) } func (s *logicalConnState) markStarted() { if s == nil { return } sessionMarkStarted(&s.alive, &s.statusMu, &s.status) } func (s *logicalConnState) markStopped(reason string, err error, stopFn context.CancelFunc, cleanupFns ...func()) { if s == nil { return } sessionMarkStopped(&s.alive, &s.statusMu, &s.status, reason, err, stopFn, cleanupFns...) } func newLogicalConnStateFromClient(c *ClientConn) *logicalConnState { if c == nil { return nil } state := &logicalConnState{ status: sessionStatusValue(nil, &c.status), } state.alive.Store(sessionIsAlive(&c.alive)) state.peer.Store(&logicalConnPeerState{ clientID: c.ClientID, clientAddr: c.ClientAddr, identityBound: c.identityBound.Load(), }) return state } func (c *LogicalConn) ensureState() *logicalConnState { if c == nil { return nil } if state := c.state.Load(); state != nil { if client := c.compatClientConn(); client != nil { client.logicalState.Store(state) } return state } client := c.compatClientConn() if client != nil { if state := client.logicalState.Load(); state != nil { if c.state.CompareAndSwap(nil, state) { client.logicalState.Store(state) return state } return c.ensureState() } } state := newLogicalConnStateFromClient(client) if state == nil { state = &logicalConnState{} } if c.state.CompareAndSwap(nil, state) { if client != nil { client.logicalState.Store(state) } return state } return c.ensureState() } func (c *ClientConn) ensureLogicalConnState() *logicalConnState { if c == nil { return nil } if logical := c.logicalView.Load(); logical != nil { return logical.ensureState() } if state := c.logicalState.Load(); state != nil { return state } state := newLogicalConnStateFromClient(c) if c.logicalState.CompareAndSwap(nil, state) { if logical := c.logicalView.Load(); logical != nil { logical.state.CompareAndSwap(nil, state) } return state } return c.logicalState.Load() } func (c *ClientConn) syncLegacyLogicalFieldsFromState(state *logicalConnState) { if c == nil || state == nil { return } peer := state.peerSnapshot() c.ClientID = peer.clientID c.ClientAddr = peer.clientAddr c.identityBound.Store(peer.identityBound) c.alive.Store(state.aliveSnapshot()) c.status = state.statusSnapshot() if logical := c.logicalView.Load(); logical != nil { logical.syncCompatibilityFieldsFromState(state) } } func (c *ClientConn) clientConnLogicalPeerStateSnapshot() logicalConnPeerState { state := c.ensureLogicalConnState() if state == nil { return logicalConnPeerState{} } return state.peerSnapshot() } func (c *ClientConn) clientConnIDSnapshot() string { return c.clientConnLogicalPeerStateSnapshot().clientID } func (c *ClientConn) setClientConnID(id string) { if c == nil { return } state := c.ensureLogicalConnState() if state == nil { c.ClientID = id return } state.updatePeer(func(peer *logicalConnPeerState) { peer.clientID = id }) c.syncLegacyLogicalFieldsFromState(state) } func (c *ClientConn) clientConnAliveSnapshot() bool { state := c.ensureLogicalConnState() if state == nil { return false } return state.aliveSnapshot() } func (c *ClientConn) clientConnStatusSnapshot() Status { state := c.ensureLogicalConnState() if state == nil { return Status{} } return state.statusSnapshot() } func (c *ClientConn) markClientConnLogicalSessionStarted() { if c == nil { return } state := c.ensureLogicalConnState() if state == nil { sessionMarkStarted(&c.alive, nil, &c.status) return } state.markStarted() c.syncLegacyLogicalFieldsFromState(state) } func (c *ClientConn) markClientConnLogicalSessionStopped(reason string, err error) { if c == nil { return } state := c.ensureLogicalConnState() if state == nil { sessionMarkStopped(&c.alive, nil, &c.status, reason, err, c.clientConnStopFuncSnapshot()) return } state.markStopped(reason, err, c.clientConnStopFuncSnapshot()) c.syncLegacyLogicalFieldsFromState(state) }