package notify import ( "b612.me/stario" "context" "errors" "net" "sync/atomic" ) type clientSessionRuntime struct { transport *transportBinding transportAttached bool conn net.Conn stopCtx context.Context stopFn context.CancelFunc transportStopCtx context.Context transportStopFn context.CancelFunc queue *stario.StarQueue inboundDispatcher *inboundDispatcher epoch uint64 suppressGoodByeOnStop *atomic.Bool } func newClientSessionRuntimeBase(stopCtx context.Context, stopFn context.CancelFunc) *clientSessionRuntime { return &clientSessionRuntime{ stopCtx: stopCtx, stopFn: stopFn, inboundDispatcher: newInboundDispatcher(), suppressGoodByeOnStop: &atomic.Bool{}, } } func prepareClientSessionRuntime(rt *clientSessionRuntime) *clientSessionRuntime { if rt == nil { return nil } if rt.inboundDispatcher == nil { rt.inboundDispatcher = newInboundDispatcher() } if rt.suppressGoodByeOnStop == nil { rt.suppressGoodByeOnStop = &atomic.Bool{} } if rt.transport == nil && rt.conn != nil { rt.transport = newTransportBinding(rt.conn, rt.queue) } normalizeClientSessionRuntimeTransportState(rt) ensureClientSessionRuntimeTransportLifecycle(rt) return rt } func (c *ClientCommon) setClientSessionRuntime(rt *clientSessionRuntime) { if c == nil || rt == nil { return } var oldBinding *transportBinding if prev := c.clientSessionRuntimeSnapshot(); prev != nil && prev.transport != nil && prev.transport != rt.transport { oldBinding = prev.transport } rt = prepareClientSessionRuntime(rt) c.sessionRuntime.Store(rt) c.stopCtx = rt.stopCtx c.stopFn = rt.stopFn if rt.transport != nil { c.queue = rt.transport.queueSnapshot() c.conn = rt.transport.connSnapshot() } else { c.queue = rt.queue c.conn = rt.conn } if oldBinding != nil { oldBinding.stopBackgroundWorkers() } } func (c *ClientCommon) resetClientSessionRuntimeBase() { if c == nil { return } stopCtx, stopFn := context.WithCancel(context.Background()) c.sessionRuntime.Store(newClientSessionRuntimeBase(stopCtx, stopFn)) c.conn = nil c.queue = nil c.stopCtx = stopCtx c.stopFn = stopFn } func (c *ClientCommon) cleanupFailedClientStart() { if c == nil { return } rt := c.clientSessionRuntimeSnapshot() if rt != nil && rt.stopFn != nil { rt.stopFn() } c.cleanupClientSessionResources() c.rollbackClientSessionStart() c.resetClientSessionRuntimeBase() } func newClientSessionRuntime(conn net.Conn, stopCtx context.Context, stopFn context.CancelFunc, queue *stario.StarQueue, epoch uint64) *clientSessionRuntime { return prepareClientSessionRuntime(&clientSessionRuntime{ transport: newTransportBinding(conn, queue), transportAttached: conn != nil, conn: conn, stopCtx: stopCtx, stopFn: stopFn, queue: queue, inboundDispatcher: newInboundDispatcher(), epoch: epoch, suppressGoodByeOnStop: &atomic.Bool{}, }) } func (rt *clientSessionRuntime) runtimeShouldSuppressGoodByeOnStop() bool { if rt == nil || rt.suppressGoodByeOnStop == nil { return false } return rt.suppressGoodByeOnStop.Load() } func (rt *clientSessionRuntime) markRuntimeSuppressGoodByeOnStop() { if rt == nil || rt.suppressGoodByeOnStop == nil { return } rt.suppressGoodByeOnStop.Store(true) } func (c *ClientCommon) retireClientSessionRuntime(rt *clientSessionRuntime, suppressGoodBye bool) { if c == nil || rt == nil { return } if suppressGoodBye { rt.markRuntimeSuppressGoodByeOnStop() } if rt.transportStopFn != nil { rt.transportStopFn() } } func (c *ClientCommon) clearClientSessionRuntimeTransport() { if c == nil { return } rt := c.clientSessionRuntimeSnapshot() if rt == nil { return } if rt.transportStopFn != nil { rt.transportStopFn() } next := *rt next.transport = nil next.transportAttached = false next.conn = nil next.transportStopCtx = nil next.transportStopFn = nil c.setClientSessionRuntime(&next) } func (c *ClientCommon) clearClientSessionRuntimeQueue() { if c == nil { return } rt := c.clientSessionRuntimeSnapshot() if rt == nil { return } next := *rt next.queue = nil if next.transport != nil { next.transport = newTransportBinding(next.transport.connSnapshot(), nil) } c.setClientSessionRuntime(&next) } func (c *ClientCommon) attachClientSessionTransport(conn net.Conn) error { if c == nil { return errors.New("client is nil") } if conn == nil { return errors.New("conn is nil") } rt := c.clientSessionRuntimeSnapshot() if rt == nil { return errors.New("client session runtime is nil") } if rt.queue == nil { return errClientSessionQueueUnavailable } oldBinding := rt.transport if rt.transportStopFn != nil { rt.transportStopFn() } next := *rt next.transport = newTransportBinding(conn, rt.queue) next.transportAttached = true next.conn = conn next.transportStopCtx = nil next.transportStopFn = nil next.suppressGoodByeOnStop = &atomic.Bool{} c.setClientSessionRuntime(&next) if oldConn := oldBinding.connSnapshot(); oldConn != nil && oldConn != conn { _ = oldConn.Close() } return c.startClientTransportRuntime(c.clientSessionRuntimeSnapshot()) } func (c *ClientCommon) clientSessionRuntimeSnapshot() *clientSessionRuntime { if c == nil { return nil } return c.sessionRuntime.Load() } func normalizeClientSessionRuntimeTransportState(rt *clientSessionRuntime) { if rt == nil { return } if rt.transport != nil { rt.transportAttached = rt.transport.connSnapshot() != nil return } rt.transportAttached = rt.conn != nil } func ensureClientSessionRuntimeTransportLifecycle(rt *clientSessionRuntime) { if rt == nil { return } if rt.conn == nil { rt.transportStopCtx = nil rt.transportStopFn = nil return } if rt.transportStopCtx != nil && rt.transportStopFn != nil { return } parent := rt.stopCtx if parent == nil { parent = context.Background() } rt.transportStopCtx, rt.transportStopFn = context.WithCancel(parent) } func (c *ClientCommon) clientTransportConnSnapshot() net.Conn { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return nil } if rt.transport != nil { return rt.transport.connSnapshot() } return rt.conn } func (c *ClientCommon) clientInboundDispatcherSnapshot() *inboundDispatcher { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return nil } return rt.inboundDispatcher } func (c *ClientCommon) clientStopContextSnapshot() context.Context { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return nil } return rt.stopCtx } func (c *ClientCommon) clientStopFuncSnapshot() context.CancelFunc { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return nil } return rt.stopFn } func (c *ClientCommon) clientQueueSnapshot() *stario.StarQueue { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return nil } if rt.transport != nil { return rt.transport.queueSnapshot() } return rt.queue } func (c *ClientCommon) clientTransportBindingSnapshot() *transportBinding { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return nil } if rt.transport != nil { return rt.transport } if rt.conn == nil { return nil } return newTransportBinding(rt.conn, rt.queue) } func (c *ClientCommon) clientTransportStopContextSnapshot() context.Context { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return nil } if rt.transportStopCtx != nil { return rt.transportStopCtx } return rt.stopCtx } func (c *ClientCommon) clientTransportAttachedSnapshot() bool { rt := c.clientSessionRuntimeSnapshot() if rt == nil { return false } return rt.transportAttached }