notify/session_state.go
starainrt 09d972c7b7
feat(notify): 重构通信内核并补齐 stream/bulk/record/transfer 能力
- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层
  - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径
  - 完成 transfer/file 传输内核与状态快照、诊断能力
  - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块
  - 增加大规模回归、并发与基准测试覆盖
  - 更新依赖库
2026-04-15 15:24:36 +08:00

162 lines
3.5 KiB
Go

package notify
import (
"context"
"sync"
"sync/atomic"
)
func sessionIsAlive(alive *atomic.Value) (ok bool) {
if alive == nil {
return false
}
defer func() {
if recover() != nil {
ok = false
}
}()
value := alive.Load()
flag, _ := value.(bool)
return flag
}
func sessionMarkStarted(alive *atomic.Value, locker sync.Locker, status *Status) {
if alive != nil {
alive.Store(true)
}
withSessionStatusLock(locker, func() {
if status == nil {
return
}
*status = Status{
Alive: true,
Reason: "",
Err: nil,
}
})
}
func sessionMarkStopped(alive *atomic.Value, locker sync.Locker, status *Status, reason string, err error, stopFn context.CancelFunc, cleanupFns ...func()) {
if alive != nil {
alive.Store(false)
}
withSessionStatusLock(locker, func() {
if status == nil {
return
}
*status = Status{
Alive: false,
Reason: reason,
Err: err,
}
})
for _, cleanupFn := range cleanupFns {
if cleanupFn != nil {
cleanupFn()
}
}
if stopFn != nil {
stopFn()
}
}
func sessionStopChan(stopCtx context.Context) <-chan struct{} {
if stopCtx == nil {
return nil
}
return stopCtx.Done()
}
func sessionStatusValue(locker sync.Locker, status *Status) Status {
var snapshot Status
withSessionStatusLock(locker, func() {
if status == nil {
return
}
snapshot = *status
})
return snapshot
}
func withSessionStatusLock(locker sync.Locker, fn func()) {
if locker != nil {
locker.Lock()
defer locker.Unlock()
}
fn()
}
func (c *ClientCommon) markSessionStarted() {
c.markClientSessionStarted()
sessionMarkStarted(&c.alive, &c.mu, &c.status)
}
func (c *ClientCommon) markSessionStopped(reason string, err error) {
c.markClientSessionStopping()
sessionMarkStopped(&c.alive, &c.mu, &c.status, reason, err, c.clientStopFuncSnapshot(),
c.clearClientSessionRuntimeTransport,
c.clearClientSessionRuntimeQueue,
c.cleanupClientSessionResources,
)
c.markClientSessionStopped()
}
func (s *ServerCommon) markSessionStarted() {
s.markServerSessionStarted()
sessionMarkStarted(&s.alive, &s.mu, &s.status)
}
func (s *ServerCommon) markSessionStopped(reason string, err error) {
s.markServerSessionStopping()
sessionMarkStopped(&s.alive, &s.mu, &s.status, reason, err, s.serverStopFuncSnapshot(),
s.clearServerSessionRuntimeTransport,
s.clearServerSessionRuntimeQueue,
s.cleanupServerSessionResources,
)
s.markServerSessionStopped()
}
func (c *ClientConn) markSessionStarted() {
c.markClientConnLogicalSessionStarted()
}
func (c *ClientConn) markSessionStopped(reason string, err error) {
c.markClientConnLogicalSessionStopped(reason, err)
}
func (c *ClientCommon) cleanupClientSessionResources() {
if c == nil {
return
}
state := c.getLogicalSessionState()
state.pendingWaits.closeAll()
state.fileAckWaits.closeAll()
state.signalAckWaits.closeAll()
state.receivedSignals.closeAll()
state.transfers.closeAll(errServiceShutdown)
if runtime := c.getStreamRuntime(); runtime != nil {
runtime.closeAll(errServiceShutdown)
}
if runtime := c.getBulkRuntime(); runtime != nil {
runtime.closeAll(errServiceShutdown)
}
}
func (s *ServerCommon) cleanupServerSessionResources() {
if s == nil {
return
}
state := s.getLogicalSessionState()
state.pendingWaits.closeAll()
state.fileAckWaits.closeAll()
state.signalAckWaits.closeAll()
state.receivedSignals.closeAll()
state.transfers.closeAll(errServiceShutdown)
if runtime := s.getStreamRuntime(); runtime != nil {
runtime.closeAll(errServiceShutdown)
}
if runtime := s.getBulkRuntime(); runtime != nil {
runtime.closeAll(errServiceShutdown)
}
}