notify/server_session_runtime.go

222 lines
4.7 KiB
Go
Raw Permalink Normal View History

package notify
import (
"b612.me/stario"
"context"
"net"
)
type serverSessionRuntime struct {
listener net.Listener
udpListener *net.UDPConn
transportAttached bool
stopCtx context.Context
stopFn context.CancelFunc
transportStopCtx context.Context
transportStopFn context.CancelFunc
queue *stario.StarQueue
inboundDispatcher *inboundDispatcher
}
func newServerSessionRuntimeBase(stopCtx context.Context, stopFn context.CancelFunc) *serverSessionRuntime {
return &serverSessionRuntime{
stopCtx: stopCtx,
stopFn: stopFn,
}
}
func prepareServerSessionRuntime(rt *serverSessionRuntime) *serverSessionRuntime {
if rt == nil {
return nil
}
normalizeServerSessionRuntimeTransportState(rt)
ensureServerSessionRuntimeTransportLifecycle(rt)
return rt
}
func (s *ServerCommon) setServerSessionRuntime(rt *serverSessionRuntime) {
if s == nil || rt == nil {
return
}
rt = prepareServerSessionRuntime(rt)
s.sessionRuntime.Store(rt)
s.listener = rt.listener
s.udpListener = rt.udpListener
s.stopCtx = rt.stopCtx
s.stopFn = rt.stopFn
s.queue = rt.queue
}
func (s *ServerCommon) resetServerSessionRuntimeBase() {
if s == nil {
return
}
stopCtx, stopFn := context.WithCancel(context.Background())
s.sessionRuntime.Store(newServerSessionRuntimeBase(stopCtx, stopFn))
s.listener = nil
s.udpListener = nil
s.queue = nil
s.stopCtx = stopCtx
s.stopFn = stopFn
}
func (s *ServerCommon) cleanupFailedServerStart() {
if s == nil {
return
}
rt := s.serverSessionRuntimeSnapshot()
if rt != nil && rt.stopFn != nil {
rt.stopFn()
}
s.cleanupServerSessionResources()
s.rollbackServerSessionStart()
s.resetServerSessionRuntimeBase()
}
func (s *ServerCommon) clearServerSessionRuntimeTransport() {
if s == nil {
return
}
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return
}
if rt.transportStopFn != nil {
rt.transportStopFn()
}
next := *rt
next.listener = nil
next.udpListener = nil
next.transportAttached = false
next.transportStopCtx = nil
next.transportStopFn = nil
s.setServerSessionRuntime(&next)
}
func (s *ServerCommon) clearServerSessionRuntimeQueue() {
if s == nil {
return
}
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return
}
next := *rt
next.queue = nil
s.setServerSessionRuntime(&next)
}
func (s *ServerCommon) bindServerSessionTransport(listener net.Listener, udpListener *net.UDPConn) {
if s == nil {
return
}
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
rt = &serverSessionRuntime{}
}
next := *rt
next.listener = listener
next.udpListener = udpListener
s.setServerSessionRuntime(&next)
}
func (s *ServerCommon) serverSessionRuntimeSnapshot() *serverSessionRuntime {
if s == nil {
return nil
}
return s.sessionRuntime.Load()
}
func normalizeServerSessionRuntimeTransportState(rt *serverSessionRuntime) {
if rt == nil {
return
}
rt.transportAttached = rt.listener != nil || rt.udpListener != nil
}
func ensureServerSessionRuntimeTransportLifecycle(rt *serverSessionRuntime) {
if rt == nil {
return
}
if rt.listener == nil && rt.udpListener == nil {
rt.transportStopCtx = nil
rt.transportStopFn = nil
return
}
if rt.transportStopCtx != nil && rt.transportStopFn != nil {
return
}
parent := rt.stopCtx
if parent == nil {
parent = context.Background()
}
rt.transportStopCtx, rt.transportStopFn = context.WithCancel(parent)
}
func (s *ServerCommon) serverStopContextSnapshot() context.Context {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.stopCtx
}
func (s *ServerCommon) serverTransportAttachedSnapshot() bool {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return false
}
return rt.transportAttached
}
func (s *ServerCommon) serverStopFuncSnapshot() context.CancelFunc {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.stopFn
}
func (s *ServerCommon) serverQueueSnapshot() *stario.StarQueue {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.queue
}
func (s *ServerCommon) serverInboundDispatcherSnapshot() *inboundDispatcher {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.inboundDispatcher
}
func (s *ServerCommon) serverListenerSnapshot() net.Listener {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.listener
}
func (s *ServerCommon) serverUDPListenerSnapshot() *net.UDPConn {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.udpListener
}
func (s *ServerCommon) serverTransportStopContextSnapshot() context.Context {
rt := s.serverSessionRuntimeSnapshot()
if rt == nil {
return nil
}
if rt.transportStopCtx != nil {
return rt.transportStopCtx
}
return rt.stopCtx
}