notify/server_session.go

276 lines
8.2 KiB
Go
Raw Permalink Normal View History

package notify
import (
"errors"
"net"
)
func (s *ServerCommon) stopClientSession(client *ClientConn, reason string, err error) {
if client == nil {
return
}
if runtime := s.getStreamRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(client), streamRuntimeCloseError(err))
}
if runtime := s.getBulkRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(client), bulkRuntimeCloseError(err))
}
if transfers := s.getTransferState(); transfers != nil {
transfers.closeScope(serverFileScope(client), err)
}
client.stopServerOwnedSessionWith(s.removeClient, reason, err)
}
func (s *ServerCommon) stopLogicalSession(logical *LogicalConn, reason string, err error) {
if logical == nil {
return
}
if runtime := s.getStreamRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(logical), streamRuntimeCloseError(err))
}
if runtime := s.getBulkRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(logical), bulkRuntimeCloseError(err))
}
if transfers := s.getTransferState(); transfers != nil {
transfers.closeScope(serverFileScope(logical), err)
}
logical.stopServerOwnedSessionWith(s.removeLogical, reason, err)
}
func (s *ServerCommon) detachClientSessionTransport(client *ClientConn, reason string, err error) {
if s == nil || client == nil {
return
}
if !client.clientConnTransportAttachedSnapshot() {
return
}
client.markClientConnTransportDetached(reason, err)
s.getPendingWaitPool().closeScope(serverTransportScope(client))
s.getPendingWaitPool().closeScope(serverFileScope(client))
s.getFileAckPool().closeScope(serverTransportScope(client))
s.getFileAckPool().closeScope(serverFileScope(client))
s.getSignalAckPool().closeScope(serverTransportScope(client))
s.getSignalAckPool().closeScope(serverFileScope(client))
if runtime := s.getStreamRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(client), errTransportDetached)
}
if runtime := s.getBulkRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(client), errTransportDetached)
}
client.detachServerOwnedTransport()
}
func (s *ServerCommon) detachLogicalSessionTransport(logical *LogicalConn, reason string, err error) {
if s == nil || logical == nil {
return
}
if !logical.transportAttachedSnapshot() {
return
}
logical.markTransportDetached(reason, err)
s.getPendingWaitPool().closeScope(serverTransportScope(logical))
s.getPendingWaitPool().closeScope(serverFileScope(logical))
s.getFileAckPool().closeScope(serverTransportScope(logical))
s.getFileAckPool().closeScope(serverFileScope(logical))
s.getSignalAckPool().closeScope(serverTransportScope(logical))
s.getSignalAckPool().closeScope(serverFileScope(logical))
if runtime := s.getStreamRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(logical), errTransportDetached)
}
if runtime := s.getBulkRuntime(); runtime != nil {
runtime.closeScope(serverFileScope(logical), errTransportDetached)
}
logical.detachServerOwnedTransport()
}
func (s *ServerCommon) newAcceptedClient(id string, addr net.Addr) *ClientConn {
logical := s.newAcceptedLogical(id, addr)
if logical == nil {
return nil
}
return logical.compatClientConn()
}
func (s *ServerCommon) newAcceptedLogical(id string, addr net.Addr) *LogicalConn {
if s == nil {
return nil
}
return newServerLogicalConn(s, id, addr)
}
func (s *ServerCommon) registerAcceptedClient(client *ClientConn) *LogicalConn {
return s.registerAcceptedLogical(logicalConnFromClient(client))
}
func (s *ServerCommon) registerAcceptedLogical(logical *LogicalConn) *LogicalConn {
if s == nil || logical == nil {
return nil
}
logical.setServer(s)
logical.applyAttachmentProfile(s.maxReadTimeout, s.maxWriteTimeout, s.defaultMsgEn, s.defaultMsgDe, s.defaultFastStreamEncode, s.defaultFastBulkEncode, s.defaultFastPlainEncode, s.handshakeRsaKey, s.SecretKey)
logical.markHeartbeatNow()
return s.getPeerRegistry().registerLogical(logical)
}
func (s *ServerCommon) renameAcceptedLogical(logical *LogicalConn, id string) error {
if s == nil {
return errors.New("server is nil")
}
if logical == nil {
return errors.New("logical conn is nil")
}
if id == "" {
return errors.New("client id is empty")
}
if logical.ID() == id {
return nil
}
return s.getPeerRegistry().renameLogical(logical, id)
}
func (s *ServerCommon) renameAcceptedClient(client *ClientConn, id string) error {
return s.renameAcceptedLogical(logicalConnFromClient(client), id)
}
func (c *LogicalConn) inheritAcceptedLogicalTransportState(src *LogicalConn) {
if c == nil || src == nil {
return
}
c.inheritAttachmentProfile(src)
if addr := src.RemoteAddr(); addr != nil {
c.setRemoteAddr(addr)
}
}
func (c *LogicalConn) attachAcceptedTransport(addr net.Addr, tuConn net.Conn) error {
if c == nil {
return errors.New("logical conn is nil")
}
if addr == nil && tuConn != nil {
addr = tuConn.RemoteAddr()
}
if addr != nil {
c.setRemoteAddr(addr)
}
if tuConn == nil {
return nil
}
c.markHeartbeatNow()
c.clearTransportDetachState()
if c.sessionRuntimeSnapshot() == nil {
c.startSessionTransport(tuConn, nil, nil)
return nil
}
return c.attachSessionTransport(tuConn)
}
func (s *ServerCommon) attachAcceptedLogicalTransport(logical *LogicalConn, addr net.Addr, tuConn net.Conn) error {
if s == nil {
return errors.New("server is nil")
}
if logical == nil {
return errors.New("logical conn is nil")
}
logical.setServer(s)
return logical.attachAcceptedTransport(addr, tuConn)
}
func (s *ServerCommon) attachAcceptedClientTransport(client *ClientConn, addr net.Addr, tuConn net.Conn) error {
return s.attachAcceptedLogicalTransport(logicalConnFromClient(client), addr, tuConn)
}
func (s *ServerCommon) handoffAcceptedLogicalTransport(dst *LogicalConn, src *LogicalConn) error {
if s == nil {
return errors.New("server is nil")
}
if dst == nil {
return errors.New("destination logical conn is nil")
}
if src == nil {
return errors.New("source logical conn is nil")
}
if dst == src {
return nil
}
addr := src.RemoteAddr()
conn, err := src.detachTransportForTransfer()
if err != nil {
return err
}
if err := s.attachAcceptedLogicalTransport(dst, addr, conn); err != nil {
if conn != nil {
_ = conn.Close()
}
return err
}
dst.inheritAcceptedLogicalTransportState(src)
src.markSessionStopped("peer transport handed off", nil)
s.removeLogical(src)
return nil
}
func (s *ServerCommon) handoffAcceptedClientTransport(dst *ClientConn, src *ClientConn) error {
return s.handoffAcceptedLogicalTransport(logicalConnFromClient(dst), logicalConnFromClient(src))
}
func (s *ServerCommon) upsertAcceptedLogical(id string, addr net.Addr, tuConn net.Conn) (*LogicalConn, bool, error) {
if s == nil {
return nil, false, errors.New("server is nil")
}
if id == "" {
return nil, false, errors.New("client id is empty")
}
if existing := s.GetLogicalConn(id); existing != nil {
if err := s.attachAcceptedLogicalTransport(existing, addr, tuConn); err != nil {
if tuConn != nil {
_ = tuConn.Close()
}
return nil, true, err
}
return existing, true, nil
}
logical := s.newAcceptedLogical(id, addr)
if logical == nil {
return nil, false, errors.New("accepted logical is nil")
}
logical = s.registerAcceptedLogical(logical)
if logical == nil {
return nil, false, errors.New("accepted logical is nil")
}
if err := s.attachAcceptedLogicalTransport(logical, addr, tuConn); err != nil {
if tuConn != nil {
_ = tuConn.Close()
}
s.removeLogical(logical)
return nil, false, err
}
if tuConn == nil {
logical.startSession(nil, nil, nil)
}
return logical, false, nil
}
func (s *ServerCommon) upsertAcceptedClient(id string, addr net.Addr, tuConn net.Conn) (*ClientConn, bool, error) {
logical, reused, err := s.upsertAcceptedLogical(id, addr, tuConn)
if logical == nil {
return nil, reused, err
}
return logical.compatClientConn(), reused, err
}
func (s *ServerCommon) bootstrapAcceptedLogical(id string, addr net.Addr, tuConn net.Conn) *LogicalConn {
logical, _, err := s.upsertAcceptedLogical(id, addr, tuConn)
if err != nil {
return nil
}
return logical
}
func (s *ServerCommon) bootstrapAcceptedClient(id string, addr net.Addr, tuConn net.Conn) *ClientConn {
logical := s.bootstrapAcceptedLogical(id, addr, tuConn)
if logical == nil {
return nil
}
return logical.compatClientConn()
}