194 lines
4.6 KiB
Go
194 lines
4.6 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"os"
|
||
|
|
"sync/atomic"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
func (c *ClientCommon) send(msg TransferMsg) (WaitMsg, error) {
|
||
|
|
if err := c.ensureClientSendReady(); err != nil {
|
||
|
|
return WaitMsg{}, err
|
||
|
|
}
|
||
|
|
var wait WaitMsg
|
||
|
|
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
|
||
|
|
msg.ID = atomic.AddUint64(&c.msgID, 1)
|
||
|
|
}
|
||
|
|
env, err := wrapTransferMsgEnvelope(msg, c.sequenceEn)
|
||
|
|
if err != nil {
|
||
|
|
return WaitMsg{}, err
|
||
|
|
}
|
||
|
|
if requiresSignalReplyWait(msg) {
|
||
|
|
wait = c.getPendingWaitPool().createAndStore(msg)
|
||
|
|
}
|
||
|
|
err = c.sendSignalEnvelopeMaybeReliable(env, msg)
|
||
|
|
if err != nil {
|
||
|
|
if requiresSignalReplyWait(msg) {
|
||
|
|
c.getPendingWaitPool().removeAndClose(msg.ID)
|
||
|
|
}
|
||
|
|
return WaitMsg{}, err
|
||
|
|
}
|
||
|
|
return wait, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) sendEnvelope(env Envelope) error {
|
||
|
|
if err := c.ensureClientSendReady(); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
payload, err := c.encodeEnvelopePayload(env)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if batchedControlEnvelope(env) {
|
||
|
|
return c.writeControlPayloadToTransport(payload)
|
||
|
|
}
|
||
|
|
return c.writePayloadToTransport(payload)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) dispatchEnvelope(env Envelope, now time.Time) {
|
||
|
|
switch env.Kind {
|
||
|
|
case EnvelopeSignalAck:
|
||
|
|
if c.handleSignalAckEnvelope(env) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
case EnvelopeStreamData:
|
||
|
|
c.dispatchStreamEnvelope(env)
|
||
|
|
return
|
||
|
|
case EnvelopeSignal:
|
||
|
|
transfer, err := unwrapTransferMsgEnvelope(env, c.sequenceDe)
|
||
|
|
if err != nil {
|
||
|
|
if c.showError || c.debugMode {
|
||
|
|
fmt.Println("client unwrap signal envelope error", err)
|
||
|
|
}
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if c.handleReceivedSignalReliability(transfer) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
message := Message{
|
||
|
|
ServerConn: c,
|
||
|
|
TransferMsg: transfer,
|
||
|
|
NetType: NET_CLIENT,
|
||
|
|
Time: now,
|
||
|
|
}
|
||
|
|
c.dispatchMsg(message)
|
||
|
|
case EnvelopeFileMeta, EnvelopeFileChunk, EnvelopeFileEnd, EnvelopeFileAbort, EnvelopeAck:
|
||
|
|
c.dispatchFileEnvelope(env, now)
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) Send(key string, value MsgVal) error {
|
||
|
|
_, err := c.send(TransferMsg{
|
||
|
|
Key: key,
|
||
|
|
Value: value,
|
||
|
|
Type: MSG_ASYNC,
|
||
|
|
})
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message, error) {
|
||
|
|
data, err := c.send(msg)
|
||
|
|
if err != nil {
|
||
|
|
return Message{}, err
|
||
|
|
}
|
||
|
|
stopCh := sessionStopChan(c.clientStopContextSnapshot())
|
||
|
|
if timeout.Seconds() == 0 {
|
||
|
|
msg, ok := <-data.Reply
|
||
|
|
if !ok {
|
||
|
|
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
|
||
|
|
}
|
||
|
|
return msg, nil
|
||
|
|
}
|
||
|
|
select {
|
||
|
|
case <-time.After(timeout):
|
||
|
|
c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID)
|
||
|
|
return Message{}, os.ErrDeadlineExceeded
|
||
|
|
case <-stopCh:
|
||
|
|
return Message{}, errServiceShutdown
|
||
|
|
case msg, ok := <-data.Reply:
|
||
|
|
if !ok {
|
||
|
|
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
|
||
|
|
}
|
||
|
|
return msg, nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, error) {
|
||
|
|
data, err := c.send(msg)
|
||
|
|
if err != nil {
|
||
|
|
return Message{}, err
|
||
|
|
}
|
||
|
|
stopCh := sessionStopChan(c.clientStopContextSnapshot())
|
||
|
|
if ctx == nil {
|
||
|
|
ctx = context.Background()
|
||
|
|
}
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
c.getPendingWaitPool().removeAndClose(data.TransferMsg.ID)
|
||
|
|
return Message{}, normalizeStreamDeadlineError(ctx.Err())
|
||
|
|
case <-stopCh:
|
||
|
|
return Message{}, errServiceShutdown
|
||
|
|
case msg, ok := <-data.Reply:
|
||
|
|
if !ok {
|
||
|
|
return msg, pendingWaitClosedErrorWith(stopCh, clientTransportDetachedError(c))
|
||
|
|
}
|
||
|
|
return msg, nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error) {
|
||
|
|
data, err := c.sequenceEn(val)
|
||
|
|
if err != nil {
|
||
|
|
return Message{}, err
|
||
|
|
}
|
||
|
|
return c.sendCtx(TransferMsg{
|
||
|
|
Key: key,
|
||
|
|
Value: data,
|
||
|
|
Type: MSG_SYNC_ASK,
|
||
|
|
}, ctx)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SendObj(key string, val interface{}) error {
|
||
|
|
data, err := encode(val)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
_, err = c.send(TransferMsg{
|
||
|
|
Key: key,
|
||
|
|
Value: data,
|
||
|
|
Type: MSG_ASYNC,
|
||
|
|
})
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
|
||
|
|
return c.sendCtx(TransferMsg{
|
||
|
|
Key: key,
|
||
|
|
Value: value,
|
||
|
|
Type: MSG_SYNC_ASK,
|
||
|
|
}, ctx)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
|
||
|
|
return c.sendWait(TransferMsg{
|
||
|
|
Key: key,
|
||
|
|
Value: value,
|
||
|
|
Type: MSG_SYNC_ASK,
|
||
|
|
}, timeout)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
|
||
|
|
data, err := c.sequenceEn(value)
|
||
|
|
if err != nil {
|
||
|
|
return Message{}, err
|
||
|
|
}
|
||
|
|
return c.SendWait(key, data, timeout)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) Reply(m Message, value MsgVal) error {
|
||
|
|
return m.Reply(value)
|
||
|
|
}
|