notify/bulk.go

2451 lines
56 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
BulkOpenSignalKey = "notify.bulk.open"
BulkCloseSignalKey = "notify.bulk.close"
BulkResetSignalKey = "notify.bulk.reset"
BulkReadySignalKey = "notify.bulk.ready"
BulkReleaseSignalKey = "notify.bulk.release"
defaultBulkChunkSize = 1024 * 1024
defaultBulkInboundQueueLimit = 256
defaultBulkInboundBytesLimit = 64 * 1024 * 1024
defaultBulkOpenWindowBytes = 16 * 1024 * 1024
defaultBulkOpenMaxInFlight = 32
defaultBulkControlReadTimeout = 0
defaultBulkControlWriteTimeout = 0
defaultBulkAcceptReadyTimeout = 10 * time.Second
)
type BulkMetadata map[string]string
type BulkRange struct {
Offset int64
Length int64
}
type BulkOpenMode uint8
const (
// BulkOpenModeDefault keeps legacy behavior:
// Dedicated=true -> dedicated, otherwise shared.
BulkOpenModeDefault BulkOpenMode = iota
// BulkOpenModeAuto prefers dedicated and falls back to shared.
BulkOpenModeAuto
// BulkOpenModeShared forces shared transport path.
BulkOpenModeShared
// BulkOpenModeDedicated forces dedicated transport path.
BulkOpenModeDedicated
)
type BulkNetworkProfile uint8
const (
// BulkNetworkProfileDefault keeps legacy defaults.
BulkNetworkProfileDefault BulkNetworkProfile = iota
// BulkNetworkProfileLAN is optimized for low-latency/local links.
BulkNetworkProfileLAN
// BulkNetworkProfileWAN is tuned for moderate RTT and occasional loss.
BulkNetworkProfileWAN
// BulkNetworkProfileConstrained is tuned for low bandwidth and unstable links.
BulkNetworkProfileConstrained
)
type BulkDedicatedAttachConfig struct {
// AttachLimit limits concurrent dedicated attach handshakes per client session.
// 0 means unlimited.
AttachLimit int
// ActiveLimit limits active logical dedicated bulks per client session.
// Physical sidecars remain bounded by LaneLimit.
// 0 means unlimited.
ActiveLimit int
// LaneLimit limits dedicated physical sidecar lanes per client session.
// 0 means unlimited.
LaneLimit int
// Retry controls extra retries after the first attach attempt.
Retry int
// Backoff is the base retry backoff.
Backoff time.Duration
// DialTimeout is used for dedicated sidecar dialing.
DialTimeout time.Duration
// HelloTimeout is used for dedicated attach request/response handshake.
HelloTimeout time.Duration
}
type BulkOpenTuning struct {
ChunkSize int
WindowBytes int
MaxInFlight int
}
type bulkDedicatedAttachState uint8
const (
bulkDedicatedAttachStatePending bulkDedicatedAttachState = iota
bulkDedicatedAttachStateAttached
bulkDedicatedAttachStateDegraded
bulkDedicatedAttachStateClosed
)
type BulkOpenOptions struct {
ID string
Range BulkRange
Metadata BulkMetadata
ReadTimeout time.Duration
WriteTimeout time.Duration
Mode BulkOpenMode
// Deprecated: Dedicated is kept for backward compatibility.
// Prefer Mode.
Dedicated bool
ChunkSize int
WindowBytes int
MaxInFlight int
}
type BulkAcceptInfo struct {
ID string
Range BulkRange
Metadata BulkMetadata
Dedicated bool
LogicalConn *LogicalConn
TransportConn *TransportConn
TransportGeneration uint64
Bulk Bulk
}
type Bulk interface {
io.Reader
io.Writer
io.Closer
ID() string
Range() BulkRange
Metadata() BulkMetadata
Context() context.Context
LogicalConn() *LogicalConn
TransportConn() *TransportConn
TransportGeneration() uint64
CloseWrite() error
Reset(error) error
Snapshot() BulkSnapshot
}
var (
errBulkClientNil = errors.New("bulk client is nil")
errBulkServerNil = errors.New("bulk server is nil")
errBulkLogicalConnNil = errors.New("bulk logical connection is nil")
errBulkTransportNil = errors.New("bulk transport connection is nil")
errBulkRuntimeNil = errors.New("bulk runtime is nil")
errBulkIDEmpty = errors.New("bulk id is empty")
errBulkAlreadyExists = errors.New("bulk already exists")
errBulkNotFound = errors.New("bulk not found")
errBulkHandlerNotConfigured = errors.New("bulk handler is not configured")
errBulkRejected = errors.New("bulk open rejected")
errBulkReset = errors.New("bulk reset")
errBulkDataIDEmpty = errors.New("bulk data id is empty")
errBulkDataPathNotReady = errors.New("bulk data path is not implemented yet")
errBulkRangeInvalid = errors.New("bulk range is invalid")
errBulkBackpressureExceeded = errors.New("bulk inbound backpressure exceeded")
errBulkDedicatedStreamOnly = errors.New("dedicated bulk requires stream transport")
errBulkDedicatedSingleConn = errors.New("dedicated bulk requires a dialable additional connection source; ConnectByConn only supports shared transport")
errBulkDedicatedActiveLimit = errors.New("dedicated bulk active limit reached")
)
func clientDedicatedBulkSupportError(c *ClientCommon) error {
if c == nil {
return errBulkClientNil
}
if conn := c.clientTransportConnSnapshot(); conn != nil && isPacketTransportConn(conn) {
return errBulkDedicatedStreamOnly
}
if source := c.clientConnectSourceSnapshot(); source != nil && source.isUDP() {
return errBulkDedicatedStreamOnly
}
if source := c.clientConnectSourceSnapshot(); source != nil && !source.supportsAdditionalConn() {
return errBulkDedicatedSingleConn
}
return nil
}
func logicalDedicatedBulkSupportError(logical *LogicalConn) error {
if logical == nil {
return errBulkLogicalConnNil
}
if transport := logical.CurrentTransportConn(); transport != nil {
return transportDedicatedBulkSupportError(transport)
}
if addr := logical.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) {
return errBulkDedicatedStreamOnly
}
return nil
}
func transportDedicatedBulkSupportError(transport *TransportConn) error {
if transport == nil {
return errBulkTransportNil
}
if !transport.UsesStreamTransport() {
return errBulkDedicatedStreamOnly
}
if addr := transport.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) {
return errBulkDedicatedStreamOnly
}
return nil
}
type bulkCloseSender func(context.Context, *bulkHandle, bool) error
type bulkResetSender func(context.Context, *bulkHandle, string) error
type bulkDataSender func(context.Context, *bulkHandle, []byte) error
type bulkWriteSender func(context.Context, *bulkHandle, uint64, []byte, bool) (int, error)
type bulkReleaseSender func(*bulkHandle, int64, int) error
type bulkAsyncWriteRequest struct {
startSeq uint64
payload []byte
chunks int
}
var bulkAsyncWritePayloadPool sync.Pool
type bulkReadChunk struct {
data []byte
release func()
}
func (c *bulkReadChunk) clear() {
if c == nil {
return
}
if c.release != nil {
c.release()
}
c.data = nil
c.release = nil
}
type bulkReadPayloadOwner struct {
refs atomic.Int32
release func()
}
func newBulkReadPayloadOwner(release func()) *bulkReadPayloadOwner {
if release == nil {
return nil
}
owner := &bulkReadPayloadOwner{release: release}
owner.refs.Store(1)
return owner
}
func (o *bulkReadPayloadOwner) retainChunk() func() {
if o == nil {
return nil
}
o.refs.Add(1)
return o.releaseChunk
}
func (o *bulkReadPayloadOwner) releaseChunk() {
if o == nil {
return
}
if o.refs.Add(-1) == 0 && o.release != nil {
o.release()
}
}
func (o *bulkReadPayloadOwner) done() {
if o == nil {
return
}
o.releaseChunk()
}
type bulkHandle struct {
runtime *bulkRuntime
runtimeScope string
id string
dataID uint64
fastPathVersion uint8
outboundSeq uint64
rangeSpec BulkRange
metadata BulkMetadata
sessionEpoch uint64
client *ClientCommon
logical *LogicalConn
transport *TransportConn
transportGeneration uint64
readTimeout time.Duration
writeTimeout time.Duration
dedicated bool
dedicatedLaneID uint32
dedicatedAttachToken string
chunkSize int
windowBytes int
maxInFlight int
inboundQueueLimit int
inboundBytesLimit int
closeFn bulkCloseSender
resetFn bulkResetSender
sendDataFn bulkDataSender
sendWriteFn bulkWriteSender
releaseFn bulkReleaseSender
ctx context.Context
cancel context.CancelFunc
writeCtx context.Context
writeCtxCancel context.CancelFunc
createdAt time.Time
writeMu sync.Mutex
mu sync.Mutex
writeQueue chan bulkAsyncWriteRequest
writeWorkerDone chan struct{}
writeDrain chan struct{}
pendingAsyncWrites int
localClosed bool
localReadClosed bool
remoteClosed bool
peerReadClosed bool
resetErr error
readQueue []bulkReadChunk
readBuf bulkReadChunk
bufferedBytes int
readNotify chan struct{}
flowNotify chan struct{}
writeStateDone chan struct{}
writeStateClosed bool
releaseNotify chan struct{}
releaseWorkerDone chan struct{}
pendingReleaseBytes int64
pendingReleaseChunks int
outboundAvailBytes int64
outboundInFlight int
bytesRead int64
bytesWritten int64
readCalls int64
writeCalls int64
lastReadAt time.Time
lastWriteAt time.Time
dedicatedMu sync.Mutex
dedicatedConn net.Conn
dedicatedConnOwned bool
dedicatedSender *bulkDedicatedSender
dedicatedReady chan struct{}
dedicatedWriteClosed bool
dedicatedActiveLease bool
dedicatedState bulkDedicatedAttachState
dedicatedAttempts uint32
dedicatedLastCode string
dedicatedDataStarted bool
acceptMu sync.Mutex
acceptDispatched bool
acceptReady chan struct{}
acceptReadyDone bool
acceptReadyErr error
acceptNotifyFn func(error)
acceptNotifySent bool
}
func newBulkHandle(parent context.Context, runtime *bulkRuntime, runtimeScope string, req BulkOpenRequest, sessionEpoch uint64, logical *LogicalConn, transport *TransportConn, transportGeneration uint64, closeFn bulkCloseSender, resetFn bulkResetSender, sendDataFn bulkDataSender, sendWriteFn bulkWriteSender, releaseFn bulkReleaseSender) *bulkHandle {
if parent == nil {
parent = context.Background()
}
ctx, cancel := context.WithCancel(parent)
if transportGeneration == 0 && transport != nil {
transportGeneration = transport.TransportGeneration()
}
if transportGeneration == 0 && logical != nil {
transportGeneration = logical.transportGenerationSnapshot()
}
req = normalizeBulkOpenRequest(req)
handle := &bulkHandle{
runtime: runtime,
runtimeScope: runtimeScope,
id: req.BulkID,
dataID: req.DataID,
fastPathVersion: normalizeBulkFastPathVersion(req.FastPathVersion),
rangeSpec: req.Range,
metadata: cloneBulkMetadata(req.Metadata),
sessionEpoch: sessionEpoch,
logical: logical,
transport: transport,
transportGeneration: transportGeneration,
readTimeout: req.ReadTimeout,
writeTimeout: req.WriteTimeout,
dedicated: req.Dedicated,
dedicatedLaneID: req.DedicatedLaneID,
dedicatedAttachToken: req.AttachToken,
chunkSize: req.ChunkSize,
windowBytes: req.WindowBytes,
maxInFlight: req.MaxInFlight,
inboundQueueLimit: defaultBulkInboundQueueLimit,
inboundBytesLimit: defaultBulkInboundBytesLimit,
closeFn: closeFn,
resetFn: resetFn,
sendDataFn: sendDataFn,
sendWriteFn: sendWriteFn,
releaseFn: releaseFn,
ctx: ctx,
cancel: cancel,
createdAt: time.Now(),
readNotify: make(chan struct{}, 1),
flowNotify: make(chan struct{}, 1),
writeStateDone: make(chan struct{}),
dedicatedReady: make(chan struct{}),
dedicatedState: initialBulkDedicatedAttachState(req.Dedicated),
acceptReady: make(chan struct{}),
outboundAvailBytes: int64(req.WindowBytes),
}
drain := make(chan struct{})
close(drain)
handle.writeDrain = drain
if sendWriteFn != nil {
handle.writeCtx, handle.writeCtxCancel = context.WithCancel(ctx)
handle.writeQueue = make(chan bulkAsyncWriteRequest, bulkAsyncWriteQueueSize(req.MaxInFlight))
handle.writeWorkerDone = make(chan struct{})
go func(parentDone <-chan struct{}, writeDone <-chan struct{}, cancel context.CancelFunc) {
select {
case <-parentDone:
case <-writeDone:
cancel()
}
}(ctx.Done(), handle.writeStateDone, handle.writeCtxCancel)
go handle.runAsyncWriteLoop()
}
if handle.flowControlEnabled() {
handle.releaseNotify = make(chan struct{}, 1)
handle.releaseWorkerDone = make(chan struct{})
go handle.runWindowReleaseLoop()
}
return handle
}
func (b *bulkHandle) ID() string {
if b == nil {
return ""
}
return b.id
}
func (b *bulkHandle) fastPathVersionSnapshot() uint8 {
if b == nil {
return bulkFastPathVersionV1
}
b.mu.Lock()
defer b.mu.Unlock()
return normalizeBulkFastPathVersion(b.fastPathVersion)
}
func (b *bulkHandle) FastPathVersion() uint8 {
return b.fastPathVersionSnapshot()
}
func (b *bulkHandle) Range() BulkRange {
if b == nil {
return BulkRange{}
}
return b.rangeSpec
}
func (b *bulkHandle) Metadata() BulkMetadata {
if b == nil {
return nil
}
return cloneBulkMetadata(b.metadata)
}
func (b *bulkHandle) Context() context.Context {
if b == nil || b.ctx == nil {
return context.Background()
}
return b.ctx
}
func (b *bulkHandle) LogicalConn() *LogicalConn {
if b == nil {
return nil
}
return b.logical
}
func (b *bulkHandle) TransportConn() *TransportConn {
if b == nil {
return nil
}
return b.transport
}
func (b *bulkHandle) TransportGeneration() uint64 {
if b == nil {
return 0
}
return b.transportGeneration
}
func (b *bulkHandle) Dedicated() bool {
if b == nil {
return false
}
return b.dedicated
}
func (b *bulkHandle) dedicatedLaneIDSnapshot() uint32 {
if b == nil {
return 0
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedLaneID
}
func (b *bulkHandle) dedicatedAttachTokenSnapshot() string {
if b == nil {
return ""
}
b.mu.Lock()
defer b.mu.Unlock()
return b.dedicatedAttachToken
}
func (b *bulkHandle) setDedicatedAttachToken(token string) {
if b == nil {
return
}
b.mu.Lock()
b.dedicatedAttachToken = token
b.mu.Unlock()
}
func (b *bulkHandle) markDedicatedAttachAttempt() {
if b == nil {
return
}
b.dedicatedMu.Lock()
b.dedicatedAttempts++
b.dedicatedMu.Unlock()
}
func (b *bulkHandle) setDedicatedAttachLastCode(code string) {
if b == nil {
return
}
b.dedicatedMu.Lock()
b.dedicatedLastCode = code
b.dedicatedMu.Unlock()
}
func (b *bulkHandle) markDedicatedAttachDegraded(code string) {
if b == nil {
return
}
b.dedicatedMu.Lock()
b.dedicatedState = bulkDedicatedAttachStateDegraded
b.dedicatedLastCode = code
b.dedicatedMu.Unlock()
}
func (b *bulkHandle) markDedicatedAttachClosed() {
if b == nil {
return
}
b.dedicatedMu.Lock()
b.dedicatedState = bulkDedicatedAttachStateClosed
b.dedicatedMu.Unlock()
}
func (b *bulkHandle) dedicatedAttachStateSnapshot() bulkDedicatedAttachState {
if b == nil {
return bulkDedicatedAttachStateClosed
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedState
}
func (b *bulkHandle) dedicatedAttachDiagnosticsSnapshot() (state bulkDedicatedAttachState, attempts uint32, lastCode string, dataStarted bool) {
if b == nil {
return bulkDedicatedAttachStateClosed, 0, "", false
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedState, b.dedicatedAttempts, b.dedicatedLastCode, b.dedicatedDataStarted
}
func (b *bulkHandle) dedicatedConnSnapshot() net.Conn {
if b == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedConn
}
func (b *bulkHandle) dedicatedSenderSnapshot() *bulkDedicatedSender {
if b == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedSender
}
func (b *bulkHandle) installDedicatedSender(sender *bulkDedicatedSender) *bulkDedicatedSender {
if b == nil || sender == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
if b.dedicatedSender != nil {
return b.dedicatedSender
}
b.dedicatedSender = sender
return sender
}
func (b *bulkHandle) clearDedicatedSender() *bulkDedicatedSender {
if b == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
sender := b.dedicatedSender
b.dedicatedSender = nil
return sender
}
func (b *bulkHandle) dedicatedAttachedSnapshot() bool {
return b.dedicatedConnSnapshot() != nil
}
func (b *bulkHandle) dedicatedDataStartedSnapshot() bool {
if b == nil {
return false
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedDataStarted
}
func (b *bulkHandle) markDedicatedDataStarted() {
if b == nil {
return
}
b.dedicatedMu.Lock()
b.dedicatedDataStarted = true
b.dedicatedMu.Unlock()
}
func (b *bulkHandle) waitDedicatedReady(ctx context.Context) error {
if b == nil || !b.Dedicated() || b.dedicatedAttachedSnapshot() {
return nil
}
if ctx == nil {
ctx = context.Background()
}
if err := b.writeStateErrorSnapshot(); err != nil {
return err
}
select {
case <-b.dedicatedReady:
return nil
case <-ctx.Done():
return ctx.Err()
case <-b.Context().Done():
if err := b.writeStateErrorSnapshot(); err != nil {
return err
}
return context.Canceled
}
}
func (b *bulkHandle) attachDedicatedConn(conn net.Conn) error {
if b == nil {
return io.ErrClosedPipe
}
if conn == nil {
return net.ErrClosed
}
b.dedicatedMu.Lock()
if b.dedicatedConn != nil {
b.dedicatedMu.Unlock()
return errors.New("bulk dedicated conn already attached")
}
b.dedicatedConn = conn
b.dedicatedConnOwned = true
b.dedicatedWriteClosed = false
b.dedicatedState = bulkDedicatedAttachStateAttached
b.dedicatedLastCode = ""
ready := b.dedicatedReady
b.dedicatedMu.Unlock()
if ready != nil {
select {
case <-ready:
default:
close(ready)
}
}
return nil
}
func (b *bulkHandle) attachDedicatedConnShared(conn net.Conn) error {
if b == nil {
return io.ErrClosedPipe
}
if conn == nil {
return net.ErrClosed
}
b.dedicatedMu.Lock()
if b.dedicatedConn != nil {
if b.dedicatedConn == conn {
b.dedicatedConnOwned = false
b.dedicatedState = bulkDedicatedAttachStateAttached
b.dedicatedLastCode = ""
b.dedicatedMu.Unlock()
return nil
}
b.dedicatedMu.Unlock()
return errors.New("bulk dedicated conn already attached")
}
b.dedicatedConn = conn
b.dedicatedConnOwned = false
b.dedicatedWriteClosed = false
b.dedicatedState = bulkDedicatedAttachStateAttached
b.dedicatedLastCode = ""
ready := b.dedicatedReady
b.dedicatedMu.Unlock()
if ready != nil {
select {
case <-ready:
default:
close(ready)
}
}
return nil
}
func (b *bulkHandle) replaceDedicatedConn(conn net.Conn) (net.Conn, *bulkDedicatedSender, error) {
if b == nil {
return nil, nil, io.ErrClosedPipe
}
if conn == nil {
return nil, nil, net.ErrClosed
}
b.dedicatedMu.Lock()
oldConn := b.dedicatedConn
oldOwned := b.dedicatedConnOwned
oldSender := b.dedicatedSender
b.dedicatedConn = conn
b.dedicatedConnOwned = true
b.dedicatedSender = nil
b.dedicatedWriteClosed = false
b.dedicatedState = bulkDedicatedAttachStateAttached
b.dedicatedLastCode = ""
ready := b.dedicatedReady
b.dedicatedMu.Unlock()
if ready != nil {
select {
case <-ready:
default:
close(ready)
}
}
if !oldOwned {
oldConn = nil
}
return oldConn, oldSender, nil
}
func (b *bulkHandle) replaceDedicatedConnShared(conn net.Conn) (net.Conn, *bulkDedicatedSender, error) {
if b == nil {
return nil, nil, io.ErrClosedPipe
}
if conn == nil {
return nil, nil, net.ErrClosed
}
b.dedicatedMu.Lock()
oldConn := b.dedicatedConn
oldOwned := b.dedicatedConnOwned
oldSender := b.dedicatedSender
b.dedicatedConn = conn
b.dedicatedConnOwned = false
b.dedicatedSender = nil
b.dedicatedWriteClosed = false
b.dedicatedState = bulkDedicatedAttachStateAttached
b.dedicatedLastCode = ""
ready := b.dedicatedReady
b.dedicatedMu.Unlock()
if ready != nil {
select {
case <-ready:
default:
close(ready)
}
}
if !oldOwned {
oldConn = nil
}
return oldConn, oldSender, nil
}
func (b *bulkHandle) bestEffortCloseDedicatedWriteHalf() {
if b == nil || !b.dedicated {
return
}
b.dedicatedMu.Lock()
conn := b.dedicatedConn
owned := b.dedicatedConnOwned
alreadyClosed := b.dedicatedWriteClosed
b.dedicatedMu.Unlock()
if conn == nil || alreadyClosed || !owned {
return
}
type closeWriter interface {
CloseWrite() error
}
if closeWriterConn, ok := conn.(closeWriter); ok {
if err := closeWriterConn.CloseWrite(); err == nil {
b.dedicatedMu.Lock()
if b.dedicatedConn == conn {
b.dedicatedWriteClosed = true
}
b.dedicatedMu.Unlock()
}
}
}
func (b *bulkHandle) dedicatedWriteHalfClosedSnapshot() bool {
if b == nil {
return false
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedWriteClosed
}
func (b *bulkHandle) setClientSnapshotOwner(client *ClientCommon) {
if b == nil {
return
}
b.client = client
}
func (b *bulkHandle) clearDedicatedConn() (net.Conn, bool) {
if b == nil {
return nil, false
}
b.dedicatedMu.Lock()
conn := b.dedicatedConn
owned := b.dedicatedConnOwned
b.dedicatedConn = nil
b.dedicatedConnOwned = false
b.dedicatedWriteClosed = false
b.dedicatedMu.Unlock()
return conn, owned
}
func (b *bulkHandle) markDedicatedActiveReserved() {
if b == nil {
return
}
b.dedicatedMu.Lock()
b.dedicatedActiveLease = true
b.dedicatedMu.Unlock()
}
func (b *bulkHandle) releaseDedicatedActiveReserved() bool {
if b == nil {
return false
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
if !b.dedicatedActiveLease {
return false
}
b.dedicatedActiveLease = false
return true
}
func (b *bulkHandle) markAcceptDispatched() bool {
if b == nil {
return false
}
b.acceptMu.Lock()
defer b.acceptMu.Unlock()
if b.acceptDispatched {
return false
}
b.acceptDispatched = true
return true
}
func (b *bulkHandle) markAcceptHandled() {
if b == nil {
return
}
b.acceptMu.Lock()
b.acceptDispatched = true
b.acceptMu.Unlock()
}
func (b *bulkHandle) setAcceptNotify(fn func(error)) {
if b == nil {
return
}
b.acceptMu.Lock()
b.acceptNotifyFn = fn
b.acceptMu.Unlock()
}
func (b *bulkHandle) clearAcceptNotify() {
if b == nil {
return
}
b.acceptMu.Lock()
b.acceptNotifyFn = nil
b.acceptMu.Unlock()
}
func (b *bulkHandle) markAcceptReady(err error) {
if b == nil {
return
}
b.acceptMu.Lock()
if b.acceptReadyDone {
b.acceptMu.Unlock()
return
}
b.acceptReadyDone = true
b.acceptReadyErr = err
ch := b.acceptReady
b.acceptMu.Unlock()
if ch != nil {
select {
case <-ch:
default:
close(ch)
}
}
}
func (b *bulkHandle) waitAcceptReady(ctx context.Context) error {
if b == nil {
return io.ErrClosedPipe
}
if ctx == nil {
ctx = context.Background()
}
b.acceptMu.Lock()
if b.acceptReadyDone {
err := b.acceptReadyErr
b.acceptMu.Unlock()
return err
}
ready := b.acceptReady
b.acceptMu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-b.Context().Done():
if err := b.acceptReadyErrorSnapshot(); err != nil {
return err
}
if err := b.resetErrSnapshot(); err != nil {
return err
}
return context.Canceled
case <-ready:
return b.acceptReadyErrorSnapshot()
}
}
func (b *bulkHandle) acceptReadyErrorSnapshot() error {
if b == nil {
return io.ErrClosedPipe
}
b.acceptMu.Lock()
defer b.acceptMu.Unlock()
return b.acceptReadyErr
}
func (b *bulkHandle) notifyAcceptStarted() {
if b == nil {
return
}
var notify func(error)
b.acceptMu.Lock()
if b.acceptNotifySent {
b.acceptMu.Unlock()
return
}
notify = b.acceptNotifyFn
b.acceptNotifyFn = nil
if notify == nil {
b.acceptMu.Unlock()
return
}
b.acceptNotifySent = true
b.acceptMu.Unlock()
go notify(nil)
}
func (b *bulkHandle) finishAcceptDispatch(err error) {
if b == nil {
return
}
var notify func(error)
b.acceptMu.Lock()
if b.acceptNotifySent {
b.acceptMu.Unlock()
return
}
notify = b.acceptNotifyFn
b.acceptNotifyFn = nil
b.acceptNotifySent = true
b.acceptMu.Unlock()
if notify != nil {
go notify(err)
}
}
func (b *bulkHandle) SessionEpoch() uint64 {
if b == nil {
return 0
}
return b.sessionEpoch
}
func (b *bulkHandle) acceptsClientSessionEpoch(epoch uint64) bool {
if b == nil {
return false
}
if b.sessionEpoch == 0 || epoch == 0 {
return true
}
return b.sessionEpoch == epoch
}
func (b *bulkHandle) acceptsTransportGeneration(transport *TransportConn) bool {
if b == nil {
return false
}
if b.transportGeneration == 0 || transport == nil {
return true
}
return b.transportGeneration == transport.TransportGeneration()
}
func (b *bulkHandle) dataIDSnapshot() uint64 {
if b == nil {
return 0
}
return b.dataID
}
func (b *bulkHandle) nextOutboundDataSeq() uint64 {
return b.reserveOutboundDataSeqs(1)
}
func (b *bulkHandle) reserveOutboundDataSeqs(count int) uint64 {
if b == nil || count <= 0 {
return 0
}
b.mu.Lock()
defer b.mu.Unlock()
start := b.outboundSeq + 1
b.outboundSeq += uint64(count)
return start
}
func (b *bulkHandle) Read(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
if b == nil {
return 0, io.ErrClosedPipe
}
b.notifyAcceptStarted()
for {
b.mu.Lock()
localReadClosed := b.localReadClosed
if len(b.readBuf.data) > 0 {
n := copy(p, b.readBuf.data)
b.readBuf.data = b.readBuf.data[n:]
b.bufferedBytes -= n
if b.bufferedBytes < 0 {
b.bufferedBytes = 0
}
if len(b.readBuf.data) == 0 {
b.readBuf.clear()
}
b.recordReadLocked(n, time.Now())
b.mu.Unlock()
b.maybeSendWindowRelease(n, false)
return n, nil
}
if len(b.readQueue) > 0 {
b.readBuf = b.readQueue[0]
b.readQueue[0] = bulkReadChunk{}
b.readQueue = b.readQueue[1:]
b.mu.Unlock()
continue
}
resetErr := b.resetErr
remoteClosed := b.remoteClosed
notify := b.readNotify
ctx := b.ctx
readTimeout := b.readTimeout
b.mu.Unlock()
if localReadClosed {
b.maybeSendWindowRelease(0, true)
return 0, io.ErrClosedPipe
}
if resetErr != nil {
b.maybeSendWindowRelease(0, true)
return 0, resetErr
}
if remoteClosed {
b.maybeSendWindowRelease(0, true)
return 0, io.EOF
}
if err := b.waitReadable(ctx, notify, readTimeout); err != nil {
b.maybeSendWindowRelease(0, true)
return 0, err
}
}
}
func (b *bulkHandle) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
if b == nil {
return 0, io.ErrClosedPipe
}
b.notifyAcceptStarted()
b.writeMu.Lock()
defer b.writeMu.Unlock()
b.mu.Lock()
resetErr := b.resetErr
localClosed := b.localClosed
peerReadClosed := b.peerReadClosed
sendDataFn := b.sendDataFn
sendWriteFn := b.sendWriteFn
chunkSize := b.chunkSize
writeTimeout := b.writeTimeout
bulkCtx := b.ctx
b.mu.Unlock()
if resetErr != nil {
return 0, resetErr
}
if localClosed || peerReadClosed {
return 0, io.ErrClosedPipe
}
if sendDataFn == nil {
return 0, errBulkDataPathNotReady
}
if sendWriteFn != nil {
written := 0
for written < len(p) {
end := len(p)
if b.windowBytes > 0 && end-written > b.windowBytes {
end = written + b.windowBytes
}
if chunkSize > 0 && b.maxInFlight > 0 {
maxPartBytes := chunkSize * b.maxInFlight
if maxPartBytes > 0 && end-written > maxPartBytes {
end = written + maxPartBytes
}
}
part := p[written:end]
partChunks := bulkPayloadChunkCount(len(part), chunkSize)
sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout)
if err != nil {
return written, err
}
if err := b.acquireOutboundWindow(sendCtx, len(part), partChunks); err != nil {
cancel()
return written, b.normalizeWriteError(err)
}
startSeq := b.reserveOutboundDataSeqs(partChunks)
if b.dedicated {
partWritten, err := b.executeSendWrite(sendCtx, startSeq, part, partChunks, false)
cancel()
written += partWritten
if err != nil {
return written, err
}
continue
}
owned := getBulkAsyncWritePayload(len(part))
copy(owned, part)
err = b.enqueueAsyncWrite(sendCtx, bulkAsyncWriteRequest{
startSeq: startSeq,
payload: owned,
chunks: partChunks,
})
cancel()
if err != nil {
putBulkAsyncWritePayload(owned)
b.rollbackOutboundWindow(len(part), partChunks)
return written, b.normalizeWriteError(err)
}
written += len(part)
}
return written, nil
}
if chunkSize <= 0 {
chunkSize = defaultBulkChunkSize
}
written := 0
for written < len(p) {
end := written + chunkSize
if end > len(p) {
end = len(p)
}
chunk := p[written:end]
sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout)
if err != nil {
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, err
}
if err := b.acquireOutboundWindow(sendCtx, len(chunk), 1); err != nil {
cancel()
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, b.normalizeWriteError(err)
}
err = sendDataFn(sendCtx, b, chunk)
cancel()
if err != nil {
b.rollbackOutboundWindow(len(chunk), 1)
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, b.normalizeWriteError(err)
}
written = end
}
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, nil
}
func (b *bulkHandle) Close() error {
return b.close(true)
}
func (b *bulkHandle) CloseWrite() error {
return b.close(false)
}
func (b *bulkHandle) close(full bool) error {
if b == nil {
return nil
}
b.notifyAcceptStarted()
b.writeMu.Lock()
defer b.writeMu.Unlock()
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
if b.localClosed {
if !full || b.localReadClosed {
b.mu.Unlock()
return nil
}
closeFn := b.closeFn
b.mu.Unlock()
if closeFn != nil && !b.dedicatedWriteHalfClosedSnapshot() {
if err := closeFn(context.Background(), b, true); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) {
return err
}
}
b.bestEffortCloseDedicatedWriteHalf()
b.mu.Lock()
if b.localReadClosed {
b.mu.Unlock()
return nil
}
b.localReadClosed = true
b.clearBufferedDataLocked()
b.closeWriteStateLocked()
shouldFinalize := b.shouldFinalizeLocked()
b.mu.Unlock()
b.notifyReadable()
if shouldFinalize {
b.finalize()
}
return nil
}
closeFn := b.closeFn
b.mu.Unlock()
if err := b.waitPendingAsyncWrites(context.Background()); err != nil {
return err
}
if closeFn != nil {
if err := closeFn(context.Background(), b, full); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) {
return err
}
}
b.bestEffortCloseDedicatedWriteHalf()
b.mu.Lock()
if b.localClosed {
b.mu.Unlock()
return nil
}
b.localClosed = true
b.closeWriteStateLocked()
if full {
b.localReadClosed = true
b.clearBufferedDataLocked()
}
shouldFinalize := b.shouldFinalizeLocked()
b.mu.Unlock()
if full {
b.notifyReadable()
}
if shouldFinalize {
b.finalize()
}
return nil
}
func (b *bulkHandle) Reset(err error) error {
if b == nil {
return nil
}
b.notifyAcceptStarted()
resetErr := bulkResetError(err)
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
resetFn := b.resetFn
b.mu.Unlock()
if resetFn != nil {
if sendErr := resetFn(context.Background(), b, bulkResetMessage(resetErr)); sendErr != nil {
return sendErr
}
}
b.markReset(resetErr)
return nil
}
func (b *bulkHandle) Snapshot() BulkSnapshot {
return b.snapshot()
}
func (b *bulkHandle) markRemoteClosed() {
if b == nil {
return
}
b.mu.Lock()
b.remoteClosed = true
shouldFinalize := b.shouldFinalizeLocked()
b.mu.Unlock()
b.notifyReadable()
if shouldFinalize {
b.finalize()
}
}
func (b *bulkHandle) markPeerClosed() {
if b == nil {
return
}
b.mu.Lock()
b.remoteClosed = true
b.peerReadClosed = true
b.closeWriteStateLocked()
shouldFinalize := b.shouldFinalizeLocked()
b.notifyFlowLocked()
b.mu.Unlock()
b.notifyReadable()
if shouldFinalize {
b.finalize()
}
}
func (b *bulkHandle) markReset(err error) {
if b == nil {
return
}
resetErr := bulkResetError(err)
b.mu.Lock()
if b.resetErr == nil {
b.resetErr = resetErr
b.clearBufferedDataLocked()
b.closeWriteStateLocked()
}
b.notifyFlowLocked()
b.mu.Unlock()
b.markAcceptReady(resetErr)
b.notifyReadable()
b.finalize()
}
func (b *bulkHandle) pushChunk(chunk []byte) error {
return b.pushChunkWithOwnership(chunk, false)
}
func (b *bulkHandle) pushOwnedChunk(chunk []byte) error {
return b.pushChunkWithOwnership(chunk, true)
}
func (b *bulkHandle) pushOwnedChunkNoReset(chunk []byte) error {
return b.pushChunkWithOwnershipOptions(chunk, true, false)
}
func (b *bulkHandle) pushOwnedChunkWithReleaseNoReset(chunk []byte, release func()) error {
return b.pushChunkWithOwnershipOptionsAndRelease(chunk, true, false, release)
}
func (b *bulkHandle) pushChunkWithOwnership(chunk []byte, owned bool) error {
return b.pushChunkWithOwnershipOptions(chunk, owned, true)
}
func (b *bulkHandle) pushChunkWithOwnershipOptions(chunk []byte, owned bool, resetOnOverflow bool) error {
return b.pushChunkWithOwnershipOptionsAndRelease(chunk, owned, resetOnOverflow, nil)
}
func (b *bulkHandle) pushChunkWithOwnershipOptionsAndRelease(chunk []byte, owned bool, resetOnOverflow bool, release func()) error {
if b == nil {
return io.ErrClosedPipe
}
if len(chunk) == 0 {
if release != nil {
release()
}
return nil
}
stored := bulkReadChunk{data: chunk, release: release}
if !owned {
stored.data = append([]byte(nil), chunk...)
if stored.release != nil {
stored.release()
stored.release = nil
}
}
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
stored.clear()
return err
}
if b.inboundQueueLimit > 0 && b.bufferedChunkCountLocked() >= b.inboundQueueLimit {
if !resetOnOverflow {
b.mu.Unlock()
stored.clear()
return errBulkBackpressureExceeded
}
err := b.markResetLocked(errBulkBackpressureExceeded)
b.mu.Unlock()
stored.clear()
b.notifyReadable()
b.finalize()
return err
}
if b.inboundBytesLimit > 0 && b.bufferedBytes+len(stored.data) > b.inboundBytesLimit {
if !resetOnOverflow {
b.mu.Unlock()
stored.clear()
return errBulkBackpressureExceeded
}
err := b.markResetLocked(errBulkBackpressureExceeded)
b.mu.Unlock()
stored.clear()
b.notifyReadable()
b.finalize()
return err
}
b.readQueue = append(b.readQueue, stored)
b.bufferedBytes += len(stored.data)
b.notifyReadableLocked()
b.mu.Unlock()
return nil
}
func (b *bulkHandle) markResetLocked(err error) error {
if b == nil {
return io.ErrClosedPipe
}
if b.resetErr == nil {
b.resetErr = bulkResetError(err)
b.clearBufferedDataLocked()
b.closeWriteStateLocked()
}
return b.resetErr
}
func (b *bulkHandle) clearBufferedDataLocked() {
if b == nil {
return
}
b.readBuf.clear()
for i := range b.readQueue {
b.readQueue[i].clear()
}
b.readQueue = nil
b.readBuf = bulkReadChunk{}
b.bufferedBytes = 0
}
func (b *bulkHandle) flowControlEnabled() bool {
if b == nil {
return false
}
return b.releaseFn != nil && (b.windowBytes > 0 || b.maxInFlight > 0)
}
func (b *bulkHandle) releaseThresholdBytes() int64 {
if b == nil {
return int64(defaultBulkChunkSize)
}
threshold := b.chunkSize
if threshold <= 0 {
threshold = defaultBulkChunkSize
}
if b.windowBytes > 0 && threshold > b.windowBytes {
threshold = b.windowBytes
}
if threshold <= 0 {
threshold = defaultBulkChunkSize
}
return int64(threshold)
}
func (b *bulkHandle) maybeSendWindowRelease(consumed int, force bool) {
if b == nil || !b.flowControlEnabled() {
return
}
b.mu.Lock()
if consumed > 0 {
b.pendingReleaseBytes += int64(consumed)
b.pendingReleaseChunks++
}
if !force && b.pendingReleaseBytes < b.releaseThresholdBytes() {
b.mu.Unlock()
return
}
b.mu.Unlock()
b.scheduleWindowRelease()
}
func (b *bulkHandle) scheduleWindowRelease() {
if b == nil || b.releaseNotify == nil {
return
}
select {
case b.releaseNotify <- struct{}{}:
default:
}
}
func (b *bulkHandle) takePendingWindowRelease() (int64, int, bulkReleaseSender) {
if b == nil {
return 0, 0, nil
}
b.mu.Lock()
defer b.mu.Unlock()
bytes := b.pendingReleaseBytes
chunks := b.pendingReleaseChunks
release := b.releaseFn
b.pendingReleaseBytes = 0
b.pendingReleaseChunks = 0
return bytes, chunks, release
}
func (b *bulkHandle) runWindowReleaseLoop() {
if b == nil {
return
}
defer close(b.releaseWorkerDone)
for {
select {
case <-b.Context().Done():
return
case <-b.releaseNotify:
}
for {
bytes, chunks, release := b.takePendingWindowRelease()
if release == nil || (bytes <= 0 && chunks <= 0) {
break
}
_ = release(b, bytes, chunks)
}
}
}
func (b *bulkHandle) acquireOutboundWindow(ctx context.Context, size int, chunks int) error {
if b == nil || size <= 0 || !b.flowControlEnabled() {
return nil
}
if ctx == nil {
ctx = context.Background()
}
need := int64(size)
if chunks <= 0 {
chunks = 1
}
for {
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
if b.localClosed || b.peerReadClosed {
b.mu.Unlock()
return io.ErrClosedPipe
}
bytesOK := true
if b.windowBytes > 0 {
bytesOK = b.outboundAvailBytes >= need
if !bytesOK && need > int64(b.windowBytes) && b.outboundInFlight == 0 {
bytesOK = true
}
}
chunksOK := true
if b.maxInFlight > 0 {
chunksOK = b.outboundInFlight+chunks <= b.maxInFlight
}
if bytesOK && chunksOK {
if b.windowBytes > 0 {
b.outboundAvailBytes -= need
}
if b.maxInFlight > 0 {
b.outboundInFlight += chunks
}
b.mu.Unlock()
return nil
}
notify := b.flowNotify
b.mu.Unlock()
select {
case <-notify:
case <-ctx.Done():
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
return stateErr
}
return normalizeStreamDeadlineError(ctx.Err())
}
}
}
func (b *bulkHandle) rollbackOutboundWindow(size int, chunks int) {
if b == nil || size <= 0 || !b.flowControlEnabled() {
return
}
if chunks <= 0 {
chunks = 1
}
b.mu.Lock()
if b.windowBytes > 0 {
b.outboundAvailBytes += int64(size)
maxAvail := int64(b.windowBytes)
if b.outboundAvailBytes > maxAvail {
b.outboundAvailBytes = maxAvail
}
}
if b.maxInFlight > 0 && b.outboundInFlight > 0 {
b.outboundInFlight -= chunks
if b.outboundInFlight < 0 {
b.outboundInFlight = 0
}
}
b.notifyFlowLocked()
b.mu.Unlock()
}
func (b *bulkHandle) releaseOutboundWindow(bytes int64, chunks int) {
if b == nil || !b.flowControlEnabled() {
return
}
b.mu.Lock()
if b.windowBytes > 0 && bytes > 0 {
b.outboundAvailBytes += bytes
maxAvail := int64(b.windowBytes)
if b.outboundAvailBytes > maxAvail {
b.outboundAvailBytes = maxAvail
}
}
if b.maxInFlight > 0 && chunks > 0 {
b.outboundInFlight -= chunks
if b.outboundInFlight < 0 {
b.outboundInFlight = 0
}
}
b.notifyFlowLocked()
b.mu.Unlock()
}
func (b *bulkHandle) bufferedChunkCountLocked() int {
if b == nil {
return 0
}
count := len(b.readQueue)
if len(b.readBuf.data) > 0 {
count++
}
return count
}
func (b *bulkHandle) shouldFinalizeLocked() bool {
if b == nil {
return true
}
if b.resetErr != nil {
return true
}
if b.dedicated {
return b.localClosed && b.remoteClosed
}
return b.localReadClosed || (b.peerReadClosed && b.remoteClosed) || (b.localClosed && b.remoteClosed)
}
func (b *bulkHandle) snapshot() BulkSnapshot {
if b == nil {
return BulkSnapshot{}
}
dedicatedAttached := b.dedicatedAttachedSnapshot()
dedicatedState, dedicatedAttempts, dedicatedLastCode, dedicatedDataStarted := b.dedicatedAttachDiagnosticsSnapshot()
b.mu.Lock()
defer b.mu.Unlock()
snapshot := BulkSnapshot{
ID: b.id,
DataID: b.dataID,
FastPathVersion: normalizeBulkFastPathVersion(b.fastPathVersion),
Scope: normalizeFileScope(b.runtimeScope),
Range: b.rangeSpec,
Metadata: cloneBulkMetadata(b.metadata),
Dedicated: b.dedicated,
DedicatedLaneID: b.dedicatedLaneID,
DedicatedAttached: dedicatedAttached,
DedicatedAttachState: bulkDedicatedAttachStateName(
dedicatedState,
),
DedicatedAttachAttempts: dedicatedAttempts,
DedicatedAttachLastCode: dedicatedLastCode,
DedicatedDataStarted: dedicatedDataStarted,
SessionEpoch: b.sessionEpoch,
TransportGeneration: b.transportGeneration,
LocalClosed: b.localClosed,
LocalReadClosed: b.localReadClosed,
RemoteClosed: b.remoteClosed,
PeerReadClosed: b.peerReadClosed,
BufferedChunks: b.bufferedChunkCountLocked(),
BufferedBytes: b.bufferedBytes,
ReadTimeout: b.readTimeout,
WriteTimeout: b.writeTimeout,
ChunkSize: b.chunkSize,
WindowBytes: b.windowBytes,
MaxInFlight: b.maxInFlight,
BytesRead: b.bytesRead,
BytesWritten: b.bytesWritten,
ReadCalls: b.readCalls,
WriteCalls: b.writeCalls,
OpenedAt: b.createdAt,
LastReadAt: b.lastReadAt,
LastWriteAt: b.lastWriteAt,
}
if b.logical != nil {
snapshot.LogicalClientID = b.logical.ID()
}
if b.resetErr != nil {
snapshot.ResetError = b.resetErr.Error()
}
var diag snapshotBindingDiagnostics
switch {
case b.logical != nil || b.transport != nil:
diag = snapshotBindingDiagnosticsFromLogical(b.logical, b.transport, b.transportGeneration)
case b.client != nil:
diag = snapshotBindingDiagnosticsFromClient(b.client, b.sessionEpoch)
}
snapshot.BindingOwner = diag.BindingOwner
snapshot.BindingAlive = diag.BindingAlive
snapshot.BindingCurrent = diag.BindingCurrent
snapshot.BindingReason = diag.BindingReason
snapshot.BindingError = diag.BindingError
snapshot.BindingBulkAdaptiveSoftPayloadBytes = diag.BindingBulkAdaptiveSoftPayloadBytes
snapshot.TransportAttached = diag.TransportAttached
snapshot.TransportHasRuntimeConn = diag.TransportHasRuntimeConn
snapshot.TransportCurrent = diag.TransportCurrent
snapshot.TransportDetachReason = diag.TransportDetachReason
snapshot.TransportDetachKind = diag.TransportDetachKind
snapshot.TransportDetachGeneration = diag.TransportDetachGeneration
snapshot.TransportDetachError = diag.TransportDetachError
snapshot.TransportDetachedAt = diag.TransportDetachedAt
snapshot.ReattachEligible = diag.ReattachEligible
return snapshot
}
func (b *bulkHandle) finalize() {
if b == nil {
return
}
b.markDedicatedAttachClosed()
b.maybeSendWindowRelease(0, true)
if b.cancel != nil {
b.cancel()
}
if b.writeCtxCancel != nil {
b.writeCtxCancel()
}
if sender := b.clearDedicatedSender(); sender != nil {
sender.stop()
}
if b.client != nil && b.releaseDedicatedActiveReserved() {
b.client.releaseBulkDedicatedActiveSlot()
}
if b.client != nil {
b.client.releaseBulkDedicatedLane(b.dedicatedLaneIDSnapshot())
}
if conn, owned := b.clearDedicatedConn(); conn != nil && owned {
_ = conn.Close()
}
if b.runtime != nil {
b.runtime.remove(b.runtimeScope, b.id)
}
}
func (b *bulkHandle) recordReadLocked(n int, now time.Time) {
if b == nil || n <= 0 {
return
}
b.bytesRead += int64(n)
b.readCalls++
b.lastReadAt = now
}
func (b *bulkHandle) recordWrite(n int, now time.Time) {
if b == nil || n <= 0 {
return
}
b.mu.Lock()
b.bytesWritten += int64(n)
b.writeCalls++
b.lastWriteAt = now
b.mu.Unlock()
}
func (b *bulkHandle) waitReadable(ctx context.Context, notify <-chan struct{}, timeout time.Duration) error {
if ctx == nil {
ctx = context.Background()
}
deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{})
if deadline.IsZero() {
select {
case <-notify:
return nil
case <-ctx.Done():
if resetErr := b.resetErrSnapshot(); resetErr != nil {
return resetErr
}
if b.localReadClosedSnapshot() {
return io.ErrClosedPipe
}
if b.remoteClosedSnapshot() {
return nil
}
return ctx.Err()
}
}
if !deadline.After(time.Now()) {
return normalizeStreamDeadlineError(context.DeadlineExceeded)
}
timer := time.NewTimer(time.Until(deadline))
defer timer.Stop()
select {
case <-notify:
return nil
case <-ctx.Done():
if resetErr := b.resetErrSnapshot(); resetErr != nil {
return resetErr
}
if b.localReadClosedSnapshot() {
return io.ErrClosedPipe
}
if b.remoteClosedSnapshot() {
return nil
}
return normalizeStreamDeadlineError(ctx.Err())
case <-timer.C:
return normalizeStreamDeadlineError(context.DeadlineExceeded)
}
}
func (b *bulkHandle) resetErrSnapshot() error {
if b == nil {
return io.ErrClosedPipe
}
b.mu.Lock()
defer b.mu.Unlock()
return b.resetErr
}
func (b *bulkHandle) remoteClosedSnapshot() bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
return b.remoteClosed
}
func (b *bulkHandle) localClosedSnapshot() bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
return b.localClosed
}
func (b *bulkHandle) localReadClosedSnapshot() bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
return b.localReadClosed
}
func (b *bulkHandle) writeStateErrorSnapshot() error {
if b == nil {
return io.ErrClosedPipe
}
b.mu.Lock()
defer b.mu.Unlock()
if b.resetErr != nil {
return b.resetErr
}
if b.localClosed || b.peerReadClosed {
return io.ErrClosedPipe
}
return nil
}
func (b *bulkHandle) notifyReadable() {
if b == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
b.notifyReadableLocked()
}
func (b *bulkHandle) notifyReadableLocked() {
if b == nil || b.readNotify == nil {
return
}
select {
case b.readNotify <- struct{}{}:
default:
}
}
func (b *bulkHandle) notifyFlowLocked() {
if b == nil || b.flowNotify == nil {
return
}
select {
case b.flowNotify <- struct{}{}:
default:
}
}
func (b *bulkHandle) closeWriteStateLocked() {
if b == nil || b.writeStateClosed {
return
}
b.writeStateClosed = true
if b.writeStateDone != nil {
close(b.writeStateDone)
}
}
func bulkAsyncWriteQueueSize(maxInFlight int) int {
if maxInFlight <= 0 {
maxInFlight = defaultBulkOpenMaxInFlight
}
if maxInFlight < 8 {
return 8
}
if maxInFlight > 128 {
return 128
}
return maxInFlight
}
func getBulkAsyncWritePayload(size int) []byte {
if size <= 0 {
return nil
}
if pooled, ok := bulkAsyncWritePayloadPool.Get().([]byte); ok && cap(pooled) >= size {
return pooled[:size]
}
return make([]byte, size)
}
func putBulkAsyncWritePayload(buf []byte) {
if cap(buf) == 0 || cap(buf) > 8*1024*1024 {
return
}
bulkAsyncWritePayloadPool.Put(buf[:0])
}
func (b *bulkHandle) beginPendingAsyncWriteLocked() {
if b == nil {
return
}
if b.pendingAsyncWrites == 0 {
b.writeDrain = make(chan struct{})
}
b.pendingAsyncWrites++
}
func (b *bulkHandle) finishPendingAsyncWrite() {
if b == nil {
return
}
b.mu.Lock()
if b.pendingAsyncWrites > 0 {
b.pendingAsyncWrites--
if b.pendingAsyncWrites == 0 && b.writeDrain != nil {
close(b.writeDrain)
}
}
b.mu.Unlock()
}
func (b *bulkHandle) waitPendingAsyncWrites(ctx context.Context) error {
if b == nil {
return io.ErrClosedPipe
}
if ctx == nil {
ctx = context.Background()
}
b.mu.Lock()
if b.pendingAsyncWrites == 0 {
b.mu.Unlock()
return nil
}
drain := b.writeDrain
b.mu.Unlock()
select {
case <-ctx.Done():
if err := b.writeStateErrorSnapshot(); err != nil {
return err
}
return normalizeStreamDeadlineError(ctx.Err())
case <-b.Context().Done():
if err := b.writeStateErrorSnapshot(); err != nil {
return err
}
return context.Canceled
case <-drain:
return b.writeStateErrorSnapshot()
}
}
func (b *bulkHandle) enqueueAsyncWrite(ctx context.Context, req bulkAsyncWriteRequest) error {
if b == nil {
return io.ErrClosedPipe
}
if ctx == nil {
ctx = context.Background()
}
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
if b.localClosed || b.peerReadClosed {
b.mu.Unlock()
return io.ErrClosedPipe
}
queue := b.writeQueue
if queue == nil {
b.mu.Unlock()
return errBulkDataPathNotReady
}
b.beginPendingAsyncWriteLocked()
b.mu.Unlock()
select {
case queue <- req:
return nil
case <-ctx.Done():
b.finishPendingAsyncWrite()
return normalizeStreamDeadlineError(ctx.Err())
case <-b.Context().Done():
b.finishPendingAsyncWrite()
if err := b.writeStateErrorSnapshot(); err != nil {
return err
}
return context.Canceled
}
}
func (b *bulkHandle) runAsyncWriteLoop() {
if b == nil {
return
}
defer close(b.writeWorkerDone)
for {
select {
case <-b.Context().Done():
b.drainPendingAsyncWrites()
return
case req := <-b.writeQueue:
b.processAsyncWrite(req)
}
}
}
func (b *bulkHandle) drainPendingAsyncWrites() {
if b == nil || b.writeQueue == nil {
return
}
for {
select {
case req := <-b.writeQueue:
b.rollbackOutboundWindow(len(req.payload), req.chunks)
putBulkAsyncWritePayload(req.payload)
b.finishPendingAsyncWrite()
default:
return
}
}
}
func (b *bulkHandle) processAsyncWrite(req bulkAsyncWriteRequest) {
if b == nil {
return
}
defer putBulkAsyncWritePayload(req.payload)
defer b.finishPendingAsyncWrite()
if len(req.payload) == 0 {
return
}
if err := b.writeStateErrorSnapshot(); err != nil {
b.rollbackOutboundWindow(len(req.payload), req.chunks)
return
}
b.notifyAcceptStarted()
b.mu.Lock()
writeTimeout := b.writeTimeout
bulkCtx := b.ctx
b.mu.Unlock()
sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout)
if err != nil {
b.rollbackOutboundWindow(len(req.payload), req.chunks)
if stateErr := b.writeStateErrorSnapshot(); stateErr == nil {
b.markReset(err)
}
return
}
_, writeErr := b.executeSendWrite(sendCtx, req.startSeq, req.payload, req.chunks, true)
cancel()
if writeErr != nil {
b.markReset(writeErr)
}
}
func (b *bulkHandle) executeSendWrite(ctx context.Context, startSeq uint64, payload []byte, chunks int, payloadOwned bool) (int, error) {
if b == nil {
return 0, io.ErrClosedPipe
}
if len(payload) == 0 {
return 0, nil
}
if chunks <= 0 {
chunks = 1
}
if err := b.writeStateErrorSnapshot(); err != nil {
b.rollbackOutboundWindow(len(payload), chunks)
return 0, err
}
b.mu.Lock()
sendWriteFn := b.sendWriteFn
chunkSize := b.chunkSize
b.mu.Unlock()
if sendWriteFn == nil {
b.rollbackOutboundWindow(len(payload), chunks)
return 0, errBulkDataPathNotReady
}
written, err := sendWriteFn(ctx, b, startSeq, payload, payloadOwned)
if written < 0 {
written = 0
}
if written > len(payload) {
written = len(payload)
}
if written > 0 {
b.recordWrite(written, time.Now())
}
if written < len(payload) {
remaining := len(payload) - written
b.rollbackOutboundWindow(remaining, bulkPayloadChunkCount(remaining, chunkSize))
}
if err != nil {
if b.canIgnoreDedicatedCloseSendError(err) {
return written, nil
}
return written, b.normalizeWriteError(err)
}
if written != len(payload) {
return written, io.ErrShortWrite
}
return written, nil
}
func (b *bulkHandle) normalizeWriteError(err error) error {
if err == nil {
return nil
}
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
return stateErr
}
return normalizeStreamDeadlineError(err)
}
func (b *bulkHandle) writeStateDoneSnapshot() <-chan struct{} {
if b == nil {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
return b.writeStateDone
}
func (b *bulkHandle) newWriteContext(parent context.Context, timeout time.Duration) (context.Context, func(), error) {
baseParent := parent
if b != nil && parent == b.ctx && b.writeCtx != nil {
baseParent = b.writeCtx
}
ctx, cancel, err := bulkWriteContext(baseParent, timeout)
if err != nil {
return nil, func() {}, err
}
if b == nil {
return ctx, cancel, nil
}
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
cancel()
return nil, func() {}, stateErr
}
return ctx, cancel, nil
}
func (b *bulkHandle) canIgnoreDedicatedCloseSendError(err error) bool {
if b == nil || !b.dedicated || err == nil {
return false
}
b.mu.Lock()
defer b.mu.Unlock()
if !(b.ctx.Err() != nil || b.remoteClosed || b.peerReadClosed || b.localClosed) {
return false
}
if errors.Is(err, errTransportDetached) || errors.Is(err, net.ErrClosed) || errors.Is(err, io.ErrClosedPipe) {
return true
}
message := strings.ToLower(err.Error())
return strings.Contains(message, "broken pipe") || strings.Contains(message, "use of closed network connection")
}
func bulkWriteContext(parent context.Context, timeout time.Duration) (context.Context, func(), error) {
if parent == nil {
parent = context.Background()
}
deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{})
if !deadline.IsZero() && !deadline.After(time.Now()) {
return nil, func() {}, normalizeStreamDeadlineError(context.DeadlineExceeded)
}
if deadline.IsZero() {
return parent, func() {}, nil
}
ctx, cancel := context.WithDeadline(parent, deadline)
return ctx, cancel, nil
}
func normalizeBulkOpenRequest(req BulkOpenRequest) BulkOpenRequest {
req.Range = normalizeBulkRange(req.Range)
req.Metadata = cloneBulkMetadata(req.Metadata)
req.FastPathVersion = normalizeBulkFastPathVersion(req.FastPathVersion)
if req.Dedicated && req.DedicatedLaneID == 0 {
req.DedicatedLaneID = 1
}
if req.ChunkSize <= 0 {
req.ChunkSize = defaultBulkChunkSize
}
if req.WindowBytes <= 0 {
req.WindowBytes = defaultBulkOpenWindowBytes
}
if req.MaxInFlight <= 0 {
req.MaxInFlight = defaultBulkOpenMaxInFlight
}
if req.ReadTimeout < 0 {
req.ReadTimeout = defaultBulkControlReadTimeout
}
if req.WriteTimeout < 0 {
req.WriteTimeout = defaultBulkControlWriteTimeout
}
return req
}
func normalizeBulkOpenOptions(opt BulkOpenOptions) BulkOpenOptions {
mode := normalizeBulkOpenMode(opt.Mode)
switch mode {
case BulkOpenModeDefault:
// Preserve legacy behavior when Mode is not explicitly set.
if opt.Dedicated {
mode = BulkOpenModeDedicated
} else {
mode = BulkOpenModeShared
}
}
readTimeout := opt.ReadTimeout
if readTimeout < 0 {
readTimeout = defaultBulkControlReadTimeout
}
writeTimeout := opt.WriteTimeout
if writeTimeout < 0 {
writeTimeout = defaultBulkControlWriteTimeout
}
return BulkOpenOptions{
ID: opt.ID,
Range: normalizeBulkRange(opt.Range),
Metadata: cloneBulkMetadata(opt.Metadata),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Mode: mode,
Dedicated: mode == BulkOpenModeDedicated,
ChunkSize: opt.ChunkSize,
WindowBytes: opt.WindowBytes,
MaxInFlight: opt.MaxInFlight,
}
}
func normalizeBulkOpenMode(mode BulkOpenMode) BulkOpenMode {
switch mode {
case BulkOpenModeDefault, BulkOpenModeAuto, BulkOpenModeShared, BulkOpenModeDedicated:
return mode
default:
return BulkOpenModeDefault
}
}
func initialBulkDedicatedAttachState(dedicated bool) bulkDedicatedAttachState {
if dedicated {
return bulkDedicatedAttachStatePending
}
return bulkDedicatedAttachStateAttached
}
func bulkPayloadChunkCount(payloadLen int, chunkSize int) int {
if payloadLen <= 0 {
return 0
}
if chunkSize <= 0 {
chunkSize = defaultBulkChunkSize
}
return (payloadLen + chunkSize - 1) / chunkSize
}
func bulkDedicatedAttachStateName(state bulkDedicatedAttachState) string {
switch state {
case bulkDedicatedAttachStatePending:
return "pending"
case bulkDedicatedAttachStateAttached:
return "attached"
case bulkDedicatedAttachStateDegraded:
return "degraded"
case bulkDedicatedAttachStateClosed:
return "closed"
default:
return "unknown"
}
}
func bulkOpenModeName(mode BulkOpenMode) string {
switch normalizeBulkOpenMode(mode) {
case BulkOpenModeAuto:
return "auto"
case BulkOpenModeShared:
return "shared"
case BulkOpenModeDedicated:
return "dedicated"
case BulkOpenModeDefault:
fallthrough
default:
return "default"
}
}
func normalizeBulkRange(r BulkRange) BulkRange {
if r.Offset < 0 {
r.Offset = -1
}
if r.Length < 0 {
r.Length = -1
}
return r
}
func validBulkRange(r BulkRange) bool {
return r.Offset >= 0 && r.Length >= 0
}
func cloneBulkMetadata(src BulkMetadata) BulkMetadata {
if len(src) == 0 {
return nil
}
dst := make(BulkMetadata, len(src))
for key, value := range src {
dst[key] = value
}
return dst
}
func bulkResetError(err error) error {
if err == nil {
return errBulkReset
}
return err
}
func bulkResetMessage(err error) string {
if err == nil {
return ""
}
return err.Error()
}