package notify import ( "b612.me/notify/internal/transport" "b612.me/stario" "context" cryptorand "crypto/rand" "encoding/binary" "encoding/hex" "errors" "fmt" "io" "net" "strings" "sync/atomic" "time" ) const ( systemBulkAttachKey = "_notify_bulk_attach" bulkDedicatedRecordMagic = "NBR1" bulkDedicatedRecordHeaderLen = 8 defaultBulkDedicatedAttachLimit = 16 defaultBulkDedicatedActiveLimit = 4096 defaultBulkDedicatedLaneLimit = 4 defaultBulkDedicatedAttachRetry = 2 defaultBulkDedicatedAttachBackoff = 150 * time.Millisecond defaultBulkDedicatedDialTimeout = 5 * time.Second defaultBulkDedicatedHelloTimeout = 10 * time.Second ) type bulkAttachRequest struct { PeerID string BulkID string AttachToken string } type bulkAttachResponse struct { Accepted bool Error string Code string Retryable bool FailedSeq uint64 FailedBulk string } type bulkAttachErrorCode string const ( bulkAttachErrorCodeInvalidRequest bulkAttachErrorCode = "invalid_request" bulkAttachErrorCodeServerUnavailable bulkAttachErrorCode = "server_unavailable" bulkAttachErrorCodePeerNotFound bulkAttachErrorCode = "peer_not_found" bulkAttachErrorCodeBulkNotFound bulkAttachErrorCode = "bulk_not_found" bulkAttachErrorCodeBulkNotDedicated bulkAttachErrorCode = "bulk_not_dedicated" bulkAttachErrorCodeTokenMismatch bulkAttachErrorCode = "token_mismatch" bulkAttachErrorCodeAlreadyAttached bulkAttachErrorCode = "already_attached" bulkAttachErrorCodeAttachFailed bulkAttachErrorCode = "attach_failed" bulkAttachErrorCodeInternal bulkAttachErrorCode = "internal_error" ) type bulkAttachError struct { Code bulkAttachErrorCode Retryable bool Message string FailedSeq uint64 FailedBulk string } func (e *bulkAttachError) Error() string { if e == nil { return "" } if e.Message != "" { return e.Message } if e.Code != "" { return string(e.Code) } return "bulk attach failed" } func newBulkAttachError(code bulkAttachErrorCode, retryable bool, message string) *bulkAttachError { return &bulkAttachError{ Code: code, Retryable: retryable, Message: message, } } func toBulkAttachResponseError(err error, bulkID string) bulkAttachResponse { resp := bulkAttachResponse{ Accepted: false, FailedBulk: bulkID, } if err == nil { resp.Error = "bulk attach failed" resp.Code = string(bulkAttachErrorCodeInternal) return resp } var attachErr *bulkAttachError if errors.As(err, &attachErr) && attachErr != nil { resp.Error = attachErr.Error() resp.Code = string(attachErr.Code) resp.Retryable = attachErr.Retryable resp.FailedSeq = attachErr.FailedSeq if attachErr.FailedBulk != "" { resp.FailedBulk = attachErr.FailedBulk } if resp.Code == "" { resp.Code = string(bulkAttachErrorCodeInternal) } return resp } resp.Error = err.Error() resp.Code = string(bulkAttachErrorCodeInternal) return resp } func newBulkAttachToken() string { var buf [16]byte if _, err := cryptorand.Read(buf[:]); err == nil { return hex.EncodeToString(buf[:]) } return "" } func decodeBulkAttachRequest(decodeFn func([]byte) (interface{}, error), data MsgVal) (bulkAttachRequest, error) { var req bulkAttachRequest if decodeFn == nil { decodeFn = Decode } raw := []byte(data) value, err := decodeFn(raw) if err != nil { return req, err } switch typed := value.(type) { case bulkAttachRequest: return typed, nil case *bulkAttachRequest: if typed == nil { return req, errors.New("bulk attach request is nil") } return *typed, nil default: return req, errors.New("invalid bulk attach payload") } } func decodeBulkAttachResponse(decodeFn func([]byte) (interface{}, error), data MsgVal) (bulkAttachResponse, error) { var resp bulkAttachResponse if decodeFn == nil { decodeFn = Decode } raw := []byte(data) value, err := decodeFn(raw) if err != nil { return resp, err } switch typed := value.(type) { case bulkAttachResponse: return typed, nil case *bulkAttachResponse: if typed == nil { return resp, errors.New("bulk attach response is nil") } return *typed, nil default: return resp, errors.New("invalid bulk attach response") } } func encodeDirectSignalFrame(queue *stario.StarQueue, sequenceEn func(interface{}) ([]byte, error), msgEn func([]byte, []byte) []byte, secretKey []byte, msg TransferMsg) ([]byte, error) { if queue == nil { queue = stario.NewQueue() } env, err := wrapTransferMsgEnvelope(msg, sequenceEn) if err != nil { return nil, err } plain, err := sequenceEn(env) if err != nil { return nil, err } payload := msgEn(secretKey, plain) if payload == nil && len(plain) != 0 { return nil, errTransportPayloadEncryptFailed } return queue.BuildMessage(payload), nil } func decodeDirectSignalPayload(sequenceDe func([]byte) (interface{}, error), msgDe func([]byte, []byte) []byte, secretKey []byte, payload []byte) (TransferMsg, error) { plain := msgDe(secretKey, payload) if plain == nil && len(payload) != 0 { return TransferMsg{}, errTransportPayloadDecryptFailed } value, err := sequenceDe(plain) if err != nil { return TransferMsg{}, err } env, ok := value.(Envelope) if !ok { return TransferMsg{}, errors.New("invalid signal envelope") } return unwrapTransferMsgEnvelope(env, sequenceDe) } func readDirectSignalFramePayload(conn net.Conn) ([]byte, error) { if conn == nil { return nil, net.ErrClosed } return newTransportFrameReader(conn, stario.NewQueue()).Next() } func writeBulkDedicatedRecord(conn net.Conn, payload []byte) error { return writeBulkDedicatedRecordWithDeadline(conn, payload, time.Time{}) } func writeBulkDedicatedRecordWithDeadline(conn net.Conn, payload []byte, deadline time.Time) error { if conn == nil { return net.ErrClosed } return withRawConnWriteLockDeadline(conn, deadline, func(conn net.Conn) error { var header [bulkDedicatedRecordHeaderLen]byte copy(header[:4], bulkDedicatedRecordMagic) binary.BigEndian.PutUint32(header[4:8], uint32(len(payload))) return writeNetBuffersFullUnlocked(conn, net.Buffers{header[:], payload}) }) } func readBulkDedicatedRecord(conn net.Conn) ([]byte, error) { payload, release, err := readBulkDedicatedRecordPooled(conn) if err != nil { return nil, err } if release != nil { defer release() } return append([]byte(nil), payload...), nil } func readBulkDedicatedRecordPooled(conn net.Conn) ([]byte, func(), error) { if conn == nil { return nil, nil, net.ErrClosed } var header [bulkDedicatedRecordHeaderLen]byte if _, err := io.ReadFull(conn, header[:]); err != nil { return nil, nil, err } if string(header[:4]) != bulkDedicatedRecordMagic { return nil, nil, fmt.Errorf("%w: record magic=%x", errBulkFastPayloadInvalid, header[:4]) } size := int(binary.BigEndian.Uint32(header[4:8])) if size < 0 { return nil, nil, errBulkFastPayloadInvalid } payload := getModernPSKPayloadBuffer(size) if _, err := io.ReadFull(conn, payload); err != nil { putModernPSKPayloadBuffer(payload) return nil, nil, err } return payload, func() { putModernPSKPayloadBuffer(payload) }, nil } func (c *ClientCommon) dialDedicatedBulkConn(ctx context.Context, timeout time.Duration) (net.Conn, error) { source := c.clientConnectSourceSnapshot() if source != nil { if !source.supportsAdditionalConn() { return nil, errBulkDedicatedSingleConn } if source.network != "" && source.addr != "" { if timeout > 0 { return transport.DialTimeout(source.network, source.addr, timeout) } return transport.Dial(source.network, source.addr) } if source.canReconnect() { return source.dial(ctx) } return nil, errClientReconnectSourceUnavailable } conn := c.clientTransportConnSnapshot() if conn == nil || conn.RemoteAddr() == nil { return nil, errClientReconnectSourceUnavailable } if timeout > 0 { return transport.DialTimeout(conn.RemoteAddr().Network(), conn.RemoteAddr().String(), timeout) } return transport.Dial(conn.RemoteAddr().Network(), conn.RemoteAddr().String()) } func (c *ClientCommon) attachDedicatedBulkSidecar(ctx context.Context, bulk *bulkHandle) error { if c == nil || bulk == nil || !bulk.Dedicated() || bulk.dedicatedAttachedSnapshot() { return nil } if ctx == nil { ctx = context.Background() } laneID := bulk.dedicatedLaneIDSnapshot() releaseActiveSlot, err := c.acquireBulkDedicatedActiveSlot(ctx) if err != nil { return err } needReleaseActive := true defer func() { if needReleaseActive { releaseActiveSlot() } }() if sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID); sidecar != nil && sidecar.conn != nil { if err := bulk.attachDedicatedConnShared(sidecar.conn); err == nil { bulk.markDedicatedActiveReserved() needReleaseActive = false return nil } } _, flight, leader := c.beginClientDedicatedSidecarAttach(laneID) if !leader { if flight == nil { return errTransportDetached } if err := flight.wait(ctx); err != nil { return err } sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID) if sidecar == nil || sidecar.conn == nil { return errTransportDetached } if err := bulk.attachDedicatedConnShared(sidecar.conn); err != nil { return err } bulk.markDedicatedActiveReserved() needReleaseActive = false return nil } if flight == nil { return errTransportDetached } var flightErr error defer func() { c.finishClientDedicatedSidecarAttach(laneID, flight, flightErr) }() releaseAttachSlot, err := c.acquireBulkDedicatedAttachSlot(ctx) if err != nil { flightErr = err return err } defer releaseAttachSlot() if sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID); sidecar != nil && sidecar.conn != nil { if err := bulk.attachDedicatedConnShared(sidecar.conn); err == nil { bulk.markDedicatedActiveReserved() needReleaseActive = false flightErr = nil return nil } } retry, backoff, dialTimeout, helloTimeout := c.bulkDedicatedAttachConfigSnapshot() attempts := retry + 1 if attempts <= 0 { attempts = 1 } var lastErr error for attempt := 1; attempt <= attempts; attempt++ { c.bulkAttachAttemptCount.Add(1) if attempt > 1 { delay := backoff * time.Duration(1<<(attempt-2)) if delay > 3*time.Second { delay = 3 * time.Second } if err := waitDedicatedAttachBackoff(ctx, delay); err != nil { flightErr = err return err } } dialCtx := ctx dialCancel := func() {} if dialTimeout > 0 { dialCtx, dialCancel = context.WithTimeout(ctx, dialTimeout) } conn, err := c.dialDedicatedBulkConn(dialCtx, dialTimeout) dialCancel() if err != nil { lastErr = err if attempt < attempts && isRetryableDedicatedAttachError(err) { flightErr = err c.bulkAttachRetryCount.Add(1) continue } bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed)) flightErr = err return err } helloCtx := ctx helloCancel := func() {} if helloTimeout > 0 { helloCtx, helloCancel = context.WithTimeout(ctx, helloTimeout) } resp, err := c.sendDedicatedBulkAttachRequest(helloCtx, conn, bulk) helloCancel() if err != nil { _ = conn.Close() lastErr = err if attempt < attempts && isRetryableDedicatedAttachError(err) { flightErr = err c.bulkAttachRetryCount.Add(1) continue } bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed)) flightErr = err return err } if !resp.Accepted { _ = conn.Close() rejectedErr := &bulkAttachError{ Code: bulkAttachErrorCode(resp.Code), Retryable: resp.Retryable, Message: resp.Error, FailedSeq: resp.FailedSeq, FailedBulk: resp.FailedBulk, } if rejectedErr.Code == "" { rejectedErr.Code = bulkAttachErrorCodeAttachFailed } lastErr = rejectedErr bulk.setDedicatedAttachLastCode(string(rejectedErr.Code)) if attempt < attempts && rejectedErr.Retryable { flightErr = rejectedErr c.bulkAttachRetryCount.Add(1) continue } bulk.markDedicatedAttachDegraded(string(rejectedErr.Code)) flightErr = rejectedErr return rejectedErr } sidecar := newBulkDedicatedSidecar(conn, laneID) activeSidecar, installed := c.installClientDedicatedSidecar(laneID, sidecar) if !installed { sidecar.close() sidecar = activeSidecar } if sidecar == nil || sidecar.conn == nil { bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) flightErr = errTransportDetached return errTransportDetached } if err := bulk.attachDedicatedConnShared(sidecar.conn); err != nil { if installed && c.clearClientDedicatedSidecar(laneID, sidecar) { sidecar.close() } bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) if installed { flightErr = err } else { flightErr = nil } return err } c.bulkAttachSuccessCount.Add(1) if installed { go c.readDedicatedSidecarLoop(sidecar) } bulk.markDedicatedActiveReserved() needReleaseActive = false flightErr = nil return nil } if lastErr != nil { flightErr = lastErr return lastErr } flightErr = errors.New("bulk attach failed") return flightErr } func (c *ClientCommon) bulkDedicatedAttachConfigSnapshot() (int, time.Duration, time.Duration, time.Duration) { if c == nil { return defaultBulkDedicatedAttachRetry, defaultBulkDedicatedAttachBackoff, defaultBulkDedicatedDialTimeout, defaultBulkDedicatedHelloTimeout } c.mu.Lock() defer c.mu.Unlock() retry := c.bulkDedicatedAttachRetry if retry < 0 { retry = 0 } backoff := c.bulkDedicatedAttachBackoff if backoff <= 0 { backoff = defaultBulkDedicatedAttachBackoff } dialTimeout := c.bulkDedicatedDialTimeout if dialTimeout <= 0 { dialTimeout = defaultBulkDedicatedDialTimeout } helloTimeout := c.bulkDedicatedHelloTimeout if helloTimeout <= 0 { helloTimeout = defaultBulkDedicatedHelloTimeout } return retry, backoff, dialTimeout, helloTimeout } func (c *ClientCommon) acquireBulkDedicatedAttachSlot(ctx context.Context) (func(), error) { sem := c.bulkDedicatedAttachSemaphoreSnapshot() if c == nil || sem == nil { return func() {}, nil } if ctx == nil { ctx = context.Background() } select { case sem <- struct{}{}: return func() { select { case <-sem: default: } }, nil case <-ctx.Done(): return nil, ctx.Err() } } func (c *ClientCommon) reserveBulkDedicatedActiveSlot() bool { if c == nil { return false } limit := c.bulkDedicatedActiveLimitSnapshot() if limit <= 0 { return true } for { current := c.bulkDedicatedActive.Load() if int(current) >= limit { return false } if c.bulkDedicatedActive.CompareAndSwap(current, current+1) { return true } } } func (c *ClientCommon) acquireBulkDedicatedActiveSlot(ctx context.Context) (func(), error) { if c == nil { return func() {}, errBulkClientNil } if ctx == nil { ctx = context.Background() } for { if c.reserveBulkDedicatedActiveSlot() { return c.releaseBulkDedicatedActiveSlot, nil } waitCh := c.bulkDedicatedActiveWaitSnapshot() if c.reserveBulkDedicatedActiveSlot() { return c.releaseBulkDedicatedActiveSlot, nil } select { case <-ctx.Done(): return nil, ctx.Err() case <-waitCh: } } } func (c *ClientCommon) releaseBulkDedicatedActiveSlot() { if c == nil { return } for { current := c.bulkDedicatedActive.Load() if current <= 0 { return } if c.bulkDedicatedActive.CompareAndSwap(current, current-1) { c.notifyBulkDedicatedActiveWaiters() return } } } func (c *ClientCommon) notifyBulkDedicatedActiveWaiters() { if c == nil { return } c.mu.Lock() c.notifyBulkDedicatedActiveWaitersLocked() c.mu.Unlock() } func waitDedicatedAttachBackoff(ctx context.Context, delay time.Duration) error { if delay <= 0 { return nil } timer := time.NewTimer(delay) defer timer.Stop() select { case <-ctx.Done(): return ctx.Err() case <-timer.C: return nil } } func isRetryableDedicatedAttachError(err error) bool { if err == nil { return false } var attachErr *bulkAttachError if errors.As(err, &attachErr) && attachErr != nil { return attachErr.Retryable } if errors.Is(err, context.Canceled) { return false } if errors.Is(err, context.DeadlineExceeded) { return true } var netErr net.Error if errors.As(err, &netErr) && (netErr.Timeout() || netErr.Temporary()) { return true } message := strings.ToLower(err.Error()) for _, pattern := range []string{ "timeout", "timed out", "deadline", "connection reset", "connection refused", "connectex", "broken pipe", "no route", "host unreachable", "transport detached", } { if strings.Contains(message, pattern) { return true } } return false } func (c *ClientCommon) sendDedicatedBulkAttachRequest(ctx context.Context, conn net.Conn, bulk *bulkHandle) (bulkAttachResponse, error) { if c == nil { return bulkAttachResponse{}, errBulkClientNil } if bulk == nil { return bulkAttachResponse{}, errBulkIDEmpty } defer func() { _ = conn.SetReadDeadline(time.Time{}) }() reqPayload, err := c.sequenceEn(bulkAttachRequest{ PeerID: c.ensureClientPeerIdentity(), BulkID: bulk.ID(), AttachToken: bulk.dedicatedAttachTokenSnapshot(), }) if err != nil { return bulkAttachResponse{}, err } msg := TransferMsg{ ID: atomic.AddUint64(&c.msgID, 1), Key: systemBulkAttachKey, Value: reqPayload, Type: MSG_SYS_WAIT, } attachProfile := c.clientDedicatedBulkAttachTransportProtectionProfile() frame, err := encodeDirectSignalFrame(stario.NewQueue(), c.sequenceEn, attachProfile.msgEn, attachProfile.secretKey, msg) if err != nil { return bulkAttachResponse{}, err } if err := writeFullToConn(conn, frame); err != nil { return bulkAttachResponse{}, err } if deadline, ok := ctx.Deadline(); ok { _ = conn.SetReadDeadline(deadline) } replyPayload, err := readDirectSignalFramePayload(conn) if err != nil { return bulkAttachResponse{}, err } transfer, err := decodeDirectSignalPayload(c.sequenceDe, attachProfile.msgDe, attachProfile.secretKey, replyPayload) if err != nil { return bulkAttachResponse{}, err } if transfer.Key != systemBulkAttachKey || transfer.Type != MSG_SYS_REPLY || transfer.ID != msg.ID { return bulkAttachResponse{}, errors.New("invalid bulk attach reply") } return decodeBulkAttachResponse(c.sequenceDe, transfer.Value) } func (c *ClientCommon) clientDedicatedBulkAttachTransportProtectionProfile() transportProtectionProfile { if c == nil { return transportProtectionProfile{} } if c.securityConfigured && c.securityBootstrap.msgEn != nil && c.securityBootstrap.msgDe != nil { return c.securityBootstrap.clone() } return c.clientTransportProtectionSnapshot() } func (c *ClientCommon) readDedicatedSidecarLoop(sidecar *bulkDedicatedSidecar) { if c == nil || sidecar == nil || sidecar.conn == nil { return } for { payload, payloadRelease, err := readBulkDedicatedRecordPooled(sidecar.conn) if err != nil { c.handleClientDedicatedSidecarFailure(sidecar, err) return } profile := c.clientTransportProtectionSnapshot() plain, plainRelease, err := decryptTransportPayloadCodecPooled(profile.mode, profile.runtime, profile.msgDe, profile.secretKey, payload, payloadRelease) if err != nil { c.handleClientDedicatedSidecarFailure(sidecar, err) return } owner := newBulkReadPayloadOwner(plainRelease) runtime := c.getBulkRuntime() if runtime == nil { owner.done() continue } var ( currentDataID uint64 currentBulk *bulkHandle skipDataID bool ) err = walkDedicatedBulkInboundPayload(plain, func(dataID uint64, item bulkDedicatedBatchItem) error { if dataID != currentDataID { currentDataID = dataID currentBulk = nil skipDataID = false bulk, ok := runtime.lookupByDataID(clientFileScope(), dataID) if !ok { c.bestEffortRejectInboundBulkData("", dataID, errBulkNotFound.Error()) skipDataID = true return nil } if !bulk.acceptsClientSessionEpoch(c.currentClientSessionEpoch()) { detachErr := transportDetachedSessionEpochError() bulk.markReset(detachErr) c.bestEffortRejectInboundBulkData(bulk.ID(), dataID, detachErr.Error()) skipDataID = true return nil } bulk.markDedicatedDataStarted() currentBulk = bulk } if skipDataID || currentBulk == nil { return nil } var release func() if item.Type == bulkFastPayloadTypeData { release = owner.retainChunk() } dispatchErr := dispatchDedicatedBulkInboundItemWithRelease(currentBulk, item, release) if dispatchErr != nil { if !errors.Is(dispatchErr, io.EOF) { _ = c.sendDedicatedBulkReset(context.Background(), currentBulk, dispatchErr.Error()) currentBulk.markReset(dispatchErr) } currentBulk = nil skipDataID = true return nil } if currentBulk.Context().Err() != nil { currentBulk = nil skipDataID = true } return nil }) if err != nil { if plainRelease != nil { plainRelease() } c.handleClientDedicatedSidecarFailure(sidecar, err) return } owner.done() } } func (s *ServerCommon) handleBulkAttachSystemMessage(message Message) bool { if message.Key != systemBulkAttachKey { return false } current := messageLogicalConnSnapshot(&message) var ( req bulkAttachRequest logical *LogicalConn bulk *bulkHandle err error ) req, err = decodeBulkAttachRequest(s.sequenceDe, message.Value) if err == nil { logical, bulk, err = s.resolveInboundDedicatedBulk(current, req) } if err != nil { if current != nil { _ = s.replyDedicatedBulkAttach(current, message, toBulkAttachResponseError(err, req.BulkID)) } return true } if current != nil { if attachErr := s.finishInboundDedicatedBulkAttach(current, logical, bulk, message); attachErr != nil { bulk.markReset(attachErr) } } return true } func (s *ServerCommon) resolveInboundDedicatedBulk(current *LogicalConn, req bulkAttachRequest) (*LogicalConn, *bulkHandle, error) { if s == nil { return nil, nil, newBulkAttachError(bulkAttachErrorCodeServerUnavailable, true, errBulkServerNil.Error()) } if current == nil { return nil, nil, newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkLogicalConnNil.Error()) } if req.PeerID == "" || req.BulkID == "" || req.AttachToken == "" { return nil, nil, newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkIDEmpty.Error()) } logical := s.GetLogicalConn(req.PeerID) if logical == nil { return nil, nil, newBulkAttachError(bulkAttachErrorCodePeerNotFound, true, errBulkLogicalConnNil.Error()) } runtime := s.getBulkRuntime() if runtime == nil { return nil, nil, newBulkAttachError(bulkAttachErrorCodeServerUnavailable, true, errBulkRuntimeNil.Error()) } bulk, ok := runtime.lookup(serverFileScope(logical), req.BulkID) if !ok { return nil, nil, &bulkAttachError{ Code: bulkAttachErrorCodeBulkNotFound, Retryable: false, Message: errBulkNotFound.Error(), FailedBulk: req.BulkID, } } bulk.markDedicatedAttachAttempt() if !bulk.Dedicated() { bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeBulkNotDedicated)) return nil, nil, &bulkAttachError{ Code: bulkAttachErrorCodeBulkNotDedicated, Retryable: false, Message: "bulk is not dedicated", FailedBulk: req.BulkID, } } if bulk.dedicatedAttachTokenSnapshot() != req.AttachToken { bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeTokenMismatch)) return nil, nil, &bulkAttachError{ Code: bulkAttachErrorCodeTokenMismatch, Retryable: false, Message: "bulk attach token mismatch", FailedBulk: req.BulkID, } } switch bulk.dedicatedAttachStateSnapshot() { case bulkDedicatedAttachStateClosed: bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) return nil, nil, &bulkAttachError{ Code: bulkAttachErrorCodeAttachFailed, Retryable: false, Message: "bulk dedicated attach closed", FailedBulk: req.BulkID, } case bulkDedicatedAttachStateDegraded: bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) return nil, nil, &bulkAttachError{ Code: bulkAttachErrorCodeAttachFailed, Retryable: true, Message: "bulk dedicated attach degraded", FailedBulk: req.BulkID, } case bulkDedicatedAttachStateAttached: if bulk.dedicatedDataStartedSnapshot() { bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAlreadyAttached)) return nil, nil, &bulkAttachError{ Code: bulkAttachErrorCodeAlreadyAttached, Retryable: false, Message: "bulk dedicated already attached", FailedBulk: req.BulkID, } } } return logical, bulk, nil } func (s *ServerCommon) finishInboundDedicatedBulkAttach(current *LogicalConn, logical *LogicalConn, bulk *bulkHandle, message Message) error { if current == nil || logical == nil || bulk == nil { return newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkLogicalConnNil.Error()) } scope := serverFileScope(logical) laneID := bulk.dedicatedLaneIDSnapshot() conn, err := current.detachTransportForTransfer() if err != nil { return newBulkAttachError(bulkAttachErrorCodeAttachFailed, true, err.Error()) } stopCurrent := func(reason string, err error) { current.markSessionStopped(reason, err) s.removeLogical(current) } fail := func(reason string, err error) error { if conn != nil { _ = conn.Close() } bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed)) stopCurrent(reason, err) return newBulkAttachError(bulkAttachErrorCodeAttachFailed, true, err.Error()) } if bulk.dedicatedAttachedSnapshot() { if bulk.dedicatedDataStartedSnapshot() { rejected := &bulkAttachError{ Code: bulkAttachErrorCodeAlreadyAttached, Retryable: false, Message: "bulk dedicated already attached", FailedBulk: bulk.ID(), } _ = s.replyDedicatedBulkAttachDetached(current, conn, message, toBulkAttachResponseError(rejected, bulk.ID())) _ = conn.Close() stopCurrent("bulk dedicated attach duplicate rejected", nil) bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAlreadyAttached)) return nil } } sidecar := newBulkDedicatedSidecar(conn, laneID) if sidecar == nil { bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) return fail("bulk dedicated attach failed", errTransportDetached) } var ( oldConn net.Conn oldSender *bulkDedicatedSender ) if bulk.dedicatedAttachedSnapshot() { var replaceErr error oldConn, oldSender, replaceErr = bulk.replaceDedicatedConnShared(conn) if replaceErr != nil { sidecar.close() bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) return fail("bulk dedicated reattach failed", replaceErr) } } else if err := bulk.attachDedicatedConnShared(conn); err != nil { sidecar.close() bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) return fail("bulk dedicated attach failed", err) } if err := s.replyDedicatedBulkAttachDetached(current, conn, message, bulkAttachResponse{Accepted: true}); err != nil { bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed)) sidecar.close() if runtime := s.getBulkRuntime(); runtime != nil { runtime.resetDedicatedByConn(scope, conn, transportDetachedError("bulk dedicated attach reply failed", err)) } stopCurrent("bulk dedicated attach reply failed", err) return nil } oldSidecar := s.installServerDedicatedSidecar(logical, laneID, sidecar) if runtime := s.getBulkRuntime(); runtime != nil { runtime.attachSharedDedicatedConn(scope, laneID, conn) } if oldSender != nil { oldSender.stop() } if oldConn != nil { _ = oldConn.Close() } if oldSidecar != nil && oldSidecar != sidecar { oldSidecar.close() } go s.readDedicatedSidecarLoop(logical, sidecar) s.startServerBulkAcceptDispatch(bulk, logical, messageTransportConnSnapshot(&message)) if runtime := s.getBulkRuntime(); runtime != nil { s.dispatchPendingServerBulkAccepts(scope, conn, bulk, logical) } stopCurrent("bulk dedicated attach", nil) return nil } func (s *ServerCommon) dispatchPendingServerBulkAccepts(scope string, conn net.Conn, current *bulkHandle, logical *LogicalConn) { if s == nil || conn == nil { return } runtime := s.getBulkRuntime() if runtime == nil { return } for _, pending := range runtime.dedicatedBulksForConn(scope, conn) { if pending == nil || pending == current { continue } s.startServerBulkAcceptDispatch(pending, logical, pending.TransportConn()) } } func (s *ServerCommon) replyDedicatedBulkAttachDetached(client *LogicalConn, conn net.Conn, message Message, resp bulkAttachResponse) error { if s == nil || client == nil { return errBulkServerNil } if conn == nil { return net.ErrClosed } msgEn := client.msgEnSnapshot() if msgEn == nil { return errTransportPayloadEncryptFailed } encoded, err := s.sequenceEn(resp) if err != nil { return err } reply := TransferMsg{ ID: message.ID, Key: systemBulkAttachKey, Value: encoded, Type: MSG_SYS_REPLY, } frame, err := encodeDirectSignalFrame(stario.NewQueue(), s.sequenceEn, msgEn, client.secretKeySnapshot(), reply) if err != nil { return err } return withRawConnWriteLockDeadline(conn, writeDeadlineFromTimeout(client.maxWriteTimeoutSnapshot()), func(conn net.Conn) error { return writeFullToConnUnlocked(conn, frame) }) } func (s *ServerCommon) replyDedicatedBulkAttach(client *LogicalConn, message Message, resp bulkAttachResponse) error { if s == nil || client == nil { return errBulkServerNil } encoded, err := s.sequenceEn(resp) if err != nil { return err } reply := TransferMsg{ ID: message.ID, Key: systemBulkAttachKey, Value: encoded, Type: MSG_SYS_REPLY, } if message.inboundConn != nil { return s.sendTransferInbound(client, messageTransportConnSnapshot(&message), message.inboundConn, messageInboundTransportProtectionSnapshot(&message), reply) } _, err = s.sendLogical(client, reply) return err } func (s *ServerCommon) readDedicatedSidecarLoop(logical *LogicalConn, sidecar *bulkDedicatedSidecar) { if s == nil || logical == nil || sidecar == nil || sidecar.conn == nil { return } runtime := s.getBulkRuntime() scope := serverFileScope(logical) for { payload, payloadRelease, err := readBulkDedicatedRecordPooled(sidecar.conn) if err != nil { s.handleServerDedicatedSidecarFailure(logical, sidecar, err) return } plain, plainRelease, err := decryptTransportPayloadCodecPooled(logical.protectionModeSnapshot(), logical.modernPSKRuntimeSnapshot(), logical.msgDeSnapshot(), logical.secretKeySnapshot(), payload, payloadRelease) if err != nil { s.handleServerDedicatedSidecarFailure(logical, sidecar, err) return } owner := newBulkReadPayloadOwner(plainRelease) if runtime == nil { owner.done() continue } var ( currentDataID uint64 currentBulk *bulkHandle skipDataID bool ) err = walkDedicatedBulkInboundPayload(plain, func(dataID uint64, item bulkDedicatedBatchItem) error { if dataID != currentDataID { currentDataID = dataID currentBulk = nil skipDataID = false bulk, ok := runtime.lookupByDataID(scope, dataID) if !ok { s.bestEffortRejectInboundDedicatedData(logical, sidecar.conn, dataID, errBulkNotFound.Error()) skipDataID = true return nil } bulk.markDedicatedDataStarted() currentBulk = bulk } if skipDataID || currentBulk == nil { return nil } var release func() if item.Type == bulkFastPayloadTypeData { release = owner.retainChunk() } dispatchErr := dispatchDedicatedBulkInboundItemWithRelease(currentBulk, item, release) if dispatchErr != nil { if !errors.Is(dispatchErr, io.EOF) { _ = s.sendDedicatedBulkReset(context.Background(), logical, currentBulk, dispatchErr.Error()) currentBulk.markReset(dispatchErr) } currentBulk = nil skipDataID = true return nil } if currentBulk.Context().Err() != nil { currentBulk = nil skipDataID = true } return nil }) if err != nil { if plainRelease != nil { plainRelease() } s.handleServerDedicatedSidecarFailure(logical, sidecar, err) return } owner.done() } } func (s *ServerCommon) bestEffortRejectInboundDedicatedData(logical *LogicalConn, conn net.Conn, dataID uint64, message string) { if s == nil || logical == nil || conn == nil || dataID == 0 { return } frame, err := s.encodeDedicatedBulkBatchPayload(logical, dataID, []bulkDedicatedSendRequest{{ Type: bulkFastPayloadTypeReset, Payload: []byte(message), }}) if err != nil { return } _ = writeBulkDedicatedRecordWithDeadline(conn, frame, writeDeadlineFromTimeout(logical.maxWriteTimeoutSnapshot())) } func handleDedicatedBulkReadError(bulk *bulkHandle, err error) { if bulk == nil { return } if bulk.Context().Err() != nil || bulk.remoteClosedSnapshot() { return } message := strings.ToLower(err.Error()) if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || strings.Contains(message, "use of closed network connection") { if bulk.Dedicated() || bulk.localClosedSnapshot() { bulk.markRemoteClosed() return } } bulk.markReset(transportDetachedError("dedicated bulk read error", err)) } func (c *ClientCommon) dedicatedBulkSender(bulk *bulkHandle) (*bulkDedicatedSender, error) { if c == nil || bulk == nil { return nil, errBulkClientNil } if sender := bulk.dedicatedSenderSnapshot(); sender != nil { return sender, nil } conn := bulk.dedicatedConnSnapshot() if conn == nil { return nil, transportDetachedError("dedicated bulk sidecar not attached", nil) } sender := newBulkDedicatedSender(conn, bulk.dataIDSnapshot(), c.encryptTransportPayload, func(items []bulkDedicatedSendRequest) ([]byte, error) { return c.encodeDedicatedBulkBatchPayload(bulk.dataIDSnapshot(), items) }, func(err error) { if bulk.canIgnoreDedicatedCloseSendError(err) { return } bulk.markReset(err) }) actual := bulk.installDedicatedSender(sender) if actual != sender { sender.stop() } return actual, nil } func (c *ClientCommon) dedicatedBulkLaneSender(bulk *bulkHandle) (*bulkDedicatedLaneSender, error) { if c == nil || bulk == nil { return nil, errBulkClientNil } sidecar := c.clientDedicatedSidecarSnapshotForLane(bulk.dedicatedLaneIDSnapshot()) conn := bulk.dedicatedConnSnapshot() if sidecar == nil || sidecar.conn == nil || conn == nil || sidecar.conn != conn { return nil, transportDetachedError("dedicated bulk sidecar not attached", nil) } sender := sidecar.laneSenderWithFactory(func(conn net.Conn) *bulkDedicatedLaneSender { profile := c.clientTransportProtectionSnapshot() laneRuntime := profile.runtime if forked, err := forkDedicatedLaneModernPSKRuntime(laneRuntime); err == nil && forked != nil { laneRuntime = forked } return newBulkDedicatedLaneSender(conn, func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) { return c.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(laneRuntime, batches) }, func(err error) { c.handleClientDedicatedSidecarFailure(sidecar, err) }) }) if sender == nil { return nil, transportDetachedError("dedicated bulk sidecar not attached", nil) } return sender, nil } func (c *ClientCommon) sendDedicatedBulkData(ctx context.Context, bulk *bulkHandle, chunk []byte) error { if c == nil || bulk == nil { return errBulkClientNil } sender, err := c.dedicatedBulkLaneSender(bulk) if err != nil { return err } return sender.submitData(ctx, bulk.dataIDSnapshot(), bulk.nextOutboundDataSeq(), chunk) } func (c *ClientCommon) sendDedicatedBulkWrite(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) { if c == nil || bulk == nil { return 0, errBulkClientNil } sender, err := c.dedicatedBulkLaneSender(bulk) if err != nil { return 0, err } return sender.submitWrite(ctx, bulk.dataIDSnapshot(), startSeq, payload, bulk.chunkSize, payloadOwned) } func (c *ClientCommon) sendDedicatedBulkClose(ctx context.Context, bulk *bulkHandle, full bool) error { if c == nil || bulk == nil { return errBulkClientNil } sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout) if err != nil { return err } defer cancel() flags := uint8(0) if full { flags = bulkFastPayloadFlagFullClose } sender, err := c.dedicatedBulkLaneSender(bulk) if err != nil { return err } return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeClose, flags, 0, nil) } func (c *ClientCommon) sendDedicatedBulkReset(ctx context.Context, bulk *bulkHandle, message string) error { if c == nil || bulk == nil { return errBulkClientNil } sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout) if err != nil { return err } defer cancel() sender, err := c.dedicatedBulkLaneSender(bulk) if err != nil { return err } return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeReset, 0, 0, []byte(message)) } func (c *ClientCommon) sendDedicatedBulkRelease(ctx context.Context, bulk *bulkHandle, bytes int64, chunks int) error { if c == nil || bulk == nil { return errBulkClientNil } payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks) if err != nil { return err } if err := bulk.waitDedicatedReady(ctx); err != nil { return err } sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout) if err != nil { return err } defer cancel() sender, err := c.dedicatedBulkLaneSender(bulk) if err != nil { return err } return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeRelease, 0, 0, payload) } func (s *ServerCommon) dedicatedBulkSender(logical *LogicalConn, bulk *bulkHandle) (*bulkDedicatedSender, error) { if s == nil || bulk == nil { return nil, errBulkServerNil } if logical == nil { logical = bulk.LogicalConn() } if logical == nil { return nil, errBulkLogicalConnNil } if sender := bulk.dedicatedSenderSnapshot(); sender != nil { return sender, nil } conn := bulk.dedicatedConnSnapshot() if conn == nil { return nil, transportDetachedError("dedicated bulk sidecar not attached", nil) } sender := newBulkDedicatedSender(conn, bulk.dataIDSnapshot(), func(plain []byte) ([]byte, error) { return s.encryptTransportPayloadLogical(logical, plain) }, func(items []bulkDedicatedSendRequest) ([]byte, error) { return s.encodeDedicatedBulkBatchPayload(logical, bulk.dataIDSnapshot(), items) }, func(err error) { if bulk.canIgnoreDedicatedCloseSendError(err) { return } bulk.markReset(err) }) actual := bulk.installDedicatedSender(sender) if actual != sender { sender.stop() } return actual, nil } func (s *ServerCommon) dedicatedBulkLaneSender(logical *LogicalConn, bulk *bulkHandle) (*bulkDedicatedLaneSender, error) { if s == nil || bulk == nil { return nil, errBulkServerNil } if logical == nil { logical = bulk.LogicalConn() } if logical == nil { return nil, errBulkLogicalConnNil } sidecar := s.serverDedicatedSidecarSnapshotForLane(logical, bulk.dedicatedLaneIDSnapshot()) conn := bulk.dedicatedConnSnapshot() if sidecar == nil || sidecar.conn == nil || conn == nil || sidecar.conn != conn { return nil, transportDetachedError("dedicated bulk sidecar not attached", nil) } sender := sidecar.laneSenderWithFactory(func(conn net.Conn) *bulkDedicatedLaneSender { laneRuntime := logical.modernPSKRuntimeSnapshot() if forked, err := forkDedicatedLaneModernPSKRuntime(laneRuntime); err == nil && forked != nil { laneRuntime = forked } return newBulkDedicatedLaneSender(conn, func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) { return s.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical, laneRuntime, batches) }, func(err error) { s.handleServerDedicatedSidecarFailure(logical, sidecar, err) }) }) if sender == nil { return nil, transportDetachedError("dedicated bulk sidecar not attached", nil) } return sender, nil } func (s *ServerCommon) sendDedicatedBulkData(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, chunk []byte) error { if s == nil || bulk == nil { return errBulkServerNil } sender, err := s.dedicatedBulkLaneSender(logical, bulk) if err != nil { return err } return sender.submitData(ctx, bulk.dataIDSnapshot(), bulk.nextOutboundDataSeq(), chunk) } func (s *ServerCommon) sendDedicatedBulkWrite(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) { if s == nil || bulk == nil { return 0, errBulkServerNil } sender, err := s.dedicatedBulkLaneSender(logical, bulk) if err != nil { return 0, err } return sender.submitWrite(ctx, bulk.dataIDSnapshot(), startSeq, payload, bulk.chunkSize, payloadOwned) } func (s *ServerCommon) sendDedicatedBulkClose(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, full bool) error { if s == nil || bulk == nil { return errBulkServerNil } sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout) if err != nil { return err } defer cancel() flags := uint8(0) if full { flags = bulkFastPayloadFlagFullClose } sender, err := s.dedicatedBulkLaneSender(logical, bulk) if err != nil { return err } return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeClose, flags, 0, nil) } func (s *ServerCommon) sendDedicatedBulkReset(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, message string) error { if s == nil || bulk == nil { return errBulkServerNil } sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout) if err != nil { return err } defer cancel() sender, err := s.dedicatedBulkLaneSender(logical, bulk) if err != nil { return err } return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeReset, 0, 0, []byte(message)) } func (s *ServerCommon) sendDedicatedBulkRelease(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, bytes int64, chunks int) error { if s == nil || bulk == nil { return errBulkServerNil } payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks) if err != nil { return err } if err := bulk.waitDedicatedReady(ctx); err != nil { return err } sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout) if err != nil { return err } defer cancel() sender, err := s.dedicatedBulkLaneSender(logical, bulk) if err != nil { return err } return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeRelease, 0, 0, payload) } func (c *ClientCommon) encodeDedicatedBulkBatchPayload(dataID uint64, items []bulkDedicatedSendRequest) ([]byte, error) { if c == nil { return nil, errBulkClientNil } profile := c.clientTransportProtectionSnapshot() if runtime := profile.runtime; runtime != nil { return runtime.sealFilledPayload(bulkDedicatedBatchPlainLen(items), func(dst []byte) error { return writeBulkDedicatedBatchPlain(dst, dataID, items) }) } if profile.fastPlainEncode != nil { return encodeBulkDedicatedBatchPayloadFast(profile.fastPlainEncode, profile.secretKey, dataID, items) } plain, err := encodeBulkDedicatedBatchPlain(dataID, items) if err != nil { return nil, err } return c.encryptTransportPayload(plain) } func forkDedicatedLaneModernPSKRuntime(base *modernPSKCodecRuntime) (*modernPSKCodecRuntime, error) { if base == nil { return nil, nil } return base.fork() } func (c *ClientCommon) encodeDedicatedBulkBatchesPayloadPooledWithRuntime(runtime *modernPSKCodecRuntime, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) { if c == nil { return nil, nil, errBulkClientNil } if len(batches) == 0 { return nil, nil, errBulkFastPayloadInvalid } if runtime != nil { return runtime.sealFilledPayloadPooled(bulkDedicatedBatchesPlainLen(batches), func(dst []byte) error { return writeBulkDedicatedBatchesPlain(dst, batches) }) } profile := c.clientTransportProtectionSnapshot() if profile.fastPlainEncode != nil { payload, err := encodeBulkDedicatedBatchesPayloadFast(profile.fastPlainEncode, profile.secretKey, batches) return payload, nil, err } plain, err := encodeBulkDedicatedBatchesPlain(batches) if err != nil { return nil, nil, err } payload, err := c.encryptTransportPayload(plain) return payload, nil, err } func (c *ClientCommon) encodeDedicatedBulkBatchesPayloadPooled(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) { return c.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(c.clientTransportProtectionSnapshot().runtime, batches) } func (c *ClientCommon) encodeDedicatedBulkBatchPayloadPooled(dataID uint64, items []bulkDedicatedSendRequest) ([]byte, func(), error) { return c.encodeDedicatedBulkBatchesPayloadPooled([]bulkDedicatedOutboundBatch{{ DataID: dataID, Items: items, }}) } func (s *ServerCommon) encodeDedicatedBulkBatchPayload(logical *LogicalConn, dataID uint64, items []bulkDedicatedSendRequest) ([]byte, error) { if s == nil { return nil, errBulkServerNil } if logical == nil { return nil, errBulkLogicalConnNil } if runtime := logical.modernPSKRuntimeSnapshot(); runtime != nil { return runtime.sealFilledPayload(bulkDedicatedBatchPlainLen(items), func(dst []byte) error { return writeBulkDedicatedBatchPlain(dst, dataID, items) }) } if fastPlainEncode := logical.fastPlainEncodeSnapshot(); fastPlainEncode != nil { return encodeBulkDedicatedBatchPayloadFast(fastPlainEncode, logical.secretKeySnapshot(), dataID, items) } plain, err := encodeBulkDedicatedBatchPlain(dataID, items) if err != nil { return nil, err } return s.encryptTransportPayloadLogical(logical, plain) } func (s *ServerCommon) encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical *LogicalConn, runtime *modernPSKCodecRuntime, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) { if s == nil { return nil, nil, errBulkServerNil } if logical == nil { return nil, nil, errBulkLogicalConnNil } if len(batches) == 0 { return nil, nil, errBulkFastPayloadInvalid } if runtime != nil { return runtime.sealFilledPayloadPooled(bulkDedicatedBatchesPlainLen(batches), func(dst []byte) error { return writeBulkDedicatedBatchesPlain(dst, batches) }) } if fastPlainEncode := logical.fastPlainEncodeSnapshot(); fastPlainEncode != nil { payload, err := encodeBulkDedicatedBatchesPayloadFast(fastPlainEncode, logical.secretKeySnapshot(), batches) return payload, nil, err } plain, err := encodeBulkDedicatedBatchesPlain(batches) if err != nil { return nil, nil, err } payload, err := s.encryptTransportPayloadLogical(logical, plain) return payload, nil, err } func (s *ServerCommon) encodeDedicatedBulkBatchesPayloadPooled(logical *LogicalConn, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) { return s.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical, logical.modernPSKRuntimeSnapshot(), batches) } func (s *ServerCommon) encodeDedicatedBulkBatchPayloadPooled(logical *LogicalConn, dataID uint64, items []bulkDedicatedSendRequest) ([]byte, func(), error) { return s.encodeDedicatedBulkBatchesPayloadPooled(logical, []bulkDedicatedOutboundBatch{{ DataID: dataID, Items: items, }}) }