notify/msg.go

158 lines
3.4 KiB
Go
Raw Permalink Normal View History

2021-11-12 16:04:39 +08:00
package notify
import (
"net"
"time"
)
const (
MSG_SYS MessageType = iota
MSG_SYS_WAIT
MSG_SYS_REPLY
// Deprecated: legacy RSA key-exchange control message.
2021-11-12 16:04:39 +08:00
MSG_KEY_CHANGE
MSG_ASYNC
MSG_SYNC_ASK
MSG_SYNC_REPLY
)
type MessageType uint8
type NetType uint8
const (
NET_SERVER NetType = iota
NET_CLIENT
)
type MsgVal []byte
type TransferMsg struct {
ID uint64
Key string
Value MsgVal
Type MessageType
}
type Message struct {
NetType
LogicalConn *LogicalConn
// Deprecated: ClientConn aliases LogicalConn for compatibility.
ClientConn *ClientConn
TransportConn *TransportConn
ServerConn Client
2021-11-12 16:04:39 +08:00
TransferMsg
Time time.Time
inboundConn net.Conn
2021-11-12 16:04:39 +08:00
}
type WaitMsg struct {
TransferMsg
Time time.Time
Reply chan Message
scope string
2021-11-12 16:04:39 +08:00
//Ctx context.Context
}
type messageLogicalTransferSender interface {
sendLogical(*LogicalConn, TransferMsg) (WaitMsg, error)
}
type messageInboundTransferSender interface {
sendTransferInbound(*LogicalConn, *TransportConn, net.Conn, TransferMsg) error
}
2021-11-12 16:04:39 +08:00
func (m *Message) Reply(value MsgVal) (err error) {
logical := messageLogicalConnSnapshot(m)
transport := messageTransportConnSnapshot(m)
2021-11-12 16:04:39 +08:00
reply := TransferMsg{
ID: m.ID,
Key: m.Key,
Value: value,
Type: m.Type,
}
if reply.Type == MSG_SYNC_ASK {
reply.Type = MSG_SYNC_REPLY
}
if reply.Type == MSG_SYS_WAIT {
reply.Type = MSG_SYS_REPLY
}
if m.NetType == NET_SERVER {
if m.inboundConn != nil && logical != nil {
server := logical.Server()
if server == nil {
return transportDetachedErrorForPeer(logical, transport)
}
sender, _ := server.(messageInboundTransferSender)
if sender == nil {
return transportDetachedErrorForPeer(logical, transport)
}
return sender.sendTransferInbound(logical, transport, m.inboundConn, reply)
}
if transport != nil {
_, err = transport.sendTransfer(reply)
return
}
if logical == nil {
return transportDetachedErrorForPeer(nil, transport)
}
server := logical.Server()
if server == nil {
return transportDetachedErrorForPeer(logical, transport)
}
sender, _ := server.(messageLogicalTransferSender)
if sender == nil {
return transportDetachedErrorForPeer(logical, transport)
}
_, err = sender.sendLogical(logical, reply)
2021-11-12 16:04:39 +08:00
}
if m.NetType == NET_CLIENT {
_, err = m.ServerConn.send(reply)
}
return
}
func (m *Message) ReplyObj(value interface{}) (err error) {
data, err := encode(value)
if err != nil {
return err
}
return m.Reply(data)
}
func hydrateServerMessagePeerFields(message Message) Message {
if message.LogicalConn == nil {
message.LogicalConn = logicalConnFromClient(message.ClientConn)
2021-11-12 16:04:39 +08:00
}
if message.ClientConn == nil {
message.ClientConn = message.LogicalConn.compatClientConn()
2021-11-12 16:04:39 +08:00
}
if message.TransportConn == nil && message.LogicalConn != nil {
message.TransportConn = message.LogicalConn.CurrentTransportConn()
2021-11-12 16:04:39 +08:00
}
return message
2021-11-12 16:04:39 +08:00
}
func messageLogicalConnSnapshot(message *Message) *LogicalConn {
if message == nil {
return nil
2021-11-12 16:04:39 +08:00
}
if message.LogicalConn != nil {
return message.LogicalConn
2021-11-12 16:04:39 +08:00
}
return logicalConnFromClient(message.ClientConn)
2021-11-12 16:04:39 +08:00
}
func messageTransportConnSnapshot(message *Message) *TransportConn {
if message == nil {
return nil
2022-05-19 11:04:52 +08:00
}
if message.TransportConn != nil {
return message.TransportConn
2022-05-19 11:04:52 +08:00
}
logical := messageLogicalConnSnapshot(message)
if logical == nil {
return nil
2022-05-19 11:04:52 +08:00
}
return logical.CurrentTransportConn()
2022-05-19 11:04:52 +08:00
}