2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"b612.me/notify/internal/transport"
|
|
|
|
|
"b612.me/stario"
|
|
|
|
|
"context"
|
|
|
|
|
cryptorand "crypto/rand"
|
|
|
|
|
"encoding/binary"
|
|
|
|
|
"encoding/hex"
|
|
|
|
|
"errors"
|
2026-04-16 17:27:48 +08:00
|
|
|
"fmt"
|
2026-04-15 15:24:36 +08:00
|
|
|
"io"
|
|
|
|
|
"net"
|
2026-04-18 16:05:57 +08:00
|
|
|
"strings"
|
2026-04-15 15:24:36 +08:00
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
systemBulkAttachKey = "_notify_bulk_attach"
|
|
|
|
|
bulkDedicatedRecordMagic = "NBR1"
|
|
|
|
|
bulkDedicatedRecordHeaderLen = 8
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
defaultBulkDedicatedAttachLimit = 16
|
|
|
|
|
defaultBulkDedicatedActiveLimit = 4096
|
|
|
|
|
defaultBulkDedicatedLaneLimit = 4
|
|
|
|
|
defaultBulkDedicatedAttachRetry = 2
|
|
|
|
|
defaultBulkDedicatedAttachBackoff = 150 * time.Millisecond
|
|
|
|
|
defaultBulkDedicatedDialTimeout = 5 * time.Second
|
|
|
|
|
defaultBulkDedicatedHelloTimeout = 10 * time.Second
|
|
|
|
|
)
|
2026-04-16 17:27:48 +08:00
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
type bulkAttachRequest struct {
|
|
|
|
|
PeerID string
|
|
|
|
|
BulkID string
|
|
|
|
|
AttachToken string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type bulkAttachResponse struct {
|
2026-04-18 16:05:57 +08:00
|
|
|
Accepted bool
|
|
|
|
|
Error string
|
|
|
|
|
Code string
|
|
|
|
|
Retryable bool
|
|
|
|
|
FailedSeq uint64
|
|
|
|
|
FailedBulk string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type bulkAttachErrorCode string
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
bulkAttachErrorCodeInvalidRequest bulkAttachErrorCode = "invalid_request"
|
|
|
|
|
bulkAttachErrorCodeServerUnavailable bulkAttachErrorCode = "server_unavailable"
|
|
|
|
|
bulkAttachErrorCodePeerNotFound bulkAttachErrorCode = "peer_not_found"
|
|
|
|
|
bulkAttachErrorCodeBulkNotFound bulkAttachErrorCode = "bulk_not_found"
|
|
|
|
|
bulkAttachErrorCodeBulkNotDedicated bulkAttachErrorCode = "bulk_not_dedicated"
|
|
|
|
|
bulkAttachErrorCodeTokenMismatch bulkAttachErrorCode = "token_mismatch"
|
|
|
|
|
bulkAttachErrorCodeAlreadyAttached bulkAttachErrorCode = "already_attached"
|
|
|
|
|
bulkAttachErrorCodeAttachFailed bulkAttachErrorCode = "attach_failed"
|
|
|
|
|
bulkAttachErrorCodeInternal bulkAttachErrorCode = "internal_error"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type bulkAttachError struct {
|
|
|
|
|
Code bulkAttachErrorCode
|
|
|
|
|
Retryable bool
|
|
|
|
|
Message string
|
|
|
|
|
FailedSeq uint64
|
|
|
|
|
FailedBulk string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *bulkAttachError) Error() string {
|
|
|
|
|
if e == nil {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
if e.Message != "" {
|
|
|
|
|
return e.Message
|
|
|
|
|
}
|
|
|
|
|
if e.Code != "" {
|
|
|
|
|
return string(e.Code)
|
|
|
|
|
}
|
|
|
|
|
return "bulk attach failed"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newBulkAttachError(code bulkAttachErrorCode, retryable bool, message string) *bulkAttachError {
|
|
|
|
|
return &bulkAttachError{
|
|
|
|
|
Code: code,
|
|
|
|
|
Retryable: retryable,
|
|
|
|
|
Message: message,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func toBulkAttachResponseError(err error, bulkID string) bulkAttachResponse {
|
|
|
|
|
resp := bulkAttachResponse{
|
|
|
|
|
Accepted: false,
|
|
|
|
|
FailedBulk: bulkID,
|
|
|
|
|
}
|
|
|
|
|
if err == nil {
|
|
|
|
|
resp.Error = "bulk attach failed"
|
|
|
|
|
resp.Code = string(bulkAttachErrorCodeInternal)
|
|
|
|
|
return resp
|
|
|
|
|
}
|
|
|
|
|
var attachErr *bulkAttachError
|
|
|
|
|
if errors.As(err, &attachErr) && attachErr != nil {
|
|
|
|
|
resp.Error = attachErr.Error()
|
|
|
|
|
resp.Code = string(attachErr.Code)
|
|
|
|
|
resp.Retryable = attachErr.Retryable
|
|
|
|
|
resp.FailedSeq = attachErr.FailedSeq
|
|
|
|
|
if attachErr.FailedBulk != "" {
|
|
|
|
|
resp.FailedBulk = attachErr.FailedBulk
|
|
|
|
|
}
|
|
|
|
|
if resp.Code == "" {
|
|
|
|
|
resp.Code = string(bulkAttachErrorCodeInternal)
|
|
|
|
|
}
|
|
|
|
|
return resp
|
|
|
|
|
}
|
|
|
|
|
resp.Error = err.Error()
|
|
|
|
|
resp.Code = string(bulkAttachErrorCodeInternal)
|
|
|
|
|
return resp
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newBulkAttachToken() string {
|
|
|
|
|
var buf [16]byte
|
|
|
|
|
if _, err := cryptorand.Read(buf[:]); err == nil {
|
|
|
|
|
return hex.EncodeToString(buf[:])
|
|
|
|
|
}
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkAttachRequest(decodeFn func([]byte) (interface{}, error), data MsgVal) (bulkAttachRequest, error) {
|
|
|
|
|
var req bulkAttachRequest
|
|
|
|
|
if decodeFn == nil {
|
|
|
|
|
decodeFn = Decode
|
|
|
|
|
}
|
|
|
|
|
raw := []byte(data)
|
|
|
|
|
value, err := decodeFn(raw)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return req, err
|
|
|
|
|
}
|
|
|
|
|
switch typed := value.(type) {
|
|
|
|
|
case bulkAttachRequest:
|
|
|
|
|
return typed, nil
|
|
|
|
|
case *bulkAttachRequest:
|
|
|
|
|
if typed == nil {
|
|
|
|
|
return req, errors.New("bulk attach request is nil")
|
|
|
|
|
}
|
|
|
|
|
return *typed, nil
|
|
|
|
|
default:
|
|
|
|
|
return req, errors.New("invalid bulk attach payload")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeBulkAttachResponse(decodeFn func([]byte) (interface{}, error), data MsgVal) (bulkAttachResponse, error) {
|
|
|
|
|
var resp bulkAttachResponse
|
|
|
|
|
if decodeFn == nil {
|
|
|
|
|
decodeFn = Decode
|
|
|
|
|
}
|
|
|
|
|
raw := []byte(data)
|
|
|
|
|
value, err := decodeFn(raw)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return resp, err
|
|
|
|
|
}
|
|
|
|
|
switch typed := value.(type) {
|
|
|
|
|
case bulkAttachResponse:
|
|
|
|
|
return typed, nil
|
|
|
|
|
case *bulkAttachResponse:
|
|
|
|
|
if typed == nil {
|
|
|
|
|
return resp, errors.New("bulk attach response is nil")
|
|
|
|
|
}
|
|
|
|
|
return *typed, nil
|
|
|
|
|
default:
|
|
|
|
|
return resp, errors.New("invalid bulk attach response")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func encodeDirectSignalFrame(queue *stario.StarQueue, sequenceEn func(interface{}) ([]byte, error), msgEn func([]byte, []byte) []byte, secretKey []byte, msg TransferMsg) ([]byte, error) {
|
|
|
|
|
if queue == nil {
|
|
|
|
|
queue = stario.NewQueue()
|
|
|
|
|
}
|
|
|
|
|
env, err := wrapTransferMsgEnvelope(msg, sequenceEn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
plain, err := sequenceEn(env)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
payload := msgEn(secretKey, plain)
|
|
|
|
|
if payload == nil && len(plain) != 0 {
|
|
|
|
|
return nil, errTransportPayloadEncryptFailed
|
|
|
|
|
}
|
|
|
|
|
return queue.BuildMessage(payload), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeDirectSignalPayload(sequenceDe func([]byte) (interface{}, error), msgDe func([]byte, []byte) []byte, secretKey []byte, payload []byte) (TransferMsg, error) {
|
|
|
|
|
plain := msgDe(secretKey, payload)
|
|
|
|
|
if plain == nil && len(payload) != 0 {
|
|
|
|
|
return TransferMsg{}, errTransportPayloadDecryptFailed
|
|
|
|
|
}
|
|
|
|
|
value, err := sequenceDe(plain)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return TransferMsg{}, err
|
|
|
|
|
}
|
|
|
|
|
env, ok := value.(Envelope)
|
|
|
|
|
if !ok {
|
|
|
|
|
return TransferMsg{}, errors.New("invalid signal envelope")
|
|
|
|
|
}
|
|
|
|
|
return unwrapTransferMsgEnvelope(env, sequenceDe)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-16 17:27:48 +08:00
|
|
|
func readDirectSignalFramePayload(conn net.Conn) ([]byte, error) {
|
|
|
|
|
if conn == nil {
|
|
|
|
|
return nil, net.ErrClosed
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return newTransportFrameReader(conn, stario.NewQueue()).Next()
|
2026-04-16 17:27:48 +08:00
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func writeBulkDedicatedRecord(conn net.Conn, payload []byte) error {
|
|
|
|
|
return writeBulkDedicatedRecordWithDeadline(conn, payload, time.Time{})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func writeBulkDedicatedRecordWithDeadline(conn net.Conn, payload []byte, deadline time.Time) error {
|
|
|
|
|
if conn == nil {
|
|
|
|
|
return net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
return withRawConnWriteLockDeadline(conn, deadline, func(conn net.Conn) error {
|
|
|
|
|
var header [bulkDedicatedRecordHeaderLen]byte
|
|
|
|
|
copy(header[:4], bulkDedicatedRecordMagic)
|
|
|
|
|
binary.BigEndian.PutUint32(header[4:8], uint32(len(payload)))
|
2026-04-16 17:27:48 +08:00
|
|
|
return writeNetBuffersFullUnlocked(conn, net.Buffers{header[:], payload})
|
2026-04-15 15:24:36 +08:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func readBulkDedicatedRecord(conn net.Conn) ([]byte, error) {
|
2026-04-18 16:05:57 +08:00
|
|
|
payload, release, err := readBulkDedicatedRecordPooled(conn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if release != nil {
|
|
|
|
|
defer release()
|
|
|
|
|
}
|
|
|
|
|
return append([]byte(nil), payload...), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func readBulkDedicatedRecordPooled(conn net.Conn) ([]byte, func(), error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if conn == nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, net.ErrClosed
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
var header [bulkDedicatedRecordHeaderLen]byte
|
|
|
|
|
if _, err := io.ReadFull(conn, header[:]); err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, err
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
if string(header[:4]) != bulkDedicatedRecordMagic {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, fmt.Errorf("%w: record magic=%x", errBulkFastPayloadInvalid, header[:4])
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
size := int(binary.BigEndian.Uint32(header[4:8]))
|
|
|
|
|
if size < 0 {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, errBulkFastPayloadInvalid
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
payload := getModernPSKPayloadBuffer(size)
|
2026-04-15 15:24:36 +08:00
|
|
|
if _, err := io.ReadFull(conn, payload); err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
putModernPSKPayloadBuffer(payload)
|
|
|
|
|
return nil, nil, err
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return payload, func() {
|
|
|
|
|
putModernPSKPayloadBuffer(payload)
|
|
|
|
|
}, nil
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (c *ClientCommon) dialDedicatedBulkConn(ctx context.Context, timeout time.Duration) (net.Conn, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
source := c.clientConnectSourceSnapshot()
|
2026-04-18 16:05:57 +08:00
|
|
|
if source != nil {
|
2026-04-20 16:35:44 +08:00
|
|
|
if !source.supportsAdditionalConn() {
|
|
|
|
|
return nil, errBulkDedicatedSingleConn
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if source.network != "" && source.addr != "" {
|
|
|
|
|
if timeout > 0 {
|
|
|
|
|
return transport.DialTimeout(source.network, source.addr, timeout)
|
|
|
|
|
}
|
|
|
|
|
return transport.Dial(source.network, source.addr)
|
|
|
|
|
}
|
|
|
|
|
if source.canReconnect() {
|
|
|
|
|
return source.dial(ctx)
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
return nil, errClientReconnectSourceUnavailable
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
conn := c.clientTransportConnSnapshot()
|
|
|
|
|
if conn == nil || conn.RemoteAddr() == nil {
|
|
|
|
|
return nil, errClientReconnectSourceUnavailable
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if timeout > 0 {
|
|
|
|
|
return transport.DialTimeout(conn.RemoteAddr().Network(), conn.RemoteAddr().String(), timeout)
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
return transport.Dial(conn.RemoteAddr().Network(), conn.RemoteAddr().String())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) attachDedicatedBulkSidecar(ctx context.Context, bulk *bulkHandle) error {
|
|
|
|
|
if c == nil || bulk == nil || !bulk.Dedicated() || bulk.dedicatedAttachedSnapshot() {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if ctx == nil {
|
|
|
|
|
ctx = context.Background()
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
laneID := bulk.dedicatedLaneIDSnapshot()
|
|
|
|
|
releaseActiveSlot, err := c.acquireBulkDedicatedActiveSlot(ctx)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
needReleaseActive := true
|
|
|
|
|
defer func() {
|
|
|
|
|
if needReleaseActive {
|
|
|
|
|
releaseActiveSlot()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
if sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID); sidecar != nil && sidecar.conn != nil {
|
|
|
|
|
if err := bulk.attachDedicatedConnShared(sidecar.conn); err == nil {
|
|
|
|
|
bulk.markDedicatedActiveReserved()
|
|
|
|
|
needReleaseActive = false
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, flight, leader := c.beginClientDedicatedSidecarAttach(laneID)
|
|
|
|
|
if !leader {
|
|
|
|
|
if flight == nil {
|
|
|
|
|
return errTransportDetached
|
|
|
|
|
}
|
|
|
|
|
if err := flight.wait(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID)
|
|
|
|
|
if sidecar == nil || sidecar.conn == nil {
|
|
|
|
|
return errTransportDetached
|
|
|
|
|
}
|
|
|
|
|
if err := bulk.attachDedicatedConnShared(sidecar.conn); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
bulk.markDedicatedActiveReserved()
|
|
|
|
|
needReleaseActive = false
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if flight == nil {
|
|
|
|
|
return errTransportDetached
|
|
|
|
|
}
|
|
|
|
|
var flightErr error
|
|
|
|
|
defer func() {
|
|
|
|
|
c.finishClientDedicatedSidecarAttach(laneID, flight, flightErr)
|
|
|
|
|
}()
|
|
|
|
|
releaseAttachSlot, err := c.acquireBulkDedicatedAttachSlot(ctx)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
flightErr = err
|
2026-04-15 15:24:36 +08:00
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
defer releaseAttachSlot()
|
|
|
|
|
if sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID); sidecar != nil && sidecar.conn != nil {
|
|
|
|
|
if err := bulk.attachDedicatedConnShared(sidecar.conn); err == nil {
|
|
|
|
|
bulk.markDedicatedActiveReserved()
|
|
|
|
|
needReleaseActive = false
|
|
|
|
|
flightErr = nil
|
|
|
|
|
return nil
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
retry, backoff, dialTimeout, helloTimeout := c.bulkDedicatedAttachConfigSnapshot()
|
|
|
|
|
attempts := retry + 1
|
|
|
|
|
if attempts <= 0 {
|
|
|
|
|
attempts = 1
|
|
|
|
|
}
|
|
|
|
|
var lastErr error
|
|
|
|
|
for attempt := 1; attempt <= attempts; attempt++ {
|
|
|
|
|
c.bulkAttachAttemptCount.Add(1)
|
|
|
|
|
if attempt > 1 {
|
|
|
|
|
delay := backoff * time.Duration(1<<(attempt-2))
|
|
|
|
|
if delay > 3*time.Second {
|
|
|
|
|
delay = 3 * time.Second
|
|
|
|
|
}
|
|
|
|
|
if err := waitDedicatedAttachBackoff(ctx, delay); err != nil {
|
|
|
|
|
flightErr = err
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
dialCtx := ctx
|
|
|
|
|
dialCancel := func() {}
|
|
|
|
|
if dialTimeout > 0 {
|
|
|
|
|
dialCtx, dialCancel = context.WithTimeout(ctx, dialTimeout)
|
|
|
|
|
}
|
|
|
|
|
conn, err := c.dialDedicatedBulkConn(dialCtx, dialTimeout)
|
|
|
|
|
dialCancel()
|
|
|
|
|
if err != nil {
|
|
|
|
|
lastErr = err
|
|
|
|
|
if attempt < attempts && isRetryableDedicatedAttachError(err) {
|
|
|
|
|
flightErr = err
|
|
|
|
|
c.bulkAttachRetryCount.Add(1)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
flightErr = err
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
helloCtx := ctx
|
|
|
|
|
helloCancel := func() {}
|
|
|
|
|
if helloTimeout > 0 {
|
|
|
|
|
helloCtx, helloCancel = context.WithTimeout(ctx, helloTimeout)
|
|
|
|
|
}
|
|
|
|
|
resp, err := c.sendDedicatedBulkAttachRequest(helloCtx, conn, bulk)
|
|
|
|
|
helloCancel()
|
|
|
|
|
if err != nil {
|
|
|
|
|
_ = conn.Close()
|
|
|
|
|
lastErr = err
|
|
|
|
|
if attempt < attempts && isRetryableDedicatedAttachError(err) {
|
|
|
|
|
flightErr = err
|
|
|
|
|
c.bulkAttachRetryCount.Add(1)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
flightErr = err
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if !resp.Accepted {
|
|
|
|
|
_ = conn.Close()
|
|
|
|
|
rejectedErr := &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCode(resp.Code),
|
|
|
|
|
Retryable: resp.Retryable,
|
|
|
|
|
Message: resp.Error,
|
|
|
|
|
FailedSeq: resp.FailedSeq,
|
|
|
|
|
FailedBulk: resp.FailedBulk,
|
|
|
|
|
}
|
|
|
|
|
if rejectedErr.Code == "" {
|
|
|
|
|
rejectedErr.Code = bulkAttachErrorCodeAttachFailed
|
|
|
|
|
}
|
|
|
|
|
lastErr = rejectedErr
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(rejectedErr.Code))
|
|
|
|
|
if attempt < attempts && rejectedErr.Retryable {
|
|
|
|
|
flightErr = rejectedErr
|
|
|
|
|
c.bulkAttachRetryCount.Add(1)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulk.markDedicatedAttachDegraded(string(rejectedErr.Code))
|
|
|
|
|
flightErr = rejectedErr
|
|
|
|
|
return rejectedErr
|
|
|
|
|
}
|
|
|
|
|
sidecar := newBulkDedicatedSidecar(conn, laneID)
|
|
|
|
|
activeSidecar, installed := c.installClientDedicatedSidecar(laneID, sidecar)
|
|
|
|
|
if !installed {
|
|
|
|
|
sidecar.close()
|
|
|
|
|
sidecar = activeSidecar
|
|
|
|
|
}
|
|
|
|
|
if sidecar == nil || sidecar.conn == nil {
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
flightErr = errTransportDetached
|
|
|
|
|
return errTransportDetached
|
|
|
|
|
}
|
|
|
|
|
if err := bulk.attachDedicatedConnShared(sidecar.conn); err != nil {
|
|
|
|
|
if installed && c.clearClientDedicatedSidecar(laneID, sidecar) {
|
|
|
|
|
sidecar.close()
|
|
|
|
|
}
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
if installed {
|
|
|
|
|
flightErr = err
|
|
|
|
|
} else {
|
|
|
|
|
flightErr = nil
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
c.bulkAttachSuccessCount.Add(1)
|
|
|
|
|
if installed {
|
|
|
|
|
go c.readDedicatedSidecarLoop(sidecar)
|
|
|
|
|
}
|
|
|
|
|
bulk.markDedicatedActiveReserved()
|
|
|
|
|
needReleaseActive = false
|
|
|
|
|
flightErr = nil
|
|
|
|
|
return nil
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if lastErr != nil {
|
|
|
|
|
flightErr = lastErr
|
|
|
|
|
return lastErr
|
|
|
|
|
}
|
|
|
|
|
flightErr = errors.New("bulk attach failed")
|
|
|
|
|
return flightErr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) bulkDedicatedAttachConfigSnapshot() (int, time.Duration, time.Duration, time.Duration) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return defaultBulkDedicatedAttachRetry, defaultBulkDedicatedAttachBackoff, defaultBulkDedicatedDialTimeout, defaultBulkDedicatedHelloTimeout
|
|
|
|
|
}
|
|
|
|
|
c.mu.Lock()
|
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
retry := c.bulkDedicatedAttachRetry
|
|
|
|
|
if retry < 0 {
|
|
|
|
|
retry = 0
|
|
|
|
|
}
|
|
|
|
|
backoff := c.bulkDedicatedAttachBackoff
|
|
|
|
|
if backoff <= 0 {
|
|
|
|
|
backoff = defaultBulkDedicatedAttachBackoff
|
|
|
|
|
}
|
|
|
|
|
dialTimeout := c.bulkDedicatedDialTimeout
|
|
|
|
|
if dialTimeout <= 0 {
|
|
|
|
|
dialTimeout = defaultBulkDedicatedDialTimeout
|
|
|
|
|
}
|
|
|
|
|
helloTimeout := c.bulkDedicatedHelloTimeout
|
|
|
|
|
if helloTimeout <= 0 {
|
|
|
|
|
helloTimeout = defaultBulkDedicatedHelloTimeout
|
|
|
|
|
}
|
|
|
|
|
return retry, backoff, dialTimeout, helloTimeout
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) acquireBulkDedicatedAttachSlot(ctx context.Context) (func(), error) {
|
|
|
|
|
sem := c.bulkDedicatedAttachSemaphoreSnapshot()
|
|
|
|
|
if c == nil || sem == nil {
|
|
|
|
|
return func() {}, nil
|
|
|
|
|
}
|
|
|
|
|
if ctx == nil {
|
|
|
|
|
ctx = context.Background()
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case sem <- struct{}{}:
|
|
|
|
|
return func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-sem:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}, nil
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return nil, ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) reserveBulkDedicatedActiveSlot() bool {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
limit := c.bulkDedicatedActiveLimitSnapshot()
|
|
|
|
|
if limit <= 0 {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
current := c.bulkDedicatedActive.Load()
|
|
|
|
|
if int(current) >= limit {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if c.bulkDedicatedActive.CompareAndSwap(current, current+1) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) acquireBulkDedicatedActiveSlot(ctx context.Context) (func(), error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return func() {}, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
if ctx == nil {
|
|
|
|
|
ctx = context.Background()
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
if c.reserveBulkDedicatedActiveSlot() {
|
|
|
|
|
return c.releaseBulkDedicatedActiveSlot, nil
|
|
|
|
|
}
|
|
|
|
|
waitCh := c.bulkDedicatedActiveWaitSnapshot()
|
|
|
|
|
if c.reserveBulkDedicatedActiveSlot() {
|
|
|
|
|
return c.releaseBulkDedicatedActiveSlot, nil
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return nil, ctx.Err()
|
|
|
|
|
case <-waitCh:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) releaseBulkDedicatedActiveSlot() {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
current := c.bulkDedicatedActive.Load()
|
|
|
|
|
if current <= 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if c.bulkDedicatedActive.CompareAndSwap(current, current-1) {
|
|
|
|
|
c.notifyBulkDedicatedActiveWaiters()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) notifyBulkDedicatedActiveWaiters() {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
c.mu.Lock()
|
|
|
|
|
c.notifyBulkDedicatedActiveWaitersLocked()
|
|
|
|
|
c.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func waitDedicatedAttachBackoff(ctx context.Context, delay time.Duration) error {
|
|
|
|
|
if delay <= 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
timer := time.NewTimer(delay)
|
|
|
|
|
defer timer.Stop()
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func isRetryableDedicatedAttachError(err error) bool {
|
|
|
|
|
if err == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
var attachErr *bulkAttachError
|
|
|
|
|
if errors.As(err, &attachErr) && attachErr != nil {
|
|
|
|
|
return attachErr.Retryable
|
|
|
|
|
}
|
|
|
|
|
if errors.Is(err, context.Canceled) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
var netErr net.Error
|
|
|
|
|
if errors.As(err, &netErr) && (netErr.Timeout() || netErr.Temporary()) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
message := strings.ToLower(err.Error())
|
|
|
|
|
for _, pattern := range []string{
|
|
|
|
|
"timeout",
|
|
|
|
|
"timed out",
|
|
|
|
|
"deadline",
|
|
|
|
|
"connection reset",
|
|
|
|
|
"connection refused",
|
|
|
|
|
"connectex",
|
|
|
|
|
"broken pipe",
|
|
|
|
|
"no route",
|
|
|
|
|
"host unreachable",
|
|
|
|
|
"transport detached",
|
|
|
|
|
} {
|
|
|
|
|
if strings.Contains(message, pattern) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) sendDedicatedBulkAttachRequest(ctx context.Context, conn net.Conn, bulk *bulkHandle) (bulkAttachResponse, error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return bulkAttachResponse{}, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
return bulkAttachResponse{}, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = conn.SetReadDeadline(time.Time{})
|
|
|
|
|
}()
|
|
|
|
|
reqPayload, err := c.sequenceEn(bulkAttachRequest{
|
|
|
|
|
PeerID: c.ensureClientPeerIdentity(),
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
AttachToken: bulk.dedicatedAttachTokenSnapshot(),
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return bulkAttachResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
msg := TransferMsg{
|
|
|
|
|
ID: atomic.AddUint64(&c.msgID, 1),
|
|
|
|
|
Key: systemBulkAttachKey,
|
|
|
|
|
Value: reqPayload,
|
|
|
|
|
Type: MSG_SYS_WAIT,
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
attachProfile := c.clientDedicatedBulkAttachTransportProtectionProfile()
|
|
|
|
|
frame, err := encodeDirectSignalFrame(stario.NewQueue(), c.sequenceEn, attachProfile.msgEn, attachProfile.secretKey, msg)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return bulkAttachResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
if err := writeFullToConn(conn, frame); err != nil {
|
|
|
|
|
return bulkAttachResponse{}, err
|
|
|
|
|
}
|
2026-04-16 17:27:48 +08:00
|
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
|
|
|
_ = conn.SetReadDeadline(deadline)
|
|
|
|
|
}
|
|
|
|
|
replyPayload, err := readDirectSignalFramePayload(conn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return bulkAttachResponse{}, err
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
transfer, err := decodeDirectSignalPayload(c.sequenceDe, attachProfile.msgDe, attachProfile.secretKey, replyPayload)
|
2026-04-16 17:27:48 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return bulkAttachResponse{}, err
|
|
|
|
|
}
|
|
|
|
|
if transfer.Key != systemBulkAttachKey || transfer.Type != MSG_SYS_REPLY || transfer.ID != msg.ID {
|
|
|
|
|
return bulkAttachResponse{}, errors.New("invalid bulk attach reply")
|
|
|
|
|
}
|
|
|
|
|
return decodeBulkAttachResponse(c.sequenceDe, transfer.Value)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
2026-04-20 16:35:44 +08:00
|
|
|
func (c *ClientCommon) clientDedicatedBulkAttachTransportProtectionProfile() transportProtectionProfile {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return transportProtectionProfile{}
|
|
|
|
|
}
|
|
|
|
|
if c.securityConfigured && c.securityBootstrap.msgEn != nil && c.securityBootstrap.msgDe != nil {
|
|
|
|
|
return c.securityBootstrap.clone()
|
|
|
|
|
}
|
|
|
|
|
return c.clientTransportProtectionSnapshot()
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (c *ClientCommon) readDedicatedSidecarLoop(sidecar *bulkDedicatedSidecar) {
|
|
|
|
|
if c == nil || sidecar == nil || sidecar.conn == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
for {
|
2026-04-18 16:05:57 +08:00
|
|
|
payload, payloadRelease, err := readBulkDedicatedRecordPooled(sidecar.conn)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
c.handleClientDedicatedSidecarFailure(sidecar, err)
|
2026-04-15 15:24:36 +08:00
|
|
|
return
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
profile := c.clientTransportProtectionSnapshot()
|
|
|
|
|
plain, plainRelease, err := decryptTransportPayloadCodecPooled(profile.mode, profile.runtime, profile.msgDe, profile.secretKey, payload, payloadRelease)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
c.handleClientDedicatedSidecarFailure(sidecar, err)
|
2026-04-15 15:24:36 +08:00
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
owner := newBulkReadPayloadOwner(plainRelease)
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
owner.done()
|
|
|
|
|
continue
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
var (
|
|
|
|
|
currentDataID uint64
|
|
|
|
|
currentBulk *bulkHandle
|
|
|
|
|
skipDataID bool
|
|
|
|
|
)
|
|
|
|
|
err = walkDedicatedBulkInboundPayload(plain, func(dataID uint64, item bulkDedicatedBatchItem) error {
|
|
|
|
|
if dataID != currentDataID {
|
|
|
|
|
currentDataID = dataID
|
|
|
|
|
currentBulk = nil
|
|
|
|
|
skipDataID = false
|
|
|
|
|
bulk, ok := runtime.lookupByDataID(clientFileScope(), dataID)
|
|
|
|
|
if !ok {
|
|
|
|
|
c.bestEffortRejectInboundBulkData("", dataID, errBulkNotFound.Error())
|
|
|
|
|
skipDataID = true
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if !bulk.acceptsClientSessionEpoch(c.currentClientSessionEpoch()) {
|
|
|
|
|
detachErr := transportDetachedSessionEpochError()
|
|
|
|
|
bulk.markReset(detachErr)
|
|
|
|
|
c.bestEffortRejectInboundBulkData(bulk.ID(), dataID, detachErr.Error())
|
|
|
|
|
skipDataID = true
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
bulk.markDedicatedDataStarted()
|
|
|
|
|
currentBulk = bulk
|
|
|
|
|
}
|
|
|
|
|
if skipDataID || currentBulk == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var release func()
|
|
|
|
|
if item.Type == bulkFastPayloadTypeData {
|
|
|
|
|
release = owner.retainChunk()
|
|
|
|
|
}
|
|
|
|
|
dispatchErr := dispatchDedicatedBulkInboundItemWithRelease(currentBulk, item, release)
|
|
|
|
|
if dispatchErr != nil {
|
|
|
|
|
if !errors.Is(dispatchErr, io.EOF) {
|
|
|
|
|
_ = c.sendDedicatedBulkReset(context.Background(), currentBulk, dispatchErr.Error())
|
|
|
|
|
currentBulk.markReset(dispatchErr)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
currentBulk = nil
|
|
|
|
|
skipDataID = true
|
|
|
|
|
return nil
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if currentBulk.Context().Err() != nil {
|
|
|
|
|
currentBulk = nil
|
|
|
|
|
skipDataID = true
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
if plainRelease != nil {
|
|
|
|
|
plainRelease()
|
|
|
|
|
}
|
|
|
|
|
c.handleClientDedicatedSidecarFailure(sidecar, err)
|
|
|
|
|
return
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
owner.done()
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) handleBulkAttachSystemMessage(message Message) bool {
|
|
|
|
|
if message.Key != systemBulkAttachKey {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
current := messageLogicalConnSnapshot(&message)
|
|
|
|
|
var (
|
|
|
|
|
req bulkAttachRequest
|
|
|
|
|
logical *LogicalConn
|
|
|
|
|
bulk *bulkHandle
|
|
|
|
|
err error
|
|
|
|
|
)
|
|
|
|
|
req, err = decodeBulkAttachRequest(s.sequenceDe, message.Value)
|
|
|
|
|
if err == nil {
|
|
|
|
|
logical, bulk, err = s.resolveInboundDedicatedBulk(current, req)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
2026-04-16 17:27:48 +08:00
|
|
|
if current != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
_ = s.replyDedicatedBulkAttach(current, message, toBulkAttachResponseError(err, req.BulkID))
|
2026-04-16 17:27:48 +08:00
|
|
|
}
|
|
|
|
|
return true
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
if current != nil {
|
2026-04-16 17:27:48 +08:00
|
|
|
if attachErr := s.finishInboundDedicatedBulkAttach(current, logical, bulk, message); attachErr != nil {
|
2026-04-15 15:24:36 +08:00
|
|
|
bulk.markReset(attachErr)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) resolveInboundDedicatedBulk(current *LogicalConn, req bulkAttachRequest) (*LogicalConn, *bulkHandle, error) {
|
|
|
|
|
if s == nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, newBulkAttachError(bulkAttachErrorCodeServerUnavailable, true, errBulkServerNil.Error())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
if current == nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkLogicalConnNil.Error())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
if req.PeerID == "" || req.BulkID == "" || req.AttachToken == "" {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkIDEmpty.Error())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
logical := s.GetLogicalConn(req.PeerID)
|
|
|
|
|
if logical == nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, newBulkAttachError(bulkAttachErrorCodePeerNotFound, true, errBulkLogicalConnNil.Error())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, newBulkAttachError(bulkAttachErrorCodeServerUnavailable, true, errBulkRuntimeNil.Error())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
bulk, ok := runtime.lookup(serverFileScope(logical), req.BulkID)
|
|
|
|
|
if !ok {
|
2026-04-18 16:05:57 +08:00
|
|
|
return nil, nil, &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCodeBulkNotFound,
|
|
|
|
|
Retryable: false,
|
|
|
|
|
Message: errBulkNotFound.Error(),
|
|
|
|
|
FailedBulk: req.BulkID,
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markDedicatedAttachAttempt()
|
2026-04-15 15:24:36 +08:00
|
|
|
if !bulk.Dedicated() {
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeBulkNotDedicated))
|
|
|
|
|
return nil, nil, &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCodeBulkNotDedicated,
|
|
|
|
|
Retryable: false,
|
|
|
|
|
Message: "bulk is not dedicated",
|
|
|
|
|
FailedBulk: req.BulkID,
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
if bulk.dedicatedAttachTokenSnapshot() != req.AttachToken {
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeTokenMismatch))
|
|
|
|
|
return nil, nil, &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCodeTokenMismatch,
|
|
|
|
|
Retryable: false,
|
|
|
|
|
Message: "bulk attach token mismatch",
|
|
|
|
|
FailedBulk: req.BulkID,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
switch bulk.dedicatedAttachStateSnapshot() {
|
|
|
|
|
case bulkDedicatedAttachStateClosed:
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
return nil, nil, &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCodeAttachFailed,
|
|
|
|
|
Retryable: false,
|
|
|
|
|
Message: "bulk dedicated attach closed",
|
|
|
|
|
FailedBulk: req.BulkID,
|
|
|
|
|
}
|
|
|
|
|
case bulkDedicatedAttachStateDegraded:
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
return nil, nil, &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCodeAttachFailed,
|
|
|
|
|
Retryable: true,
|
|
|
|
|
Message: "bulk dedicated attach degraded",
|
|
|
|
|
FailedBulk: req.BulkID,
|
|
|
|
|
}
|
|
|
|
|
case bulkDedicatedAttachStateAttached:
|
|
|
|
|
if bulk.dedicatedDataStartedSnapshot() {
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAlreadyAttached))
|
|
|
|
|
return nil, nil, &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCodeAlreadyAttached,
|
|
|
|
|
Retryable: false,
|
|
|
|
|
Message: "bulk dedicated already attached",
|
|
|
|
|
FailedBulk: req.BulkID,
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
return logical, bulk, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-16 17:27:48 +08:00
|
|
|
func (s *ServerCommon) finishInboundDedicatedBulkAttach(current *LogicalConn, logical *LogicalConn, bulk *bulkHandle, message Message) error {
|
2026-04-15 15:24:36 +08:00
|
|
|
if current == nil || logical == nil || bulk == nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkLogicalConnNil.Error())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
laneID := bulk.dedicatedLaneIDSnapshot()
|
2026-04-15 15:24:36 +08:00
|
|
|
conn, err := current.detachTransportForTransfer()
|
|
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return newBulkAttachError(bulkAttachErrorCodeAttachFailed, true, err.Error())
|
|
|
|
|
}
|
|
|
|
|
stopCurrent := func(reason string, err error) {
|
|
|
|
|
current.markSessionStopped(reason, err)
|
|
|
|
|
s.removeLogical(current)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-16 17:27:48 +08:00
|
|
|
fail := func(reason string, err error) error {
|
2026-04-15 15:24:36 +08:00
|
|
|
if conn != nil {
|
|
|
|
|
_ = conn.Close()
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
stopCurrent(reason, err)
|
|
|
|
|
return newBulkAttachError(bulkAttachErrorCodeAttachFailed, true, err.Error())
|
|
|
|
|
}
|
|
|
|
|
if bulk.dedicatedAttachedSnapshot() {
|
|
|
|
|
if bulk.dedicatedDataStartedSnapshot() {
|
|
|
|
|
rejected := &bulkAttachError{
|
|
|
|
|
Code: bulkAttachErrorCodeAlreadyAttached,
|
|
|
|
|
Retryable: false,
|
|
|
|
|
Message: "bulk dedicated already attached",
|
|
|
|
|
FailedBulk: bulk.ID(),
|
|
|
|
|
}
|
|
|
|
|
_ = s.replyDedicatedBulkAttachDetached(current, conn, message, toBulkAttachResponseError(rejected, bulk.ID()))
|
|
|
|
|
_ = conn.Close()
|
|
|
|
|
stopCurrent("bulk dedicated attach duplicate rejected", nil)
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAlreadyAttached))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
sidecar := newBulkDedicatedSidecar(conn, laneID)
|
|
|
|
|
if sidecar == nil {
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
return fail("bulk dedicated attach failed", errTransportDetached)
|
2026-04-16 17:27:48 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
var (
|
|
|
|
|
oldConn net.Conn
|
|
|
|
|
oldSender *bulkDedicatedSender
|
|
|
|
|
)
|
|
|
|
|
if bulk.dedicatedAttachedSnapshot() {
|
|
|
|
|
var replaceErr error
|
|
|
|
|
oldConn, oldSender, replaceErr = bulk.replaceDedicatedConnShared(conn)
|
|
|
|
|
if replaceErr != nil {
|
|
|
|
|
sidecar.close()
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
return fail("bulk dedicated reattach failed", replaceErr)
|
|
|
|
|
}
|
|
|
|
|
} else if err := bulk.attachDedicatedConnShared(conn); err != nil {
|
|
|
|
|
sidecar.close()
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
2026-04-16 17:27:48 +08:00
|
|
|
return fail("bulk dedicated attach failed", err)
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if err := s.replyDedicatedBulkAttachDetached(current, conn, message, bulkAttachResponse{Accepted: true}); err != nil {
|
|
|
|
|
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
|
|
|
|
|
sidecar.close()
|
|
|
|
|
if runtime := s.getBulkRuntime(); runtime != nil {
|
|
|
|
|
runtime.resetDedicatedByConn(scope, conn, transportDetachedError("bulk dedicated attach reply failed", err))
|
|
|
|
|
}
|
|
|
|
|
stopCurrent("bulk dedicated attach reply failed", err)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
oldSidecar := s.installServerDedicatedSidecar(logical, laneID, sidecar)
|
|
|
|
|
if runtime := s.getBulkRuntime(); runtime != nil {
|
|
|
|
|
runtime.attachSharedDedicatedConn(scope, laneID, conn)
|
|
|
|
|
}
|
|
|
|
|
if oldSender != nil {
|
|
|
|
|
oldSender.stop()
|
|
|
|
|
}
|
|
|
|
|
if oldConn != nil {
|
|
|
|
|
_ = oldConn.Close()
|
|
|
|
|
}
|
|
|
|
|
if oldSidecar != nil && oldSidecar != sidecar {
|
|
|
|
|
oldSidecar.close()
|
|
|
|
|
}
|
|
|
|
|
go s.readDedicatedSidecarLoop(logical, sidecar)
|
|
|
|
|
s.startServerBulkAcceptDispatch(bulk, logical, messageTransportConnSnapshot(&message))
|
|
|
|
|
if runtime := s.getBulkRuntime(); runtime != nil {
|
|
|
|
|
s.dispatchPendingServerBulkAccepts(scope, conn, bulk, logical)
|
|
|
|
|
}
|
|
|
|
|
stopCurrent("bulk dedicated attach", nil)
|
2026-04-15 15:24:36 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (s *ServerCommon) dispatchPendingServerBulkAccepts(scope string, conn net.Conn, current *bulkHandle, logical *LogicalConn) {
|
|
|
|
|
if s == nil || conn == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for _, pending := range runtime.dedicatedBulksForConn(scope, conn) {
|
|
|
|
|
if pending == nil || pending == current {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
s.startServerBulkAcceptDispatch(pending, logical, pending.TransportConn())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-16 17:27:48 +08:00
|
|
|
func (s *ServerCommon) replyDedicatedBulkAttachDetached(client *LogicalConn, conn net.Conn, message Message, resp bulkAttachResponse) error {
|
|
|
|
|
if s == nil || client == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if conn == nil {
|
|
|
|
|
return net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
msgEn := client.msgEnSnapshot()
|
|
|
|
|
if msgEn == nil {
|
|
|
|
|
return errTransportPayloadEncryptFailed
|
|
|
|
|
}
|
|
|
|
|
encoded, err := s.sequenceEn(resp)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
reply := TransferMsg{
|
|
|
|
|
ID: message.ID,
|
|
|
|
|
Key: systemBulkAttachKey,
|
|
|
|
|
Value: encoded,
|
|
|
|
|
Type: MSG_SYS_REPLY,
|
|
|
|
|
}
|
|
|
|
|
frame, err := encodeDirectSignalFrame(stario.NewQueue(), s.sequenceEn, msgEn, client.secretKeySnapshot(), reply)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return withRawConnWriteLockDeadline(conn, writeDeadlineFromTimeout(client.maxWriteTimeoutSnapshot()), func(conn net.Conn) error {
|
|
|
|
|
return writeFullToConnUnlocked(conn, frame)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (s *ServerCommon) replyDedicatedBulkAttach(client *LogicalConn, message Message, resp bulkAttachResponse) error {
|
|
|
|
|
if s == nil || client == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
encoded, err := s.sequenceEn(resp)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
reply := TransferMsg{
|
|
|
|
|
ID: message.ID,
|
|
|
|
|
Key: systemBulkAttachKey,
|
|
|
|
|
Value: encoded,
|
|
|
|
|
Type: MSG_SYS_REPLY,
|
|
|
|
|
}
|
|
|
|
|
if message.inboundConn != nil {
|
2026-04-20 16:35:44 +08:00
|
|
|
return s.sendTransferInbound(client, messageTransportConnSnapshot(&message), message.inboundConn, messageInboundTransportProtectionSnapshot(&message), reply)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
_, err = s.sendLogical(client, reply)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (s *ServerCommon) readDedicatedSidecarLoop(logical *LogicalConn, sidecar *bulkDedicatedSidecar) {
|
|
|
|
|
if s == nil || logical == nil || sidecar == nil || sidecar.conn == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
scope := serverFileScope(logical)
|
2026-04-15 15:24:36 +08:00
|
|
|
for {
|
2026-04-18 16:05:57 +08:00
|
|
|
payload, payloadRelease, err := readBulkDedicatedRecordPooled(sidecar.conn)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
|
2026-04-15 15:24:36 +08:00
|
|
|
return
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
plain, plainRelease, err := decryptTransportPayloadCodecPooled(logical.protectionModeSnapshot(), logical.modernPSKRuntimeSnapshot(), logical.msgDeSnapshot(), logical.secretKeySnapshot(), payload, payloadRelease)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
|
2026-04-15 15:24:36 +08:00
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
owner := newBulkReadPayloadOwner(plainRelease)
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
owner.done()
|
|
|
|
|
continue
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
var (
|
|
|
|
|
currentDataID uint64
|
|
|
|
|
currentBulk *bulkHandle
|
|
|
|
|
skipDataID bool
|
|
|
|
|
)
|
|
|
|
|
err = walkDedicatedBulkInboundPayload(plain, func(dataID uint64, item bulkDedicatedBatchItem) error {
|
|
|
|
|
if dataID != currentDataID {
|
|
|
|
|
currentDataID = dataID
|
|
|
|
|
currentBulk = nil
|
|
|
|
|
skipDataID = false
|
|
|
|
|
bulk, ok := runtime.lookupByDataID(scope, dataID)
|
|
|
|
|
if !ok {
|
|
|
|
|
s.bestEffortRejectInboundDedicatedData(logical, sidecar.conn, dataID, errBulkNotFound.Error())
|
|
|
|
|
skipDataID = true
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
bulk.markDedicatedDataStarted()
|
|
|
|
|
currentBulk = bulk
|
|
|
|
|
}
|
|
|
|
|
if skipDataID || currentBulk == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var release func()
|
|
|
|
|
if item.Type == bulkFastPayloadTypeData {
|
|
|
|
|
release = owner.retainChunk()
|
|
|
|
|
}
|
|
|
|
|
dispatchErr := dispatchDedicatedBulkInboundItemWithRelease(currentBulk, item, release)
|
|
|
|
|
if dispatchErr != nil {
|
|
|
|
|
if !errors.Is(dispatchErr, io.EOF) {
|
|
|
|
|
_ = s.sendDedicatedBulkReset(context.Background(), logical, currentBulk, dispatchErr.Error())
|
|
|
|
|
currentBulk.markReset(dispatchErr)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
currentBulk = nil
|
|
|
|
|
skipDataID = true
|
|
|
|
|
return nil
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if currentBulk.Context().Err() != nil {
|
|
|
|
|
currentBulk = nil
|
|
|
|
|
skipDataID = true
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
if plainRelease != nil {
|
|
|
|
|
plainRelease()
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
|
|
|
|
|
return
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
owner.done()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) bestEffortRejectInboundDedicatedData(logical *LogicalConn, conn net.Conn, dataID uint64, message string) {
|
|
|
|
|
if s == nil || logical == nil || conn == nil || dataID == 0 {
|
|
|
|
|
return
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
frame, err := s.encodeDedicatedBulkBatchPayload(logical, dataID, []bulkDedicatedSendRequest{{
|
|
|
|
|
Type: bulkFastPayloadTypeReset,
|
|
|
|
|
Payload: []byte(message),
|
|
|
|
|
}})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
_ = writeBulkDedicatedRecordWithDeadline(conn, frame, writeDeadlineFromTimeout(logical.maxWriteTimeoutSnapshot()))
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handleDedicatedBulkReadError(bulk *bulkHandle, err error) {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if bulk.Context().Err() != nil || bulk.remoteClosedSnapshot() {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
message := strings.ToLower(err.Error())
|
|
|
|
|
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || strings.Contains(message, "use of closed network connection") {
|
2026-04-15 15:24:36 +08:00
|
|
|
if bulk.Dedicated() || bulk.localClosedSnapshot() {
|
|
|
|
|
bulk.markRemoteClosed()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
bulk.markReset(transportDetachedError("dedicated bulk read error", err))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) dedicatedBulkSender(bulk *bulkHandle) (*bulkDedicatedSender, error) {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return nil, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
if sender := bulk.dedicatedSenderSnapshot(); sender != nil {
|
|
|
|
|
return sender, nil
|
|
|
|
|
}
|
|
|
|
|
conn := bulk.dedicatedConnSnapshot()
|
|
|
|
|
if conn == nil {
|
|
|
|
|
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
|
|
|
|
|
}
|
|
|
|
|
sender := newBulkDedicatedSender(conn, bulk.dataIDSnapshot(), c.encryptTransportPayload, func(items []bulkDedicatedSendRequest) ([]byte, error) {
|
|
|
|
|
return c.encodeDedicatedBulkBatchPayload(bulk.dataIDSnapshot(), items)
|
|
|
|
|
}, func(err error) {
|
2026-04-18 16:05:57 +08:00
|
|
|
if bulk.canIgnoreDedicatedCloseSendError(err) {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
bulk.markReset(err)
|
|
|
|
|
})
|
|
|
|
|
actual := bulk.installDedicatedSender(sender)
|
|
|
|
|
if actual != sender {
|
|
|
|
|
sender.stop()
|
|
|
|
|
}
|
|
|
|
|
return actual, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (c *ClientCommon) dedicatedBulkLaneSender(bulk *bulkHandle) (*bulkDedicatedLaneSender, error) {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return nil, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
sidecar := c.clientDedicatedSidecarSnapshotForLane(bulk.dedicatedLaneIDSnapshot())
|
|
|
|
|
conn := bulk.dedicatedConnSnapshot()
|
|
|
|
|
if sidecar == nil || sidecar.conn == nil || conn == nil || sidecar.conn != conn {
|
|
|
|
|
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
|
|
|
|
|
}
|
|
|
|
|
sender := sidecar.laneSenderWithFactory(func(conn net.Conn) *bulkDedicatedLaneSender {
|
2026-04-20 16:35:44 +08:00
|
|
|
profile := c.clientTransportProtectionSnapshot()
|
|
|
|
|
laneRuntime := profile.runtime
|
2026-04-18 16:05:57 +08:00
|
|
|
if forked, err := forkDedicatedLaneModernPSKRuntime(laneRuntime); err == nil && forked != nil {
|
|
|
|
|
laneRuntime = forked
|
|
|
|
|
}
|
|
|
|
|
return newBulkDedicatedLaneSender(conn, func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
|
|
|
|
|
return c.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(laneRuntime, batches)
|
|
|
|
|
}, func(err error) {
|
|
|
|
|
c.handleClientDedicatedSidecarFailure(sidecar, err)
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
if sender == nil {
|
|
|
|
|
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
|
|
|
|
|
}
|
|
|
|
|
return sender, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (c *ClientCommon) sendDedicatedBulkData(ctx context.Context, bulk *bulkHandle, chunk []byte) error {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := c.dedicatedBulkLaneSender(bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitData(ctx, bulk.dataIDSnapshot(), bulk.nextOutboundDataSeq(), chunk)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
2026-04-20 16:35:44 +08:00
|
|
|
func (c *ClientCommon) sendDedicatedBulkWrite(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return 0, errBulkClientNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := c.dedicatedBulkLaneSender(bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
return sender.submitWrite(ctx, bulk.dataIDSnapshot(), startSeq, payload, bulk.chunkSize, payloadOwned)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) sendDedicatedBulkClose(ctx context.Context, bulk *bulkHandle, full bool) error {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
|
|
|
|
flags := uint8(0)
|
|
|
|
|
if full {
|
|
|
|
|
flags = bulkFastPayloadFlagFullClose
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := c.dedicatedBulkLaneSender(bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeClose, flags, 0, nil)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) sendDedicatedBulkReset(ctx context.Context, bulk *bulkHandle, message string) error {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := c.dedicatedBulkLaneSender(bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeReset, 0, 0, []byte(message))
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) sendDedicatedBulkRelease(ctx context.Context, bulk *bulkHandle, bytes int64, chunks int) error {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := c.dedicatedBulkLaneSender(bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeRelease, 0, 0, payload)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) dedicatedBulkSender(logical *LogicalConn, bulk *bulkHandle) (*bulkDedicatedSender, error) {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return nil, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
logical = bulk.LogicalConn()
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
if sender := bulk.dedicatedSenderSnapshot(); sender != nil {
|
|
|
|
|
return sender, nil
|
|
|
|
|
}
|
|
|
|
|
conn := bulk.dedicatedConnSnapshot()
|
|
|
|
|
if conn == nil {
|
|
|
|
|
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
|
|
|
|
|
}
|
|
|
|
|
sender := newBulkDedicatedSender(conn, bulk.dataIDSnapshot(), func(plain []byte) ([]byte, error) {
|
|
|
|
|
return s.encryptTransportPayloadLogical(logical, plain)
|
|
|
|
|
}, func(items []bulkDedicatedSendRequest) ([]byte, error) {
|
|
|
|
|
return s.encodeDedicatedBulkBatchPayload(logical, bulk.dataIDSnapshot(), items)
|
|
|
|
|
}, func(err error) {
|
2026-04-18 16:05:57 +08:00
|
|
|
if bulk.canIgnoreDedicatedCloseSendError(err) {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
bulk.markReset(err)
|
|
|
|
|
})
|
|
|
|
|
actual := bulk.installDedicatedSender(sender)
|
|
|
|
|
if actual != sender {
|
|
|
|
|
sender.stop()
|
|
|
|
|
}
|
|
|
|
|
return actual, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (s *ServerCommon) dedicatedBulkLaneSender(logical *LogicalConn, bulk *bulkHandle) (*bulkDedicatedLaneSender, error) {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return nil, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
logical = bulk.LogicalConn()
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
sidecar := s.serverDedicatedSidecarSnapshotForLane(logical, bulk.dedicatedLaneIDSnapshot())
|
|
|
|
|
conn := bulk.dedicatedConnSnapshot()
|
|
|
|
|
if sidecar == nil || sidecar.conn == nil || conn == nil || sidecar.conn != conn {
|
|
|
|
|
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
|
|
|
|
|
}
|
|
|
|
|
sender := sidecar.laneSenderWithFactory(func(conn net.Conn) *bulkDedicatedLaneSender {
|
|
|
|
|
laneRuntime := logical.modernPSKRuntimeSnapshot()
|
|
|
|
|
if forked, err := forkDedicatedLaneModernPSKRuntime(laneRuntime); err == nil && forked != nil {
|
|
|
|
|
laneRuntime = forked
|
|
|
|
|
}
|
|
|
|
|
return newBulkDedicatedLaneSender(conn, func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
|
|
|
|
|
return s.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical, laneRuntime, batches)
|
|
|
|
|
}, func(err error) {
|
|
|
|
|
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
if sender == nil {
|
|
|
|
|
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
|
|
|
|
|
}
|
|
|
|
|
return sender, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (s *ServerCommon) sendDedicatedBulkData(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, chunk []byte) error {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitData(ctx, bulk.dataIDSnapshot(), bulk.nextOutboundDataSeq(), chunk)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
2026-04-20 16:35:44 +08:00
|
|
|
func (s *ServerCommon) sendDedicatedBulkWrite(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return 0, errBulkServerNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
return sender.submitWrite(ctx, bulk.dataIDSnapshot(), startSeq, payload, bulk.chunkSize, payloadOwned)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) sendDedicatedBulkClose(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, full bool) error {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
|
|
|
|
flags := uint8(0)
|
|
|
|
|
if full {
|
|
|
|
|
flags = bulkFastPayloadFlagFullClose
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeClose, flags, 0, nil)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) sendDedicatedBulkReset(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, message string) error {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeReset, 0, 0, []byte(message))
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) sendDedicatedBulkRelease(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, bytes int64, chunks int) error {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
2026-04-18 16:05:57 +08:00
|
|
|
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeRelease, 0, 0, payload)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) encodeDedicatedBulkBatchPayload(dataID uint64, items []bulkDedicatedSendRequest) ([]byte, error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil, errBulkClientNil
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
profile := c.clientTransportProtectionSnapshot()
|
|
|
|
|
if runtime := profile.runtime; runtime != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
return runtime.sealFilledPayload(bulkDedicatedBatchPlainLen(items), func(dst []byte) error {
|
|
|
|
|
return writeBulkDedicatedBatchPlain(dst, dataID, items)
|
|
|
|
|
})
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
if profile.fastPlainEncode != nil {
|
|
|
|
|
return encodeBulkDedicatedBatchPayloadFast(profile.fastPlainEncode, profile.secretKey, dataID, items)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
plain, err := encodeBulkDedicatedBatchPlain(dataID, items)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return c.encryptTransportPayload(plain)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func forkDedicatedLaneModernPSKRuntime(base *modernPSKCodecRuntime) (*modernPSKCodecRuntime, error) {
|
|
|
|
|
if base == nil {
|
|
|
|
|
return nil, nil
|
|
|
|
|
}
|
|
|
|
|
return base.fork()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) encodeDedicatedBulkBatchesPayloadPooledWithRuntime(runtime *modernPSKCodecRuntime, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil, nil, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
if len(batches) == 0 {
|
|
|
|
|
return nil, nil, errBulkFastPayloadInvalid
|
|
|
|
|
}
|
|
|
|
|
if runtime != nil {
|
|
|
|
|
return runtime.sealFilledPayloadPooled(bulkDedicatedBatchesPlainLen(batches), func(dst []byte) error {
|
|
|
|
|
return writeBulkDedicatedBatchesPlain(dst, batches)
|
|
|
|
|
})
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
profile := c.clientTransportProtectionSnapshot()
|
|
|
|
|
if profile.fastPlainEncode != nil {
|
|
|
|
|
payload, err := encodeBulkDedicatedBatchesPayloadFast(profile.fastPlainEncode, profile.secretKey, batches)
|
2026-04-18 16:05:57 +08:00
|
|
|
return payload, nil, err
|
|
|
|
|
}
|
|
|
|
|
plain, err := encodeBulkDedicatedBatchesPlain(batches)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
}
|
|
|
|
|
payload, err := c.encryptTransportPayload(plain)
|
|
|
|
|
return payload, nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) encodeDedicatedBulkBatchesPayloadPooled(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
|
2026-04-20 16:35:44 +08:00
|
|
|
return c.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(c.clientTransportProtectionSnapshot().runtime, batches)
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) encodeDedicatedBulkBatchPayloadPooled(dataID uint64, items []bulkDedicatedSendRequest) ([]byte, func(), error) {
|
|
|
|
|
return c.encodeDedicatedBulkBatchesPayloadPooled([]bulkDedicatedOutboundBatch{{
|
|
|
|
|
DataID: dataID,
|
|
|
|
|
Items: items,
|
|
|
|
|
}})
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (s *ServerCommon) encodeDedicatedBulkBatchPayload(logical *LogicalConn, dataID uint64, items []bulkDedicatedSendRequest) ([]byte, error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return nil, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil, errBulkLogicalConnNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if runtime := logical.modernPSKRuntimeSnapshot(); runtime != nil {
|
|
|
|
|
return runtime.sealFilledPayload(bulkDedicatedBatchPlainLen(items), func(dst []byte) error {
|
|
|
|
|
return writeBulkDedicatedBatchPlain(dst, dataID, items)
|
|
|
|
|
})
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
if fastPlainEncode := logical.fastPlainEncodeSnapshot(); fastPlainEncode != nil {
|
|
|
|
|
return encodeBulkDedicatedBatchPayloadFast(fastPlainEncode, logical.secretKeySnapshot(), dataID, items)
|
|
|
|
|
}
|
|
|
|
|
plain, err := encodeBulkDedicatedBatchPlain(dataID, items)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return s.encryptTransportPayloadLogical(logical, plain)
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
|
|
|
|
|
func (s *ServerCommon) encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical *LogicalConn, runtime *modernPSKCodecRuntime, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return nil, nil, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil, nil, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
if len(batches) == 0 {
|
|
|
|
|
return nil, nil, errBulkFastPayloadInvalid
|
|
|
|
|
}
|
|
|
|
|
if runtime != nil {
|
|
|
|
|
return runtime.sealFilledPayloadPooled(bulkDedicatedBatchesPlainLen(batches), func(dst []byte) error {
|
|
|
|
|
return writeBulkDedicatedBatchesPlain(dst, batches)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
if fastPlainEncode := logical.fastPlainEncodeSnapshot(); fastPlainEncode != nil {
|
|
|
|
|
payload, err := encodeBulkDedicatedBatchesPayloadFast(fastPlainEncode, logical.secretKeySnapshot(), batches)
|
|
|
|
|
return payload, nil, err
|
|
|
|
|
}
|
|
|
|
|
plain, err := encodeBulkDedicatedBatchesPlain(batches)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
}
|
|
|
|
|
payload, err := s.encryptTransportPayloadLogical(logical, plain)
|
|
|
|
|
return payload, nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) encodeDedicatedBulkBatchesPayloadPooled(logical *LogicalConn, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
|
|
|
|
|
return s.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical, logical.modernPSKRuntimeSnapshot(), batches)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) encodeDedicatedBulkBatchPayloadPooled(logical *LogicalConn, dataID uint64, items []bulkDedicatedSendRequest) ([]byte, func(), error) {
|
|
|
|
|
return s.encodeDedicatedBulkBatchesPayloadPooled(logical, []bulkDedicatedOutboundBatch{{
|
|
|
|
|
DataID: dataID,
|
|
|
|
|
Items: items,
|
|
|
|
|
}})
|
|
|
|
|
}
|