notify/client_send.go
starainrt 09d972c7b7
feat(notify): 重构通信内核并补齐 stream/bulk/record/transfer 能力
- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层
  - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径
  - 完成 transfer/file 传输内核与状态快照、诊断能力
  - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块
  - 增加大规模回归、并发与基准测试覆盖
  - 更新依赖库
2026-04-15 15:24:36 +08:00

194 lines
4.6 KiB
Go

package notify
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
)
func (c *ClientCommon) send(msg TransferMsg) (WaitMsg, error) {
if err := c.ensureClientSendReady(); err != nil {
return WaitMsg{}, err
}
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = atomic.AddUint64(&c.msgID, 1)
}
env, err := wrapTransferMsgEnvelope(msg, c.sequenceEn)
if err != nil {
return WaitMsg{}, err
}
if requiresSignalReplyWait(msg) {
wait = c.getPendingWaitPool().createAndStore(msg)
}
err = c.sendSignalEnvelopeMaybeReliable(env, msg)
if err != nil {
if requiresSignalReplyWait(msg) {
c.getPendingWaitPool().removeAndClose(msg.ID)
}
return WaitMsg{}, err
}
return wait, err
}
func (c *ClientCommon) sendEnvelope(env Envelope) error {
if err := c.ensureClientSendReady(); err != nil {
return err
}
payload, err := c.encodeEnvelopePayload(env)
if err != nil {
return err
}
if batchedControlEnvelope(env) {
return c.writeControlPayloadToTransport(payload)
}
return c.writePayloadToTransport(payload)
}
func (c *ClientCommon) dispatchEnvelope(env Envelope, now time.Time) {
switch env.Kind {
case EnvelopeSignalAck:
if c.handleSignalAckEnvelope(env) {
return
}
case EnvelopeStreamData:
c.dispatchStreamEnvelope(env)
return
case EnvelopeSignal:
transfer, err := unwrapTransferMsgEnvelope(env, c.sequenceDe)
if err != nil {
if c.showError || c.debugMode {
fmt.Println("client unwrap signal envelope error", err)
}
return
}
if c.handleReceivedSignalReliability(transfer) {
return
}
message := Message{
ServerConn: c,
TransferMsg: transfer,
NetType: NET_CLIENT,
Time: now,
}
c.dispatchMsg(message)
case EnvelopeFileMeta, EnvelopeFileChunk, EnvelopeFileEnd, EnvelopeFileAbort, EnvelopeAck:
c.dispatchFileEnvelope(env, now)
default:
}
}
func (c *ClientCommon) Send(key string, value MsgVal) error {
_, err := c.send(TransferMsg{
Key: key,
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
stopCh := sessionStopChan(c.clientStopContextSnapshot())
if timeout.Seconds() == 0 {
msg, ok := <-data.Reply
if !ok {
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
}
return msg, nil
}
select {
case <-time.After(timeout):
c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-stopCh:
return Message{}, errServiceShutdown
case msg, ok := <-data.Reply:
if !ok {
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
}
return msg, nil
}
}
func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
stopCh := sessionStopChan(c.clientStopContextSnapshot())
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID)
return Message{}, normalizeStreamDeadlineError(ctx.Err())
case <-stopCh:
return Message{}, errServiceShutdown
case msg, ok := <-data.Reply:
if !ok {
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
}
return msg, nil
}
}
func (c *ClientCommon) SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error) {
data, err := c.sequenceEn(val)
if err != nil {
return Message{}, err
}
return c.sendCtx(TransferMsg{
Key: key,
Value: data,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendObj(key string, val interface{}) error {
data, err := encode(val)
if err != nil {
return err
}
_, err = c.send(TransferMsg{
Key: key,
Value: data,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
return c.sendCtx(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
return c.sendWait(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (c *ClientCommon) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
data, err := c.sequenceEn(value)
if err != nil {
return Message{}, err
}
return c.SendWait(key, data, timeout)
}
func (c *ClientCommon) Reply(m Message, value MsgVal) error {
return m.Reply(value)
}