2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type BulkOpenRequest struct {
|
2026-04-18 16:05:57 +08:00
|
|
|
BulkID string
|
|
|
|
|
DataID uint64
|
|
|
|
|
FastPathVersion uint8
|
|
|
|
|
Range BulkRange
|
|
|
|
|
Metadata BulkMetadata
|
|
|
|
|
ReadTimeout time.Duration
|
|
|
|
|
WriteTimeout time.Duration
|
|
|
|
|
Dedicated bool
|
|
|
|
|
DedicatedLaneID uint32
|
|
|
|
|
AttachToken string
|
|
|
|
|
ChunkSize int
|
|
|
|
|
WindowBytes int
|
|
|
|
|
MaxInFlight int
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type BulkOpenResponse struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
DataID uint64
|
2026-04-18 16:05:57 +08:00
|
|
|
FastPathVersion uint8
|
2026-04-15 15:24:36 +08:00
|
|
|
Accepted bool
|
|
|
|
|
Dedicated bool
|
|
|
|
|
AttachToken string
|
|
|
|
|
TransportGeneration uint64
|
|
|
|
|
Error string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type BulkCloseRequest struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
Full bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type BulkCloseResponse struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
Accepted bool
|
|
|
|
|
Error string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type BulkResetRequest struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
DataID uint64
|
|
|
|
|
Error string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type BulkResetResponse struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
Accepted bool
|
|
|
|
|
Error string
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
type BulkReadyRequest struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
DataID uint64
|
|
|
|
|
Error string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type BulkReadyResponse struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
Accepted bool
|
|
|
|
|
Error string
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
type BulkReleaseRequest struct {
|
|
|
|
|
BulkID string
|
|
|
|
|
DataID uint64
|
|
|
|
|
Bytes int64
|
|
|
|
|
Chunks int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func bindClientBulkControl(c *ClientCommon) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
c.SetLink(BulkOpenSignalKey, func(msg *Message) {
|
|
|
|
|
c.handleInboundBulkOpen(msg)
|
|
|
|
|
})
|
|
|
|
|
c.SetLink(BulkCloseSignalKey, func(msg *Message) {
|
|
|
|
|
c.handleInboundBulkClose(msg)
|
|
|
|
|
})
|
|
|
|
|
c.SetLink(BulkResetSignalKey, func(msg *Message) {
|
|
|
|
|
c.handleInboundBulkReset(msg)
|
|
|
|
|
})
|
2026-04-18 16:05:57 +08:00
|
|
|
c.SetLink(BulkReadySignalKey, func(msg *Message) {
|
|
|
|
|
c.handleInboundBulkReady(msg)
|
|
|
|
|
})
|
2026-04-15 15:24:36 +08:00
|
|
|
c.SetLink(BulkReleaseSignalKey, func(msg *Message) {
|
|
|
|
|
c.handleInboundBulkRelease(msg)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func bindServerBulkControl(s *ServerCommon) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
s.SetLink(BulkOpenSignalKey, func(msg *Message) {
|
|
|
|
|
s.handleInboundBulkOpen(msg)
|
|
|
|
|
})
|
|
|
|
|
s.SetLink(BulkCloseSignalKey, func(msg *Message) {
|
|
|
|
|
s.handleInboundBulkClose(msg)
|
|
|
|
|
})
|
|
|
|
|
s.SetLink(BulkResetSignalKey, func(msg *Message) {
|
|
|
|
|
s.handleInboundBulkReset(msg)
|
|
|
|
|
})
|
2026-04-18 16:05:57 +08:00
|
|
|
s.SetLink(BulkReadySignalKey, func(msg *Message) {
|
|
|
|
|
s.handleInboundBulkReady(msg)
|
|
|
|
|
})
|
2026-04-15 15:24:36 +08:00
|
|
|
s.SetLink(BulkReleaseSignalKey, func(msg *Message) {
|
|
|
|
|
s.handleInboundBulkRelease(msg)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func bulkAcceptInfoFromClientBulk(bulk *bulkHandle) BulkAcceptInfo {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
return BulkAcceptInfo{}
|
|
|
|
|
}
|
|
|
|
|
return BulkAcceptInfo{
|
|
|
|
|
ID: bulk.ID(),
|
|
|
|
|
Range: bulk.Range(),
|
|
|
|
|
Metadata: bulk.Metadata(),
|
|
|
|
|
Dedicated: bulk.Dedicated(),
|
|
|
|
|
TransportGeneration: bulk.TransportGeneration(),
|
|
|
|
|
Bulk: bulk,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func bulkAcceptInfoFromServerBulk(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) BulkAcceptInfo {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
return BulkAcceptInfo{}
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
logical = bulk.LogicalConn()
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
transport = bulk.TransportConn()
|
|
|
|
|
}
|
|
|
|
|
return BulkAcceptInfo{
|
|
|
|
|
ID: bulk.ID(),
|
|
|
|
|
Range: bulk.Range(),
|
|
|
|
|
Metadata: bulk.Metadata(),
|
|
|
|
|
Dedicated: bulk.Dedicated(),
|
|
|
|
|
LogicalConn: logical,
|
|
|
|
|
TransportConn: transport,
|
|
|
|
|
TransportGeneration: bulk.TransportGeneration(),
|
|
|
|
|
Bulk: bulk,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func dispatchBulkAccept(handler func(BulkAcceptInfo) error, bulk *bulkHandle, info BulkAcceptInfo) error {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
return errBulkNotFound
|
|
|
|
|
}
|
|
|
|
|
if !bulk.markAcceptDispatched() {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var dispatchErr error
|
|
|
|
|
defer func() {
|
|
|
|
|
bulk.finishAcceptDispatch(dispatchErr)
|
|
|
|
|
}()
|
|
|
|
|
if handler == nil {
|
|
|
|
|
dispatchErr = errBulkHandlerNotConfigured
|
|
|
|
|
bulk.markReset(dispatchErr)
|
|
|
|
|
return dispatchErr
|
|
|
|
|
}
|
|
|
|
|
if err := handler(info); err != nil {
|
|
|
|
|
dispatchErr = err
|
|
|
|
|
bulk.markReset(dispatchErr)
|
|
|
|
|
return dispatchErr
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) clientBulkAcceptReadyNotifier(bulk *bulkHandle) func(error) {
|
|
|
|
|
return func(readyErr error) {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
req := BulkReadyRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
DataID: bulk.dataIDSnapshot(),
|
|
|
|
|
}
|
|
|
|
|
if readyErr != nil {
|
|
|
|
|
req.Error = readyErr.Error()
|
|
|
|
|
}
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultBulkAcceptReadyTimeout)
|
|
|
|
|
defer cancel()
|
|
|
|
|
if _, err := sendBulkReadyClient(ctx, c, req); err != nil && bulk.Context().Err() == nil {
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkReadyServer(ctx context.Context, s *ServerCommon, logical *LogicalConn, transport *TransportConn, req BulkReadyRequest) error {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if transport != nil {
|
|
|
|
|
if _, err := sendBulkReadyServerTransport(ctx, s, transport, req); err == nil {
|
|
|
|
|
return nil
|
|
|
|
|
} else if !errors.Is(err, errTransportDetached) && !errors.Is(err, errBulkTransportNil) {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
_, err := sendBulkReadyServerLogical(ctx, s, logical, req)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) serverBulkAcceptReadyNotifier(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) func(error) {
|
|
|
|
|
return func(readyErr error) {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
req := BulkReadyRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
DataID: bulk.dataIDSnapshot(),
|
|
|
|
|
}
|
|
|
|
|
if readyErr != nil {
|
|
|
|
|
req.Error = readyErr.Error()
|
|
|
|
|
}
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultBulkAcceptReadyTimeout)
|
|
|
|
|
defer cancel()
|
|
|
|
|
if err := sendBulkReadyServer(ctx, s, logical, transport, req); err != nil && bulk.Context().Err() == nil {
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) startClientBulkAcceptDispatch(bulk *bulkHandle) {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bulk.setAcceptNotify(c.clientBulkAcceptReadyNotifier(bulk))
|
|
|
|
|
go func() {
|
|
|
|
|
_ = c.dispatchClientBulkAccept(bulk)
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) startServerBulkAcceptDispatch(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
logical = bulk.LogicalConn()
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
transport = bulk.TransportConn()
|
|
|
|
|
}
|
|
|
|
|
bulk.setAcceptNotify(s.serverBulkAcceptReadyNotifier(bulk, logical, transport))
|
|
|
|
|
go func() {
|
|
|
|
|
_ = s.dispatchServerBulkAccept(bulk, logical, transport)
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) dispatchClientBulkAccept(bulk *bulkHandle) error {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
return dispatchBulkAccept(runtime.handlerSnapshot(), bulk, bulkAcceptInfoFromClientBulk(bulk))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) dispatchServerBulkAccept(bulk *bulkHandle, logical *LogicalConn, transport *TransportConn) error {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
return dispatchBulkAccept(runtime.handlerSnapshot(), bulk, bulkAcceptInfoFromServerBulk(bulk, logical, transport))
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (c *ClientCommon) handleInboundBulkOpen(msg *Message) {
|
|
|
|
|
req, err := decodeBulkOpenRequest(msg)
|
2026-04-18 16:05:57 +08:00
|
|
|
resp := BulkOpenResponse{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
FastPathVersion: negotiateBulkFastPathVersion(req.FastPathVersion),
|
|
|
|
|
Dedicated: req.Dedicated,
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if err := clientDedicatedBulkSupportError(c); err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
scope := clientFileScope()
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
req.DataID = runtime.nextDataID()
|
|
|
|
|
resp.DataID = req.DataID
|
|
|
|
|
}
|
|
|
|
|
if req.Dedicated && req.AttachToken == "" {
|
|
|
|
|
req.AttachToken = newBulkAttachToken()
|
|
|
|
|
}
|
|
|
|
|
resp.AttachToken = req.AttachToken
|
|
|
|
|
bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, scope, req, c.currentClientSessionEpoch(), nil, nil, 0, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c))
|
|
|
|
|
bulk.setClientSnapshotOwner(c)
|
|
|
|
|
if err := runtime.register(scope, bulk); err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if err := c.attachDedicatedBulkSidecar(context.Background(), bulk); err != nil {
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
resp.Accepted = true
|
|
|
|
|
resp.DataID = bulk.dataIDSnapshot()
|
|
|
|
|
resp.TransportGeneration = bulk.TransportGeneration()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
c.startClientBulkAcceptDispatch(bulk)
|
|
|
|
|
return
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if err := c.dispatchClientBulkAccept(bulk); err != nil {
|
2026-04-15 15:24:36 +08:00
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
resp.DataID = bulk.dataIDSnapshot()
|
|
|
|
|
resp.TransportGeneration = bulk.TransportGeneration()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) handleInboundBulkOpen(msg *Message) {
|
|
|
|
|
req, err := decodeBulkOpenRequest(msg)
|
2026-04-18 16:05:57 +08:00
|
|
|
resp := BulkOpenResponse{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
FastPathVersion: negotiateBulkFastPathVersion(req.FastPathVersion),
|
|
|
|
|
Dedicated: req.Dedicated,
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
logical := messageLogicalConnSnapshot(msg)
|
|
|
|
|
if logical == nil {
|
|
|
|
|
resp.Error = errBulkLogicalConnNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
transport := messageTransportConnSnapshot(msg)
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if err := logicalDedicatedBulkSupportError(logical); err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if transport != nil {
|
|
|
|
|
if err := transportDedicatedBulkSupportError(transport); err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
req.DataID = runtime.nextDataID()
|
|
|
|
|
resp.DataID = req.DataID
|
|
|
|
|
}
|
|
|
|
|
if req.Dedicated && req.AttachToken == "" {
|
|
|
|
|
req.AttachToken = newBulkAttachToken()
|
|
|
|
|
}
|
|
|
|
|
resp.AttachToken = req.AttachToken
|
|
|
|
|
bulk := newBulkHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, bulkTransportGeneration(logical, transport), serverBulkCloseSender(s, logical, transport), serverBulkResetSender(s, logical, transport), serverBulkDataSender(s, transport), serverBulkWriteSender(s, logical, transport), serverBulkReleaseSender(s, logical, transport))
|
|
|
|
|
if err := runtime.register(scope, bulk); err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
s.attachServerDedicatedSidecarIfExists(logical, bulk)
|
|
|
|
|
if runtime.handlerSnapshot() == nil {
|
2026-04-15 15:24:36 +08:00
|
|
|
bulk.markReset(errBulkHandlerNotConfigured)
|
|
|
|
|
resp.Error = errBulkHandlerNotConfigured.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if req.Dedicated {
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
resp.DataID = bulk.dataIDSnapshot()
|
|
|
|
|
resp.TransportGeneration = bulk.TransportGeneration()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
if bulk.dedicatedAttachedSnapshot() {
|
|
|
|
|
s.startServerBulkAcceptDispatch(bulk, logical, messageTransportConnSnapshot(msg))
|
|
|
|
|
}
|
|
|
|
|
return
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if err := s.dispatchServerBulkAccept(bulk, logical, transport); err != nil {
|
2026-04-15 15:24:36 +08:00
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
resp.DataID = bulk.dataIDSnapshot()
|
|
|
|
|
resp.TransportGeneration = bulk.TransportGeneration()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleInboundBulkClose(msg *Message) {
|
|
|
|
|
req, err := decodeBulkCloseRequest(msg)
|
|
|
|
|
resp := BulkCloseResponse{BulkID: req.BulkID}
|
|
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bulk, ok := runtime.lookup(clientFileScope(), req.BulkID)
|
|
|
|
|
if !ok {
|
|
|
|
|
resp.Error = errBulkNotFound.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if req.Full {
|
|
|
|
|
bulk.markPeerClosed()
|
|
|
|
|
} else {
|
|
|
|
|
bulk.markRemoteClosed()
|
|
|
|
|
}
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) handleInboundBulkClose(msg *Message) {
|
|
|
|
|
req, err := decodeBulkCloseRequest(msg)
|
|
|
|
|
resp := BulkCloseResponse{BulkID: req.BulkID}
|
|
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
logical := messageLogicalConnSnapshot(msg)
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
bulk, ok := runtime.lookup(scope, req.BulkID)
|
|
|
|
|
if !ok {
|
|
|
|
|
resp.Error = errBulkNotFound.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if req.Full {
|
|
|
|
|
bulk.markPeerClosed()
|
|
|
|
|
} else {
|
|
|
|
|
bulk.markRemoteClosed()
|
|
|
|
|
}
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleInboundBulkReset(msg *Message) {
|
|
|
|
|
req, err := decodeBulkResetRequest(msg)
|
|
|
|
|
resp := BulkResetResponse{BulkID: req.BulkID}
|
|
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bulk, ok := runtime.lookup(clientFileScope(), req.BulkID)
|
|
|
|
|
if !ok && req.DataID != 0 {
|
|
|
|
|
bulk, ok = runtime.lookupByDataID(clientFileScope(), req.DataID)
|
|
|
|
|
}
|
|
|
|
|
if !ok {
|
|
|
|
|
resp.Error = errBulkNotFound.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if resp.BulkID == "" {
|
|
|
|
|
resp.BulkID = bulk.ID()
|
|
|
|
|
}
|
|
|
|
|
bulk.markReset(bulkResetError(bulkRemoteResetError(req.Error)))
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleInboundBulkRelease(msg *Message) {
|
|
|
|
|
req, err := decodeBulkReleaseRequest(msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bulk, ok := runtime.lookup(clientFileScope(), req.BulkID)
|
|
|
|
|
if !ok && req.DataID != 0 {
|
|
|
|
|
bulk, ok = runtime.lookupByDataID(clientFileScope(), req.DataID)
|
|
|
|
|
}
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bulk.releaseOutboundWindow(req.Bytes, req.Chunks)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (c *ClientCommon) handleInboundBulkReady(msg *Message) {
|
|
|
|
|
req, err := decodeBulkReadyRequest(msg)
|
|
|
|
|
resp := BulkReadyResponse{BulkID: req.BulkID}
|
|
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bulk, ok := runtime.lookup(clientFileScope(), req.BulkID)
|
|
|
|
|
if !ok && req.DataID != 0 {
|
|
|
|
|
bulk, ok = runtime.lookupByDataID(clientFileScope(), req.DataID)
|
|
|
|
|
}
|
|
|
|
|
if !ok {
|
|
|
|
|
resp.Error = errBulkNotFound.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if resp.BulkID == "" {
|
|
|
|
|
resp.BulkID = bulk.ID()
|
|
|
|
|
}
|
|
|
|
|
readyErr := bulkReadyRemoteError(req.Error)
|
|
|
|
|
bulk.markAcceptReady(readyErr)
|
|
|
|
|
if readyErr != nil {
|
|
|
|
|
bulk.markReset(readyErr)
|
|
|
|
|
}
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (s *ServerCommon) handleInboundBulkReset(msg *Message) {
|
|
|
|
|
req, err := decodeBulkResetRequest(msg)
|
|
|
|
|
resp := BulkResetResponse{BulkID: req.BulkID}
|
|
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
logical := messageLogicalConnSnapshot(msg)
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
bulk, ok := runtime.lookup(scope, req.BulkID)
|
|
|
|
|
if !ok && req.DataID != 0 {
|
|
|
|
|
bulk, ok = runtime.lookupByDataID(scope, req.DataID)
|
|
|
|
|
}
|
|
|
|
|
if !ok {
|
|
|
|
|
resp.Error = errBulkNotFound.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if resp.BulkID == "" {
|
|
|
|
|
resp.BulkID = bulk.ID()
|
|
|
|
|
}
|
|
|
|
|
bulk.markReset(bulkResetError(bulkRemoteResetError(req.Error)))
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (s *ServerCommon) handleInboundBulkReady(msg *Message) {
|
|
|
|
|
req, err := decodeBulkReadyRequest(msg)
|
|
|
|
|
resp := BulkReadyResponse{BulkID: req.BulkID}
|
|
|
|
|
if err != nil {
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
resp.Error = errBulkRuntimeNil.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
logical := messageLogicalConnSnapshot(msg)
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
bulk, ok := runtime.lookup(scope, req.BulkID)
|
|
|
|
|
if !ok && req.DataID != 0 {
|
|
|
|
|
bulk, ok = runtime.lookupByDataID(scope, req.DataID)
|
|
|
|
|
}
|
|
|
|
|
if !ok {
|
|
|
|
|
resp.Error = errBulkNotFound.Error()
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if resp.BulkID == "" {
|
|
|
|
|
resp.BulkID = bulk.ID()
|
|
|
|
|
}
|
|
|
|
|
readyErr := bulkReadyRemoteError(req.Error)
|
|
|
|
|
bulk.markAcceptReady(readyErr)
|
|
|
|
|
if readyErr != nil {
|
|
|
|
|
bulk.markReset(readyErr)
|
|
|
|
|
}
|
|
|
|
|
resp.Accepted = true
|
|
|
|
|
replyBulkControlIfNeeded(msg, resp)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (s *ServerCommon) handleInboundBulkRelease(msg *Message) {
|
|
|
|
|
req, err := decodeBulkReleaseRequest(msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
logical := messageLogicalConnSnapshot(msg)
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
bulk, ok := runtime.lookup(scope, req.BulkID)
|
|
|
|
|
if !ok && req.DataID != 0 {
|
|
|
|
|
bulk, ok = runtime.lookupByDataID(scope, req.DataID)
|
|
|
|
|
}
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bulk.releaseOutboundWindow(req.Bytes, req.Chunks)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func replyBulkControlIfNeeded(msg *Message, value interface{}) {
|
|
|
|
|
if msg == nil || !requiresSignalReplyWait(msg.TransferMsg) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
_ = msg.ReplyObj(value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkOpenClient(ctx context.Context, c Client, req BulkOpenRequest) (BulkOpenResponse, error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return BulkOpenResponse{}, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := c.SendObjCtx(ctx, BulkOpenSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkOpenResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkOpenResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkOpenServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkOpenRequest) (BulkOpenResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkOpenResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return BulkOpenResponse{}, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxLogical(ctx, logical, BulkOpenSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkOpenResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkOpenResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkOpenServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkOpenRequest) (BulkOpenResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkOpenResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return BulkOpenResponse{}, errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxTransport(ctx, transport, BulkOpenSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkOpenResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkOpenResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkCloseClient(ctx context.Context, c Client, req BulkCloseRequest) (BulkCloseResponse, error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return BulkCloseResponse{}, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := c.SendObjCtx(ctx, BulkCloseSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkCloseResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkCloseResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkCloseServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkCloseRequest) (BulkCloseResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkCloseResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return BulkCloseResponse{}, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxLogical(ctx, logical, BulkCloseSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkCloseResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkCloseResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkCloseServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkCloseRequest) (BulkCloseResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkCloseResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return BulkCloseResponse{}, errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxTransport(ctx, transport, BulkCloseSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkCloseResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkCloseResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkResetClient(ctx context.Context, c Client, req BulkResetRequest) (BulkResetResponse, error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return BulkResetResponse{}, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := c.SendObjCtx(ctx, BulkResetSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkResetResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkResetResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkResetServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkResetRequest) (BulkResetResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkResetResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return BulkResetResponse{}, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxLogical(ctx, logical, BulkResetSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkResetResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkResetResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkResetServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkResetRequest) (BulkResetResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkResetResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return BulkResetResponse{}, errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxTransport(ctx, transport, BulkResetSignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkResetResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkResetResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkReleaseClient(c Client, req BulkReleaseRequest) error {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
return c.SendObj(BulkReleaseSignalKey, req)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkReleaseServerLogical(s Server, logical *LogicalConn, req BulkReleaseRequest) error {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
return s.SendObjLogical(logical, BulkReleaseSignalKey, req)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkReleaseServerTransport(s Server, transport *TransportConn, req BulkReleaseRequest) error {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
return s.SendObjTransport(transport, BulkReleaseSignalKey, req)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkOpenRequest(msg *Message) (BulkOpenRequest, error) {
|
|
|
|
|
var req BulkOpenRequest
|
|
|
|
|
if msg == nil {
|
|
|
|
|
return BulkOpenRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if err := msg.Value.Orm(&req); err != nil {
|
|
|
|
|
return BulkOpenRequest{}, err
|
|
|
|
|
}
|
|
|
|
|
req = normalizeBulkOpenRequest(req)
|
|
|
|
|
if req.BulkID == "" {
|
|
|
|
|
return BulkOpenRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if !validBulkRange(req.Range) {
|
|
|
|
|
return BulkOpenRequest{}, errBulkRangeInvalid
|
|
|
|
|
}
|
|
|
|
|
return req, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkCloseRequest(msg *Message) (BulkCloseRequest, error) {
|
|
|
|
|
var req BulkCloseRequest
|
|
|
|
|
if msg == nil {
|
|
|
|
|
return BulkCloseRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if err := msg.Value.Orm(&req); err != nil {
|
|
|
|
|
return BulkCloseRequest{}, err
|
|
|
|
|
}
|
|
|
|
|
if req.BulkID == "" {
|
|
|
|
|
return BulkCloseRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
return req, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkResetRequest(msg *Message) (BulkResetRequest, error) {
|
|
|
|
|
var req BulkResetRequest
|
|
|
|
|
if msg == nil {
|
|
|
|
|
return BulkResetRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if err := msg.Value.Orm(&req); err != nil {
|
|
|
|
|
return BulkResetRequest{}, err
|
|
|
|
|
}
|
|
|
|
|
if req.BulkID == "" && req.DataID == 0 {
|
|
|
|
|
return BulkResetRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
return req, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkReleaseRequest(msg *Message) (BulkReleaseRequest, error) {
|
|
|
|
|
var req BulkReleaseRequest
|
|
|
|
|
if msg == nil {
|
|
|
|
|
return BulkReleaseRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if err := msg.Value.Orm(&req); err != nil {
|
|
|
|
|
return BulkReleaseRequest{}, err
|
|
|
|
|
}
|
|
|
|
|
if req.BulkID == "" && req.DataID == 0 {
|
|
|
|
|
return BulkReleaseRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if req.Bytes < 0 || req.Chunks < 0 {
|
|
|
|
|
return BulkReleaseRequest{}, errBulkRangeInvalid
|
|
|
|
|
}
|
|
|
|
|
return req, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func decodeBulkReadyRequest(msg *Message) (BulkReadyRequest, error) {
|
|
|
|
|
var req BulkReadyRequest
|
|
|
|
|
if msg == nil {
|
|
|
|
|
return BulkReadyRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if err := msg.Value.Orm(&req); err != nil {
|
|
|
|
|
return BulkReadyRequest{}, err
|
|
|
|
|
}
|
|
|
|
|
if req.BulkID == "" && req.DataID == 0 {
|
|
|
|
|
return BulkReadyRequest{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
return req, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func decodeBulkOpenResponse(msg Message) (BulkOpenResponse, error) {
|
|
|
|
|
var resp BulkOpenResponse
|
|
|
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
|
|
|
|
return BulkOpenResponse{}, err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
resp.FastPathVersion = normalizeBulkFastPathVersion(resp.FastPathVersion)
|
2026-04-15 15:24:36 +08:00
|
|
|
return resp, bulkControlResultError("open", resp.Accepted, resp.Error, nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkCloseResponse(msg Message) (BulkCloseResponse, error) {
|
|
|
|
|
var resp BulkCloseResponse
|
|
|
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
|
|
|
|
return BulkCloseResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return resp, bulkControlResultError("close", resp.Accepted, resp.Error, nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkResetResponse(msg Message) (BulkResetResponse, error) {
|
|
|
|
|
var resp BulkResetResponse
|
|
|
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
|
|
|
|
return BulkResetResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return resp, bulkControlResultError("reset", resp.Accepted, resp.Error, nil)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func decodeBulkReadyResponse(msg Message) (BulkReadyResponse, error) {
|
|
|
|
|
var resp BulkReadyResponse
|
|
|
|
|
if err := msg.Value.Orm(&resp); err != nil {
|
|
|
|
|
return BulkReadyResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return resp, bulkControlResultError("ready", resp.Accepted, resp.Error, nil)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func bulkControlResultError(op string, accepted bool, message string, callErr error) error {
|
|
|
|
|
if callErr != nil {
|
|
|
|
|
return callErr
|
|
|
|
|
}
|
|
|
|
|
if message != "" {
|
|
|
|
|
return bulkControlMessageError(message)
|
|
|
|
|
}
|
|
|
|
|
if accepted {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if op == "open" {
|
|
|
|
|
return errBulkRejected
|
|
|
|
|
}
|
|
|
|
|
return errors.New("bulk " + op + " rejected")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func bulkControlMessageError(message string) error {
|
|
|
|
|
switch message {
|
|
|
|
|
case errBulkNotFound.Error():
|
|
|
|
|
return errBulkNotFound
|
|
|
|
|
case errBulkAlreadyExists.Error():
|
|
|
|
|
return errBulkAlreadyExists
|
|
|
|
|
case errBulkHandlerNotConfigured.Error():
|
|
|
|
|
return errBulkHandlerNotConfigured
|
|
|
|
|
case errBulkLogicalConnNil.Error():
|
|
|
|
|
return errBulkLogicalConnNil
|
|
|
|
|
case errBulkTransportNil.Error():
|
|
|
|
|
return errBulkTransportNil
|
|
|
|
|
case errBulkRuntimeNil.Error():
|
|
|
|
|
return errBulkRuntimeNil
|
|
|
|
|
case errBulkIDEmpty.Error():
|
|
|
|
|
return errBulkIDEmpty
|
|
|
|
|
case errBulkRangeInvalid.Error():
|
|
|
|
|
return errBulkRangeInvalid
|
|
|
|
|
case errBulkDataIDEmpty.Error():
|
|
|
|
|
return errBulkDataIDEmpty
|
|
|
|
|
default:
|
|
|
|
|
return errors.New(message)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func bulkRemoteResetError(message string) error {
|
|
|
|
|
if message == "" {
|
|
|
|
|
return errBulkReset
|
|
|
|
|
}
|
|
|
|
|
return errors.New(message)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func bulkReadyRemoteError(message string) error {
|
|
|
|
|
if message == "" {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return bulkControlMessageError(message)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkReadyClient(ctx context.Context, c Client, req BulkReadyRequest) (BulkReadyResponse, error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return BulkReadyResponse{}, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := c.SendObjCtx(ctx, BulkReadySignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkReadyResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkReadyResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkReadyServerLogical(ctx context.Context, s Server, logical *LogicalConn, req BulkReadyRequest) (BulkReadyResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkReadyResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return BulkReadyResponse{}, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxLogical(ctx, logical, BulkReadySignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkReadyResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkReadyResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func sendBulkReadyServerTransport(ctx context.Context, s Server, transport *TransportConn, req BulkReadyRequest) (BulkReadyResponse, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return BulkReadyResponse{}, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return BulkReadyResponse{}, errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
msg, err := s.SendObjCtxTransport(ctx, transport, BulkReadySignalKey, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return BulkReadyResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkReadyResponse(msg)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func bulkTransportGeneration(logical *LogicalConn, transport *TransportConn) uint64 {
|
|
|
|
|
return streamTransportGeneration(logical, transport)
|
|
|
|
|
}
|