notify/bulk_dedicated.go

1557 lines
46 KiB
Go
Raw Permalink Normal View History

package notify
import (
"b612.me/notify/internal/transport"
"b612.me/stario"
"context"
cryptorand "crypto/rand"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"strings"
"sync/atomic"
"time"
)
const (
systemBulkAttachKey = "_notify_bulk_attach"
bulkDedicatedRecordMagic = "NBR1"
bulkDedicatedRecordHeaderLen = 8
defaultBulkDedicatedAttachLimit = 16
defaultBulkDedicatedActiveLimit = 4096
defaultBulkDedicatedLaneLimit = 4
defaultBulkDedicatedAttachRetry = 2
defaultBulkDedicatedAttachBackoff = 150 * time.Millisecond
defaultBulkDedicatedDialTimeout = 5 * time.Second
defaultBulkDedicatedHelloTimeout = 10 * time.Second
)
type bulkAttachRequest struct {
PeerID string
BulkID string
AttachToken string
}
type bulkAttachResponse struct {
Accepted bool
Error string
Code string
Retryable bool
FailedSeq uint64
FailedBulk string
}
type bulkAttachErrorCode string
const (
bulkAttachErrorCodeInvalidRequest bulkAttachErrorCode = "invalid_request"
bulkAttachErrorCodeServerUnavailable bulkAttachErrorCode = "server_unavailable"
bulkAttachErrorCodePeerNotFound bulkAttachErrorCode = "peer_not_found"
bulkAttachErrorCodeBulkNotFound bulkAttachErrorCode = "bulk_not_found"
bulkAttachErrorCodeBulkNotDedicated bulkAttachErrorCode = "bulk_not_dedicated"
bulkAttachErrorCodeTokenMismatch bulkAttachErrorCode = "token_mismatch"
bulkAttachErrorCodeAlreadyAttached bulkAttachErrorCode = "already_attached"
bulkAttachErrorCodeAttachFailed bulkAttachErrorCode = "attach_failed"
bulkAttachErrorCodeInternal bulkAttachErrorCode = "internal_error"
)
type bulkAttachError struct {
Code bulkAttachErrorCode
Retryable bool
Message string
FailedSeq uint64
FailedBulk string
}
func (e *bulkAttachError) Error() string {
if e == nil {
return ""
}
if e.Message != "" {
return e.Message
}
if e.Code != "" {
return string(e.Code)
}
return "bulk attach failed"
}
func newBulkAttachError(code bulkAttachErrorCode, retryable bool, message string) *bulkAttachError {
return &bulkAttachError{
Code: code,
Retryable: retryable,
Message: message,
}
}
func toBulkAttachResponseError(err error, bulkID string) bulkAttachResponse {
resp := bulkAttachResponse{
Accepted: false,
FailedBulk: bulkID,
}
if err == nil {
resp.Error = "bulk attach failed"
resp.Code = string(bulkAttachErrorCodeInternal)
return resp
}
var attachErr *bulkAttachError
if errors.As(err, &attachErr) && attachErr != nil {
resp.Error = attachErr.Error()
resp.Code = string(attachErr.Code)
resp.Retryable = attachErr.Retryable
resp.FailedSeq = attachErr.FailedSeq
if attachErr.FailedBulk != "" {
resp.FailedBulk = attachErr.FailedBulk
}
if resp.Code == "" {
resp.Code = string(bulkAttachErrorCodeInternal)
}
return resp
}
resp.Error = err.Error()
resp.Code = string(bulkAttachErrorCodeInternal)
return resp
}
func newBulkAttachToken() string {
var buf [16]byte
if _, err := cryptorand.Read(buf[:]); err == nil {
return hex.EncodeToString(buf[:])
}
return ""
}
func decodeBulkAttachRequest(decodeFn func([]byte) (interface{}, error), data MsgVal) (bulkAttachRequest, error) {
var req bulkAttachRequest
if decodeFn == nil {
decodeFn = Decode
}
raw := []byte(data)
value, err := decodeFn(raw)
if err != nil {
return req, err
}
switch typed := value.(type) {
case bulkAttachRequest:
return typed, nil
case *bulkAttachRequest:
if typed == nil {
return req, errors.New("bulk attach request is nil")
}
return *typed, nil
default:
return req, errors.New("invalid bulk attach payload")
}
}
func decodeBulkAttachResponse(decodeFn func([]byte) (interface{}, error), data MsgVal) (bulkAttachResponse, error) {
var resp bulkAttachResponse
if decodeFn == nil {
decodeFn = Decode
}
raw := []byte(data)
value, err := decodeFn(raw)
if err != nil {
return resp, err
}
switch typed := value.(type) {
case bulkAttachResponse:
return typed, nil
case *bulkAttachResponse:
if typed == nil {
return resp, errors.New("bulk attach response is nil")
}
return *typed, nil
default:
return resp, errors.New("invalid bulk attach response")
}
}
func encodeDirectSignalFrame(queue *stario.StarQueue, sequenceEn func(interface{}) ([]byte, error), msgEn func([]byte, []byte) []byte, secretKey []byte, msg TransferMsg) ([]byte, error) {
if queue == nil {
queue = stario.NewQueue()
}
env, err := wrapTransferMsgEnvelope(msg, sequenceEn)
if err != nil {
return nil, err
}
plain, err := sequenceEn(env)
if err != nil {
return nil, err
}
payload := msgEn(secretKey, plain)
if payload == nil && len(plain) != 0 {
return nil, errTransportPayloadEncryptFailed
}
return queue.BuildMessage(payload), nil
}
func decodeDirectSignalPayload(sequenceDe func([]byte) (interface{}, error), msgDe func([]byte, []byte) []byte, secretKey []byte, payload []byte) (TransferMsg, error) {
plain := msgDe(secretKey, payload)
if plain == nil && len(payload) != 0 {
return TransferMsg{}, errTransportPayloadDecryptFailed
}
value, err := sequenceDe(plain)
if err != nil {
return TransferMsg{}, err
}
env, ok := value.(Envelope)
if !ok {
return TransferMsg{}, errors.New("invalid signal envelope")
}
return unwrapTransferMsgEnvelope(env, sequenceDe)
}
func readDirectSignalFramePayload(conn net.Conn) ([]byte, error) {
if conn == nil {
return nil, net.ErrClosed
}
return newTransportFrameReader(conn, stario.NewQueue()).Next()
}
func writeBulkDedicatedRecord(conn net.Conn, payload []byte) error {
return writeBulkDedicatedRecordWithDeadline(conn, payload, time.Time{})
}
func writeBulkDedicatedRecordWithDeadline(conn net.Conn, payload []byte, deadline time.Time) error {
if conn == nil {
return net.ErrClosed
}
return withRawConnWriteLockDeadline(conn, deadline, func(conn net.Conn) error {
var header [bulkDedicatedRecordHeaderLen]byte
copy(header[:4], bulkDedicatedRecordMagic)
binary.BigEndian.PutUint32(header[4:8], uint32(len(payload)))
return writeNetBuffersFullUnlocked(conn, net.Buffers{header[:], payload})
})
}
func readBulkDedicatedRecord(conn net.Conn) ([]byte, error) {
payload, release, err := readBulkDedicatedRecordPooled(conn)
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
return append([]byte(nil), payload...), nil
}
func readBulkDedicatedRecordPooled(conn net.Conn) ([]byte, func(), error) {
if conn == nil {
return nil, nil, net.ErrClosed
}
var header [bulkDedicatedRecordHeaderLen]byte
if _, err := io.ReadFull(conn, header[:]); err != nil {
return nil, nil, err
}
if string(header[:4]) != bulkDedicatedRecordMagic {
return nil, nil, fmt.Errorf("%w: record magic=%x", errBulkFastPayloadInvalid, header[:4])
}
size := int(binary.BigEndian.Uint32(header[4:8]))
if size < 0 {
return nil, nil, errBulkFastPayloadInvalid
}
payload := getModernPSKPayloadBuffer(size)
if _, err := io.ReadFull(conn, payload); err != nil {
putModernPSKPayloadBuffer(payload)
return nil, nil, err
}
return payload, func() {
putModernPSKPayloadBuffer(payload)
}, nil
}
func (c *ClientCommon) dialDedicatedBulkConn(ctx context.Context, timeout time.Duration) (net.Conn, error) {
source := c.clientConnectSourceSnapshot()
if source != nil {
if !source.supportsAdditionalConn() {
return nil, errBulkDedicatedSingleConn
}
if source.network != "" && source.addr != "" {
if timeout > 0 {
return transport.DialTimeout(source.network, source.addr, timeout)
}
return transport.Dial(source.network, source.addr)
}
if source.canReconnect() {
return source.dial(ctx)
}
return nil, errClientReconnectSourceUnavailable
}
conn := c.clientTransportConnSnapshot()
if conn == nil || conn.RemoteAddr() == nil {
return nil, errClientReconnectSourceUnavailable
}
if timeout > 0 {
return transport.DialTimeout(conn.RemoteAddr().Network(), conn.RemoteAddr().String(), timeout)
}
return transport.Dial(conn.RemoteAddr().Network(), conn.RemoteAddr().String())
}
func (c *ClientCommon) attachDedicatedBulkSidecar(ctx context.Context, bulk *bulkHandle) error {
if c == nil || bulk == nil || !bulk.Dedicated() || bulk.dedicatedAttachedSnapshot() {
return nil
}
if ctx == nil {
ctx = context.Background()
}
laneID := bulk.dedicatedLaneIDSnapshot()
releaseActiveSlot, err := c.acquireBulkDedicatedActiveSlot(ctx)
if err != nil {
return err
}
needReleaseActive := true
defer func() {
if needReleaseActive {
releaseActiveSlot()
}
}()
if sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID); sidecar != nil && sidecar.conn != nil {
if err := bulk.attachDedicatedConnShared(sidecar.conn); err == nil {
bulk.markDedicatedActiveReserved()
needReleaseActive = false
return nil
}
}
_, flight, leader := c.beginClientDedicatedSidecarAttach(laneID)
if !leader {
if flight == nil {
return errTransportDetached
}
if err := flight.wait(ctx); err != nil {
return err
}
sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID)
if sidecar == nil || sidecar.conn == nil {
return errTransportDetached
}
if err := bulk.attachDedicatedConnShared(sidecar.conn); err != nil {
return err
}
bulk.markDedicatedActiveReserved()
needReleaseActive = false
return nil
}
if flight == nil {
return errTransportDetached
}
var flightErr error
defer func() {
c.finishClientDedicatedSidecarAttach(laneID, flight, flightErr)
}()
releaseAttachSlot, err := c.acquireBulkDedicatedAttachSlot(ctx)
if err != nil {
flightErr = err
return err
}
defer releaseAttachSlot()
if sidecar := c.clientDedicatedSidecarSnapshotForLane(laneID); sidecar != nil && sidecar.conn != nil {
if err := bulk.attachDedicatedConnShared(sidecar.conn); err == nil {
bulk.markDedicatedActiveReserved()
needReleaseActive = false
flightErr = nil
return nil
}
}
retry, backoff, dialTimeout, helloTimeout := c.bulkDedicatedAttachConfigSnapshot()
attempts := retry + 1
if attempts <= 0 {
attempts = 1
}
var lastErr error
for attempt := 1; attempt <= attempts; attempt++ {
c.bulkAttachAttemptCount.Add(1)
if attempt > 1 {
delay := backoff * time.Duration(1<<(attempt-2))
if delay > 3*time.Second {
delay = 3 * time.Second
}
if err := waitDedicatedAttachBackoff(ctx, delay); err != nil {
flightErr = err
return err
}
}
dialCtx := ctx
dialCancel := func() {}
if dialTimeout > 0 {
dialCtx, dialCancel = context.WithTimeout(ctx, dialTimeout)
}
conn, err := c.dialDedicatedBulkConn(dialCtx, dialTimeout)
dialCancel()
if err != nil {
lastErr = err
if attempt < attempts && isRetryableDedicatedAttachError(err) {
flightErr = err
c.bulkAttachRetryCount.Add(1)
continue
}
bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed))
flightErr = err
return err
}
helloCtx := ctx
helloCancel := func() {}
if helloTimeout > 0 {
helloCtx, helloCancel = context.WithTimeout(ctx, helloTimeout)
}
resp, err := c.sendDedicatedBulkAttachRequest(helloCtx, conn, bulk)
helloCancel()
if err != nil {
_ = conn.Close()
lastErr = err
if attempt < attempts && isRetryableDedicatedAttachError(err) {
flightErr = err
c.bulkAttachRetryCount.Add(1)
continue
}
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed))
flightErr = err
return err
}
if !resp.Accepted {
_ = conn.Close()
rejectedErr := &bulkAttachError{
Code: bulkAttachErrorCode(resp.Code),
Retryable: resp.Retryable,
Message: resp.Error,
FailedSeq: resp.FailedSeq,
FailedBulk: resp.FailedBulk,
}
if rejectedErr.Code == "" {
rejectedErr.Code = bulkAttachErrorCodeAttachFailed
}
lastErr = rejectedErr
bulk.setDedicatedAttachLastCode(string(rejectedErr.Code))
if attempt < attempts && rejectedErr.Retryable {
flightErr = rejectedErr
c.bulkAttachRetryCount.Add(1)
continue
}
bulk.markDedicatedAttachDegraded(string(rejectedErr.Code))
flightErr = rejectedErr
return rejectedErr
}
sidecar := newBulkDedicatedSidecar(conn, laneID)
activeSidecar, installed := c.installClientDedicatedSidecar(laneID, sidecar)
if !installed {
sidecar.close()
sidecar = activeSidecar
}
if sidecar == nil || sidecar.conn == nil {
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
flightErr = errTransportDetached
return errTransportDetached
}
if err := bulk.attachDedicatedConnShared(sidecar.conn); err != nil {
if installed && c.clearClientDedicatedSidecar(laneID, sidecar) {
sidecar.close()
}
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
if installed {
flightErr = err
} else {
flightErr = nil
}
return err
}
c.bulkAttachSuccessCount.Add(1)
if installed {
go c.readDedicatedSidecarLoop(sidecar)
}
bulk.markDedicatedActiveReserved()
needReleaseActive = false
flightErr = nil
return nil
}
if lastErr != nil {
flightErr = lastErr
return lastErr
}
flightErr = errors.New("bulk attach failed")
return flightErr
}
func (c *ClientCommon) bulkDedicatedAttachConfigSnapshot() (int, time.Duration, time.Duration, time.Duration) {
if c == nil {
return defaultBulkDedicatedAttachRetry, defaultBulkDedicatedAttachBackoff, defaultBulkDedicatedDialTimeout, defaultBulkDedicatedHelloTimeout
}
c.mu.Lock()
defer c.mu.Unlock()
retry := c.bulkDedicatedAttachRetry
if retry < 0 {
retry = 0
}
backoff := c.bulkDedicatedAttachBackoff
if backoff <= 0 {
backoff = defaultBulkDedicatedAttachBackoff
}
dialTimeout := c.bulkDedicatedDialTimeout
if dialTimeout <= 0 {
dialTimeout = defaultBulkDedicatedDialTimeout
}
helloTimeout := c.bulkDedicatedHelloTimeout
if helloTimeout <= 0 {
helloTimeout = defaultBulkDedicatedHelloTimeout
}
return retry, backoff, dialTimeout, helloTimeout
}
func (c *ClientCommon) acquireBulkDedicatedAttachSlot(ctx context.Context) (func(), error) {
sem := c.bulkDedicatedAttachSemaphoreSnapshot()
if c == nil || sem == nil {
return func() {}, nil
}
if ctx == nil {
ctx = context.Background()
}
select {
case sem <- struct{}{}:
return func() {
select {
case <-sem:
default:
}
}, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (c *ClientCommon) reserveBulkDedicatedActiveSlot() bool {
if c == nil {
return false
}
limit := c.bulkDedicatedActiveLimitSnapshot()
if limit <= 0 {
return true
}
for {
current := c.bulkDedicatedActive.Load()
if int(current) >= limit {
return false
}
if c.bulkDedicatedActive.CompareAndSwap(current, current+1) {
return true
}
}
}
func (c *ClientCommon) acquireBulkDedicatedActiveSlot(ctx context.Context) (func(), error) {
if c == nil {
return func() {}, errBulkClientNil
}
if ctx == nil {
ctx = context.Background()
}
for {
if c.reserveBulkDedicatedActiveSlot() {
return c.releaseBulkDedicatedActiveSlot, nil
}
waitCh := c.bulkDedicatedActiveWaitSnapshot()
if c.reserveBulkDedicatedActiveSlot() {
return c.releaseBulkDedicatedActiveSlot, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-waitCh:
}
}
}
func (c *ClientCommon) releaseBulkDedicatedActiveSlot() {
if c == nil {
return
}
for {
current := c.bulkDedicatedActive.Load()
if current <= 0 {
return
}
if c.bulkDedicatedActive.CompareAndSwap(current, current-1) {
c.notifyBulkDedicatedActiveWaiters()
return
}
}
}
func (c *ClientCommon) notifyBulkDedicatedActiveWaiters() {
if c == nil {
return
}
c.mu.Lock()
c.notifyBulkDedicatedActiveWaitersLocked()
c.mu.Unlock()
}
func waitDedicatedAttachBackoff(ctx context.Context, delay time.Duration) error {
if delay <= 0 {
return nil
}
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func isRetryableDedicatedAttachError(err error) bool {
if err == nil {
return false
}
var attachErr *bulkAttachError
if errors.As(err, &attachErr) && attachErr != nil {
return attachErr.Retryable
}
if errors.Is(err, context.Canceled) {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
var netErr net.Error
if errors.As(err, &netErr) && (netErr.Timeout() || netErr.Temporary()) {
return true
}
message := strings.ToLower(err.Error())
for _, pattern := range []string{
"timeout",
"timed out",
"deadline",
"connection reset",
"connection refused",
"connectex",
"broken pipe",
"no route",
"host unreachable",
"transport detached",
} {
if strings.Contains(message, pattern) {
return true
}
}
return false
}
func (c *ClientCommon) sendDedicatedBulkAttachRequest(ctx context.Context, conn net.Conn, bulk *bulkHandle) (bulkAttachResponse, error) {
if c == nil {
return bulkAttachResponse{}, errBulkClientNil
}
if bulk == nil {
return bulkAttachResponse{}, errBulkIDEmpty
}
defer func() {
_ = conn.SetReadDeadline(time.Time{})
}()
reqPayload, err := c.sequenceEn(bulkAttachRequest{
PeerID: c.ensureClientPeerIdentity(),
BulkID: bulk.ID(),
AttachToken: bulk.dedicatedAttachTokenSnapshot(),
})
if err != nil {
return bulkAttachResponse{}, err
}
msg := TransferMsg{
ID: atomic.AddUint64(&c.msgID, 1),
Key: systemBulkAttachKey,
Value: reqPayload,
Type: MSG_SYS_WAIT,
}
attachProfile := c.clientDedicatedBulkAttachTransportProtectionProfile()
frame, err := encodeDirectSignalFrame(stario.NewQueue(), c.sequenceEn, attachProfile.msgEn, attachProfile.secretKey, msg)
if err != nil {
return bulkAttachResponse{}, err
}
if err := writeFullToConn(conn, frame); err != nil {
return bulkAttachResponse{}, err
}
if deadline, ok := ctx.Deadline(); ok {
_ = conn.SetReadDeadline(deadline)
}
replyPayload, err := readDirectSignalFramePayload(conn)
if err != nil {
return bulkAttachResponse{}, err
}
transfer, err := decodeDirectSignalPayload(c.sequenceDe, attachProfile.msgDe, attachProfile.secretKey, replyPayload)
if err != nil {
return bulkAttachResponse{}, err
}
if transfer.Key != systemBulkAttachKey || transfer.Type != MSG_SYS_REPLY || transfer.ID != msg.ID {
return bulkAttachResponse{}, errors.New("invalid bulk attach reply")
}
return decodeBulkAttachResponse(c.sequenceDe, transfer.Value)
}
func (c *ClientCommon) clientDedicatedBulkAttachTransportProtectionProfile() transportProtectionProfile {
if c == nil {
return transportProtectionProfile{}
}
if c.securityConfigured && c.securityBootstrap.msgEn != nil && c.securityBootstrap.msgDe != nil {
return c.securityBootstrap.clone()
}
return c.clientTransportProtectionSnapshot()
}
func (c *ClientCommon) readDedicatedSidecarLoop(sidecar *bulkDedicatedSidecar) {
if c == nil || sidecar == nil || sidecar.conn == nil {
return
}
for {
payload, payloadRelease, err := readBulkDedicatedRecordPooled(sidecar.conn)
if err != nil {
c.handleClientDedicatedSidecarFailure(sidecar, err)
return
}
profile := c.clientTransportProtectionSnapshot()
plain, plainRelease, err := decryptTransportPayloadCodecPooled(profile.mode, profile.runtime, profile.msgDe, profile.secretKey, payload, payloadRelease)
if err != nil {
c.handleClientDedicatedSidecarFailure(sidecar, err)
return
}
owner := newBulkReadPayloadOwner(plainRelease)
runtime := c.getBulkRuntime()
if runtime == nil {
owner.done()
continue
}
var (
currentDataID uint64
currentBulk *bulkHandle
skipDataID bool
)
err = walkDedicatedBulkInboundPayload(plain, func(dataID uint64, item bulkDedicatedBatchItem) error {
if dataID != currentDataID {
currentDataID = dataID
currentBulk = nil
skipDataID = false
bulk, ok := runtime.lookupByDataID(clientFileScope(), dataID)
if !ok {
c.bestEffortRejectInboundBulkData("", dataID, errBulkNotFound.Error())
skipDataID = true
return nil
}
if !bulk.acceptsClientSessionEpoch(c.currentClientSessionEpoch()) {
detachErr := transportDetachedSessionEpochError()
bulk.markReset(detachErr)
c.bestEffortRejectInboundBulkData(bulk.ID(), dataID, detachErr.Error())
skipDataID = true
return nil
}
bulk.markDedicatedDataStarted()
currentBulk = bulk
}
if skipDataID || currentBulk == nil {
return nil
}
var release func()
if item.Type == bulkFastPayloadTypeData {
release = owner.retainChunk()
}
dispatchErr := dispatchDedicatedBulkInboundItemWithRelease(currentBulk, item, release)
if dispatchErr != nil {
if !errors.Is(dispatchErr, io.EOF) {
_ = c.sendDedicatedBulkReset(context.Background(), currentBulk, dispatchErr.Error())
currentBulk.markReset(dispatchErr)
}
currentBulk = nil
skipDataID = true
return nil
}
if currentBulk.Context().Err() != nil {
currentBulk = nil
skipDataID = true
}
return nil
})
if err != nil {
if plainRelease != nil {
plainRelease()
}
c.handleClientDedicatedSidecarFailure(sidecar, err)
return
}
owner.done()
}
}
func (s *ServerCommon) handleBulkAttachSystemMessage(message Message) bool {
if message.Key != systemBulkAttachKey {
return false
}
current := messageLogicalConnSnapshot(&message)
var (
req bulkAttachRequest
logical *LogicalConn
bulk *bulkHandle
err error
)
req, err = decodeBulkAttachRequest(s.sequenceDe, message.Value)
if err == nil {
logical, bulk, err = s.resolveInboundDedicatedBulk(current, req)
}
if err != nil {
if current != nil {
_ = s.replyDedicatedBulkAttach(current, message, toBulkAttachResponseError(err, req.BulkID))
}
return true
}
if current != nil {
if attachErr := s.finishInboundDedicatedBulkAttach(current, logical, bulk, message); attachErr != nil {
bulk.markReset(attachErr)
}
}
return true
}
func (s *ServerCommon) resolveInboundDedicatedBulk(current *LogicalConn, req bulkAttachRequest) (*LogicalConn, *bulkHandle, error) {
if s == nil {
return nil, nil, newBulkAttachError(bulkAttachErrorCodeServerUnavailable, true, errBulkServerNil.Error())
}
if current == nil {
return nil, nil, newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkLogicalConnNil.Error())
}
if req.PeerID == "" || req.BulkID == "" || req.AttachToken == "" {
return nil, nil, newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkIDEmpty.Error())
}
logical := s.GetLogicalConn(req.PeerID)
if logical == nil {
return nil, nil, newBulkAttachError(bulkAttachErrorCodePeerNotFound, true, errBulkLogicalConnNil.Error())
}
runtime := s.getBulkRuntime()
if runtime == nil {
return nil, nil, newBulkAttachError(bulkAttachErrorCodeServerUnavailable, true, errBulkRuntimeNil.Error())
}
bulk, ok := runtime.lookup(serverFileScope(logical), req.BulkID)
if !ok {
return nil, nil, &bulkAttachError{
Code: bulkAttachErrorCodeBulkNotFound,
Retryable: false,
Message: errBulkNotFound.Error(),
FailedBulk: req.BulkID,
}
}
bulk.markDedicatedAttachAttempt()
if !bulk.Dedicated() {
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeBulkNotDedicated))
return nil, nil, &bulkAttachError{
Code: bulkAttachErrorCodeBulkNotDedicated,
Retryable: false,
Message: "bulk is not dedicated",
FailedBulk: req.BulkID,
}
}
if bulk.dedicatedAttachTokenSnapshot() != req.AttachToken {
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeTokenMismatch))
return nil, nil, &bulkAttachError{
Code: bulkAttachErrorCodeTokenMismatch,
Retryable: false,
Message: "bulk attach token mismatch",
FailedBulk: req.BulkID,
}
}
switch bulk.dedicatedAttachStateSnapshot() {
case bulkDedicatedAttachStateClosed:
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
return nil, nil, &bulkAttachError{
Code: bulkAttachErrorCodeAttachFailed,
Retryable: false,
Message: "bulk dedicated attach closed",
FailedBulk: req.BulkID,
}
case bulkDedicatedAttachStateDegraded:
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
return nil, nil, &bulkAttachError{
Code: bulkAttachErrorCodeAttachFailed,
Retryable: true,
Message: "bulk dedicated attach degraded",
FailedBulk: req.BulkID,
}
case bulkDedicatedAttachStateAttached:
if bulk.dedicatedDataStartedSnapshot() {
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAlreadyAttached))
return nil, nil, &bulkAttachError{
Code: bulkAttachErrorCodeAlreadyAttached,
Retryable: false,
Message: "bulk dedicated already attached",
FailedBulk: req.BulkID,
}
}
}
return logical, bulk, nil
}
func (s *ServerCommon) finishInboundDedicatedBulkAttach(current *LogicalConn, logical *LogicalConn, bulk *bulkHandle, message Message) error {
if current == nil || logical == nil || bulk == nil {
return newBulkAttachError(bulkAttachErrorCodeInvalidRequest, false, errBulkLogicalConnNil.Error())
}
scope := serverFileScope(logical)
laneID := bulk.dedicatedLaneIDSnapshot()
conn, err := current.detachTransportForTransfer()
if err != nil {
return newBulkAttachError(bulkAttachErrorCodeAttachFailed, true, err.Error())
}
stopCurrent := func(reason string, err error) {
current.markSessionStopped(reason, err)
s.removeLogical(current)
}
fail := func(reason string, err error) error {
if conn != nil {
_ = conn.Close()
}
bulk.markDedicatedAttachDegraded(string(bulkAttachErrorCodeAttachFailed))
stopCurrent(reason, err)
return newBulkAttachError(bulkAttachErrorCodeAttachFailed, true, err.Error())
}
if bulk.dedicatedAttachedSnapshot() {
if bulk.dedicatedDataStartedSnapshot() {
rejected := &bulkAttachError{
Code: bulkAttachErrorCodeAlreadyAttached,
Retryable: false,
Message: "bulk dedicated already attached",
FailedBulk: bulk.ID(),
}
_ = s.replyDedicatedBulkAttachDetached(current, conn, message, toBulkAttachResponseError(rejected, bulk.ID()))
_ = conn.Close()
stopCurrent("bulk dedicated attach duplicate rejected", nil)
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAlreadyAttached))
return nil
}
}
sidecar := newBulkDedicatedSidecar(conn, laneID)
if sidecar == nil {
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
return fail("bulk dedicated attach failed", errTransportDetached)
}
var (
oldConn net.Conn
oldSender *bulkDedicatedSender
)
if bulk.dedicatedAttachedSnapshot() {
var replaceErr error
oldConn, oldSender, replaceErr = bulk.replaceDedicatedConnShared(conn)
if replaceErr != nil {
sidecar.close()
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
return fail("bulk dedicated reattach failed", replaceErr)
}
} else if err := bulk.attachDedicatedConnShared(conn); err != nil {
sidecar.close()
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
return fail("bulk dedicated attach failed", err)
}
if err := s.replyDedicatedBulkAttachDetached(current, conn, message, bulkAttachResponse{Accepted: true}); err != nil {
bulk.setDedicatedAttachLastCode(string(bulkAttachErrorCodeAttachFailed))
sidecar.close()
if runtime := s.getBulkRuntime(); runtime != nil {
runtime.resetDedicatedByConn(scope, conn, transportDetachedError("bulk dedicated attach reply failed", err))
}
stopCurrent("bulk dedicated attach reply failed", err)
return nil
}
oldSidecar := s.installServerDedicatedSidecar(logical, laneID, sidecar)
if runtime := s.getBulkRuntime(); runtime != nil {
runtime.attachSharedDedicatedConn(scope, laneID, conn)
}
if oldSender != nil {
oldSender.stop()
}
if oldConn != nil {
_ = oldConn.Close()
}
if oldSidecar != nil && oldSidecar != sidecar {
oldSidecar.close()
}
go s.readDedicatedSidecarLoop(logical, sidecar)
s.startServerBulkAcceptDispatch(bulk, logical, messageTransportConnSnapshot(&message))
if runtime := s.getBulkRuntime(); runtime != nil {
s.dispatchPendingServerBulkAccepts(scope, conn, bulk, logical)
}
stopCurrent("bulk dedicated attach", nil)
return nil
}
func (s *ServerCommon) dispatchPendingServerBulkAccepts(scope string, conn net.Conn, current *bulkHandle, logical *LogicalConn) {
if s == nil || conn == nil {
return
}
runtime := s.getBulkRuntime()
if runtime == nil {
return
}
for _, pending := range runtime.dedicatedBulksForConn(scope, conn) {
if pending == nil || pending == current {
continue
}
s.startServerBulkAcceptDispatch(pending, logical, pending.TransportConn())
}
}
func (s *ServerCommon) replyDedicatedBulkAttachDetached(client *LogicalConn, conn net.Conn, message Message, resp bulkAttachResponse) error {
if s == nil || client == nil {
return errBulkServerNil
}
if conn == nil {
return net.ErrClosed
}
msgEn := client.msgEnSnapshot()
if msgEn == nil {
return errTransportPayloadEncryptFailed
}
encoded, err := s.sequenceEn(resp)
if err != nil {
return err
}
reply := TransferMsg{
ID: message.ID,
Key: systemBulkAttachKey,
Value: encoded,
Type: MSG_SYS_REPLY,
}
frame, err := encodeDirectSignalFrame(stario.NewQueue(), s.sequenceEn, msgEn, client.secretKeySnapshot(), reply)
if err != nil {
return err
}
return withRawConnWriteLockDeadline(conn, writeDeadlineFromTimeout(client.maxWriteTimeoutSnapshot()), func(conn net.Conn) error {
return writeFullToConnUnlocked(conn, frame)
})
}
func (s *ServerCommon) replyDedicatedBulkAttach(client *LogicalConn, message Message, resp bulkAttachResponse) error {
if s == nil || client == nil {
return errBulkServerNil
}
encoded, err := s.sequenceEn(resp)
if err != nil {
return err
}
reply := TransferMsg{
ID: message.ID,
Key: systemBulkAttachKey,
Value: encoded,
Type: MSG_SYS_REPLY,
}
if message.inboundConn != nil {
return s.sendTransferInbound(client, messageTransportConnSnapshot(&message), message.inboundConn, messageInboundTransportProtectionSnapshot(&message), reply)
}
_, err = s.sendLogical(client, reply)
return err
}
func (s *ServerCommon) readDedicatedSidecarLoop(logical *LogicalConn, sidecar *bulkDedicatedSidecar) {
if s == nil || logical == nil || sidecar == nil || sidecar.conn == nil {
return
}
runtime := s.getBulkRuntime()
scope := serverFileScope(logical)
for {
payload, payloadRelease, err := readBulkDedicatedRecordPooled(sidecar.conn)
if err != nil {
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
return
}
plain, plainRelease, err := decryptTransportPayloadCodecPooled(logical.protectionModeSnapshot(), logical.modernPSKRuntimeSnapshot(), logical.msgDeSnapshot(), logical.secretKeySnapshot(), payload, payloadRelease)
if err != nil {
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
return
}
owner := newBulkReadPayloadOwner(plainRelease)
if runtime == nil {
owner.done()
continue
}
var (
currentDataID uint64
currentBulk *bulkHandle
skipDataID bool
)
err = walkDedicatedBulkInboundPayload(plain, func(dataID uint64, item bulkDedicatedBatchItem) error {
if dataID != currentDataID {
currentDataID = dataID
currentBulk = nil
skipDataID = false
bulk, ok := runtime.lookupByDataID(scope, dataID)
if !ok {
s.bestEffortRejectInboundDedicatedData(logical, sidecar.conn, dataID, errBulkNotFound.Error())
skipDataID = true
return nil
}
bulk.markDedicatedDataStarted()
currentBulk = bulk
}
if skipDataID || currentBulk == nil {
return nil
}
var release func()
if item.Type == bulkFastPayloadTypeData {
release = owner.retainChunk()
}
dispatchErr := dispatchDedicatedBulkInboundItemWithRelease(currentBulk, item, release)
if dispatchErr != nil {
if !errors.Is(dispatchErr, io.EOF) {
_ = s.sendDedicatedBulkReset(context.Background(), logical, currentBulk, dispatchErr.Error())
currentBulk.markReset(dispatchErr)
}
currentBulk = nil
skipDataID = true
return nil
}
if currentBulk.Context().Err() != nil {
currentBulk = nil
skipDataID = true
}
return nil
})
if err != nil {
if plainRelease != nil {
plainRelease()
}
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
return
}
owner.done()
}
}
func (s *ServerCommon) bestEffortRejectInboundDedicatedData(logical *LogicalConn, conn net.Conn, dataID uint64, message string) {
if s == nil || logical == nil || conn == nil || dataID == 0 {
return
}
frame, err := s.encodeDedicatedBulkBatchPayload(logical, dataID, []bulkDedicatedSendRequest{{
Type: bulkFastPayloadTypeReset,
Payload: []byte(message),
}})
if err != nil {
return
}
_ = writeBulkDedicatedRecordWithDeadline(conn, frame, writeDeadlineFromTimeout(logical.maxWriteTimeoutSnapshot()))
}
func handleDedicatedBulkReadError(bulk *bulkHandle, err error) {
if bulk == nil {
return
}
if bulk.Context().Err() != nil || bulk.remoteClosedSnapshot() {
return
}
message := strings.ToLower(err.Error())
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || strings.Contains(message, "use of closed network connection") {
if bulk.Dedicated() || bulk.localClosedSnapshot() {
bulk.markRemoteClosed()
return
}
}
bulk.markReset(transportDetachedError("dedicated bulk read error", err))
}
func (c *ClientCommon) dedicatedBulkSender(bulk *bulkHandle) (*bulkDedicatedSender, error) {
if c == nil || bulk == nil {
return nil, errBulkClientNil
}
if sender := bulk.dedicatedSenderSnapshot(); sender != nil {
return sender, nil
}
conn := bulk.dedicatedConnSnapshot()
if conn == nil {
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
}
sender := newBulkDedicatedSender(conn, bulk.dataIDSnapshot(), c.encryptTransportPayload, func(items []bulkDedicatedSendRequest) ([]byte, error) {
return c.encodeDedicatedBulkBatchPayload(bulk.dataIDSnapshot(), items)
}, func(err error) {
if bulk.canIgnoreDedicatedCloseSendError(err) {
return
}
bulk.markReset(err)
})
actual := bulk.installDedicatedSender(sender)
if actual != sender {
sender.stop()
}
return actual, nil
}
func (c *ClientCommon) dedicatedBulkLaneSender(bulk *bulkHandle) (*bulkDedicatedLaneSender, error) {
if c == nil || bulk == nil {
return nil, errBulkClientNil
}
sidecar := c.clientDedicatedSidecarSnapshotForLane(bulk.dedicatedLaneIDSnapshot())
conn := bulk.dedicatedConnSnapshot()
if sidecar == nil || sidecar.conn == nil || conn == nil || sidecar.conn != conn {
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
}
sender := sidecar.laneSenderWithFactory(func(conn net.Conn) *bulkDedicatedLaneSender {
profile := c.clientTransportProtectionSnapshot()
laneRuntime := profile.runtime
if forked, err := forkDedicatedLaneModernPSKRuntime(laneRuntime); err == nil && forked != nil {
laneRuntime = forked
}
return newBulkDedicatedLaneSender(conn, func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
return c.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(laneRuntime, batches)
}, func(err error) {
c.handleClientDedicatedSidecarFailure(sidecar, err)
})
})
if sender == nil {
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
}
return sender, nil
}
func (c *ClientCommon) sendDedicatedBulkData(ctx context.Context, bulk *bulkHandle, chunk []byte) error {
if c == nil || bulk == nil {
return errBulkClientNil
}
sender, err := c.dedicatedBulkLaneSender(bulk)
if err != nil {
return err
}
return sender.submitData(ctx, bulk.dataIDSnapshot(), bulk.nextOutboundDataSeq(), chunk)
}
func (c *ClientCommon) sendDedicatedBulkWrite(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
if c == nil || bulk == nil {
return 0, errBulkClientNil
}
sender, err := c.dedicatedBulkLaneSender(bulk)
if err != nil {
return 0, err
}
return sender.submitWrite(ctx, bulk.dataIDSnapshot(), startSeq, payload, bulk.chunkSize, payloadOwned)
}
func (c *ClientCommon) sendDedicatedBulkClose(ctx context.Context, bulk *bulkHandle, full bool) error {
if c == nil || bulk == nil {
return errBulkClientNil
}
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
if err != nil {
return err
}
defer cancel()
flags := uint8(0)
if full {
flags = bulkFastPayloadFlagFullClose
}
sender, err := c.dedicatedBulkLaneSender(bulk)
if err != nil {
return err
}
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeClose, flags, 0, nil)
}
func (c *ClientCommon) sendDedicatedBulkReset(ctx context.Context, bulk *bulkHandle, message string) error {
if c == nil || bulk == nil {
return errBulkClientNil
}
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
if err != nil {
return err
}
defer cancel()
sender, err := c.dedicatedBulkLaneSender(bulk)
if err != nil {
return err
}
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeReset, 0, 0, []byte(message))
}
func (c *ClientCommon) sendDedicatedBulkRelease(ctx context.Context, bulk *bulkHandle, bytes int64, chunks int) error {
if c == nil || bulk == nil {
return errBulkClientNil
}
payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks)
if err != nil {
return err
}
if err := bulk.waitDedicatedReady(ctx); err != nil {
return err
}
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
if err != nil {
return err
}
defer cancel()
sender, err := c.dedicatedBulkLaneSender(bulk)
if err != nil {
return err
}
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeRelease, 0, 0, payload)
}
func (s *ServerCommon) dedicatedBulkSender(logical *LogicalConn, bulk *bulkHandle) (*bulkDedicatedSender, error) {
if s == nil || bulk == nil {
return nil, errBulkServerNil
}
if logical == nil {
logical = bulk.LogicalConn()
}
if logical == nil {
return nil, errBulkLogicalConnNil
}
if sender := bulk.dedicatedSenderSnapshot(); sender != nil {
return sender, nil
}
conn := bulk.dedicatedConnSnapshot()
if conn == nil {
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
}
sender := newBulkDedicatedSender(conn, bulk.dataIDSnapshot(), func(plain []byte) ([]byte, error) {
return s.encryptTransportPayloadLogical(logical, plain)
}, func(items []bulkDedicatedSendRequest) ([]byte, error) {
return s.encodeDedicatedBulkBatchPayload(logical, bulk.dataIDSnapshot(), items)
}, func(err error) {
if bulk.canIgnoreDedicatedCloseSendError(err) {
return
}
bulk.markReset(err)
})
actual := bulk.installDedicatedSender(sender)
if actual != sender {
sender.stop()
}
return actual, nil
}
func (s *ServerCommon) dedicatedBulkLaneSender(logical *LogicalConn, bulk *bulkHandle) (*bulkDedicatedLaneSender, error) {
if s == nil || bulk == nil {
return nil, errBulkServerNil
}
if logical == nil {
logical = bulk.LogicalConn()
}
if logical == nil {
return nil, errBulkLogicalConnNil
}
sidecar := s.serverDedicatedSidecarSnapshotForLane(logical, bulk.dedicatedLaneIDSnapshot())
conn := bulk.dedicatedConnSnapshot()
if sidecar == nil || sidecar.conn == nil || conn == nil || sidecar.conn != conn {
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
}
sender := sidecar.laneSenderWithFactory(func(conn net.Conn) *bulkDedicatedLaneSender {
laneRuntime := logical.modernPSKRuntimeSnapshot()
if forked, err := forkDedicatedLaneModernPSKRuntime(laneRuntime); err == nil && forked != nil {
laneRuntime = forked
}
return newBulkDedicatedLaneSender(conn, func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
return s.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical, laneRuntime, batches)
}, func(err error) {
s.handleServerDedicatedSidecarFailure(logical, sidecar, err)
})
})
if sender == nil {
return nil, transportDetachedError("dedicated bulk sidecar not attached", nil)
}
return sender, nil
}
func (s *ServerCommon) sendDedicatedBulkData(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, chunk []byte) error {
if s == nil || bulk == nil {
return errBulkServerNil
}
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
if err != nil {
return err
}
return sender.submitData(ctx, bulk.dataIDSnapshot(), bulk.nextOutboundDataSeq(), chunk)
}
func (s *ServerCommon) sendDedicatedBulkWrite(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
if s == nil || bulk == nil {
return 0, errBulkServerNil
}
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
if err != nil {
return 0, err
}
return sender.submitWrite(ctx, bulk.dataIDSnapshot(), startSeq, payload, bulk.chunkSize, payloadOwned)
}
func (s *ServerCommon) sendDedicatedBulkClose(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, full bool) error {
if s == nil || bulk == nil {
return errBulkServerNil
}
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
if err != nil {
return err
}
defer cancel()
flags := uint8(0)
if full {
flags = bulkFastPayloadFlagFullClose
}
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
if err != nil {
return err
}
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeClose, flags, 0, nil)
}
func (s *ServerCommon) sendDedicatedBulkReset(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, message string) error {
if s == nil || bulk == nil {
return errBulkServerNil
}
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
if err != nil {
return err
}
defer cancel()
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
if err != nil {
return err
}
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeReset, 0, 0, []byte(message))
}
func (s *ServerCommon) sendDedicatedBulkRelease(ctx context.Context, logical *LogicalConn, bulk *bulkHandle, bytes int64, chunks int) error {
if s == nil || bulk == nil {
return errBulkServerNil
}
payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks)
if err != nil {
return err
}
if err := bulk.waitDedicatedReady(ctx); err != nil {
return err
}
sendCtx, cancel, err := bulkWriteContext(ctx, bulk.writeTimeout)
if err != nil {
return err
}
defer cancel()
sender, err := s.dedicatedBulkLaneSender(logical, bulk)
if err != nil {
return err
}
return sender.submitControl(sendCtx, bulk.dataIDSnapshot(), bulkFastPayloadTypeRelease, 0, 0, payload)
}
func (c *ClientCommon) encodeDedicatedBulkBatchPayload(dataID uint64, items []bulkDedicatedSendRequest) ([]byte, error) {
if c == nil {
return nil, errBulkClientNil
}
profile := c.clientTransportProtectionSnapshot()
if runtime := profile.runtime; runtime != nil {
return runtime.sealFilledPayload(bulkDedicatedBatchPlainLen(items), func(dst []byte) error {
return writeBulkDedicatedBatchPlain(dst, dataID, items)
})
}
if profile.fastPlainEncode != nil {
return encodeBulkDedicatedBatchPayloadFast(profile.fastPlainEncode, profile.secretKey, dataID, items)
}
plain, err := encodeBulkDedicatedBatchPlain(dataID, items)
if err != nil {
return nil, err
}
return c.encryptTransportPayload(plain)
}
func forkDedicatedLaneModernPSKRuntime(base *modernPSKCodecRuntime) (*modernPSKCodecRuntime, error) {
if base == nil {
return nil, nil
}
return base.fork()
}
func (c *ClientCommon) encodeDedicatedBulkBatchesPayloadPooledWithRuntime(runtime *modernPSKCodecRuntime, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
if c == nil {
return nil, nil, errBulkClientNil
}
if len(batches) == 0 {
return nil, nil, errBulkFastPayloadInvalid
}
if runtime != nil {
return runtime.sealFilledPayloadPooled(bulkDedicatedBatchesPlainLen(batches), func(dst []byte) error {
return writeBulkDedicatedBatchesPlain(dst, batches)
})
}
profile := c.clientTransportProtectionSnapshot()
if profile.fastPlainEncode != nil {
payload, err := encodeBulkDedicatedBatchesPayloadFast(profile.fastPlainEncode, profile.secretKey, batches)
return payload, nil, err
}
plain, err := encodeBulkDedicatedBatchesPlain(batches)
if err != nil {
return nil, nil, err
}
payload, err := c.encryptTransportPayload(plain)
return payload, nil, err
}
func (c *ClientCommon) encodeDedicatedBulkBatchesPayloadPooled(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
return c.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(c.clientTransportProtectionSnapshot().runtime, batches)
}
func (c *ClientCommon) encodeDedicatedBulkBatchPayloadPooled(dataID uint64, items []bulkDedicatedSendRequest) ([]byte, func(), error) {
return c.encodeDedicatedBulkBatchesPayloadPooled([]bulkDedicatedOutboundBatch{{
DataID: dataID,
Items: items,
}})
}
func (s *ServerCommon) encodeDedicatedBulkBatchPayload(logical *LogicalConn, dataID uint64, items []bulkDedicatedSendRequest) ([]byte, error) {
if s == nil {
return nil, errBulkServerNil
}
if logical == nil {
return nil, errBulkLogicalConnNil
}
if runtime := logical.modernPSKRuntimeSnapshot(); runtime != nil {
return runtime.sealFilledPayload(bulkDedicatedBatchPlainLen(items), func(dst []byte) error {
return writeBulkDedicatedBatchPlain(dst, dataID, items)
})
}
if fastPlainEncode := logical.fastPlainEncodeSnapshot(); fastPlainEncode != nil {
return encodeBulkDedicatedBatchPayloadFast(fastPlainEncode, logical.secretKeySnapshot(), dataID, items)
}
plain, err := encodeBulkDedicatedBatchPlain(dataID, items)
if err != nil {
return nil, err
}
return s.encryptTransportPayloadLogical(logical, plain)
}
func (s *ServerCommon) encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical *LogicalConn, runtime *modernPSKCodecRuntime, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
if s == nil {
return nil, nil, errBulkServerNil
}
if logical == nil {
return nil, nil, errBulkLogicalConnNil
}
if len(batches) == 0 {
return nil, nil, errBulkFastPayloadInvalid
}
if runtime != nil {
return runtime.sealFilledPayloadPooled(bulkDedicatedBatchesPlainLen(batches), func(dst []byte) error {
return writeBulkDedicatedBatchesPlain(dst, batches)
})
}
if fastPlainEncode := logical.fastPlainEncodeSnapshot(); fastPlainEncode != nil {
payload, err := encodeBulkDedicatedBatchesPayloadFast(fastPlainEncode, logical.secretKeySnapshot(), batches)
return payload, nil, err
}
plain, err := encodeBulkDedicatedBatchesPlain(batches)
if err != nil {
return nil, nil, err
}
payload, err := s.encryptTransportPayloadLogical(logical, plain)
return payload, nil, err
}
func (s *ServerCommon) encodeDedicatedBulkBatchesPayloadPooled(logical *LogicalConn, batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
return s.encodeDedicatedBulkBatchesPayloadPooledWithRuntime(logical, logical.modernPSKRuntimeSnapshot(), batches)
}
func (s *ServerCommon) encodeDedicatedBulkBatchPayloadPooled(logical *LogicalConn, dataID uint64, items []bulkDedicatedSendRequest) ([]byte, func(), error) {
return s.encodeDedicatedBulkBatchesPayloadPooled(logical, []bulkDedicatedOutboundBatch{{
DataID: dataID,
Items: items,
}})
}