notify/client_session_runtime.go

324 lines
7.6 KiB
Go
Raw Permalink Normal View History

package notify
import (
"b612.me/stario"
"context"
"errors"
"net"
"sync/atomic"
)
type clientSessionRuntime struct {
transport *transportBinding
transportAttached bool
conn net.Conn
stopCtx context.Context
stopFn context.CancelFunc
transportStopCtx context.Context
transportStopFn context.CancelFunc
queue *stario.StarQueue
inboundDispatcher *inboundDispatcher
epoch uint64
suppressGoodByeOnStop *atomic.Bool
}
func newClientSessionRuntimeBase(stopCtx context.Context, stopFn context.CancelFunc) *clientSessionRuntime {
return &clientSessionRuntime{
stopCtx: stopCtx,
stopFn: stopFn,
inboundDispatcher: newInboundDispatcher(),
suppressGoodByeOnStop: &atomic.Bool{},
}
}
func prepareClientSessionRuntime(rt *clientSessionRuntime) *clientSessionRuntime {
if rt == nil {
return nil
}
if rt.inboundDispatcher == nil {
rt.inboundDispatcher = newInboundDispatcher()
}
if rt.suppressGoodByeOnStop == nil {
rt.suppressGoodByeOnStop = &atomic.Bool{}
}
if rt.transport == nil && rt.conn != nil {
rt.transport = newTransportBinding(rt.conn, rt.queue)
}
normalizeClientSessionRuntimeTransportState(rt)
ensureClientSessionRuntimeTransportLifecycle(rt)
return rt
}
func (c *ClientCommon) setClientSessionRuntime(rt *clientSessionRuntime) {
if c == nil || rt == nil {
return
}
var oldBinding *transportBinding
if prev := c.clientSessionRuntimeSnapshot(); prev != nil && prev.transport != nil && prev.transport != rt.transport {
oldBinding = prev.transport
}
rt = prepareClientSessionRuntime(rt)
c.sessionRuntime.Store(rt)
c.stopCtx = rt.stopCtx
c.stopFn = rt.stopFn
if rt.transport != nil {
c.queue = rt.transport.queueSnapshot()
c.conn = rt.transport.connSnapshot()
} else {
c.queue = rt.queue
c.conn = rt.conn
}
if oldBinding != nil {
oldBinding.stopBackgroundWorkers()
}
}
func (c *ClientCommon) resetClientSessionRuntimeBase() {
if c == nil {
return
}
stopCtx, stopFn := context.WithCancel(context.Background())
c.sessionRuntime.Store(newClientSessionRuntimeBase(stopCtx, stopFn))
c.conn = nil
c.queue = nil
c.stopCtx = stopCtx
c.stopFn = stopFn
}
func (c *ClientCommon) cleanupFailedClientStart() {
if c == nil {
return
}
rt := c.clientSessionRuntimeSnapshot()
if rt != nil && rt.stopFn != nil {
rt.stopFn()
}
c.cleanupClientSessionResources()
c.rollbackClientSessionStart()
c.resetClientSessionRuntimeBase()
}
func newClientSessionRuntime(conn net.Conn, stopCtx context.Context, stopFn context.CancelFunc, queue *stario.StarQueue, epoch uint64) *clientSessionRuntime {
return prepareClientSessionRuntime(&clientSessionRuntime{
transport: newTransportBinding(conn, queue),
transportAttached: conn != nil,
conn: conn,
stopCtx: stopCtx,
stopFn: stopFn,
queue: queue,
inboundDispatcher: newInboundDispatcher(),
epoch: epoch,
suppressGoodByeOnStop: &atomic.Bool{},
})
}
func (rt *clientSessionRuntime) runtimeShouldSuppressGoodByeOnStop() bool {
if rt == nil || rt.suppressGoodByeOnStop == nil {
return false
}
return rt.suppressGoodByeOnStop.Load()
}
func (rt *clientSessionRuntime) markRuntimeSuppressGoodByeOnStop() {
if rt == nil || rt.suppressGoodByeOnStop == nil {
return
}
rt.suppressGoodByeOnStop.Store(true)
}
func (c *ClientCommon) retireClientSessionRuntime(rt *clientSessionRuntime, suppressGoodBye bool) {
if c == nil || rt == nil {
return
}
if suppressGoodBye {
rt.markRuntimeSuppressGoodByeOnStop()
}
if rt.transportStopFn != nil {
rt.transportStopFn()
}
}
func (c *ClientCommon) clearClientSessionRuntimeTransport() {
if c == nil {
return
}
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return
}
if rt.transportStopFn != nil {
rt.transportStopFn()
}
next := *rt
next.transport = nil
next.transportAttached = false
next.conn = nil
next.transportStopCtx = nil
next.transportStopFn = nil
c.setClientSessionRuntime(&next)
}
func (c *ClientCommon) clearClientSessionRuntimeQueue() {
if c == nil {
return
}
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return
}
next := *rt
next.queue = nil
if next.transport != nil {
next.transport = newTransportBinding(next.transport.connSnapshot(), nil)
}
c.setClientSessionRuntime(&next)
}
func (c *ClientCommon) attachClientSessionTransport(conn net.Conn) error {
if c == nil {
return errors.New("client is nil")
}
if conn == nil {
return errors.New("conn is nil")
}
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return errors.New("client session runtime is nil")
}
if rt.queue == nil {
return errClientSessionQueueUnavailable
}
oldBinding := rt.transport
if rt.transportStopFn != nil {
rt.transportStopFn()
}
next := *rt
next.transport = newTransportBinding(conn, rt.queue)
next.transportAttached = true
next.conn = conn
next.transportStopCtx = nil
next.transportStopFn = nil
next.suppressGoodByeOnStop = &atomic.Bool{}
c.setClientSessionRuntime(&next)
if oldConn := oldBinding.connSnapshot(); oldConn != nil && oldConn != conn {
_ = oldConn.Close()
}
return c.startClientTransportRuntime(c.clientSessionRuntimeSnapshot())
}
func (c *ClientCommon) clientSessionRuntimeSnapshot() *clientSessionRuntime {
if c == nil {
return nil
}
return c.sessionRuntime.Load()
}
func normalizeClientSessionRuntimeTransportState(rt *clientSessionRuntime) {
if rt == nil {
return
}
if rt.transport != nil {
rt.transportAttached = rt.transport.connSnapshot() != nil
return
}
rt.transportAttached = rt.conn != nil
}
func ensureClientSessionRuntimeTransportLifecycle(rt *clientSessionRuntime) {
if rt == nil {
return
}
if rt.conn == nil {
rt.transportStopCtx = nil
rt.transportStopFn = nil
return
}
if rt.transportStopCtx != nil && rt.transportStopFn != nil {
return
}
parent := rt.stopCtx
if parent == nil {
parent = context.Background()
}
rt.transportStopCtx, rt.transportStopFn = context.WithCancel(parent)
}
func (c *ClientCommon) clientTransportConnSnapshot() net.Conn {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return nil
}
if rt.transport != nil {
return rt.transport.connSnapshot()
}
return rt.conn
}
func (c *ClientCommon) clientInboundDispatcherSnapshot() *inboundDispatcher {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.inboundDispatcher
}
func (c *ClientCommon) clientStopContextSnapshot() context.Context {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.stopCtx
}
func (c *ClientCommon) clientStopFuncSnapshot() context.CancelFunc {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return nil
}
return rt.stopFn
}
func (c *ClientCommon) clientQueueSnapshot() *stario.StarQueue {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return nil
}
if rt.transport != nil {
return rt.transport.queueSnapshot()
}
return rt.queue
}
func (c *ClientCommon) clientTransportBindingSnapshot() *transportBinding {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return nil
}
if rt.transport != nil {
return rt.transport
}
if rt.conn == nil {
return nil
}
return newTransportBinding(rt.conn, rt.queue)
}
func (c *ClientCommon) clientTransportStopContextSnapshot() context.Context {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return nil
}
if rt.transportStopCtx != nil {
return rt.transportStopCtx
}
return rt.stopCtx
}
func (c *ClientCommon) clientTransportAttachedSnapshot() bool {
rt := c.clientSessionRuntimeSnapshot()
if rt == nil {
return false
}
return rt.transportAttached
}