notify/transfer_control.go

633 lines
26 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
)
const (
TransferBeginSignalKey = "notify.transfer.begin"
TransferResumeSignalKey = "notify.transfer.resume"
TransferCommitSignalKey = "notify.transfer.commit"
TransferAbortSignalKey = "notify.transfer.abort"
)
type TransferRange struct {
Offset int64
Length int64
}
type TransferBeginRequest struct {
TransferID string
Channel TransferChannel
Size int64
Checksum string
Metadata map[string]string
}
type TransferBeginResponse struct {
TransferID string
Accepted bool
NextOffset int64
Missing []TransferRange
Error string
}
type TransferResumeRequest struct {
TransferID string
}
type TransferResumeResponse struct {
TransferID string
Accepted bool
NextOffset int64
Missing []TransferRange
Error string
}
type TransferCommitRequest struct {
TransferID string
Size int64
Checksum string
}
type TransferCommitResponse struct {
TransferID string
Accepted bool
Error string
}
type TransferAbortRequest struct {
TransferID string
Stage string
Offset int64
Error string
}
type TransferAbortResponse struct {
TransferID string
Accepted bool
Error string
}
type TransferControlHandler struct {
Begin func(*Message, TransferBeginRequest) (TransferBeginResponse, error)
Resume func(*Message, TransferResumeRequest) (TransferResumeResponse, error)
Commit func(*Message, TransferCommitRequest) (TransferCommitResponse, error)
Abort func(*Message, TransferAbortRequest) (TransferAbortResponse, error)
}
var (
errTransferControlClientNil = errors.New("transfer control client is nil")
errTransferControlServerNil = errors.New("transfer control server is nil")
errTransferControlClientConnNil = errors.New("transfer control client connection is nil")
errTransferControlLogicalConnNil = errors.New("transfer control logical connection is nil")
errTransferControlTransportNil = errors.New("transfer control transport connection is nil")
errTransferControlHandlerEmpty = errors.New("transfer control handler is empty")
)
func BindTransferControlClient(c Client, handler TransferControlHandler) error {
if c == nil {
return errTransferControlClientNil
}
if handler.empty() {
return errTransferControlHandlerEmpty
}
bindTransferControlLinks(c.SetLink, transferControlRuntimeFromClient(c), func(*Message) string {
return clientFileScope()
}, func(*Message) string {
return clientFileScope()
}, func(*Message) uint64 {
return 0
}, handler)
return nil
}
func BindTransferControlServer(s Server, handler TransferControlHandler) error {
if s == nil {
return errTransferControlServerNil
}
if handler.empty() {
return errTransferControlHandlerEmpty
}
bindTransferControlLinks(s.SetLink, transferControlRuntimeFromServer(s), func(msg *Message) string {
if transport := messageTransportConnSnapshot(msg); transport != nil {
return serverTransportScopeForTransport(transport)
}
if logical := messageLogicalConnSnapshot(msg); logical != nil {
return serverTransportScope(logical)
}
return serverFileDomain + ":unknown"
}, func(msg *Message) string {
if logical := messageLogicalConnSnapshot(msg); logical != nil {
return serverFileScope(logical)
}
return serverFileDomain + ":unknown"
}, func(msg *Message) uint64 {
if transport := messageTransportConnSnapshot(msg); transport != nil {
return transport.TransportGeneration()
}
if logical := messageLogicalConnSnapshot(msg); logical != nil {
return logical.transportGenerationSnapshot()
}
if msg == nil {
return 0
}
return 0
}, handler)
return nil
}
func SendTransferBeginClient(ctx context.Context, c Client, req TransferBeginRequest) (TransferBeginResponse, error) {
runtime := transferControlRuntimeFromClient(c)
runtimeScope := clientFileScope()
publicScope := clientFileScope()
transferControlPrepareBegin(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
msg, err := sendTransferControlClient(ctx, c, TransferBeginSignalKey, req)
if err != nil {
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferBeginResponse{}, err)
return TransferBeginResponse{}, err
}
resp, err := decodeTransferBeginResponse(msg)
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func SendTransferResumeClient(ctx context.Context, c Client, req TransferResumeRequest) (TransferResumeResponse, error) {
runtime := transferControlRuntimeFromClient(c)
runtimeScope := clientFileScope()
publicScope := clientFileScope()
transferControlPrepareResume(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
msg, err := sendTransferControlClient(ctx, c, TransferResumeSignalKey, req)
if err != nil {
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferResumeResponse{}, err)
return TransferResumeResponse{}, err
}
resp, err := decodeTransferResumeResponse(msg)
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func SendTransferCommitClient(ctx context.Context, c Client, req TransferCommitRequest) (TransferCommitResponse, error) {
runtime := transferControlRuntimeFromClient(c)
runtimeScope := clientFileScope()
publicScope := clientFileScope()
transferControlPrepareCommit(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
msg, err := sendTransferControlClient(ctx, c, TransferCommitSignalKey, req)
if err != nil {
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferCommitResponse{}, err)
return TransferCommitResponse{}, err
}
resp, err := decodeTransferCommitResponse(msg)
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func SendTransferAbortClient(ctx context.Context, c Client, req TransferAbortRequest) (TransferAbortResponse, error) {
runtime := transferControlRuntimeFromClient(c)
runtimeScope := clientFileScope()
publicScope := clientFileScope()
transferControlPrepareAbort(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
msg, err := sendTransferControlClient(ctx, c, TransferAbortSignalKey, req)
if err != nil {
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, TransferAbortResponse{}, err)
return TransferAbortResponse{}, err
}
resp, err := decodeTransferAbortResponse(msg)
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, resp, err)
return resp, err
}
func SendTransferBeginServer(ctx context.Context, s Server, c *ClientConn, req TransferBeginRequest) (TransferBeginResponse, error) {
return SendTransferBeginLogical(ctx, s, logicalConnFromClient(c), req)
}
func SendTransferBeginLogical(ctx context.Context, s Server, c *LogicalConn, req TransferBeginRequest) (TransferBeginResponse, error) {
if s == nil {
return TransferBeginResponse{}, errTransferControlServerNil
}
if c == nil {
return TransferBeginResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScope(c)
publicScope := serverFileScope(c)
transportGeneration := c.transportGenerationSnapshot()
return sendTransferBeginPrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func SendTransferBeginTransport(ctx context.Context, s Server, t *TransportConn, req TransferBeginRequest) (TransferBeginResponse, error) {
if s == nil {
return TransferBeginResponse{}, errTransferControlServerNil
}
if t == nil {
return TransferBeginResponse{}, errTransferControlTransportNil
}
logical := t.LogicalConn()
if logical == nil {
return TransferBeginResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScopeForTransport(t)
publicScope := serverFileScope(logical)
transportGeneration := t.TransportGeneration()
return sendTransferBeginPreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func sendTransferBeginPrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferBeginRequest) (TransferBeginResponse, error) {
transferControlPrepareBegin(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferBeginSignalKey, req)
if err != nil {
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferBeginResponse{}, err)
return TransferBeginResponse{}, err
}
resp, err := decodeTransferBeginResponse(msg)
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func sendTransferBeginPreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferBeginRequest) (TransferBeginResponse, error) {
transferControlPrepareBegin(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferBeginSignalKey, req)
if err != nil {
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferBeginResponse{}, err)
return TransferBeginResponse{}, err
}
resp, err := decodeTransferBeginResponse(msg)
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func SendTransferResumeServer(ctx context.Context, s Server, c *ClientConn, req TransferResumeRequest) (TransferResumeResponse, error) {
return SendTransferResumeLogical(ctx, s, logicalConnFromClient(c), req)
}
func SendTransferResumeLogical(ctx context.Context, s Server, c *LogicalConn, req TransferResumeRequest) (TransferResumeResponse, error) {
if s == nil {
return TransferResumeResponse{}, errTransferControlServerNil
}
if c == nil {
return TransferResumeResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScope(c)
publicScope := serverFileScope(c)
transportGeneration := c.transportGenerationSnapshot()
return sendTransferResumePrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func SendTransferResumeTransport(ctx context.Context, s Server, t *TransportConn, req TransferResumeRequest) (TransferResumeResponse, error) {
if s == nil {
return TransferResumeResponse{}, errTransferControlServerNil
}
if t == nil {
return TransferResumeResponse{}, errTransferControlTransportNil
}
logical := t.LogicalConn()
if logical == nil {
return TransferResumeResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScopeForTransport(t)
publicScope := serverFileScope(logical)
transportGeneration := t.TransportGeneration()
return sendTransferResumePreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func sendTransferResumePrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferResumeRequest) (TransferResumeResponse, error) {
transferControlPrepareResume(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferResumeSignalKey, req)
if err != nil {
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferResumeResponse{}, err)
return TransferResumeResponse{}, err
}
resp, err := decodeTransferResumeResponse(msg)
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func sendTransferResumePreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferResumeRequest) (TransferResumeResponse, error) {
transferControlPrepareResume(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferResumeSignalKey, req)
if err != nil {
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferResumeResponse{}, err)
return TransferResumeResponse{}, err
}
resp, err := decodeTransferResumeResponse(msg)
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func SendTransferCommitServer(ctx context.Context, s Server, c *ClientConn, req TransferCommitRequest) (TransferCommitResponse, error) {
return SendTransferCommitLogical(ctx, s, logicalConnFromClient(c), req)
}
func SendTransferCommitLogical(ctx context.Context, s Server, c *LogicalConn, req TransferCommitRequest) (TransferCommitResponse, error) {
if s == nil {
return TransferCommitResponse{}, errTransferControlServerNil
}
if c == nil {
return TransferCommitResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScope(c)
publicScope := serverFileScope(c)
transportGeneration := c.transportGenerationSnapshot()
return sendTransferCommitPrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func SendTransferCommitTransport(ctx context.Context, s Server, t *TransportConn, req TransferCommitRequest) (TransferCommitResponse, error) {
if s == nil {
return TransferCommitResponse{}, errTransferControlServerNil
}
if t == nil {
return TransferCommitResponse{}, errTransferControlTransportNil
}
logical := t.LogicalConn()
if logical == nil {
return TransferCommitResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScopeForTransport(t)
publicScope := serverFileScope(logical)
transportGeneration := t.TransportGeneration()
return sendTransferCommitPreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func sendTransferCommitPrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferCommitRequest) (TransferCommitResponse, error) {
transferControlPrepareCommit(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferCommitSignalKey, req)
if err != nil {
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferCommitResponse{}, err)
return TransferCommitResponse{}, err
}
resp, err := decodeTransferCommitResponse(msg)
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func sendTransferCommitPreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferCommitRequest) (TransferCommitResponse, error) {
transferControlPrepareCommit(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferCommitSignalKey, req)
if err != nil {
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferCommitResponse{}, err)
return TransferCommitResponse{}, err
}
resp, err := decodeTransferCommitResponse(msg)
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
return resp, err
}
func SendTransferAbortServer(ctx context.Context, s Server, c *ClientConn, req TransferAbortRequest) (TransferAbortResponse, error) {
return SendTransferAbortLogical(ctx, s, logicalConnFromClient(c), req)
}
func SendTransferAbortLogical(ctx context.Context, s Server, c *LogicalConn, req TransferAbortRequest) (TransferAbortResponse, error) {
if s == nil {
return TransferAbortResponse{}, errTransferControlServerNil
}
if c == nil {
return TransferAbortResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScope(c)
publicScope := serverFileScope(c)
transportGeneration := c.transportGenerationSnapshot()
return sendTransferAbortPrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func SendTransferAbortTransport(ctx context.Context, s Server, t *TransportConn, req TransferAbortRequest) (TransferAbortResponse, error) {
if s == nil {
return TransferAbortResponse{}, errTransferControlServerNil
}
if t == nil {
return TransferAbortResponse{}, errTransferControlTransportNil
}
logical := t.LogicalConn()
if logical == nil {
return TransferAbortResponse{}, errTransferControlLogicalConnNil
}
runtime := transferControlRuntimeFromServer(s)
runtimeScope := serverTransportScopeForTransport(t)
publicScope := serverFileScope(logical)
transportGeneration := t.TransportGeneration()
return sendTransferAbortPreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
}
func sendTransferAbortPrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferAbortRequest) (TransferAbortResponse, error) {
transferControlPrepareAbort(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferAbortSignalKey, req)
if err != nil {
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, TransferAbortResponse{}, err)
return TransferAbortResponse{}, err
}
resp, err := decodeTransferAbortResponse(msg)
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, resp, err)
return resp, err
}
func sendTransferAbortPreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferAbortRequest) (TransferAbortResponse, error) {
transferControlPrepareAbort(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferAbortSignalKey, req)
if err != nil {
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, TransferAbortResponse{}, err)
return TransferAbortResponse{}, err
}
resp, err := decodeTransferAbortResponse(msg)
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, resp, err)
return resp, err
}
func (h TransferControlHandler) empty() bool {
return h.Begin == nil && h.Resume == nil && h.Commit == nil && h.Abort == nil
}
func bindTransferControlLinks(setLink func(string, func(*Message)), runtime *transferRuntime, runtimeScopeFn func(*Message) string, publicScopeFn func(*Message) string, transportGenerationFn func(*Message) uint64, handler TransferControlHandler) {
if handler.Begin != nil {
setLink(TransferBeginSignalKey, func(msg *Message) {
var req TransferBeginRequest
resp := TransferBeginResponse{}
if err := msg.Value.Orm(&req); err != nil {
resp.Error = err.Error()
_ = msg.ReplyObj(resp)
return
}
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
publicScope := transferControlMessageScope(publicScopeFn, msg)
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
transferControlPrepareBegin(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
resp, err := handler.Begin(msg, req)
if resp.TransferID == "" {
resp.TransferID = req.TransferID
}
if err != nil && resp.Error == "" {
resp.Error = err.Error()
}
transferControlFinishBegin(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, resp, transferControlResultError("begin", resp.Accepted, resp.Error, err))
_ = msg.ReplyObj(resp)
})
}
if handler.Resume != nil {
setLink(TransferResumeSignalKey, func(msg *Message) {
var req TransferResumeRequest
resp := TransferResumeResponse{}
if err := msg.Value.Orm(&req); err != nil {
resp.Error = err.Error()
_ = msg.ReplyObj(resp)
return
}
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
publicScope := transferControlMessageScope(publicScopeFn, msg)
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
transferControlPrepareResume(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
resp, err := handler.Resume(msg, req)
if resp.TransferID == "" {
resp.TransferID = req.TransferID
}
if err != nil && resp.Error == "" {
resp.Error = err.Error()
}
transferControlFinishResume(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, resp, transferControlResultError("resume", resp.Accepted, resp.Error, err))
_ = msg.ReplyObj(resp)
})
}
if handler.Commit != nil {
setLink(TransferCommitSignalKey, func(msg *Message) {
var req TransferCommitRequest
resp := TransferCommitResponse{}
if err := msg.Value.Orm(&req); err != nil {
resp.Error = err.Error()
_ = msg.ReplyObj(resp)
return
}
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
publicScope := transferControlMessageScope(publicScopeFn, msg)
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
transferControlPrepareCommit(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
resp, err := handler.Commit(msg, req)
if resp.TransferID == "" {
resp.TransferID = req.TransferID
}
if err != nil && resp.Error == "" {
resp.Error = err.Error()
}
transferControlFinishCommit(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, resp, transferControlResultError("commit", resp.Accepted, resp.Error, err))
_ = msg.ReplyObj(resp)
})
}
if handler.Abort != nil {
setLink(TransferAbortSignalKey, func(msg *Message) {
var req TransferAbortRequest
resp := TransferAbortResponse{}
if err := msg.Value.Orm(&req); err != nil {
resp.Error = err.Error()
_ = msg.ReplyObj(resp)
return
}
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
publicScope := transferControlMessageScope(publicScopeFn, msg)
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
transferControlPrepareAbort(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
resp, err := handler.Abort(msg, req)
if resp.TransferID == "" {
resp.TransferID = req.TransferID
}
if err != nil && resp.Error == "" {
resp.Error = err.Error()
}
transferControlFinishAbort(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, req, resp, transferControlResultError("abort", resp.Accepted, resp.Error, err))
_ = msg.ReplyObj(resp)
})
}
}
func transferControlMessageScope(scopeFn func(*Message) string, msg *Message) string {
if scopeFn == nil {
return defaultFileScope
}
return normalizeFileScope(scopeFn(msg))
}
func transferControlMessageGeneration(generationFn func(*Message) uint64, msg *Message) uint64 {
if generationFn == nil {
return 0
}
return generationFn(msg)
}
func sendTransferControlClient(ctx context.Context, c Client, key string, req interface{}) (Message, error) {
if c == nil {
return Message{}, errTransferControlClientNil
}
return c.SendObjCtx(ctx, key, req)
}
func sendTransferControlServer(ctx context.Context, s Server, c *ClientConn, key string, req interface{}) (Message, error) {
return sendTransferControlServerLogical(ctx, s, logicalConnFromClient(c), key, req)
}
func sendTransferControlServerLogical(ctx context.Context, s Server, c *LogicalConn, key string, req interface{}) (Message, error) {
if s == nil {
return Message{}, errTransferControlServerNil
}
if c == nil {
return Message{}, errTransferControlLogicalConnNil
}
return s.SendObjCtxLogical(ctx, c, key, req)
}
func sendTransferControlServerTransport(ctx context.Context, s Server, t *TransportConn, key string, req interface{}) (Message, error) {
if s == nil {
return Message{}, errTransferControlServerNil
}
if t == nil {
return Message{}, errTransferControlTransportNil
}
return s.SendObjCtxTransport(ctx, t, key, req)
}
func decodeTransferBeginResponse(msg Message) (TransferBeginResponse, error) {
var resp TransferBeginResponse
if err := msg.Value.Orm(&resp); err != nil {
return TransferBeginResponse{}, err
}
return resp, transferControlResultError("begin", resp.Accepted, resp.Error, nil)
}
func decodeTransferResumeResponse(msg Message) (TransferResumeResponse, error) {
var resp TransferResumeResponse
if err := msg.Value.Orm(&resp); err != nil {
return TransferResumeResponse{}, err
}
return resp, transferControlResultError("resume", resp.Accepted, resp.Error, nil)
}
func decodeTransferCommitResponse(msg Message) (TransferCommitResponse, error) {
var resp TransferCommitResponse
if err := msg.Value.Orm(&resp); err != nil {
return TransferCommitResponse{}, err
}
return resp, transferControlResultError("commit", resp.Accepted, resp.Error, nil)
}
func decodeTransferAbortResponse(msg Message) (TransferAbortResponse, error) {
var resp TransferAbortResponse
if err := msg.Value.Orm(&resp); err != nil {
return TransferAbortResponse{}, err
}
return resp, transferControlResultError("abort", resp.Accepted, resp.Error, nil)
}
func transferControlResultError(op string, accepted bool, message string, callErr error) error {
if callErr != nil {
return callErr
}
if message != "" {
return errors.New(message)
}
if accepted {
return nil
}
return errors.New("transfer " + op + " rejected")
}