package notify import ( "context" "errors" "time" ) type BulkOpenRequest struct { BulkID string DataID uint64 FastPathVersion uint8 Range BulkRange Metadata BulkMetadata ReadTimeout time.Duration WriteTimeout time.Duration Dedicated bool DedicatedLaneID uint32 AttachToken string ChunkSize int WindowBytes int MaxInFlight int } type BulkOpenResponse struct { BulkID string DataID uint64 FastPathVersion uint8 Accepted bool Dedicated bool AttachToken string TransportGeneration uint64 Error string } type BulkCloseRequest struct { BulkID string Full bool } type BulkCloseResponse struct { BulkID string Accepted bool Error string } type BulkResetRequest struct { BulkID string DataID uint64 Error string } type BulkResetResponse struct { BulkID string Accepted bool Error string } type BulkReadyRequest struct { BulkID string DataID uint64 Error string } type BulkReadyResponse struct { BulkID string Accepted bool Error string } type BulkReleaseRequest struct { BulkID string DataID uint64 Bytes int64 Chunks int } func bindClientBulkControl(c *ClientCommon) { if c == nil { return } c.SetLink(BulkOpenSignalKey, func(msg *Message) { c.handleInboundBulkOpen(msg) }) c.SetLink(BulkCloseSignalKey, func(msg *Message) { c.handleInboundBulkClose(msg) }) c.SetLink(BulkResetSignalKey, func(msg *Message) { c.handleInboundBulkReset(msg) }) c.SetLink(BulkReadySignalKey, func(msg *Message) { c.handleInboundBulkReady(msg) }) c.SetLink(BulkReleaseSignalKey, func(msg *Message) { c.handleInboundBulkRelease(msg) }) } func bindServerBulkControl(s *ServerCommon) { if s == nil { return } s.SetLink(BulkOpenSignalKey, func(msg *Message) { s.handleInboundBulkOpen(msg) }) s.SetLink(BulkCloseSignalKey, func(msg *Message) { s.handleInboundBulkClose(msg) }) s.SetLink(BulkResetSignalKey, func(msg *Message) { s.handleInboundBulkReset(msg) }) s.SetLink(BulkReadySignalKey, func(msg *Message) { s.handleInboundBulkReady(msg) }) s.SetLink(BulkReleaseSignalKey, func(msg *Message) { s.handleInboundBulkRelease(msg) }) } func bulkAcceptInfoFromClientBulk(bulk *bulkHandle) BulkAcceptInfo { if bulk == nil { return BulkAcceptInfo{} } return BulkAcceptInfo{ ID: bulk.ID(), Range: bulk.Range(), Metadata: bulk.Metadata(), Dedicated: bulk.Dedicated(), TransportGeneration: bulk.TransportGeneration(), Bulk: bulk, } } func bulkAcceptInfoFromServerBulk(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) BulkAcceptInfo { if bulk == nil { return BulkAcceptInfo{} } if logical == nil { logical = bulk.LogicalConn() } if transport == nil { transport = bulk.TransportConn() } return BulkAcceptInfo{ ID: bulk.ID(), Range: bulk.Range(), Metadata: bulk.Metadata(), Dedicated: bulk.Dedicated(), LogicalConn: logical, TransportConn: transport, TransportGeneration: bulk.TransportGeneration(), Bulk: bulk, } } func dispatchBulkAccept(handler func(BulkAcceptInfo) error, bulk *bulkHandle, info BulkAcceptInfo) error { if bulk == nil { return errBulkNotFound } if !bulk.markAcceptDispatched() { return nil } var dispatchErr error defer func() { bulk.finishAcceptDispatch(dispatchErr) }() if handler == nil { dispatchErr = errBulkHandlerNotConfigured bulk.markReset(dispatchErr) return dispatchErr } if err := handler(info); err != nil { dispatchErr = err bulk.markReset(dispatchErr) return dispatchErr } return nil } func (c *ClientCommon) clientBulkAcceptReadyNotifier(bulk *bulkHandle) func(error) { return func(readyErr error) { if c == nil || bulk == nil { return } req := BulkReadyRequest{ BulkID: bulk.ID(), DataID: bulk.dataIDSnapshot(), } if readyErr != nil { req.Error = readyErr.Error() } ctx, cancel := context.WithTimeout(context.Background(), defaultBulkAcceptReadyTimeout) defer cancel() if _, err := sendBulkReadyClient(ctx, c, req); err != nil && bulk.Context().Err() == nil { bulk.markReset(err) } } } func sendBulkReadyServer(ctx context.Context, s *ServerCommon, logical *LogicalConn, transport *TransportConn, req BulkReadyRequest) error { if s == nil { return errBulkServerNil } if transport != nil { if _, err := sendBulkReadyServerTransport(ctx, s, transport, req); err == nil { return nil } else if !errors.Is(err, errTransportDetached) && !errors.Is(err, errBulkTransportNil) { return err } } if logical == nil { return errBulkLogicalConnNil } _, err := sendBulkReadyServerLogical(ctx, s, logical, req) return err } func (s *ServerCommon) serverBulkAcceptReadyNotifier(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) func(error) { return func(readyErr error) { if s == nil || bulk == nil { return } req := BulkReadyRequest{ BulkID: bulk.ID(), DataID: bulk.dataIDSnapshot(), } if readyErr != nil { req.Error = readyErr.Error() } ctx, cancel := context.WithTimeout(context.Background(), defaultBulkAcceptReadyTimeout) defer cancel() if err := sendBulkReadyServer(ctx, s, logical, transport, req); err != nil && bulk.Context().Err() == nil { bulk.markReset(err) } } } func (c *ClientCommon) startClientBulkAcceptDispatch(bulk *bulkHandle) { if c == nil || bulk == nil { return } bulk.setAcceptNotify(c.clientBulkAcceptReadyNotifier(bulk)) go func() { _ = c.dispatchClientBulkAccept(bulk) }() } func (s *ServerCommon) startServerBulkAcceptDispatch(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) { if s == nil || bulk == nil { return } if logical == nil { logical = bulk.LogicalConn() } if transport == nil { transport = bulk.TransportConn() } bulk.setAcceptNotify(s.serverBulkAcceptReadyNotifier(bulk, logical, transport)) go func() { _ = s.dispatchServerBulkAccept(bulk, logical, transport) }() } func (c *ClientCommon) dispatchClientBulkAccept(bulk *bulkHandle) error { if c == nil { return errBulkClientNil } runtime := c.getBulkRuntime() if runtime == nil { return errBulkRuntimeNil } return dispatchBulkAccept(runtime.handlerSnapshot(), bulk, bulkAcceptInfoFromClientBulk(bulk)) } func (s *ServerCommon) dispatchServerBulkAccept(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) error { if s == nil { return errBulkServerNil } runtime := s.getBulkRuntime() if runtime == nil { return errBulkRuntimeNil } return dispatchBulkAccept(runtime.handlerSnapshot(), bulk, bulkAcceptInfoFromServerBulk(bulk, logical, transport)) } func (c *ClientCommon) handleInboundBulkOpen(msg *Message) { req, err := decodeBulkOpenRequest(msg) resp := BulkOpenResponse{ BulkID: req.BulkID, DataID: req.DataID, FastPathVersion: negotiateBulkFastPathVersion(req.FastPathVersion), Dedicated: req.Dedicated, } if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } if req.Dedicated { if err := clientDedicatedBulkSupportError(c); err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } } runtime := c.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } scope := clientFileScope() if req.DataID == 0 { req.DataID = runtime.nextDataID() resp.DataID = req.DataID } if req.Dedicated && req.AttachToken == "" { req.AttachToken = newBulkAttachToken() } resp.AttachToken = req.AttachToken bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, scope, req, c.currentClientSessionEpoch(), nil, nil, 0, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c)) bulk.setClientSnapshotOwner(c) if err := runtime.register(scope, bulk); err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } if req.Dedicated { if err := c.attachDedicatedBulkSidecar(context.Background(), bulk); err != nil { bulk.markReset(err) resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } resp.Accepted = true resp.DataID = bulk.dataIDSnapshot() resp.TransportGeneration = bulk.TransportGeneration() replyBulkControlIfNeeded(msg, resp) c.startClientBulkAcceptDispatch(bulk) return } if err := c.dispatchClientBulkAccept(bulk); err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } resp.Accepted = true resp.DataID = bulk.dataIDSnapshot() resp.TransportGeneration = bulk.TransportGeneration() replyBulkControlIfNeeded(msg, resp) } func (s *ServerCommon) handleInboundBulkOpen(msg *Message) { req, err := decodeBulkOpenRequest(msg) resp := BulkOpenResponse{ BulkID: req.BulkID, DataID: req.DataID, FastPathVersion: negotiateBulkFastPathVersion(req.FastPathVersion), Dedicated: req.Dedicated, } if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } runtime := s.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } logical := messageLogicalConnSnapshot(msg) if logical == nil { resp.Error = errBulkLogicalConnNil.Error() replyBulkControlIfNeeded(msg, resp) return } transport := messageTransportConnSnapshot(msg) if req.Dedicated { if err := logicalDedicatedBulkSupportError(logical); err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } if transport != nil { if err := transportDedicatedBulkSupportError(transport); err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } } } scope := serverFileScope(logical) if req.DataID == 0 { req.DataID = runtime.nextDataID() resp.DataID = req.DataID } if req.Dedicated && req.AttachToken == "" { req.AttachToken = newBulkAttachToken() } resp.AttachToken = req.AttachToken bulk := newBulkHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, bulkTransportGeneration(logical, transport), serverBulkCloseSender(s, logical, transport), serverBulkResetSender(s, logical, transport), serverBulkDataSender(s, transport), serverBulkWriteSender(s, logical, transport), serverBulkReleaseSender(s, logical, transport)) if err := runtime.register(scope, bulk); err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } s.attachServerDedicatedSidecarIfExists(logical, bulk) if runtime.handlerSnapshot() == nil { bulk.markReset(errBulkHandlerNotConfigured) resp.Error = errBulkHandlerNotConfigured.Error() replyBulkControlIfNeeded(msg, resp) return } if req.Dedicated { resp.Accepted = true resp.DataID = bulk.dataIDSnapshot() resp.TransportGeneration = bulk.TransportGeneration() replyBulkControlIfNeeded(msg, resp) if bulk.dedicatedAttachedSnapshot() { s.startServerBulkAcceptDispatch(bulk, logical, messageTransportConnSnapshot(msg)) } return } if err := s.dispatchServerBulkAccept(bulk, logical, transport); err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } resp.Accepted = true resp.DataID = bulk.dataIDSnapshot() resp.TransportGeneration = bulk.TransportGeneration() replyBulkControlIfNeeded(msg, resp) } func (c *ClientCommon) handleInboundBulkClose(msg *Message) { req, err := decodeBulkCloseRequest(msg) resp := BulkCloseResponse{BulkID: req.BulkID} if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } runtime := c.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } bulk, ok := runtime.lookup(clientFileScope(), req.BulkID) if !ok { resp.Error = errBulkNotFound.Error() replyBulkControlIfNeeded(msg, resp) return } if req.Full { bulk.markPeerClosed() } else { bulk.markRemoteClosed() } resp.Accepted = true replyBulkControlIfNeeded(msg, resp) } func (s *ServerCommon) handleInboundBulkClose(msg *Message) { req, err := decodeBulkCloseRequest(msg) resp := BulkCloseResponse{BulkID: req.BulkID} if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } runtime := s.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } logical := messageLogicalConnSnapshot(msg) scope := serverFileScope(logical) bulk, ok := runtime.lookup(scope, req.BulkID) if !ok { resp.Error = errBulkNotFound.Error() replyBulkControlIfNeeded(msg, resp) return } if req.Full { bulk.markPeerClosed() } else { bulk.markRemoteClosed() } resp.Accepted = true replyBulkControlIfNeeded(msg, resp) } func (c *ClientCommon) handleInboundBulkReset(msg *Message) { req, err := decodeBulkResetRequest(msg) resp := BulkResetResponse{BulkID: req.BulkID} if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } runtime := c.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } bulk, ok := runtime.lookup(clientFileScope(), req.BulkID) if !ok && req.DataID != 0 { bulk, ok = runtime.lookupByDataID(clientFileScope(), req.DataID) } if !ok { resp.Error = errBulkNotFound.Error() replyBulkControlIfNeeded(msg, resp) return } if resp.BulkID == "" { resp.BulkID = bulk.ID() } bulk.markReset(bulkResetError(bulkRemoteResetError(req.Error))) resp.Accepted = true replyBulkControlIfNeeded(msg, resp) } func (c *ClientCommon) handleInboundBulkRelease(msg *Message) { req, err := decodeBulkReleaseRequest(msg) if err != nil { return } runtime := c.getBulkRuntime() if runtime == nil { return } bulk, ok := runtime.lookup(clientFileScope(), req.BulkID) if !ok && req.DataID != 0 { bulk, ok = runtime.lookupByDataID(clientFileScope(), req.DataID) } if !ok { return } bulk.releaseOutboundWindow(req.Bytes, req.Chunks) } func (c *ClientCommon) handleInboundBulkReady(msg *Message) { req, err := decodeBulkReadyRequest(msg) resp := BulkReadyResponse{BulkID: req.BulkID} if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } runtime := c.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } bulk, ok := runtime.lookup(clientFileScope(), req.BulkID) if !ok && req.DataID != 0 { bulk, ok = runtime.lookupByDataID(clientFileScope(), req.DataID) } if !ok { resp.Error = errBulkNotFound.Error() replyBulkControlIfNeeded(msg, resp) return } if resp.BulkID == "" { resp.BulkID = bulk.ID() } readyErr := bulkReadyRemoteError(req.Error) bulk.markAcceptReady(readyErr) if readyErr != nil { bulk.markReset(readyErr) } resp.Accepted = true replyBulkControlIfNeeded(msg, resp) } func (s *ServerCommon) handleInboundBulkReset(msg *Message) { req, err := decodeBulkResetRequest(msg) resp := BulkResetResponse{BulkID: req.BulkID} if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } runtime := s.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } logical := messageLogicalConnSnapshot(msg) scope := serverFileScope(logical) bulk, ok := runtime.lookup(scope, req.BulkID) if !ok && req.DataID != 0 { bulk, ok = runtime.lookupByDataID(scope, req.DataID) } if !ok { resp.Error = errBulkNotFound.Error() replyBulkControlIfNeeded(msg, resp) return } if resp.BulkID == "" { resp.BulkID = bulk.ID() } bulk.markReset(bulkResetError(bulkRemoteResetError(req.Error))) resp.Accepted = true replyBulkControlIfNeeded(msg, resp) } func (s *ServerCommon) handleInboundBulkReady(msg *Message) { req, err := decodeBulkReadyRequest(msg) resp := BulkReadyResponse{BulkID: req.BulkID} if err != nil { resp.Error = err.Error() replyBulkControlIfNeeded(msg, resp) return } runtime := s.getBulkRuntime() if runtime == nil { resp.Error = errBulkRuntimeNil.Error() replyBulkControlIfNeeded(msg, resp) return } logical := messageLogicalConnSnapshot(msg) scope := serverFileScope(logical) bulk, ok := runtime.lookup(scope, req.BulkID) if !ok && req.DataID != 0 { bulk, ok = runtime.lookupByDataID(scope, req.DataID) } if !ok { resp.Error = errBulkNotFound.Error() replyBulkControlIfNeeded(msg, resp) return } if resp.BulkID == "" { resp.BulkID = bulk.ID() } readyErr := bulkReadyRemoteError(req.Error) bulk.markAcceptReady(readyErr) if readyErr != nil { bulk.markReset(readyErr) } resp.Accepted = true replyBulkControlIfNeeded(msg, resp) } func (s *ServerCommon) handleInboundBulkRelease(msg *Message) { req, err := decodeBulkReleaseRequest(msg) if err != nil { return } runtime := s.getBulkRuntime() if runtime == nil { return } logical := messageLogicalConnSnapshot(msg) scope := serverFileScope(logical) bulk, ok := runtime.lookup(scope, req.BulkID) if !ok && req.DataID != 0 { bulk, ok = runtime.lookupByDataID(scope, req.DataID) } if !ok { return } bulk.releaseOutboundWindow(req.Bytes, req.Chunks) } func replyBulkControlIfNeeded(msg *Message, value interface{}) { if msg == nil || !requiresSignalReplyWait(msg.TransferMsg) { return } _ = msg.ReplyObj(value) } func sendBulkOpenClient(ctx context.Context, c Client, req BulkOpenRequest) (BulkOpenResponse, error) { if c == nil { return BulkOpenResponse{}, errBulkClientNil } msg, err := c.SendObjCtx(ctx, BulkOpenSignalKey, req) if err != nil { return BulkOpenResponse{}, err } return decodeBulkOpenResponse(msg) } func sendBulkOpenServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkOpenRequest) (BulkOpenResponse, error) { if s == nil { return BulkOpenResponse{}, errBulkServerNil } if logical == nil { return BulkOpenResponse{}, errBulkLogicalConnNil } msg, err := s.SendObjCtxLogical(ctx, logical, BulkOpenSignalKey, req) if err != nil { return BulkOpenResponse{}, err } return decodeBulkOpenResponse(msg) } func sendBulkOpenServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkOpenRequest) (BulkOpenResponse, error) { if s == nil { return BulkOpenResponse{}, errBulkServerNil } if transport == nil { return BulkOpenResponse{}, errBulkTransportNil } msg, err := s.SendObjCtxTransport(ctx, transport, BulkOpenSignalKey, req) if err != nil { return BulkOpenResponse{}, err } return decodeBulkOpenResponse(msg) } func sendBulkCloseClient(ctx context.Context, c Client, req BulkCloseRequest) (BulkCloseResponse, error) { if c == nil { return BulkCloseResponse{}, errBulkClientNil } msg, err := c.SendObjCtx(ctx, BulkCloseSignalKey, req) if err != nil { return BulkCloseResponse{}, err } return decodeBulkCloseResponse(msg) } func sendBulkCloseServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkCloseRequest) (BulkCloseResponse, error) { if s == nil { return BulkCloseResponse{}, errBulkServerNil } if logical == nil { return BulkCloseResponse{}, errBulkLogicalConnNil } msg, err := s.SendObjCtxLogical(ctx, logical, BulkCloseSignalKey, req) if err != nil { return BulkCloseResponse{}, err } return decodeBulkCloseResponse(msg) } func sendBulkCloseServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkCloseRequest) (BulkCloseResponse, error) { if s == nil { return BulkCloseResponse{}, errBulkServerNil } if transport == nil { return BulkCloseResponse{}, errBulkTransportNil } msg, err := s.SendObjCtxTransport(ctx, transport, BulkCloseSignalKey, req) if err != nil { return BulkCloseResponse{}, err } return decodeBulkCloseResponse(msg) } func sendBulkResetClient(ctx context.Context, c Client, req BulkResetRequest) (BulkResetResponse, error) { if c == nil { return BulkResetResponse{}, errBulkClientNil } msg, err := c.SendObjCtx(ctx, BulkResetSignalKey, req) if err != nil { return BulkResetResponse{}, err } return decodeBulkResetResponse(msg) } func sendBulkResetServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkResetRequest) (BulkResetResponse, error) { if s == nil { return BulkResetResponse{}, errBulkServerNil } if logical == nil { return BulkResetResponse{}, errBulkLogicalConnNil } msg, err := s.SendObjCtxLogical(ctx, logical, BulkResetSignalKey, req) if err != nil { return BulkResetResponse{}, err } return decodeBulkResetResponse(msg) } func sendBulkResetServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkResetRequest) (BulkResetResponse, error) { if s == nil { return BulkResetResponse{}, errBulkServerNil } if transport == nil { return BulkResetResponse{}, errBulkTransportNil } msg, err := s.SendObjCtxTransport(ctx, transport, BulkResetSignalKey, req) if err != nil { return BulkResetResponse{}, err } return decodeBulkResetResponse(msg) } func sendBulkReleaseClient(c Client, req BulkReleaseRequest) error { if c == nil { return errBulkClientNil } return c.SendObj(BulkReleaseSignalKey, req) } func sendBulkReleaseServerLogical(s Server, logical *LogicalConn, req BulkReleaseRequest) error { if s == nil { return errBulkServerNil } if logical == nil { return errBulkLogicalConnNil } return s.SendObjLogical(logical, BulkReleaseSignalKey, req) } func sendBulkReleaseServerTransport(s Server, transport *TransportConn, req BulkReleaseRequest) error { if s == nil { return errBulkServerNil } if transport == nil { return errBulkTransportNil } return s.SendObjTransport(transport, BulkReleaseSignalKey, req) } func decodeBulkOpenRequest(msg *Message) (BulkOpenRequest, error) { var req BulkOpenRequest if msg == nil { return BulkOpenRequest{}, errBulkIDEmpty } if err := msg.Value.Orm(&req); err != nil { return BulkOpenRequest{}, err } req = normalizeBulkOpenRequest(req) if req.BulkID == "" { return BulkOpenRequest{}, errBulkIDEmpty } if !validBulkRange(req.Range) { return BulkOpenRequest{}, errBulkRangeInvalid } return req, nil } func decodeBulkCloseRequest(msg *Message) (BulkCloseRequest, error) { var req BulkCloseRequest if msg == nil { return BulkCloseRequest{}, errBulkIDEmpty } if err := msg.Value.Orm(&req); err != nil { return BulkCloseRequest{}, err } if req.BulkID == "" { return BulkCloseRequest{}, errBulkIDEmpty } return req, nil } func decodeBulkResetRequest(msg *Message) (BulkResetRequest, error) { var req BulkResetRequest if msg == nil { return BulkResetRequest{}, errBulkIDEmpty } if err := msg.Value.Orm(&req); err != nil { return BulkResetRequest{}, err } if req.BulkID == "" && req.DataID == 0 { return BulkResetRequest{}, errBulkIDEmpty } return req, nil } func decodeBulkReleaseRequest(msg *Message) (BulkReleaseRequest, error) { var req BulkReleaseRequest if msg == nil { return BulkReleaseRequest{}, errBulkIDEmpty } if err := msg.Value.Orm(&req); err != nil { return BulkReleaseRequest{}, err } if req.BulkID == "" && req.DataID == 0 { return BulkReleaseRequest{}, errBulkIDEmpty } if req.Bytes < 0 || req.Chunks < 0 { return BulkReleaseRequest{}, errBulkRangeInvalid } return req, nil } func decodeBulkReadyRequest(msg *Message) (BulkReadyRequest, error) { var req BulkReadyRequest if msg == nil { return BulkReadyRequest{}, errBulkIDEmpty } if err := msg.Value.Orm(&req); err != nil { return BulkReadyRequest{}, err } if req.BulkID == "" && req.DataID == 0 { return BulkReadyRequest{}, errBulkIDEmpty } return req, nil } func decodeBulkOpenResponse(msg Message) (BulkOpenResponse, error) { var resp BulkOpenResponse if err := msg.Value.Orm(&resp); err != nil { return BulkOpenResponse{}, err } resp.FastPathVersion = normalizeBulkFastPathVersion(resp.FastPathVersion) return resp, bulkControlResultError("open", resp.Accepted, resp.Error, nil) } func decodeBulkCloseResponse(msg Message) (BulkCloseResponse, error) { var resp BulkCloseResponse if err := msg.Value.Orm(&resp); err != nil { return BulkCloseResponse{}, err } return resp, bulkControlResultError("close", resp.Accepted, resp.Error, nil) } func decodeBulkResetResponse(msg Message) (BulkResetResponse, error) { var resp BulkResetResponse if err := msg.Value.Orm(&resp); err != nil { return BulkResetResponse{}, err } return resp, bulkControlResultError("reset", resp.Accepted, resp.Error, nil) } func decodeBulkReadyResponse(msg Message) (BulkReadyResponse, error) { var resp BulkReadyResponse if err := msg.Value.Orm(&resp); err != nil { return BulkReadyResponse{}, err } return resp, bulkControlResultError("ready", resp.Accepted, resp.Error, nil) } func bulkControlResultError(op string, accepted bool, message string, callErr error) error { if callErr != nil { return callErr } if message != "" { return bulkControlMessageError(message) } if accepted { return nil } if op == "open" { return errBulkRejected } return errors.New("bulk " + op + " rejected") } func bulkControlMessageError(message string) error { switch message { case errBulkNotFound.Error(): return errBulkNotFound case errBulkAlreadyExists.Error(): return errBulkAlreadyExists case errBulkHandlerNotConfigured.Error(): return errBulkHandlerNotConfigured case errBulkLogicalConnNil.Error(): return errBulkLogicalConnNil case errBulkTransportNil.Error(): return errBulkTransportNil case errBulkRuntimeNil.Error(): return errBulkRuntimeNil case errBulkIDEmpty.Error(): return errBulkIDEmpty case errBulkRangeInvalid.Error(): return errBulkRangeInvalid case errBulkDataIDEmpty.Error(): return errBulkDataIDEmpty default: return errors.New(message) } } func bulkRemoteResetError(message string) error { if message == "" { return errBulkReset } return errors.New(message) } func bulkReadyRemoteError(message string) error { if message == "" { return nil } return bulkControlMessageError(message) } func sendBulkReadyClient(ctx context.Context, c Client, req BulkReadyRequest) (BulkReadyResponse, error) { if c == nil { return BulkReadyResponse{}, errBulkClientNil } msg, err := c.SendObjCtx(ctx, BulkReadySignalKey, req) if err != nil { return BulkReadyResponse{}, err } return decodeBulkReadyResponse(msg) } func sendBulkReadyServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkReadyRequest) (BulkReadyResponse, error) { if s == nil { return BulkReadyResponse{}, errBulkServerNil } if logical == nil { return BulkReadyResponse{}, errBulkLogicalConnNil } msg, err := s.SendObjCtxLogical(ctx, logical, BulkReadySignalKey, req) if err != nil { return BulkReadyResponse{}, err } return decodeBulkReadyResponse(msg) } func sendBulkReadyServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkReadyRequest) (BulkReadyResponse, error) { if s == nil { return BulkReadyResponse{}, errBulkServerNil } if transport == nil { return BulkReadyResponse{}, errBulkTransportNil } msg, err := s.SendObjCtxTransport(ctx, transport, BulkReadySignalKey, req) if err != nil { return BulkReadyResponse{}, err } return decodeBulkReadyResponse(msg) } func bulkTransportGeneration(logical *LogicalConn, transport *TransportConn) uint64 { return streamTransportGeneration(logical, transport) }