notify/logical_session_state.go
starainrt 09d972c7b7
feat(notify): 重构通信内核并补齐 stream/bulk/record/transfer 能力
- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层
  - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径
  - 完成 transfer/file 传输内核与状态快照、诊断能力
  - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块
  - 增加大规模回归、并发与基准测试覆盖
  - 更新依赖库
2026-04-15 15:24:36 +08:00

91 lines
2.8 KiB
Go

package notify
type logicalSessionState struct {
pendingWaits *pendingWaitPool
fileReceives *fileReceivePool
fileAckWaits *fileAckPool
signalAckWaits *signalAckPool
receivedSignals *receivedSignalCache
signalReliableState *signalReliabilityState
fileTransfers *fileTransferState
transfers *transferState
}
func newLogicalSessionState(fileCfg fileTransferConfig, signalCfg signalReliabilityConfig) *logicalSessionState {
fileCfg = normalizeFileTransferConfig(fileCfg)
signalCfg = normalizeSignalReliabilityConfig(signalCfg)
return &logicalSessionState{
pendingWaits: newPendingWaitPool(),
fileReceives: newFileReceivePoolWithConfig(fileCfg),
fileAckWaits: newFileAckPool(),
signalAckWaits: newSignalAckPool(),
receivedSignals: newReceivedSignalCache(signalCfg.ReceiveCacheLimit),
signalReliableState: newSignalReliabilityState(),
fileTransfers: newFileTransferStateWithConfig(fileCfg),
transfers: newTransferState(),
}
}
func (s *logicalSessionState) applyFileTransferConfig(cfg fileTransferConfig) {
if s == nil {
return
}
cfg = normalizeFileTransferConfig(cfg)
if s.fileReceives != nil {
s.fileReceives.applyConfig(cfg)
}
if s.fileTransfers != nil {
s.fileTransfers.applyConfig(cfg)
}
}
func (s *logicalSessionState) applySignalReliabilityConfig(cfg signalReliabilityConfig) {
if s == nil {
return
}
cfg = normalizeSignalReliabilityConfig(cfg)
if s.receivedSignals != nil {
s.receivedSignals.applyLimit(cfg.ReceiveCacheLimit)
}
}
func (c *ClientCommon) getLogicalSessionState() *logicalSessionState {
c.mu.Lock()
fileCfg := normalizeFileTransferConfig(c.fileTransferCfg)
signalCfg := normalizeSignalReliabilityConfig(c.signalReliableCfg)
c.fileTransferCfg = fileCfg
c.signalReliableCfg = signalCfg
if c.logicalSession == nil {
c.logicalSession = newLogicalSessionState(fileCfg, signalCfg)
}
state := c.logicalSession
c.mu.Unlock()
state.applyFileTransferConfig(fileCfg)
state.applySignalReliabilityConfig(signalCfg)
return state
}
func (s *ServerCommon) getLogicalSessionState() *logicalSessionState {
s.mu.Lock()
fileCfg := normalizeFileTransferConfig(s.fileTransferCfg)
signalCfg := normalizeSignalReliabilityConfig(s.signalReliableCfg)
s.fileTransferCfg = fileCfg
s.signalReliableCfg = signalCfg
if s.logicalSession == nil {
s.logicalSession = newLogicalSessionState(fileCfg, signalCfg)
}
state := s.logicalSession
s.mu.Unlock()
state.applyFileTransferConfig(fileCfg)
state.applySignalReliabilityConfig(signalCfg)
return state
}
func (c *ClientCommon) getTransferState() *transferState {
return c.getLogicalSessionState().transfers
}
func (s *ServerCommon) getTransferState() *transferState {
return s.getLogicalSessionState().transfers
}