package notify import ( "net" "time" ) type clientConnAttachmentState struct { maxReadTimeout time.Duration maxWriteTimeout time.Duration msgEn func([]byte, []byte) []byte msgDe func([]byte, []byte) []byte fastStreamEncode transportFastStreamEncoder fastBulkEncode transportFastBulkEncoder fastPlainEncode transportFastPlainEncoder handshakeRsaKey []byte secretKey []byte lastHeartBeat int64 } func cloneClientConnAttachmentState(src *clientConnAttachmentState) *clientConnAttachmentState { if src == nil { return &clientConnAttachmentState{} } cloned := *src cloned.handshakeRsaKey = cloneClientConnAttachmentBytes(src.handshakeRsaKey) cloned.secretKey = cloneClientConnAttachmentBytes(src.secretKey) return &cloned } func cloneClientConnAttachmentBytes(src []byte) []byte { if len(src) == 0 { return nil } return append([]byte(nil), src...) } func (c *LogicalConn) attachmentStateSnapshot() *clientConnAttachmentState { if c == nil { return &clientConnAttachmentState{} } if state := c.attachment.Load(); state != nil { if client := c.compatClientConn(); client != nil { client.attachment.Store(state) } return cloneClientConnAttachmentState(state) } client := c.compatClientConn() if client != nil { if state := client.attachment.Load(); state != nil { if c.attachment.CompareAndSwap(nil, state) { client.attachment.Store(state) return cloneClientConnAttachmentState(state) } return c.attachmentStateSnapshot() } } return &clientConnAttachmentState{} } func (c *LogicalConn) setAttachmentState(state *clientConnAttachmentState) { if c == nil { return } next := cloneClientConnAttachmentState(state) c.attachment.Store(next) if client := c.compatClientConn(); client != nil { client.attachment.Store(next) } } func (c *LogicalConn) updateAttachmentState(apply func(*clientConnAttachmentState)) { if c == nil || apply == nil { return } for { current := c.attachment.Load() if current == nil { if client := c.compatClientConn(); client != nil { current = client.attachment.Load() } } next := cloneClientConnAttachmentState(current) apply(next) if current == nil { if c.attachment.CompareAndSwap((*clientConnAttachmentState)(nil), next) { if client := c.compatClientConn(); client != nil { client.attachment.Store(next) } return } continue } if c.attachment.CompareAndSwap(current, next) { if client := c.compatClientConn(); client != nil { client.attachment.Store(next) } return } } } func (c *ClientConn) clientConnAttachmentStateSnapshot() *clientConnAttachmentState { if c == nil { return &clientConnAttachmentState{} } if logical := c.logicalView.Load(); logical != nil { return logical.attachmentStateSnapshot() } if state := c.attachment.Load(); state != nil { return cloneClientConnAttachmentState(state) } return &clientConnAttachmentState{} } func (c *ClientConn) setClientConnAttachmentState(state *clientConnAttachmentState) { if c == nil { return } if logical := c.logicalView.Load(); logical != nil { logical.setAttachmentState(state) return } c.attachment.Store(cloneClientConnAttachmentState(state)) } func (c *ClientConn) updateClientConnAttachmentState(apply func(*clientConnAttachmentState)) { if c == nil || apply == nil { return } if logical := c.logicalView.Load(); logical != nil { logical.updateAttachmentState(apply) return } for { current := c.attachment.Load() next := cloneClientConnAttachmentState(current) apply(next) if current == nil { if c.attachment.CompareAndSwap((*clientConnAttachmentState)(nil), next) { return } continue } if c.attachment.CompareAndSwap(current, next) { return } } } func (c *ClientConn) applyClientConnAttachmentProfile(maxReadTimeout time.Duration, maxWriteTimeout time.Duration, msgEn func([]byte, []byte) []byte, msgDe func([]byte, []byte) []byte, handshakeRsaKey []byte, secretKey []byte) { c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.maxReadTimeout = maxReadTimeout state.maxWriteTimeout = maxWriteTimeout state.msgEn = msgEn state.msgDe = msgDe state.handshakeRsaKey = cloneClientConnAttachmentBytes(handshakeRsaKey) state.secretKey = cloneClientConnAttachmentBytes(secretKey) }) } func (c *ClientConn) inheritClientConnAttachmentProfile(src *ClientConn) { if c == nil || src == nil { return } c.setClientConnAttachmentState(src.clientConnAttachmentStateSnapshot()) } func (c *ClientConn) clientConnMaxReadTimeoutSnapshot() time.Duration { if c == nil { return 0 } return c.clientConnAttachmentStateSnapshot().maxReadTimeout } func (c *ClientConn) setClientConnMaxWriteTimeout(timeout time.Duration) { if c == nil { return } if logical := c.logicalView.Load(); logical != nil { logical.updateAttachmentState(func(state *clientConnAttachmentState) { state.maxWriteTimeout = timeout }) return } c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.maxWriteTimeout = timeout }) } func (c *ClientConn) clientConnMaxWriteTimeoutSnapshot() time.Duration { if c == nil { return 0 } return c.clientConnAttachmentStateSnapshot().maxWriteTimeout } func (c *ClientConn) clientConnMsgEnSnapshot() func([]byte, []byte) []byte { if c == nil { return nil } return c.clientConnAttachmentStateSnapshot().msgEn } func (c *ClientConn) setClientConnMsgEn(fn func([]byte, []byte) []byte) { c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.msgEn = fn state.fastStreamEncode = nil state.fastBulkEncode = nil state.fastPlainEncode = nil }) } func (c *ClientConn) clientConnMsgDeSnapshot() func([]byte, []byte) []byte { if c == nil { return nil } return c.clientConnAttachmentStateSnapshot().msgDe } func (c *ClientConn) setClientConnMsgDe(fn func([]byte, []byte) []byte) { c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.msgDe = fn state.fastStreamEncode = nil state.fastBulkEncode = nil state.fastPlainEncode = nil }) } func (c *ClientConn) setClientConnFastStreamEncode(fn transportFastStreamEncoder) { c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.fastStreamEncode = fn }) } func (c *ClientConn) clientConnFastStreamEncodeSnapshot() transportFastStreamEncoder { if c == nil { return nil } return c.clientConnAttachmentStateSnapshot().fastStreamEncode } func (c *ClientConn) setClientConnFastBulkEncode(fn transportFastBulkEncoder) { c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.fastBulkEncode = fn }) } func (c *ClientConn) clientConnFastBulkEncodeSnapshot() transportFastBulkEncoder { if c == nil { return nil } return c.clientConnAttachmentStateSnapshot().fastBulkEncode } func (c *ClientConn) setClientConnFastPlainEncode(fn transportFastPlainEncoder) { c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.fastPlainEncode = fn }) } func (c *ClientConn) clientConnFastPlainEncodeSnapshot() transportFastPlainEncoder { if c == nil { return nil } return c.clientConnAttachmentStateSnapshot().fastPlainEncode } func (c *ClientConn) clientConnHandshakeRsaKeySnapshot() []byte { if c == nil { return nil } return c.clientConnAttachmentStateSnapshot().handshakeRsaKey } func (c *ClientConn) clientConnSecretKeySnapshot() []byte { if c == nil { return nil } return c.clientConnAttachmentStateSnapshot().secretKey } func (c *ClientConn) setClientConnSecretKey(key []byte) { c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.secretKey = cloneClientConnAttachmentBytes(key) }) } func (c *ClientConn) clientConnLastHeartbeatUnixSnapshot() int64 { if c == nil { return 0 } return c.clientConnAttachmentStateSnapshot().lastHeartBeat } func (c *ClientConn) setClientConnLastHeartbeatUnix(unix int64) { if c == nil { return } if logical := c.logicalView.Load(); logical != nil { logical.setClientConnLastHeartbeatUnix(unix) return } c.updateClientConnAttachmentState(func(state *clientConnAttachmentState) { state.lastHeartBeat = unix }) } func (c *ClientConn) markClientConnHeartbeatNow() { if c == nil { return } if logical := c.logicalView.Load(); logical != nil { logical.markHeartbeatNow() return } c.setClientConnLastHeartbeatUnix(time.Now().Unix()) } func (c *ClientConn) setClientConnRemoteAddr(addr net.Addr) { if c == nil { return } state := c.ensureLogicalConnState() if state == nil { c.ClientAddr = addr return } state.updatePeer(func(peer *logicalConnPeerState) { peer.clientAddr = addr }) c.syncLegacyLogicalFieldsFromState(state) }