633 lines
26 KiB
Go
633 lines
26 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
TransferBeginSignalKey = "notify.transfer.begin"
|
||
|
|
TransferResumeSignalKey = "notify.transfer.resume"
|
||
|
|
TransferCommitSignalKey = "notify.transfer.commit"
|
||
|
|
TransferAbortSignalKey = "notify.transfer.abort"
|
||
|
|
)
|
||
|
|
|
||
|
|
type TransferRange struct {
|
||
|
|
Offset int64
|
||
|
|
Length int64
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferBeginRequest struct {
|
||
|
|
TransferID string
|
||
|
|
Channel TransferChannel
|
||
|
|
Size int64
|
||
|
|
Checksum string
|
||
|
|
Metadata map[string]string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferBeginResponse struct {
|
||
|
|
TransferID string
|
||
|
|
Accepted bool
|
||
|
|
NextOffset int64
|
||
|
|
Missing []TransferRange
|
||
|
|
Error string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferResumeRequest struct {
|
||
|
|
TransferID string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferResumeResponse struct {
|
||
|
|
TransferID string
|
||
|
|
Accepted bool
|
||
|
|
NextOffset int64
|
||
|
|
Missing []TransferRange
|
||
|
|
Error string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferCommitRequest struct {
|
||
|
|
TransferID string
|
||
|
|
Size int64
|
||
|
|
Checksum string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferCommitResponse struct {
|
||
|
|
TransferID string
|
||
|
|
Accepted bool
|
||
|
|
Error string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferAbortRequest struct {
|
||
|
|
TransferID string
|
||
|
|
Stage string
|
||
|
|
Offset int64
|
||
|
|
Error string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferAbortResponse struct {
|
||
|
|
TransferID string
|
||
|
|
Accepted bool
|
||
|
|
Error string
|
||
|
|
}
|
||
|
|
|
||
|
|
type TransferControlHandler struct {
|
||
|
|
Begin func(*Message, TransferBeginRequest) (TransferBeginResponse, error)
|
||
|
|
Resume func(*Message, TransferResumeRequest) (TransferResumeResponse, error)
|
||
|
|
Commit func(*Message, TransferCommitRequest) (TransferCommitResponse, error)
|
||
|
|
Abort func(*Message, TransferAbortRequest) (TransferAbortResponse, error)
|
||
|
|
}
|
||
|
|
|
||
|
|
var (
|
||
|
|
errTransferControlClientNil = errors.New("transfer control client is nil")
|
||
|
|
errTransferControlServerNil = errors.New("transfer control server is nil")
|
||
|
|
errTransferControlClientConnNil = errors.New("transfer control client connection is nil")
|
||
|
|
errTransferControlLogicalConnNil = errors.New("transfer control logical connection is nil")
|
||
|
|
errTransferControlTransportNil = errors.New("transfer control transport connection is nil")
|
||
|
|
errTransferControlHandlerEmpty = errors.New("transfer control handler is empty")
|
||
|
|
)
|
||
|
|
|
||
|
|
func BindTransferControlClient(c Client, handler TransferControlHandler) error {
|
||
|
|
if c == nil {
|
||
|
|
return errTransferControlClientNil
|
||
|
|
}
|
||
|
|
if handler.empty() {
|
||
|
|
return errTransferControlHandlerEmpty
|
||
|
|
}
|
||
|
|
bindTransferControlLinks(c.SetLink, transferControlRuntimeFromClient(c), func(*Message) string {
|
||
|
|
return clientFileScope()
|
||
|
|
}, func(*Message) string {
|
||
|
|
return clientFileScope()
|
||
|
|
}, func(*Message) uint64 {
|
||
|
|
return 0
|
||
|
|
}, handler)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func BindTransferControlServer(s Server, handler TransferControlHandler) error {
|
||
|
|
if s == nil {
|
||
|
|
return errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if handler.empty() {
|
||
|
|
return errTransferControlHandlerEmpty
|
||
|
|
}
|
||
|
|
bindTransferControlLinks(s.SetLink, transferControlRuntimeFromServer(s), func(msg *Message) string {
|
||
|
|
if transport := messageTransportConnSnapshot(msg); transport != nil {
|
||
|
|
return serverTransportScopeForTransport(transport)
|
||
|
|
}
|
||
|
|
if logical := messageLogicalConnSnapshot(msg); logical != nil {
|
||
|
|
return serverTransportScope(logical)
|
||
|
|
}
|
||
|
|
return serverFileDomain + ":unknown"
|
||
|
|
}, func(msg *Message) string {
|
||
|
|
if logical := messageLogicalConnSnapshot(msg); logical != nil {
|
||
|
|
return serverFileScope(logical)
|
||
|
|
}
|
||
|
|
return serverFileDomain + ":unknown"
|
||
|
|
}, func(msg *Message) uint64 {
|
||
|
|
if transport := messageTransportConnSnapshot(msg); transport != nil {
|
||
|
|
return transport.TransportGeneration()
|
||
|
|
}
|
||
|
|
if logical := messageLogicalConnSnapshot(msg); logical != nil {
|
||
|
|
return logical.transportGenerationSnapshot()
|
||
|
|
}
|
||
|
|
if msg == nil {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
return 0
|
||
|
|
}, handler)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferBeginClient(ctx context.Context, c Client, req TransferBeginRequest) (TransferBeginResponse, error) {
|
||
|
|
runtime := transferControlRuntimeFromClient(c)
|
||
|
|
runtimeScope := clientFileScope()
|
||
|
|
publicScope := clientFileScope()
|
||
|
|
transferControlPrepareBegin(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
|
||
|
|
msg, err := sendTransferControlClient(ctx, c, TransferBeginSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferBeginResponse{}, err)
|
||
|
|
return TransferBeginResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferBeginResponse(msg)
|
||
|
|
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferResumeClient(ctx context.Context, c Client, req TransferResumeRequest) (TransferResumeResponse, error) {
|
||
|
|
runtime := transferControlRuntimeFromClient(c)
|
||
|
|
runtimeScope := clientFileScope()
|
||
|
|
publicScope := clientFileScope()
|
||
|
|
transferControlPrepareResume(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
|
||
|
|
msg, err := sendTransferControlClient(ctx, c, TransferResumeSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferResumeResponse{}, err)
|
||
|
|
return TransferResumeResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferResumeResponse(msg)
|
||
|
|
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferCommitClient(ctx context.Context, c Client, req TransferCommitRequest) (TransferCommitResponse, error) {
|
||
|
|
runtime := transferControlRuntimeFromClient(c)
|
||
|
|
runtimeScope := clientFileScope()
|
||
|
|
publicScope := clientFileScope()
|
||
|
|
transferControlPrepareCommit(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
|
||
|
|
msg, err := sendTransferControlClient(ctx, c, TransferCommitSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferCommitResponse{}, err)
|
||
|
|
return TransferCommitResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferCommitResponse(msg)
|
||
|
|
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferAbortClient(ctx context.Context, c Client, req TransferAbortRequest) (TransferAbortResponse, error) {
|
||
|
|
runtime := transferControlRuntimeFromClient(c)
|
||
|
|
runtimeScope := clientFileScope()
|
||
|
|
publicScope := clientFileScope()
|
||
|
|
transferControlPrepareAbort(runtime, fileTransferDirectionSend, runtimeScope, publicScope, 0, req)
|
||
|
|
msg, err := sendTransferControlClient(ctx, c, TransferAbortSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, TransferAbortResponse{}, err)
|
||
|
|
return TransferAbortResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferAbortResponse(msg)
|
||
|
|
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferBeginServer(ctx context.Context, s Server, c *ClientConn, req TransferBeginRequest) (TransferBeginResponse, error) {
|
||
|
|
return SendTransferBeginLogical(ctx, s, logicalConnFromClient(c), req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferBeginLogical(ctx context.Context, s Server, c *LogicalConn, req TransferBeginRequest) (TransferBeginResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferBeginResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if c == nil {
|
||
|
|
return TransferBeginResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScope(c)
|
||
|
|
publicScope := serverFileScope(c)
|
||
|
|
transportGeneration := c.transportGenerationSnapshot()
|
||
|
|
return sendTransferBeginPrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferBeginTransport(ctx context.Context, s Server, t *TransportConn, req TransferBeginRequest) (TransferBeginResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferBeginResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if t == nil {
|
||
|
|
return TransferBeginResponse{}, errTransferControlTransportNil
|
||
|
|
}
|
||
|
|
logical := t.LogicalConn()
|
||
|
|
if logical == nil {
|
||
|
|
return TransferBeginResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScopeForTransport(t)
|
||
|
|
publicScope := serverFileScope(logical)
|
||
|
|
transportGeneration := t.TransportGeneration()
|
||
|
|
return sendTransferBeginPreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferBeginPrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferBeginRequest) (TransferBeginResponse, error) {
|
||
|
|
transferControlPrepareBegin(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferBeginSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferBeginResponse{}, err)
|
||
|
|
return TransferBeginResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferBeginResponse(msg)
|
||
|
|
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferBeginPreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferBeginRequest) (TransferBeginResponse, error) {
|
||
|
|
transferControlPrepareBegin(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferBeginSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferBeginResponse{}, err)
|
||
|
|
return TransferBeginResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferBeginResponse(msg)
|
||
|
|
transferControlFinishBegin(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferResumeServer(ctx context.Context, s Server, c *ClientConn, req TransferResumeRequest) (TransferResumeResponse, error) {
|
||
|
|
return SendTransferResumeLogical(ctx, s, logicalConnFromClient(c), req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferResumeLogical(ctx context.Context, s Server, c *LogicalConn, req TransferResumeRequest) (TransferResumeResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferResumeResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if c == nil {
|
||
|
|
return TransferResumeResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScope(c)
|
||
|
|
publicScope := serverFileScope(c)
|
||
|
|
transportGeneration := c.transportGenerationSnapshot()
|
||
|
|
return sendTransferResumePrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferResumeTransport(ctx context.Context, s Server, t *TransportConn, req TransferResumeRequest) (TransferResumeResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferResumeResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if t == nil {
|
||
|
|
return TransferResumeResponse{}, errTransferControlTransportNil
|
||
|
|
}
|
||
|
|
logical := t.LogicalConn()
|
||
|
|
if logical == nil {
|
||
|
|
return TransferResumeResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScopeForTransport(t)
|
||
|
|
publicScope := serverFileScope(logical)
|
||
|
|
transportGeneration := t.TransportGeneration()
|
||
|
|
return sendTransferResumePreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferResumePrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferResumeRequest) (TransferResumeResponse, error) {
|
||
|
|
transferControlPrepareResume(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferResumeSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferResumeResponse{}, err)
|
||
|
|
return TransferResumeResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferResumeResponse(msg)
|
||
|
|
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferResumePreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferResumeRequest) (TransferResumeResponse, error) {
|
||
|
|
transferControlPrepareResume(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferResumeSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferResumeResponse{}, err)
|
||
|
|
return TransferResumeResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferResumeResponse(msg)
|
||
|
|
transferControlFinishResume(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferCommitServer(ctx context.Context, s Server, c *ClientConn, req TransferCommitRequest) (TransferCommitResponse, error) {
|
||
|
|
return SendTransferCommitLogical(ctx, s, logicalConnFromClient(c), req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferCommitLogical(ctx context.Context, s Server, c *LogicalConn, req TransferCommitRequest) (TransferCommitResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferCommitResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if c == nil {
|
||
|
|
return TransferCommitResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScope(c)
|
||
|
|
publicScope := serverFileScope(c)
|
||
|
|
transportGeneration := c.transportGenerationSnapshot()
|
||
|
|
return sendTransferCommitPrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferCommitTransport(ctx context.Context, s Server, t *TransportConn, req TransferCommitRequest) (TransferCommitResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferCommitResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if t == nil {
|
||
|
|
return TransferCommitResponse{}, errTransferControlTransportNil
|
||
|
|
}
|
||
|
|
logical := t.LogicalConn()
|
||
|
|
if logical == nil {
|
||
|
|
return TransferCommitResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScopeForTransport(t)
|
||
|
|
publicScope := serverFileScope(logical)
|
||
|
|
transportGeneration := t.TransportGeneration()
|
||
|
|
return sendTransferCommitPreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferCommitPrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferCommitRequest) (TransferCommitResponse, error) {
|
||
|
|
transferControlPrepareCommit(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferCommitSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferCommitResponse{}, err)
|
||
|
|
return TransferCommitResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferCommitResponse(msg)
|
||
|
|
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferCommitPreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferCommitRequest) (TransferCommitResponse, error) {
|
||
|
|
transferControlPrepareCommit(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferCommitSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, TransferCommitResponse{}, err)
|
||
|
|
return TransferCommitResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferCommitResponse(msg)
|
||
|
|
transferControlFinishCommit(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferAbortServer(ctx context.Context, s Server, c *ClientConn, req TransferAbortRequest) (TransferAbortResponse, error) {
|
||
|
|
return SendTransferAbortLogical(ctx, s, logicalConnFromClient(c), req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferAbortLogical(ctx context.Context, s Server, c *LogicalConn, req TransferAbortRequest) (TransferAbortResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferAbortResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if c == nil {
|
||
|
|
return TransferAbortResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScope(c)
|
||
|
|
publicScope := serverFileScope(c)
|
||
|
|
transportGeneration := c.transportGenerationSnapshot()
|
||
|
|
return sendTransferAbortPrepared(ctx, s, c, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func SendTransferAbortTransport(ctx context.Context, s Server, t *TransportConn, req TransferAbortRequest) (TransferAbortResponse, error) {
|
||
|
|
if s == nil {
|
||
|
|
return TransferAbortResponse{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if t == nil {
|
||
|
|
return TransferAbortResponse{}, errTransferControlTransportNil
|
||
|
|
}
|
||
|
|
logical := t.LogicalConn()
|
||
|
|
if logical == nil {
|
||
|
|
return TransferAbortResponse{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
runtime := transferControlRuntimeFromServer(s)
|
||
|
|
runtimeScope := serverTransportScopeForTransport(t)
|
||
|
|
publicScope := serverFileScope(logical)
|
||
|
|
transportGeneration := t.TransportGeneration()
|
||
|
|
return sendTransferAbortPreparedTransport(ctx, s, t, runtime, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferAbortPrepared(ctx context.Context, s Server, c *LogicalConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferAbortRequest) (TransferAbortResponse, error) {
|
||
|
|
transferControlPrepareAbort(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerLogical(ctx, s, c, TransferAbortSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, TransferAbortResponse{}, err)
|
||
|
|
return TransferAbortResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferAbortResponse(msg)
|
||
|
|
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferAbortPreparedTransport(ctx context.Context, s Server, t *TransportConn, runtime *transferRuntime, runtimeScope string, publicScope string, transportGeneration uint64, req TransferAbortRequest) (TransferAbortResponse, error) {
|
||
|
|
transferControlPrepareAbort(runtime, fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
msg, err := sendTransferControlServerTransport(ctx, s, t, TransferAbortSignalKey, req)
|
||
|
|
if err != nil {
|
||
|
|
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, TransferAbortResponse{}, err)
|
||
|
|
return TransferAbortResponse{}, err
|
||
|
|
}
|
||
|
|
resp, err := decodeTransferAbortResponse(msg)
|
||
|
|
transferControlFinishAbort(runtime, fileTransferDirectionSend, runtimeScope, req.TransferID, req, resp, err)
|
||
|
|
return resp, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (h TransferControlHandler) empty() bool {
|
||
|
|
return h.Begin == nil && h.Resume == nil && h.Commit == nil && h.Abort == nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func bindTransferControlLinks(setLink func(string, func(*Message)), runtime *transferRuntime, runtimeScopeFn func(*Message) string, publicScopeFn func(*Message) string, transportGenerationFn func(*Message) uint64, handler TransferControlHandler) {
|
||
|
|
if handler.Begin != nil {
|
||
|
|
setLink(TransferBeginSignalKey, func(msg *Message) {
|
||
|
|
var req TransferBeginRequest
|
||
|
|
resp := TransferBeginResponse{}
|
||
|
|
if err := msg.Value.Orm(&req); err != nil {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
|
||
|
|
publicScope := transferControlMessageScope(publicScopeFn, msg)
|
||
|
|
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
|
||
|
|
transferControlPrepareBegin(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
resp, err := handler.Begin(msg, req)
|
||
|
|
if resp.TransferID == "" {
|
||
|
|
resp.TransferID = req.TransferID
|
||
|
|
}
|
||
|
|
if err != nil && resp.Error == "" {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
}
|
||
|
|
transferControlFinishBegin(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, resp, transferControlResultError("begin", resp.Accepted, resp.Error, err))
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
if handler.Resume != nil {
|
||
|
|
setLink(TransferResumeSignalKey, func(msg *Message) {
|
||
|
|
var req TransferResumeRequest
|
||
|
|
resp := TransferResumeResponse{}
|
||
|
|
if err := msg.Value.Orm(&req); err != nil {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
|
||
|
|
publicScope := transferControlMessageScope(publicScopeFn, msg)
|
||
|
|
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
|
||
|
|
transferControlPrepareResume(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
resp, err := handler.Resume(msg, req)
|
||
|
|
if resp.TransferID == "" {
|
||
|
|
resp.TransferID = req.TransferID
|
||
|
|
}
|
||
|
|
if err != nil && resp.Error == "" {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
}
|
||
|
|
transferControlFinishResume(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, resp, transferControlResultError("resume", resp.Accepted, resp.Error, err))
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
if handler.Commit != nil {
|
||
|
|
setLink(TransferCommitSignalKey, func(msg *Message) {
|
||
|
|
var req TransferCommitRequest
|
||
|
|
resp := TransferCommitResponse{}
|
||
|
|
if err := msg.Value.Orm(&req); err != nil {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
|
||
|
|
publicScope := transferControlMessageScope(publicScopeFn, msg)
|
||
|
|
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
|
||
|
|
transferControlPrepareCommit(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
resp, err := handler.Commit(msg, req)
|
||
|
|
if resp.TransferID == "" {
|
||
|
|
resp.TransferID = req.TransferID
|
||
|
|
}
|
||
|
|
if err != nil && resp.Error == "" {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
}
|
||
|
|
transferControlFinishCommit(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, resp, transferControlResultError("commit", resp.Accepted, resp.Error, err))
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
if handler.Abort != nil {
|
||
|
|
setLink(TransferAbortSignalKey, func(msg *Message) {
|
||
|
|
var req TransferAbortRequest
|
||
|
|
resp := TransferAbortResponse{}
|
||
|
|
if err := msg.Value.Orm(&req); err != nil {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
runtimeScope := transferControlMessageScope(runtimeScopeFn, msg)
|
||
|
|
publicScope := transferControlMessageScope(publicScopeFn, msg)
|
||
|
|
transportGeneration := transferControlMessageGeneration(transportGenerationFn, msg)
|
||
|
|
transferControlPrepareAbort(runtime, fileTransferDirectionReceive, runtimeScope, publicScope, transportGeneration, req)
|
||
|
|
resp, err := handler.Abort(msg, req)
|
||
|
|
if resp.TransferID == "" {
|
||
|
|
resp.TransferID = req.TransferID
|
||
|
|
}
|
||
|
|
if err != nil && resp.Error == "" {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
}
|
||
|
|
transferControlFinishAbort(runtime, fileTransferDirectionReceive, runtimeScope, req.TransferID, req, resp, transferControlResultError("abort", resp.Accepted, resp.Error, err))
|
||
|
|
_ = msg.ReplyObj(resp)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func transferControlMessageScope(scopeFn func(*Message) string, msg *Message) string {
|
||
|
|
if scopeFn == nil {
|
||
|
|
return defaultFileScope
|
||
|
|
}
|
||
|
|
return normalizeFileScope(scopeFn(msg))
|
||
|
|
}
|
||
|
|
|
||
|
|
func transferControlMessageGeneration(generationFn func(*Message) uint64, msg *Message) uint64 {
|
||
|
|
if generationFn == nil {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
return generationFn(msg)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferControlClient(ctx context.Context, c Client, key string, req interface{}) (Message, error) {
|
||
|
|
if c == nil {
|
||
|
|
return Message{}, errTransferControlClientNil
|
||
|
|
}
|
||
|
|
return c.SendObjCtx(ctx, key, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferControlServer(ctx context.Context, s Server, c *ClientConn, key string, req interface{}) (Message, error) {
|
||
|
|
return sendTransferControlServerLogical(ctx, s, logicalConnFromClient(c), key, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferControlServerLogical(ctx context.Context, s Server, c *LogicalConn, key string, req interface{}) (Message, error) {
|
||
|
|
if s == nil {
|
||
|
|
return Message{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if c == nil {
|
||
|
|
return Message{}, errTransferControlLogicalConnNil
|
||
|
|
}
|
||
|
|
return s.SendObjCtxLogical(ctx, c, key, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sendTransferControlServerTransport(ctx context.Context, s Server, t *TransportConn, key string, req interface{}) (Message, error) {
|
||
|
|
if s == nil {
|
||
|
|
return Message{}, errTransferControlServerNil
|
||
|
|
}
|
||
|
|
if t == nil {
|
||
|
|
return Message{}, errTransferControlTransportNil
|
||
|
|
}
|
||
|
|
return s.SendObjCtxTransport(ctx, t, key, req)
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeTransferBeginResponse(msg Message) (TransferBeginResponse, error) {
|
||
|
|
var resp TransferBeginResponse
|
||
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
||
|
|
return TransferBeginResponse{}, err
|
||
|
|
}
|
||
|
|
return resp, transferControlResultError("begin", resp.Accepted, resp.Error, nil)
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeTransferResumeResponse(msg Message) (TransferResumeResponse, error) {
|
||
|
|
var resp TransferResumeResponse
|
||
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
||
|
|
return TransferResumeResponse{}, err
|
||
|
|
}
|
||
|
|
return resp, transferControlResultError("resume", resp.Accepted, resp.Error, nil)
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeTransferCommitResponse(msg Message) (TransferCommitResponse, error) {
|
||
|
|
var resp TransferCommitResponse
|
||
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
||
|
|
return TransferCommitResponse{}, err
|
||
|
|
}
|
||
|
|
return resp, transferControlResultError("commit", resp.Accepted, resp.Error, nil)
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeTransferAbortResponse(msg Message) (TransferAbortResponse, error) {
|
||
|
|
var resp TransferAbortResponse
|
||
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
||
|
|
return TransferAbortResponse{}, err
|
||
|
|
}
|
||
|
|
return resp, transferControlResultError("abort", resp.Accepted, resp.Error, nil)
|
||
|
|
}
|
||
|
|
|
||
|
|
func transferControlResultError(op string, accepted bool, message string, callErr error) error {
|
||
|
|
if callErr != nil {
|
||
|
|
return callErr
|
||
|
|
}
|
||
|
|
if message != "" {
|
||
|
|
return errors.New(message)
|
||
|
|
}
|
||
|
|
if accepted {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return errors.New("transfer " + op + " rejected")
|
||
|
|
}
|