package notify import ( "context" "fmt" "os" "sync/atomic" "time" ) func (c *ClientCommon) send(msg TransferMsg) (WaitMsg, error) { if err := c.ensureClientSendReady(); err != nil { return WaitMsg{}, err } var wait WaitMsg if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 { msg.ID = atomic.AddUint64(&c.msgID, 1) } env, err := wrapTransferMsgEnvelope(msg, c.sequenceEn) if err != nil { return WaitMsg{}, err } if requiresSignalReplyWait(msg) { wait = c.getPendingWaitPool().createAndStore(msg) } err = c.sendSignalEnvelopeMaybeReliable(env, msg) if err != nil { if requiresSignalReplyWait(msg) { c.getPendingWaitPool().removeAndClose(msg.ID) } return WaitMsg{}, err } return wait, err } func (c *ClientCommon) sendEnvelope(env Envelope) error { if err := c.ensureClientSendReady(); err != nil { return err } payload, err := c.encodeEnvelopePayload(env) if err != nil { return err } if batchedControlEnvelope(env) { return c.writeControlPayloadToTransport(payload) } return c.writePayloadToTransport(payload) } func (c *ClientCommon) dispatchEnvelope(env Envelope, now time.Time) { switch env.Kind { case EnvelopeSignalAck: if c.handleSignalAckEnvelope(env) { return } case EnvelopeStreamData: c.dispatchStreamEnvelope(env) return case EnvelopeSignal: transfer, err := unwrapTransferMsgEnvelope(env, c.sequenceDe) if err != nil { if c.showError || c.debugMode { fmt.Println("client unwrap signal envelope error", err) } return } if c.handleReceivedSignalReliability(transfer) { return } message := Message{ ServerConn: c, TransferMsg: transfer, NetType: NET_CLIENT, Time: now, } c.dispatchMsg(message) case EnvelopeFileMeta, EnvelopeFileChunk, EnvelopeFileEnd, EnvelopeFileAbort, EnvelopeAck: c.dispatchFileEnvelope(env, now) default: } } func (c *ClientCommon) Send(key string, value MsgVal) error { _, err := c.send(TransferMsg{ Key: key, Value: value, Type: MSG_ASYNC, }) return err } func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message, error) { data, err := c.send(msg) if err != nil { return Message{}, err } stopCh := sessionStopChan(c.clientStopContextSnapshot()) if timeout.Seconds() == 0 { msg, ok := <-data.Reply if !ok { return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c)) } return msg, nil } select { case <-time.After(timeout): c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID) return Message{}, os.ErrDeadlineExceeded case <-stopCh: return Message{}, errServiceShutdown case msg, ok := <-data.Reply: if !ok { return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c)) } return msg, nil } } func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, error) { data, err := c.send(msg) if err != nil { return Message{}, err } stopCh := sessionStopChan(c.clientStopContextSnapshot()) if ctx == nil { ctx = context.Background() } select { case <-ctx.Done(): c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID) return Message{}, normalizeStreamDeadlineError(ctx.Err()) case <-stopCh: return Message{}, errServiceShutdown case msg, ok := <-data.Reply: if !ok { return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c)) } return msg, nil } } func (c *ClientCommon) SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error) { data, err := c.sequenceEn(val) if err != nil { return Message{}, err } return c.sendCtx(TransferMsg{ Key: key, Value: data, Type: MSG_SYNC_ASK, }, ctx) } func (c *ClientCommon) SendObj(key string, val interface{}) error { data, err := encode(val) if err != nil { return err } _, err = c.send(TransferMsg{ Key: key, Value: data, Type: MSG_ASYNC, }) return err } func (c *ClientCommon) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) { return c.sendCtx(TransferMsg{ Key: key, Value: value, Type: MSG_SYNC_ASK, }, ctx) } func (c *ClientCommon) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) { return c.sendWait(TransferMsg{ Key: key, Value: value, Type: MSG_SYNC_ASK, }, timeout) } func (c *ClientCommon) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) { data, err := c.sequenceEn(value) if err != nil { return Message{}, err } return c.SendWait(key, data, timeout) } func (c *ClientCommon) Reply(m Message, value MsgVal) error { return m.Reply(value) }