notify/transport_conn.go

372 lines
9.7 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
"net"
"time"
)
type TransportConn struct {
logical *LogicalConn
generation uint64
remoteAddr net.Addr
attached bool
hasRuntimeConn bool
}
const (
transportStreamReadBufferSize = 256 * 1024
transportPacketReadBufferSize = 64 * 1024
)
func streamReadBuffer() []byte {
return make([]byte, transportStreamReadBufferSize)
}
func packetReadBuffer() []byte {
return make([]byte, transportPacketReadBufferSize)
}
type TransportConnRuntimeSnapshot struct {
ClientID string
RemoteAddress string
BindingOwner string
LogicalAlive bool
BindingCurrent bool
LogicalReason string
LogicalError string
TransportGeneration uint64
Attached bool
HasRuntimeConn bool
UsesStreamTransport bool
Current bool
TransportDetachReason string
TransportDetachKind string
TransportDetachGeneration uint64
TransportDetachError string
TransportDetachedAt time.Time
ReattachEligible bool
}
type transportConnServerSender interface {
sendTransport(*TransportConn, TransferMsg) (WaitMsg, error)
sendTransportWait(*TransportConn, TransferMsg, time.Duration) (Message, error)
}
type transportConnServerAPI interface {
transportConnServerSender
SendCtxTransport(context.Context, *TransportConn, string, MsgVal) (Message, error)
SendObjTransport(*TransportConn, string, interface{}) error
SendObjCtxTransport(context.Context, *TransportConn, string, interface{}) (Message, error)
SendWaitObjTransport(*TransportConn, string, interface{}, time.Duration) (Message, error)
SendFileTransport(context.Context, *TransportConn, string) error
}
type serverUDPTransportRuntimeReader interface {
serverUDPListenerSnapshot() *net.UDPConn
}
var errTransportConnRuntimeSnapshotNil = errors.New("transport conn runtime snapshot target is nil")
func (c *ClientConn) clientConnRemoteAddrSnapshot() net.Addr {
if c == nil {
return nil
}
return c.clientConnLogicalPeerStateSnapshot().clientAddr
}
func (c *ClientConn) CurrentTransportConn() *TransportConn {
return c.currentTransportConnSnapshot()
}
func (c *ClientConn) currentTransportConnSnapshot() *TransportConn {
if c == nil {
return nil
}
logical := c.LogicalConn()
if logical == nil {
return nil
}
return logical.currentTransportConnSnapshot()
}
func (c *LogicalConn) currentTransportConnSnapshot() *TransportConn {
if c == nil {
return nil
}
logical := c
remoteAddr := c.RemoteAddr()
hasRuntimeConn := c.transportSnapshot() != nil
server := c.Server()
if server != nil {
if reader, ok := server.(serverUDPTransportRuntimeReader); ok && reader.serverUDPListenerSnapshot() != nil {
if remoteAddr == nil {
return nil
}
return &TransportConn{
logical: logical,
generation: c.transportGenerationSnapshot(),
remoteAddr: remoteAddr,
attached: true,
hasRuntimeConn: hasRuntimeConn,
}
}
}
if !c.transportAttachedSnapshot() {
return nil
}
return &TransportConn{
logical: logical,
generation: c.transportGenerationSnapshot(),
remoteAddr: remoteAddr,
attached: true,
hasRuntimeConn: hasRuntimeConn,
}
}
func (t *TransportConn) logicalConnSnapshot() *LogicalConn {
if t == nil {
return nil
}
return t.logical
}
func (t *TransportConn) LogicalConn() *LogicalConn {
return t.logicalConnSnapshot()
}
func (t *TransportConn) ClientID() string {
logical := t.logicalConnSnapshot()
if logical == nil {
return ""
}
return logical.ID()
}
func (t *TransportConn) RemoteAddr() net.Addr {
if t == nil {
return nil
}
return t.remoteAddr
}
func (t *TransportConn) TransportGeneration() uint64 {
if t == nil {
return 0
}
return t.generation
}
func (t *TransportConn) Attached() bool {
return t != nil && t.attached
}
func (t *TransportConn) HasRuntimeConn() bool {
return t != nil && t.hasRuntimeConn
}
func (t *TransportConn) UsesStreamTransport() bool {
logical := t.logicalConnSnapshot()
if logical == nil {
return false
}
return logical.usesStreamTransportSnapshot()
}
func (t *TransportConn) IsCurrent() bool {
if t == nil {
return false
}
logical := t.logicalConnSnapshot()
if logical == nil {
return false
}
current := logical.CurrentTransportConn()
if current == nil {
return false
}
if current.generation != t.generation {
return false
}
return transportConnAddrString(current.remoteAddr) == transportConnAddrString(t.remoteAddr)
}
func transportConnAddrString(addr net.Addr) string {
if addr == nil {
return ""
}
return addr.String()
}
func (t *TransportConn) transportScope() string {
logical := t.logicalConnSnapshot()
if logical == nil {
return serverFileDomain + ":unknown"
}
return serverTransportScopeByGeneration(logical, t.TransportGeneration())
}
func (t *TransportConn) deliveryScopes() []string {
logical := t.logicalConnSnapshot()
if logical == nil {
return []string{serverFileDomain + ":unknown"}
}
base := serverFileScope(logical)
transport := t.transportScope()
if transport == base {
return []string{base}
}
return []string{transport, base}
}
func (t *TransportConn) runtimeSnapshot() TransportConnRuntimeSnapshot {
snapshot := TransportConnRuntimeSnapshot{
ClientID: t.ClientID(),
TransportGeneration: t.TransportGeneration(),
Attached: t.Attached(),
HasRuntimeConn: t.HasRuntimeConn(),
UsesStreamTransport: t.UsesStreamTransport(),
Current: t.IsCurrent(),
}
if addr := t.RemoteAddr(); addr != nil {
snapshot.RemoteAddress = addr.String()
}
if logical := t.logicalConnSnapshot(); logical != nil {
diag := snapshotBindingDiagnosticsFromLogical(logical, t, t.TransportGeneration())
snapshot.BindingOwner = diag.BindingOwner
snapshot.LogicalAlive = diag.BindingAlive
snapshot.BindingCurrent = diag.BindingCurrent
snapshot.LogicalReason = diag.BindingReason
snapshot.LogicalError = diag.BindingError
snapshot.TransportDetachReason = diag.TransportDetachReason
snapshot.TransportDetachKind = diag.TransportDetachKind
snapshot.TransportDetachGeneration = diag.TransportDetachGeneration
snapshot.TransportDetachError = diag.TransportDetachError
snapshot.TransportDetachedAt = diag.TransportDetachedAt
snapshot.ReattachEligible = diag.ReattachEligible
}
return snapshot
}
func GetTransportConnRuntimeSnapshot(t *TransportConn) (TransportConnRuntimeSnapshot, error) {
if t == nil {
return TransportConnRuntimeSnapshot{}, errTransportConnRuntimeSnapshotNil
}
return t.runtimeSnapshot(), nil
}
func GetCurrentTransportConnRuntimeSnapshot(c *ClientConn) (TransportConnRuntimeSnapshot, bool, error) {
if c == nil {
return TransportConnRuntimeSnapshot{}, false, errClientConnRuntimeSnapshotNil
}
transport := c.CurrentTransportConn()
if transport == nil {
return TransportConnRuntimeSnapshot{}, false, nil
}
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
if err != nil {
return TransportConnRuntimeSnapshot{}, false, err
}
return snapshot, true, nil
}
func (t *TransportConn) transportConnServerSenderSnapshot() transportConnServerSender {
logical := t.logicalConnSnapshot()
if logical == nil {
return nil
}
server := logical.Server()
if server == nil {
return nil
}
sender, _ := server.(transportConnServerSender)
return sender
}
func (t *TransportConn) transportConnServerAPISnapshot() transportConnServerAPI {
logical := t.logicalConnSnapshot()
if logical == nil {
return nil
}
server := logical.Server()
if server == nil {
return nil
}
api, _ := server.(transportConnServerAPI)
return api
}
func (t *TransportConn) sendTransfer(msg TransferMsg) (WaitMsg, error) {
sender := t.transportConnServerSenderSnapshot()
if sender == nil {
return WaitMsg{}, transportDetachedErrorForTransport(t)
}
return sender.sendTransport(t, msg)
}
func (t *TransportConn) sendTransferWait(msg TransferMsg, timeout time.Duration) (Message, error) {
sender := t.transportConnServerSenderSnapshot()
if sender == nil {
return Message{}, transportDetachedErrorForTransport(t)
}
return sender.sendTransportWait(t, msg, timeout)
}
func (t *TransportConn) Send(key string, value MsgVal) error {
_, err := t.sendTransfer(TransferMsg{
Key: key,
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (t *TransportConn) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
return t.sendTransferWait(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (t *TransportConn) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
api := t.transportConnServerAPISnapshot()
if api == nil {
return Message{}, transportDetachedErrorForTransport(t)
}
return api.SendCtxTransport(ctx, t, key, value)
}
func (t *TransportConn) SendObj(key string, value interface{}) error {
api := t.transportConnServerAPISnapshot()
if api == nil {
return transportDetachedErrorForTransport(t)
}
return api.SendObjTransport(t, key, value)
}
func (t *TransportConn) SendObjCtx(ctx context.Context, key string, value interface{}) (Message, error) {
api := t.transportConnServerAPISnapshot()
if api == nil {
return Message{}, transportDetachedErrorForTransport(t)
}
return api.SendObjCtxTransport(ctx, t, key, value)
}
func (t *TransportConn) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
api := t.transportConnServerAPISnapshot()
if api == nil {
return Message{}, transportDetachedErrorForTransport(t)
}
return api.SendWaitObjTransport(t, key, value, timeout)
}
func (t *TransportConn) SendFile(ctx context.Context, filePath string) error {
api := t.transportConnServerAPISnapshot()
if api == nil {
return transportDetachedErrorForTransport(t)
}
return api.SendFileTransport(ctx, t, filePath)
}