notify/client.go
starainrt 98ef9e7fcc
feat(transport): 完成安全架构拆分并收口 stream/bulk 传输优化
- 新增 managed/external/nested 三种传输保护模式
  - 新增 peer attach 显式认证、抗重放、channel binding 和可选前向保密协商
  - 明确单连接注入与可重拨连接源的语义边界
  - 禁止 ConnectByConn 场景下 dedicated bulk 走 sidecar,auto 模式自动回退 shared
  - 修正 dedicated attach 在 bootstrap/steady profile 切换下的处理逻辑
  - 优化 shared bulk super-batch 与批量 framed write 路径
  - 降低 stream/bulk fast path 的复制和分发损耗
  - 补齐 benchmark、回归测试、运行时快照和 README 文档
2026-04-20 16:35:44 +08:00

165 lines
6.7 KiB
Go

package notify
import (
"b612.me/stario"
"context"
"net"
"sync"
"sync/atomic"
"time"
)
type ClientCommon struct {
alive atomic.Value
status Status
byeFromServer bool
conn net.Conn
mu sync.Mutex
msgID uint64
peerIdentity string
sessionEpoch uint64
sessionOwnerState atomic.Int32
sessionRuntime atomic.Pointer[clientSessionRuntime]
connectSource atomic.Pointer[clientConnectSource]
queue *stario.StarQueue
stopFn context.CancelFunc
stopCtx context.Context
parallelNum int
maxReadTimeout time.Duration
maxWriteTimeout time.Duration
keyExchangeFn func(c Client) error
linkFns map[string]func(message *Message)
defaultFns func(message *Message)
msgEn func([]byte, []byte) []byte
msgDe func([]byte, []byte) []byte
fastStreamEncode transportFastStreamEncoder
fastBulkEncode transportFastBulkEncoder
fastPlainEncode transportFastPlainEncoder
modernPSKRuntime *modernPSKCodecRuntime
handshakeRsaPubKey []byte
SecretKey []byte
transportProtection atomic.Pointer[transportProtectionProfile]
peerAttachSecurity atomic.Pointer[peerAttachSecurityState]
securityBootstrap transportProtectionProfile
securitySteady transportProtectionProfile
securitySteadyNegotiated transportProtectionProfile
securityAuthMode AuthMode
securityProtectionMode ProtectionMode
securityRequireForwardSecrecy bool
securityConfigured bool
peerAttachAuthenticated bool
peerAttachAuthFallback bool
peerAttachAt int64
noFinSyncMsgMaxKeepSeconds int
lastHeartbeat int64
heartbeatPeriod time.Duration
wg stario.WaitGroup
netType NetType
showError bool
skipKeyExchange bool
useHeartBeat bool
sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error)
logicalSession *logicalSessionState
onFileEvent func(FileEvent)
fileEventObserver func(FileEvent)
fileTransferCfg fileTransferConfig
signalReliableCfg signalReliabilityConfig
streamRuntime *streamRuntime
recordRuntime *recordRuntime
bulkRuntime *bulkRuntime
bulkDefaultOpenMode BulkOpenMode
bulkNetworkProfile BulkNetworkProfile
bulkOpenTuning BulkOpenTuning
bulkDedicatedAttachLimit int
bulkDedicatedAttachSem chan struct{}
bulkDedicatedAttachRetry int
bulkDedicatedAttachBackoff time.Duration
bulkDedicatedDialTimeout time.Duration
bulkDedicatedHelloTimeout time.Duration
bulkDedicatedActiveLimit int
bulkDedicatedActive atomic.Int32
bulkDedicatedActiveWait chan struct{}
bulkDedicatedLaneLimit int
bulkDedicatedSidecarMu sync.Mutex
bulkDedicatedLanes map[uint32]*bulkDedicatedLane
bulkDedicatedNextLaneID uint32
bulkAttachAttemptCount atomic.Int64
bulkAttachRetryCount atomic.Int64
bulkAttachSuccessCount atomic.Int64
bulkAttachFallbackCount atomic.Int64
connectionRetryState *connectionRetryState
securityReadyCheck bool
debugMode bool
}
func NewClient() Client {
transport := defaultModernPSKTransportBundle()
var client = ClientCommon{
maxReadTimeout: 0,
maxWriteTimeout: 0,
peerIdentity: newClientPeerIdentity(),
sequenceEn: encode,
sequenceDe: Decode,
keyExchangeFn: aesRsaHello,
SecretKey: nil,
handshakeRsaPubKey: defaultRsaPubKey,
msgEn: transport.msgEn,
msgDe: transport.msgDe,
fastStreamEncode: transport.fastStreamEncode,
fastBulkEncode: transport.fastBulkEncode,
fastPlainEncode: transport.fastPlainEncode,
skipKeyExchange: true,
securityReadyCheck: true,
bulkDefaultOpenMode: BulkOpenModeShared,
bulkNetworkProfile: BulkNetworkProfileDefault,
bulkOpenTuning: defaultBulkOpenTuning(),
bulkDedicatedAttachLimit: defaultBulkDedicatedAttachLimit,
bulkDedicatedAttachRetry: defaultBulkDedicatedAttachRetry,
bulkDedicatedAttachBackoff: defaultBulkDedicatedAttachBackoff,
bulkDedicatedDialTimeout: defaultBulkDedicatedDialTimeout,
bulkDedicatedHelloTimeout: defaultBulkDedicatedHelloTimeout,
bulkDedicatedActiveLimit: defaultBulkDedicatedActiveLimit,
bulkDedicatedActiveWait: make(chan struct{}),
bulkDedicatedLaneLimit: defaultBulkDedicatedLaneLimit,
}
client.alive.Store(false)
client.useHeartBeat = true
client.heartbeatPeriod = time.Second * 20
client.linkFns = make(map[string]func(*Message))
client.defaultFns = func(message *Message) {
return
}
client.wg = stario.NewWaitGroup(0)
client.fileTransferCfg = defaultFileTransferConfig()
client.signalReliableCfg = defaultSignalReliabilityConfig()
client.logicalSession = newLogicalSessionState(client.fileTransferCfg, client.signalReliableCfg)
client.streamRuntime = newStreamRuntime("cstrm")
client.recordRuntime = newRecordRuntime()
client.bulkRuntime = newBulkRuntime("cblk")
client.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
if client.bulkDedicatedAttachLimit > 0 {
client.bulkDedicatedAttachSem = make(chan struct{}, client.bulkDedicatedAttachLimit)
}
client.connectionRetryState = newConnectionRetryState()
client.onFileEvent = normalizeFileEventCallback(nil)
client.fileEventObserver = normalizeFileEventCallback(nil)
client.stopCtx, client.stopFn = context.WithCancel(context.Background())
client.sessionRuntime.Store(newClientSessionRuntimeBase(client.stopCtx, client.stopFn))
client.setClientTransportProtectionProfile(defaultTransportProtectionProfile())
client.peerAttachSecurity.Store(defaultPeerAttachSecurityState())
bindClientStreamControl(&client)
bindClientBulkControl(&client)
client.getTransferState().setBuiltinHandler(client.builtinFileTransferHandler)
return &client
}
func (c *ClientCommon) maxWriteTimeoutSnapshot() time.Duration {
if c == nil {
return 0
}
c.mu.Lock()
defer c.mu.Unlock()
return c.maxWriteTimeout
}