notify/client_send.go

194 lines
4.6 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
)
func (c *ClientCommon) send(msg TransferMsg) (WaitMsg, error) {
if err := c.ensureClientSendReady(); err != nil {
return WaitMsg{}, err
}
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = atomic.AddUint64(&c.msgID, 1)
}
env, err := wrapTransferMsgEnvelope(msg, c.sequenceEn)
if err != nil {
return WaitMsg{}, err
}
if requiresSignalReplyWait(msg) {
wait = c.getPendingWaitPool().createAndStore(msg)
}
err = c.sendSignalEnvelopeMaybeReliable(env, msg)
if err != nil {
if requiresSignalReplyWait(msg) {
c.getPendingWaitPool().removeAndClose(msg.ID)
}
return WaitMsg{}, err
}
return wait, err
}
func (c *ClientCommon) sendEnvelope(env Envelope) error {
if err := c.ensureClientSendReady(); err != nil {
return err
}
payload, err := c.encodeEnvelopePayload(env)
if err != nil {
return err
}
if batchedControlEnvelope(env) {
return c.writeControlPayloadToTransport(payload)
}
return c.writePayloadToTransport(payload)
}
func (c *ClientCommon) dispatchEnvelope(env Envelope, now time.Time) {
switch env.Kind {
case EnvelopeSignalAck:
if c.handleSignalAckEnvelope(env) {
return
}
case EnvelopeStreamData:
c.dispatchStreamEnvelope(env)
return
case EnvelopeSignal:
transfer, err := unwrapTransferMsgEnvelope(env, c.sequenceDe)
if err != nil {
if c.showError || c.debugMode {
fmt.Println("client unwrap signal envelope error", err)
}
return
}
if c.handleReceivedSignalReliability(transfer) {
return
}
message := Message{
ServerConn: c,
TransferMsg: transfer,
NetType: NET_CLIENT,
Time: now,
}
c.dispatchMsg(message)
case EnvelopeFileMeta, EnvelopeFileChunk, EnvelopeFileEnd, EnvelopeFileAbort, EnvelopeAck:
c.dispatchFileEnvelope(env, now)
default:
}
}
func (c *ClientCommon) Send(key string, value MsgVal) error {
_, err := c.send(TransferMsg{
Key: key,
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
stopCh := sessionStopChan(c.clientStopContextSnapshot())
if timeout.Seconds() == 0 {
msg, ok := <-data.Reply
if !ok {
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
}
return msg, nil
}
select {
case <-time.After(timeout):
c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-stopCh:
return Message{}, errServiceShutdown
case msg, ok := <-data.Reply:
if !ok {
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
}
return msg, nil
}
}
func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
stopCh := sessionStopChan(c.clientStopContextSnapshot())
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID)
return Message{}, normalizeStreamDeadlineError(ctx.Err())
case <-stopCh:
return Message{}, errServiceShutdown
case msg, ok := <-data.Reply:
if !ok {
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
}
return msg, nil
}
}
func (c *ClientCommon) SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error) {
data, err := c.sequenceEn(val)
if err != nil {
return Message{}, err
}
return c.sendCtx(TransferMsg{
Key: key,
Value: data,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendObj(key string, val interface{}) error {
data, err := encode(val)
if err != nil {
return err
}
_, err = c.send(TransferMsg{
Key: key,
Value: data,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
return c.sendCtx(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
return c.sendWait(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (c *ClientCommon) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
data, err := c.sequenceEn(value)
if err != nil {
return Message{}, err
}
return c.SendWait(key, data, timeout)
}
func (c *ClientCommon) Reply(m Message, value MsgVal) error {
return m.Reply(value)
}