notify/client.go

165 lines
6.7 KiB
Go
Raw Permalink Normal View History

2019-11-14 10:44:19 +08:00
package notify
import (
2021-11-12 16:04:39 +08:00
"b612.me/stario"
2020-02-11 10:50:11 +08:00
"context"
2019-11-14 10:44:19 +08:00
"net"
2020-12-23 20:50:57 +08:00
"sync"
2021-11-12 16:04:39 +08:00
"sync/atomic"
2019-11-14 10:44:19 +08:00
"time"
2019-11-22 10:24:39 +08:00
)
2019-11-14 10:44:19 +08:00
2021-11-12 16:04:39 +08:00
type ClientCommon struct {
alive atomic.Value
status Status
byeFromServer bool
conn net.Conn
mu sync.Mutex
msgID uint64
peerIdentity string
sessionEpoch uint64
sessionOwnerState atomic.Int32
sessionRuntime atomic.Pointer[clientSessionRuntime]
connectSource atomic.Pointer[clientConnectSource]
queue *stario.StarQueue
stopFn context.CancelFunc
stopCtx context.Context
parallelNum int
maxReadTimeout time.Duration
maxWriteTimeout time.Duration
keyExchangeFn func(c Client) error
linkFns map[string]func(message *Message)
defaultFns func(message *Message)
msgEn func([]byte, []byte) []byte
msgDe func([]byte, []byte) []byte
fastStreamEncode transportFastStreamEncoder
fastBulkEncode transportFastBulkEncoder
fastPlainEncode transportFastPlainEncoder
modernPSKRuntime *modernPSKCodecRuntime
handshakeRsaPubKey []byte
SecretKey []byte
transportProtection atomic.Pointer[transportProtectionProfile]
peerAttachSecurity atomic.Pointer[peerAttachSecurityState]
securityBootstrap transportProtectionProfile
securitySteady transportProtectionProfile
securitySteadyNegotiated transportProtectionProfile
securityAuthMode AuthMode
securityProtectionMode ProtectionMode
securityRequireForwardSecrecy bool
securityConfigured bool
peerAttachAuthenticated bool
peerAttachAuthFallback bool
peerAttachAt int64
noFinSyncMsgMaxKeepSeconds int
lastHeartbeat int64
heartbeatPeriod time.Duration
wg stario.WaitGroup
netType NetType
showError bool
skipKeyExchange bool
useHeartBeat bool
sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error)
logicalSession *logicalSessionState
onFileEvent func(FileEvent)
fileEventObserver func(FileEvent)
fileTransferCfg fileTransferConfig
signalReliableCfg signalReliabilityConfig
streamRuntime *streamRuntime
recordRuntime *recordRuntime
bulkRuntime *bulkRuntime
bulkDefaultOpenMode BulkOpenMode
bulkNetworkProfile BulkNetworkProfile
bulkOpenTuning BulkOpenTuning
bulkDedicatedAttachLimit int
bulkDedicatedAttachSem chan struct{}
bulkDedicatedAttachRetry int
bulkDedicatedAttachBackoff time.Duration
bulkDedicatedDialTimeout time.Duration
bulkDedicatedHelloTimeout time.Duration
bulkDedicatedActiveLimit int
bulkDedicatedActive atomic.Int32
bulkDedicatedActiveWait chan struct{}
bulkDedicatedLaneLimit int
bulkDedicatedSidecarMu sync.Mutex
bulkDedicatedLanes map[uint32]*bulkDedicatedLane
bulkDedicatedNextLaneID uint32
bulkAttachAttemptCount atomic.Int64
bulkAttachRetryCount atomic.Int64
bulkAttachSuccessCount atomic.Int64
bulkAttachFallbackCount atomic.Int64
connectionRetryState *connectionRetryState
securityReadyCheck bool
debugMode bool
2021-11-12 16:04:39 +08:00
}
func NewClient() Client {
transport := defaultModernPSKTransportBundle()
2021-11-12 16:04:39 +08:00
var client = ClientCommon{
maxReadTimeout: 0,
maxWriteTimeout: 0,
peerIdentity: newClientPeerIdentity(),
sequenceEn: encode,
sequenceDe: Decode,
keyExchangeFn: aesRsaHello,
SecretKey: nil,
handshakeRsaPubKey: defaultRsaPubKey,
msgEn: transport.msgEn,
msgDe: transport.msgDe,
fastStreamEncode: transport.fastStreamEncode,
fastBulkEncode: transport.fastBulkEncode,
fastPlainEncode: transport.fastPlainEncode,
skipKeyExchange: true,
securityReadyCheck: true,
bulkDefaultOpenMode: BulkOpenModeShared,
bulkNetworkProfile: BulkNetworkProfileDefault,
bulkOpenTuning: defaultBulkOpenTuning(),
bulkDedicatedAttachLimit: defaultBulkDedicatedAttachLimit,
bulkDedicatedAttachRetry: defaultBulkDedicatedAttachRetry,
bulkDedicatedAttachBackoff: defaultBulkDedicatedAttachBackoff,
bulkDedicatedDialTimeout: defaultBulkDedicatedDialTimeout,
bulkDedicatedHelloTimeout: defaultBulkDedicatedHelloTimeout,
bulkDedicatedActiveLimit: defaultBulkDedicatedActiveLimit,
bulkDedicatedActiveWait: make(chan struct{}),
bulkDedicatedLaneLimit: defaultBulkDedicatedLaneLimit,
2019-11-14 10:44:19 +08:00
}
2021-11-12 16:04:39 +08:00
client.alive.Store(false)
client.useHeartBeat = true
client.heartbeatPeriod = time.Second * 20
client.linkFns = make(map[string]func(*Message))
client.defaultFns = func(message *Message) {
2019-12-15 12:44:55 +08:00
return
2021-11-12 16:04:39 +08:00
}
client.wg = stario.NewWaitGroup(0)
client.fileTransferCfg = defaultFileTransferConfig()
client.signalReliableCfg = defaultSignalReliabilityConfig()
client.logicalSession = newLogicalSessionState(client.fileTransferCfg, client.signalReliableCfg)
client.streamRuntime = newStreamRuntime("cstrm")
client.recordRuntime = newRecordRuntime()
client.bulkRuntime = newBulkRuntime("cblk")
client.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
if client.bulkDedicatedAttachLimit > 0 {
client.bulkDedicatedAttachSem = make(chan struct{}, client.bulkDedicatedAttachLimit)
}
client.connectionRetryState = newConnectionRetryState()
client.onFileEvent = normalizeFileEventCallback(nil)
client.fileEventObserver = normalizeFileEventCallback(nil)
2021-11-12 16:04:39 +08:00
client.stopCtx, client.stopFn = context.WithCancel(context.Background())
client.sessionRuntime.Store(newClientSessionRuntimeBase(client.stopCtx, client.stopFn))
client.setClientTransportProtectionProfile(defaultTransportProtectionProfile())
client.peerAttachSecurity.Store(defaultPeerAttachSecurityState())
bindClientStreamControl(&client)
bindClientBulkControl(&client)
client.getTransferState().setBuiltinHandler(client.builtinFileTransferHandler)
2021-11-12 16:04:39 +08:00
return &client
}
func (c *ClientCommon) maxWriteTimeoutSnapshot() time.Duration {
if c == nil {
return 0
}
c.mu.Lock()
defer c.mu.Unlock()
return c.maxWriteTimeout
}