notify/msg.go
starainrt 98ef9e7fcc
feat(transport): 完成安全架构拆分并收口 stream/bulk 传输优化
- 新增 managed/external/nested 三种传输保护模式
  - 新增 peer attach 显式认证、抗重放、channel binding 和可选前向保密协商
  - 明确单连接注入与可重拨连接源的语义边界
  - 禁止 ConnectByConn 场景下 dedicated bulk 走 sidecar,auto 模式自动回退 shared
  - 修正 dedicated attach 在 bootstrap/steady profile 切换下的处理逻辑
  - 优化 shared bulk super-batch 与批量 framed write 路径
  - 降低 stream/bulk fast path 的复制和分发损耗
  - 补齐 benchmark、回归测试、运行时快照和 README 文档
2026-04-20 16:35:44 +08:00

185 lines
4.4 KiB
Go

package notify
import (
"net"
"time"
)
const (
MSG_SYS MessageType = iota
MSG_SYS_WAIT
MSG_SYS_REPLY
// Deprecated: legacy RSA key-exchange control message.
MSG_KEY_CHANGE
MSG_ASYNC
MSG_SYNC_ASK
MSG_SYNC_REPLY
)
type MessageType uint8
type NetType uint8
const (
NET_SERVER NetType = iota
NET_CLIENT
)
type MsgVal []byte
type TransferMsg struct {
ID uint64
Key string
Value MsgVal
Type MessageType
}
type Message struct {
NetType
LogicalConn *LogicalConn
// Deprecated: ClientConn aliases LogicalConn for compatibility.
ClientConn *ClientConn
TransportConn *TransportConn
ServerConn Client
inboundTransportProfile *transportProtectionProfile
TransferMsg
Time time.Time
inboundConn net.Conn
}
type WaitMsg struct {
TransferMsg
Time time.Time
Reply chan Message
scope string
//Ctx context.Context
}
type messageLogicalTransferSender interface {
sendLogical(*LogicalConn, TransferMsg) (WaitMsg, error)
}
type messageInboundTransferSender interface {
sendTransferInbound(*LogicalConn, *TransportConn, net.Conn, *transportProtectionProfile, TransferMsg) error
}
func (m *Message) Reply(value MsgVal) (err error) {
logical := messageLogicalConnSnapshot(m)
transport := messageTransportConnSnapshot(m)
reply := TransferMsg{
ID: m.ID,
Key: m.Key,
Value: value,
Type: m.Type,
}
if reply.Type == MSG_SYNC_ASK {
reply.Type = MSG_SYNC_REPLY
}
if reply.Type == MSG_SYS_WAIT {
reply.Type = MSG_SYS_REPLY
}
if m.NetType == NET_SERVER {
if m.inboundConn != nil && logical != nil {
server := logical.Server()
if server == nil {
return transportDetachedErrorForPeer(logical, transport)
}
sender, _ := server.(messageInboundTransferSender)
if sender == nil {
return transportDetachedErrorForPeer(logical, transport)
}
return sender.sendTransferInbound(logical, transport, m.inboundConn, messageInboundTransportProtectionSnapshot(m), reply)
}
if transport != nil {
_, err = transport.sendTransfer(reply)
return
}
if logical == nil {
return transportDetachedErrorForPeer(nil, transport)
}
server := logical.Server()
if server == nil {
return transportDetachedErrorForPeer(logical, transport)
}
sender, _ := server.(messageLogicalTransferSender)
if sender == nil {
return transportDetachedErrorForPeer(logical, transport)
}
_, err = sender.sendLogical(logical, reply)
}
if m.NetType == NET_CLIENT {
_, err = m.ServerConn.send(reply)
}
return
}
func (m *Message) ReplyObj(value interface{}) (err error) {
data, err := encode(value)
if err != nil {
return err
}
return m.Reply(data)
}
func hydrateServerMessagePeerFields(message Message) Message {
if message.LogicalConn == nil {
message.LogicalConn = logicalConnFromClient(message.ClientConn)
}
if message.LogicalConn == nil && message.TransportConn != nil {
message.LogicalConn = message.TransportConn.logicalConnSnapshot()
}
if message.ClientConn == nil && message.LogicalConn != nil {
message.ClientConn = message.LogicalConn.compatClientConn()
}
if message.TransportConn == nil && message.LogicalConn != nil {
message.TransportConn = message.LogicalConn.CurrentTransportConn()
}
if message.inboundConn != nil && message.inboundTransportProfile == nil && message.LogicalConn != nil {
profile := message.LogicalConn.transportProtectionProfileSnapshot()
message.inboundTransportProfile = &profile
}
return message
}
func messageLogicalConnSnapshot(message *Message) *LogicalConn {
if message == nil {
return nil
}
if message.LogicalConn != nil {
return message.LogicalConn
}
return logicalConnFromClient(message.ClientConn)
}
func messageTransportConnSnapshot(message *Message) *TransportConn {
if message == nil {
return nil
}
if message.TransportConn != nil {
return message.TransportConn
}
logical := messageLogicalConnSnapshot(message)
if logical == nil {
return nil
}
return logical.CurrentTransportConn()
}
func messageInboundTransportProtectionSnapshot(message *Message) *transportProtectionProfile {
if message == nil {
return nil
}
if message.inboundTransportProfile != nil {
return message.inboundTransportProfile
}
if message.inboundConn == nil {
return nil
}
logical := messageLogicalConnSnapshot(message)
if logical == nil {
return nil
}
profile := logical.transportProtectionProfileSnapshot()
message.inboundTransportProfile = &profile
return message.inboundTransportProfile
}