package notify import ( "context" "errors" "io" "net" "strings" "sync" "sync/atomic" "time" ) const ( BulkOpenSignalKey = "notify.bulk.open" BulkCloseSignalKey = "notify.bulk.close" BulkResetSignalKey = "notify.bulk.reset" BulkReadySignalKey = "notify.bulk.ready" BulkReleaseSignalKey = "notify.bulk.release" defaultBulkChunkSize = 1024 * 1024 defaultBulkInboundQueueLimit = 256 defaultBulkInboundBytesLimit = 64 * 1024 * 1024 defaultBulkOpenWindowBytes = 16 * 1024 * 1024 defaultBulkOpenMaxInFlight = 32 defaultBulkControlReadTimeout = 0 defaultBulkControlWriteTimeout = 0 defaultBulkAcceptReadyTimeout = 10 * time.Second ) type BulkMetadata map[string]string type BulkRange struct { Offset int64 Length int64 } type BulkOpenMode uint8 const ( // BulkOpenModeDefault keeps legacy behavior: // Dedicated=true -> dedicated, otherwise shared. BulkOpenModeDefault BulkOpenMode = iota // BulkOpenModeAuto prefers dedicated and falls back to shared. BulkOpenModeAuto // BulkOpenModeShared forces shared transport path. BulkOpenModeShared // BulkOpenModeDedicated forces dedicated transport path. BulkOpenModeDedicated ) type BulkNetworkProfile uint8 const ( // BulkNetworkProfileDefault keeps legacy defaults. BulkNetworkProfileDefault BulkNetworkProfile = iota // BulkNetworkProfileLAN is optimized for low-latency/local links. BulkNetworkProfileLAN // BulkNetworkProfileWAN is tuned for moderate RTT and occasional loss. BulkNetworkProfileWAN // BulkNetworkProfileConstrained is tuned for low bandwidth and unstable links. BulkNetworkProfileConstrained ) type BulkDedicatedAttachConfig struct { // AttachLimit limits concurrent dedicated attach handshakes per client session. // 0 means unlimited. AttachLimit int // ActiveLimit limits active logical dedicated bulks per client session. // Physical sidecars remain bounded by LaneLimit. // 0 means unlimited. ActiveLimit int // LaneLimit limits dedicated physical sidecar lanes per client session. // 0 means unlimited. LaneLimit int // Retry controls extra retries after the first attach attempt. Retry int // Backoff is the base retry backoff. Backoff time.Duration // DialTimeout is used for dedicated sidecar dialing. DialTimeout time.Duration // HelloTimeout is used for dedicated attach request/response handshake. HelloTimeout time.Duration } type BulkOpenTuning struct { ChunkSize int WindowBytes int MaxInFlight int } type bulkDedicatedAttachState uint8 const ( bulkDedicatedAttachStatePending bulkDedicatedAttachState = iota bulkDedicatedAttachStateAttached bulkDedicatedAttachStateDegraded bulkDedicatedAttachStateClosed ) type BulkOpenOptions struct { ID string Range BulkRange Metadata BulkMetadata ReadTimeout time.Duration WriteTimeout time.Duration Mode BulkOpenMode // Deprecated: Dedicated is kept for backward compatibility. // Prefer Mode. Dedicated bool ChunkSize int WindowBytes int MaxInFlight int } type BulkAcceptInfo struct { ID string Range BulkRange Metadata BulkMetadata Dedicated bool LogicalConn *LogicalConn TransportConn *TransportConn TransportGeneration uint64 Bulk Bulk } type Bulk interface { io.Reader io.Writer io.Closer ID() string Range() BulkRange Metadata() BulkMetadata Context() context.Context LogicalConn() *LogicalConn TransportConn() *TransportConn TransportGeneration() uint64 CloseWrite() error Reset(error) error Snapshot() BulkSnapshot } var ( errBulkClientNil = errors.New("bulk client is nil") errBulkServerNil = errors.New("bulk server is nil") errBulkLogicalConnNil = errors.New("bulk logical connection is nil") errBulkTransportNil = errors.New("bulk transport connection is nil") errBulkRuntimeNil = errors.New("bulk runtime is nil") errBulkIDEmpty = errors.New("bulk id is empty") errBulkAlreadyExists = errors.New("bulk already exists") errBulkNotFound = errors.New("bulk not found") errBulkHandlerNotConfigured = errors.New("bulk handler is not configured") errBulkRejected = errors.New("bulk open rejected") errBulkReset = errors.New("bulk reset") errBulkDataIDEmpty = errors.New("bulk data id is empty") errBulkDataPathNotReady = errors.New("bulk data path is not implemented yet") errBulkRangeInvalid = errors.New("bulk range is invalid") errBulkBackpressureExceeded = errors.New("bulk inbound backpressure exceeded") errBulkDedicatedStreamOnly = errors.New("dedicated bulk requires stream transport") errBulkDedicatedSingleConn = errors.New("dedicated bulk requires a dialable additional connection source; ConnectByConn only supports shared transport") errBulkDedicatedActiveLimit = errors.New("dedicated bulk active limit reached") ) func clientDedicatedBulkSupportError(c *ClientCommon) error { if c == nil { return errBulkClientNil } if conn := c.clientTransportConnSnapshot(); conn != nil && isPacketTransportConn(conn) { return errBulkDedicatedStreamOnly } if source := c.clientConnectSourceSnapshot(); source != nil && source.isUDP() { return errBulkDedicatedStreamOnly } if source := c.clientConnectSourceSnapshot(); source != nil && !source.supportsAdditionalConn() { return errBulkDedicatedSingleConn } return nil } func logicalDedicatedBulkSupportError(logical *LogicalConn) error { if logical == nil { return errBulkLogicalConnNil } if transport := logical.CurrentTransportConn(); transport != nil { return transportDedicatedBulkSupportError(transport) } if addr := logical.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) { return errBulkDedicatedStreamOnly } return nil } func transportDedicatedBulkSupportError(transport *TransportConn) error { if transport == nil { return errBulkTransportNil } if !transport.UsesStreamTransport() { return errBulkDedicatedStreamOnly } if addr := transport.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) { return errBulkDedicatedStreamOnly } return nil } type bulkCloseSender func(context.Context, *bulkHandle, bool) error type bulkResetSender func(context.Context, *bulkHandle, string) error type bulkDataSender func(context.Context, *bulkHandle, []byte) error type bulkWriteSender func(context.Context, *bulkHandle, uint64, []byte, bool) (int, error) type bulkReleaseSender func(*bulkHandle, int64, int) error type bulkAsyncWriteRequest struct { startSeq uint64 payload []byte chunks int } var bulkAsyncWritePayloadPool sync.Pool type bulkReadChunk struct { data []byte release func() } func (c *bulkReadChunk) clear() { if c == nil { return } if c.release != nil { c.release() } c.data = nil c.release = nil } type bulkReadPayloadOwner struct { refs atomic.Int32 release func() } func newBulkReadPayloadOwner(release func()) *bulkReadPayloadOwner { if release == nil { return nil } owner := &bulkReadPayloadOwner{release: release} owner.refs.Store(1) return owner } func (o *bulkReadPayloadOwner) retainChunk() func() { if o == nil { return nil } o.refs.Add(1) return o.releaseChunk } func (o *bulkReadPayloadOwner) releaseChunk() { if o == nil { return } if o.refs.Add(-1) == 0 && o.release != nil { o.release() } } func (o *bulkReadPayloadOwner) done() { if o == nil { return } o.releaseChunk() } type bulkHandle struct { runtime *bulkRuntime runtimeScope string id string dataID uint64 fastPathVersion uint8 outboundSeq uint64 rangeSpec BulkRange metadata BulkMetadata sessionEpoch uint64 client *ClientCommon logical *LogicalConn transport *TransportConn transportGeneration uint64 readTimeout time.Duration writeTimeout time.Duration dedicated bool dedicatedLaneID uint32 dedicatedAttachToken string chunkSize int windowBytes int maxInFlight int inboundQueueLimit int inboundBytesLimit int closeFn bulkCloseSender resetFn bulkResetSender sendDataFn bulkDataSender sendWriteFn bulkWriteSender releaseFn bulkReleaseSender ctx context.Context cancel context.CancelFunc writeCtx context.Context writeCtxCancel context.CancelFunc createdAt time.Time writeMu sync.Mutex mu sync.Mutex writeQueue chan bulkAsyncWriteRequest writeWorkerDone chan struct{} writeDrain chan struct{} pendingAsyncWrites int localClosed bool localReadClosed bool remoteClosed bool peerReadClosed bool resetErr error readQueue []bulkReadChunk readBuf bulkReadChunk bufferedBytes int readNotify chan struct{} flowNotify chan struct{} writeStateDone chan struct{} writeStateClosed bool releaseNotify chan struct{} releaseWorkerDone chan struct{} pendingReleaseBytes int64 pendingReleaseChunks int outboundAvailBytes int64 outboundInFlight int bytesRead int64 bytesWritten int64 readCalls int64 writeCalls int64 lastReadAt time.Time lastWriteAt time.Time dedicatedMu sync.Mutex dedicatedConn net.Conn dedicatedConnOwned bool dedicatedSender *bulkDedicatedSender dedicatedReady chan struct{} dedicatedWriteClosed bool dedicatedActiveLease bool dedicatedState bulkDedicatedAttachState dedicatedAttempts uint32 dedicatedLastCode string dedicatedDataStarted bool acceptMu sync.Mutex acceptDispatched bool acceptReady chan struct{} acceptReadyDone bool acceptReadyErr error acceptNotifyFn func(error) acceptNotifySent bool } func newBulkHandle(parent context.Context, runtime *bulkRuntime, runtimeScope string, req BulkOpenRequest, sessionEpoch uint64, logical *LogicalConn, transport *TransportConn, transportGeneration uint64, closeFn bulkCloseSender, resetFn bulkResetSender, sendDataFn bulkDataSender, sendWriteFn bulkWriteSender, releaseFn bulkReleaseSender) *bulkHandle { if parent == nil { parent = context.Background() } ctx, cancel := context.WithCancel(parent) if transportGeneration == 0 && transport != nil { transportGeneration = transport.TransportGeneration() } if transportGeneration == 0 && logical != nil { transportGeneration = logical.transportGenerationSnapshot() } req = normalizeBulkOpenRequest(req) handle := &bulkHandle{ runtime: runtime, runtimeScope: runtimeScope, id: req.BulkID, dataID: req.DataID, fastPathVersion: normalizeBulkFastPathVersion(req.FastPathVersion), rangeSpec: req.Range, metadata: cloneBulkMetadata(req.Metadata), sessionEpoch: sessionEpoch, logical: logical, transport: transport, transportGeneration: transportGeneration, readTimeout: req.ReadTimeout, writeTimeout: req.WriteTimeout, dedicated: req.Dedicated, dedicatedLaneID: req.DedicatedLaneID, dedicatedAttachToken: req.AttachToken, chunkSize: req.ChunkSize, windowBytes: req.WindowBytes, maxInFlight: req.MaxInFlight, inboundQueueLimit: defaultBulkInboundQueueLimit, inboundBytesLimit: defaultBulkInboundBytesLimit, closeFn: closeFn, resetFn: resetFn, sendDataFn: sendDataFn, sendWriteFn: sendWriteFn, releaseFn: releaseFn, ctx: ctx, cancel: cancel, createdAt: time.Now(), readNotify: make(chan struct{}, 1), flowNotify: make(chan struct{}, 1), writeStateDone: make(chan struct{}), dedicatedReady: make(chan struct{}), dedicatedState: initialBulkDedicatedAttachState(req.Dedicated), acceptReady: make(chan struct{}), outboundAvailBytes: int64(req.WindowBytes), } drain := make(chan struct{}) close(drain) handle.writeDrain = drain if sendWriteFn != nil { handle.writeCtx, handle.writeCtxCancel = context.WithCancel(ctx) handle.writeQueue = make(chan bulkAsyncWriteRequest, bulkAsyncWriteQueueSize(req.MaxInFlight)) handle.writeWorkerDone = make(chan struct{}) go func(parentDone <-chan struct{}, writeDone <-chan struct{}, cancel context.CancelFunc) { select { case <-parentDone: case <-writeDone: cancel() } }(ctx.Done(), handle.writeStateDone, handle.writeCtxCancel) go handle.runAsyncWriteLoop() } if handle.flowControlEnabled() { handle.releaseNotify = make(chan struct{}, 1) handle.releaseWorkerDone = make(chan struct{}) go handle.runWindowReleaseLoop() } return handle } func (b *bulkHandle) ID() string { if b == nil { return "" } return b.id } func (b *bulkHandle) fastPathVersionSnapshot() uint8 { if b == nil { return bulkFastPathVersionV1 } b.mu.Lock() defer b.mu.Unlock() return normalizeBulkFastPathVersion(b.fastPathVersion) } func (b *bulkHandle) FastPathVersion() uint8 { return b.fastPathVersionSnapshot() } func (b *bulkHandle) Range() BulkRange { if b == nil { return BulkRange{} } return b.rangeSpec } func (b *bulkHandle) Metadata() BulkMetadata { if b == nil { return nil } return cloneBulkMetadata(b.metadata) } func (b *bulkHandle) Context() context.Context { if b == nil || b.ctx == nil { return context.Background() } return b.ctx } func (b *bulkHandle) LogicalConn() *LogicalConn { if b == nil { return nil } return b.logical } func (b *bulkHandle) TransportConn() *TransportConn { if b == nil { return nil } return b.transport } func (b *bulkHandle) TransportGeneration() uint64 { if b == nil { return 0 } return b.transportGeneration } func (b *bulkHandle) Dedicated() bool { if b == nil { return false } return b.dedicated } func (b *bulkHandle) dedicatedLaneIDSnapshot() uint32 { if b == nil { return 0 } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() return b.dedicatedLaneID } func (b *bulkHandle) dedicatedAttachTokenSnapshot() string { if b == nil { return "" } b.mu.Lock() defer b.mu.Unlock() return b.dedicatedAttachToken } func (b *bulkHandle) setDedicatedAttachToken(token string) { if b == nil { return } b.mu.Lock() b.dedicatedAttachToken = token b.mu.Unlock() } func (b *bulkHandle) markDedicatedAttachAttempt() { if b == nil { return } b.dedicatedMu.Lock() b.dedicatedAttempts++ b.dedicatedMu.Unlock() } func (b *bulkHandle) setDedicatedAttachLastCode(code string) { if b == nil { return } b.dedicatedMu.Lock() b.dedicatedLastCode = code b.dedicatedMu.Unlock() } func (b *bulkHandle) markDedicatedAttachDegraded(code string) { if b == nil { return } b.dedicatedMu.Lock() b.dedicatedState = bulkDedicatedAttachStateDegraded b.dedicatedLastCode = code b.dedicatedMu.Unlock() } func (b *bulkHandle) markDedicatedAttachClosed() { if b == nil { return } b.dedicatedMu.Lock() b.dedicatedState = bulkDedicatedAttachStateClosed b.dedicatedMu.Unlock() } func (b *bulkHandle) dedicatedAttachStateSnapshot() bulkDedicatedAttachState { if b == nil { return bulkDedicatedAttachStateClosed } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() return b.dedicatedState } func (b *bulkHandle) dedicatedAttachDiagnosticsSnapshot() (state bulkDedicatedAttachState, attempts uint32, lastCode string, dataStarted bool) { if b == nil { return bulkDedicatedAttachStateClosed, 0, "", false } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() return b.dedicatedState, b.dedicatedAttempts, b.dedicatedLastCode, b.dedicatedDataStarted } func (b *bulkHandle) dedicatedConnSnapshot() net.Conn { if b == nil { return nil } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() return b.dedicatedConn } func (b *bulkHandle) dedicatedSenderSnapshot() *bulkDedicatedSender { if b == nil { return nil } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() return b.dedicatedSender } func (b *bulkHandle) installDedicatedSender(sender *bulkDedicatedSender) *bulkDedicatedSender { if b == nil || sender == nil { return nil } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() if b.dedicatedSender != nil { return b.dedicatedSender } b.dedicatedSender = sender return sender } func (b *bulkHandle) clearDedicatedSender() *bulkDedicatedSender { if b == nil { return nil } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() sender := b.dedicatedSender b.dedicatedSender = nil return sender } func (b *bulkHandle) dedicatedAttachedSnapshot() bool { return b.dedicatedConnSnapshot() != nil } func (b *bulkHandle) dedicatedDataStartedSnapshot() bool { if b == nil { return false } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() return b.dedicatedDataStarted } func (b *bulkHandle) markDedicatedDataStarted() { if b == nil { return } b.dedicatedMu.Lock() b.dedicatedDataStarted = true b.dedicatedMu.Unlock() } func (b *bulkHandle) waitDedicatedReady(ctx context.Context) error { if b == nil || !b.Dedicated() || b.dedicatedAttachedSnapshot() { return nil } if ctx == nil { ctx = context.Background() } if err := b.writeStateErrorSnapshot(); err != nil { return err } select { case <-b.dedicatedReady: return nil case <-ctx.Done(): return ctx.Err() case <-b.Context().Done(): if err := b.writeStateErrorSnapshot(); err != nil { return err } return context.Canceled } } func (b *bulkHandle) attachDedicatedConn(conn net.Conn) error { if b == nil { return io.ErrClosedPipe } if conn == nil { return net.ErrClosed } b.dedicatedMu.Lock() if b.dedicatedConn != nil { b.dedicatedMu.Unlock() return errors.New("bulk dedicated conn already attached") } b.dedicatedConn = conn b.dedicatedConnOwned = true b.dedicatedWriteClosed = false b.dedicatedState = bulkDedicatedAttachStateAttached b.dedicatedLastCode = "" ready := b.dedicatedReady b.dedicatedMu.Unlock() if ready != nil { select { case <-ready: default: close(ready) } } return nil } func (b *bulkHandle) attachDedicatedConnShared(conn net.Conn) error { if b == nil { return io.ErrClosedPipe } if conn == nil { return net.ErrClosed } b.dedicatedMu.Lock() if b.dedicatedConn != nil { if b.dedicatedConn == conn { b.dedicatedConnOwned = false b.dedicatedState = bulkDedicatedAttachStateAttached b.dedicatedLastCode = "" b.dedicatedMu.Unlock() return nil } b.dedicatedMu.Unlock() return errors.New("bulk dedicated conn already attached") } b.dedicatedConn = conn b.dedicatedConnOwned = false b.dedicatedWriteClosed = false b.dedicatedState = bulkDedicatedAttachStateAttached b.dedicatedLastCode = "" ready := b.dedicatedReady b.dedicatedMu.Unlock() if ready != nil { select { case <-ready: default: close(ready) } } return nil } func (b *bulkHandle) replaceDedicatedConn(conn net.Conn) (net.Conn, *bulkDedicatedSender, error) { if b == nil { return nil, nil, io.ErrClosedPipe } if conn == nil { return nil, nil, net.ErrClosed } b.dedicatedMu.Lock() oldConn := b.dedicatedConn oldOwned := b.dedicatedConnOwned oldSender := b.dedicatedSender b.dedicatedConn = conn b.dedicatedConnOwned = true b.dedicatedSender = nil b.dedicatedWriteClosed = false b.dedicatedState = bulkDedicatedAttachStateAttached b.dedicatedLastCode = "" ready := b.dedicatedReady b.dedicatedMu.Unlock() if ready != nil { select { case <-ready: default: close(ready) } } if !oldOwned { oldConn = nil } return oldConn, oldSender, nil } func (b *bulkHandle) replaceDedicatedConnShared(conn net.Conn) (net.Conn, *bulkDedicatedSender, error) { if b == nil { return nil, nil, io.ErrClosedPipe } if conn == nil { return nil, nil, net.ErrClosed } b.dedicatedMu.Lock() oldConn := b.dedicatedConn oldOwned := b.dedicatedConnOwned oldSender := b.dedicatedSender b.dedicatedConn = conn b.dedicatedConnOwned = false b.dedicatedSender = nil b.dedicatedWriteClosed = false b.dedicatedState = bulkDedicatedAttachStateAttached b.dedicatedLastCode = "" ready := b.dedicatedReady b.dedicatedMu.Unlock() if ready != nil { select { case <-ready: default: close(ready) } } if !oldOwned { oldConn = nil } return oldConn, oldSender, nil } func (b *bulkHandle) bestEffortCloseDedicatedWriteHalf() { if b == nil || !b.dedicated { return } b.dedicatedMu.Lock() conn := b.dedicatedConn owned := b.dedicatedConnOwned alreadyClosed := b.dedicatedWriteClosed b.dedicatedMu.Unlock() if conn == nil || alreadyClosed || !owned { return } type closeWriter interface { CloseWrite() error } if closeWriterConn, ok := conn.(closeWriter); ok { if err := closeWriterConn.CloseWrite(); err == nil { b.dedicatedMu.Lock() if b.dedicatedConn == conn { b.dedicatedWriteClosed = true } b.dedicatedMu.Unlock() } } } func (b *bulkHandle) dedicatedWriteHalfClosedSnapshot() bool { if b == nil { return false } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() return b.dedicatedWriteClosed } func (b *bulkHandle) setClientSnapshotOwner(client *ClientCommon) { if b == nil { return } b.client = client } func (b *bulkHandle) clearDedicatedConn() (net.Conn, bool) { if b == nil { return nil, false } b.dedicatedMu.Lock() conn := b.dedicatedConn owned := b.dedicatedConnOwned b.dedicatedConn = nil b.dedicatedConnOwned = false b.dedicatedWriteClosed = false b.dedicatedMu.Unlock() return conn, owned } func (b *bulkHandle) markDedicatedActiveReserved() { if b == nil { return } b.dedicatedMu.Lock() b.dedicatedActiveLease = true b.dedicatedMu.Unlock() } func (b *bulkHandle) releaseDedicatedActiveReserved() bool { if b == nil { return false } b.dedicatedMu.Lock() defer b.dedicatedMu.Unlock() if !b.dedicatedActiveLease { return false } b.dedicatedActiveLease = false return true } func (b *bulkHandle) markAcceptDispatched() bool { if b == nil { return false } b.acceptMu.Lock() defer b.acceptMu.Unlock() if b.acceptDispatched { return false } b.acceptDispatched = true return true } func (b *bulkHandle) markAcceptHandled() { if b == nil { return } b.acceptMu.Lock() b.acceptDispatched = true b.acceptMu.Unlock() } func (b *bulkHandle) setAcceptNotify(fn func(error)) { if b == nil { return } b.acceptMu.Lock() b.acceptNotifyFn = fn b.acceptMu.Unlock() } func (b *bulkHandle) clearAcceptNotify() { if b == nil { return } b.acceptMu.Lock() b.acceptNotifyFn = nil b.acceptMu.Unlock() } func (b *bulkHandle) markAcceptReady(err error) { if b == nil { return } b.acceptMu.Lock() if b.acceptReadyDone { b.acceptMu.Unlock() return } b.acceptReadyDone = true b.acceptReadyErr = err ch := b.acceptReady b.acceptMu.Unlock() if ch != nil { select { case <-ch: default: close(ch) } } } func (b *bulkHandle) waitAcceptReady(ctx context.Context) error { if b == nil { return io.ErrClosedPipe } if ctx == nil { ctx = context.Background() } b.acceptMu.Lock() if b.acceptReadyDone { err := b.acceptReadyErr b.acceptMu.Unlock() return err } ready := b.acceptReady b.acceptMu.Unlock() select { case <-ctx.Done(): return ctx.Err() case <-b.Context().Done(): if err := b.acceptReadyErrorSnapshot(); err != nil { return err } if err := b.resetErrSnapshot(); err != nil { return err } return context.Canceled case <-ready: return b.acceptReadyErrorSnapshot() } } func (b *bulkHandle) acceptReadyErrorSnapshot() error { if b == nil { return io.ErrClosedPipe } b.acceptMu.Lock() defer b.acceptMu.Unlock() return b.acceptReadyErr } func (b *bulkHandle) notifyAcceptStarted() { if b == nil { return } var notify func(error) b.acceptMu.Lock() if b.acceptNotifySent { b.acceptMu.Unlock() return } notify = b.acceptNotifyFn b.acceptNotifyFn = nil if notify == nil { b.acceptMu.Unlock() return } b.acceptNotifySent = true b.acceptMu.Unlock() go notify(nil) } func (b *bulkHandle) finishAcceptDispatch(err error) { if b == nil { return } var notify func(error) b.acceptMu.Lock() if b.acceptNotifySent { b.acceptMu.Unlock() return } notify = b.acceptNotifyFn b.acceptNotifyFn = nil b.acceptNotifySent = true b.acceptMu.Unlock() if notify != nil { go notify(err) } } func (b *bulkHandle) SessionEpoch() uint64 { if b == nil { return 0 } return b.sessionEpoch } func (b *bulkHandle) acceptsClientSessionEpoch(epoch uint64) bool { if b == nil { return false } if b.sessionEpoch == 0 || epoch == 0 { return true } return b.sessionEpoch == epoch } func (b *bulkHandle) acceptsTransportGeneration(transport *TransportConn) bool { if b == nil { return false } if b.transportGeneration == 0 || transport == nil { return true } return b.transportGeneration == transport.TransportGeneration() } func (b *bulkHandle) dataIDSnapshot() uint64 { if b == nil { return 0 } return b.dataID } func (b *bulkHandle) nextOutboundDataSeq() uint64 { return b.reserveOutboundDataSeqs(1) } func (b *bulkHandle) reserveOutboundDataSeqs(count int) uint64 { if b == nil || count <= 0 { return 0 } b.mu.Lock() defer b.mu.Unlock() start := b.outboundSeq + 1 b.outboundSeq += uint64(count) return start } func (b *bulkHandle) Read(p []byte) (int, error) { if len(p) == 0 { return 0, nil } if b == nil { return 0, io.ErrClosedPipe } b.notifyAcceptStarted() for { b.mu.Lock() localReadClosed := b.localReadClosed if len(b.readBuf.data) > 0 { n := copy(p, b.readBuf.data) b.readBuf.data = b.readBuf.data[n:] b.bufferedBytes -= n if b.bufferedBytes < 0 { b.bufferedBytes = 0 } if len(b.readBuf.data) == 0 { b.readBuf.clear() } b.recordReadLocked(n, time.Now()) b.mu.Unlock() b.maybeSendWindowRelease(n, false) return n, nil } if len(b.readQueue) > 0 { b.readBuf = b.readQueue[0] b.readQueue[0] = bulkReadChunk{} b.readQueue = b.readQueue[1:] b.mu.Unlock() continue } resetErr := b.resetErr remoteClosed := b.remoteClosed notify := b.readNotify ctx := b.ctx readTimeout := b.readTimeout b.mu.Unlock() if localReadClosed { b.maybeSendWindowRelease(0, true) return 0, io.ErrClosedPipe } if resetErr != nil { b.maybeSendWindowRelease(0, true) return 0, resetErr } if remoteClosed { b.maybeSendWindowRelease(0, true) return 0, io.EOF } if err := b.waitReadable(ctx, notify, readTimeout); err != nil { b.maybeSendWindowRelease(0, true) return 0, err } } } func (b *bulkHandle) Write(p []byte) (int, error) { if len(p) == 0 { return 0, nil } if b == nil { return 0, io.ErrClosedPipe } b.notifyAcceptStarted() b.writeMu.Lock() defer b.writeMu.Unlock() b.mu.Lock() resetErr := b.resetErr localClosed := b.localClosed peerReadClosed := b.peerReadClosed sendDataFn := b.sendDataFn sendWriteFn := b.sendWriteFn chunkSize := b.chunkSize writeTimeout := b.writeTimeout bulkCtx := b.ctx b.mu.Unlock() if resetErr != nil { return 0, resetErr } if localClosed || peerReadClosed { return 0, io.ErrClosedPipe } if sendDataFn == nil { return 0, errBulkDataPathNotReady } if sendWriteFn != nil { written := 0 for written < len(p) { end := len(p) if b.windowBytes > 0 && end-written > b.windowBytes { end = written + b.windowBytes } if chunkSize > 0 && b.maxInFlight > 0 { maxPartBytes := chunkSize * b.maxInFlight if maxPartBytes > 0 && end-written > maxPartBytes { end = written + maxPartBytes } } part := p[written:end] partChunks := bulkPayloadChunkCount(len(part), chunkSize) sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout) if err != nil { return written, err } if err := b.acquireOutboundWindow(sendCtx, len(part), partChunks); err != nil { cancel() return written, b.normalizeWriteError(err) } startSeq := b.reserveOutboundDataSeqs(partChunks) if b.dedicated { partWritten, err := b.executeSendWrite(sendCtx, startSeq, part, partChunks, false) cancel() written += partWritten if err != nil { return written, err } continue } owned := getBulkAsyncWritePayload(len(part)) copy(owned, part) err = b.enqueueAsyncWrite(sendCtx, bulkAsyncWriteRequest{ startSeq: startSeq, payload: owned, chunks: partChunks, }) cancel() if err != nil { putBulkAsyncWritePayload(owned) b.rollbackOutboundWindow(len(part), partChunks) return written, b.normalizeWriteError(err) } written += len(part) } return written, nil } if chunkSize <= 0 { chunkSize = defaultBulkChunkSize } written := 0 for written < len(p) { end := written + chunkSize if end > len(p) { end = len(p) } chunk := p[written:end] sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout) if err != nil { if written > 0 { b.recordWrite(written, time.Now()) } return written, err } if err := b.acquireOutboundWindow(sendCtx, len(chunk), 1); err != nil { cancel() if written > 0 { b.recordWrite(written, time.Now()) } return written, b.normalizeWriteError(err) } err = sendDataFn(sendCtx, b, chunk) cancel() if err != nil { b.rollbackOutboundWindow(len(chunk), 1) if written > 0 { b.recordWrite(written, time.Now()) } return written, b.normalizeWriteError(err) } written = end } if written > 0 { b.recordWrite(written, time.Now()) } return written, nil } func (b *bulkHandle) Close() error { return b.close(true) } func (b *bulkHandle) CloseWrite() error { return b.close(false) } func (b *bulkHandle) close(full bool) error { if b == nil { return nil } b.notifyAcceptStarted() b.writeMu.Lock() defer b.writeMu.Unlock() b.mu.Lock() if b.resetErr != nil { err := b.resetErr b.mu.Unlock() return err } if b.localClosed { if !full || b.localReadClosed { b.mu.Unlock() return nil } closeFn := b.closeFn b.mu.Unlock() if closeFn != nil && !b.dedicatedWriteHalfClosedSnapshot() { if err := closeFn(context.Background(), b, true); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) { return err } } b.bestEffortCloseDedicatedWriteHalf() b.mu.Lock() if b.localReadClosed { b.mu.Unlock() return nil } b.localReadClosed = true b.clearBufferedDataLocked() b.closeWriteStateLocked() shouldFinalize := b.shouldFinalizeLocked() b.mu.Unlock() b.notifyReadable() if shouldFinalize { b.finalize() } return nil } closeFn := b.closeFn b.mu.Unlock() if err := b.waitPendingAsyncWrites(context.Background()); err != nil { return err } if closeFn != nil { if err := closeFn(context.Background(), b, full); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) { return err } } b.bestEffortCloseDedicatedWriteHalf() b.mu.Lock() if b.localClosed { b.mu.Unlock() return nil } b.localClosed = true b.closeWriteStateLocked() if full { b.localReadClosed = true b.clearBufferedDataLocked() } shouldFinalize := b.shouldFinalizeLocked() b.mu.Unlock() if full { b.notifyReadable() } if shouldFinalize { b.finalize() } return nil } func (b *bulkHandle) Reset(err error) error { if b == nil { return nil } b.notifyAcceptStarted() resetErr := bulkResetError(err) b.mu.Lock() if b.resetErr != nil { err := b.resetErr b.mu.Unlock() return err } resetFn := b.resetFn b.mu.Unlock() if resetFn != nil { if sendErr := resetFn(context.Background(), b, bulkResetMessage(resetErr)); sendErr != nil { return sendErr } } b.markReset(resetErr) return nil } func (b *bulkHandle) Snapshot() BulkSnapshot { return b.snapshot() } func (b *bulkHandle) markRemoteClosed() { if b == nil { return } b.mu.Lock() b.remoteClosed = true shouldFinalize := b.shouldFinalizeLocked() b.mu.Unlock() b.notifyReadable() if shouldFinalize { b.finalize() } } func (b *bulkHandle) markPeerClosed() { if b == nil { return } b.mu.Lock() b.remoteClosed = true b.peerReadClosed = true b.closeWriteStateLocked() shouldFinalize := b.shouldFinalizeLocked() b.notifyFlowLocked() b.mu.Unlock() b.notifyReadable() if shouldFinalize { b.finalize() } } func (b *bulkHandle) markReset(err error) { if b == nil { return } resetErr := bulkResetError(err) b.mu.Lock() if b.resetErr == nil { b.resetErr = resetErr b.clearBufferedDataLocked() b.closeWriteStateLocked() } b.notifyFlowLocked() b.mu.Unlock() b.markAcceptReady(resetErr) b.notifyReadable() b.finalize() } func (b *bulkHandle) pushChunk(chunk []byte) error { return b.pushChunkWithOwnership(chunk, false) } func (b *bulkHandle) pushOwnedChunk(chunk []byte) error { return b.pushChunkWithOwnership(chunk, true) } func (b *bulkHandle) pushOwnedChunkNoReset(chunk []byte) error { return b.pushChunkWithOwnershipOptions(chunk, true, false) } func (b *bulkHandle) pushOwnedChunkWithReleaseNoReset(chunk []byte, release func()) error { return b.pushChunkWithOwnershipOptionsAndRelease(chunk, true, false, release) } func (b *bulkHandle) pushChunkWithOwnership(chunk []byte, owned bool) error { return b.pushChunkWithOwnershipOptions(chunk, owned, true) } func (b *bulkHandle) pushChunkWithOwnershipOptions(chunk []byte, owned bool, resetOnOverflow bool) error { return b.pushChunkWithOwnershipOptionsAndRelease(chunk, owned, resetOnOverflow, nil) } func (b *bulkHandle) pushChunkWithOwnershipOptionsAndRelease(chunk []byte, owned bool, resetOnOverflow bool, release func()) error { if b == nil { return io.ErrClosedPipe } if len(chunk) == 0 { if release != nil { release() } return nil } stored := bulkReadChunk{data: chunk, release: release} if !owned { stored.data = append([]byte(nil), chunk...) if stored.release != nil { stored.release() stored.release = nil } } b.mu.Lock() if b.resetErr != nil { err := b.resetErr b.mu.Unlock() stored.clear() return err } if b.inboundQueueLimit > 0 && b.bufferedChunkCountLocked() >= b.inboundQueueLimit { if !resetOnOverflow { b.mu.Unlock() stored.clear() return errBulkBackpressureExceeded } err := b.markResetLocked(errBulkBackpressureExceeded) b.mu.Unlock() stored.clear() b.notifyReadable() b.finalize() return err } if b.inboundBytesLimit > 0 && b.bufferedBytes+len(stored.data) > b.inboundBytesLimit { if !resetOnOverflow { b.mu.Unlock() stored.clear() return errBulkBackpressureExceeded } err := b.markResetLocked(errBulkBackpressureExceeded) b.mu.Unlock() stored.clear() b.notifyReadable() b.finalize() return err } b.readQueue = append(b.readQueue, stored) b.bufferedBytes += len(stored.data) b.notifyReadableLocked() b.mu.Unlock() return nil } func (b *bulkHandle) markResetLocked(err error) error { if b == nil { return io.ErrClosedPipe } if b.resetErr == nil { b.resetErr = bulkResetError(err) b.clearBufferedDataLocked() b.closeWriteStateLocked() } return b.resetErr } func (b *bulkHandle) clearBufferedDataLocked() { if b == nil { return } b.readBuf.clear() for i := range b.readQueue { b.readQueue[i].clear() } b.readQueue = nil b.readBuf = bulkReadChunk{} b.bufferedBytes = 0 } func (b *bulkHandle) flowControlEnabled() bool { if b == nil { return false } return b.releaseFn != nil && (b.windowBytes > 0 || b.maxInFlight > 0) } func (b *bulkHandle) releaseThresholdBytes() int64 { if b == nil { return int64(defaultBulkChunkSize) } threshold := b.chunkSize if threshold <= 0 { threshold = defaultBulkChunkSize } if b.windowBytes > 0 && threshold > b.windowBytes { threshold = b.windowBytes } if threshold <= 0 { threshold = defaultBulkChunkSize } return int64(threshold) } func (b *bulkHandle) maybeSendWindowRelease(consumed int, force bool) { if b == nil || !b.flowControlEnabled() { return } b.mu.Lock() if consumed > 0 { b.pendingReleaseBytes += int64(consumed) b.pendingReleaseChunks++ } if !force && b.pendingReleaseBytes < b.releaseThresholdBytes() { b.mu.Unlock() return } b.mu.Unlock() b.scheduleWindowRelease() } func (b *bulkHandle) scheduleWindowRelease() { if b == nil || b.releaseNotify == nil { return } select { case b.releaseNotify <- struct{}{}: default: } } func (b *bulkHandle) takePendingWindowRelease() (int64, int, bulkReleaseSender) { if b == nil { return 0, 0, nil } b.mu.Lock() defer b.mu.Unlock() bytes := b.pendingReleaseBytes chunks := b.pendingReleaseChunks release := b.releaseFn b.pendingReleaseBytes = 0 b.pendingReleaseChunks = 0 return bytes, chunks, release } func (b *bulkHandle) runWindowReleaseLoop() { if b == nil { return } defer close(b.releaseWorkerDone) for { select { case <-b.Context().Done(): return case <-b.releaseNotify: } for { bytes, chunks, release := b.takePendingWindowRelease() if release == nil || (bytes <= 0 && chunks <= 0) { break } _ = release(b, bytes, chunks) } } } func (b *bulkHandle) acquireOutboundWindow(ctx context.Context, size int, chunks int) error { if b == nil || size <= 0 || !b.flowControlEnabled() { return nil } if ctx == nil { ctx = context.Background() } need := int64(size) if chunks <= 0 { chunks = 1 } for { b.mu.Lock() if b.resetErr != nil { err := b.resetErr b.mu.Unlock() return err } if b.localClosed || b.peerReadClosed { b.mu.Unlock() return io.ErrClosedPipe } bytesOK := true if b.windowBytes > 0 { bytesOK = b.outboundAvailBytes >= need if !bytesOK && need > int64(b.windowBytes) && b.outboundInFlight == 0 { bytesOK = true } } chunksOK := true if b.maxInFlight > 0 { chunksOK = b.outboundInFlight+chunks <= b.maxInFlight } if bytesOK && chunksOK { if b.windowBytes > 0 { b.outboundAvailBytes -= need } if b.maxInFlight > 0 { b.outboundInFlight += chunks } b.mu.Unlock() return nil } notify := b.flowNotify b.mu.Unlock() select { case <-notify: case <-ctx.Done(): if stateErr := b.writeStateErrorSnapshot(); stateErr != nil { return stateErr } return normalizeStreamDeadlineError(ctx.Err()) } } } func (b *bulkHandle) rollbackOutboundWindow(size int, chunks int) { if b == nil || size <= 0 || !b.flowControlEnabled() { return } if chunks <= 0 { chunks = 1 } b.mu.Lock() if b.windowBytes > 0 { b.outboundAvailBytes += int64(size) maxAvail := int64(b.windowBytes) if b.outboundAvailBytes > maxAvail { b.outboundAvailBytes = maxAvail } } if b.maxInFlight > 0 && b.outboundInFlight > 0 { b.outboundInFlight -= chunks if b.outboundInFlight < 0 { b.outboundInFlight = 0 } } b.notifyFlowLocked() b.mu.Unlock() } func (b *bulkHandle) releaseOutboundWindow(bytes int64, chunks int) { if b == nil || !b.flowControlEnabled() { return } b.mu.Lock() if b.windowBytes > 0 && bytes > 0 { b.outboundAvailBytes += bytes maxAvail := int64(b.windowBytes) if b.outboundAvailBytes > maxAvail { b.outboundAvailBytes = maxAvail } } if b.maxInFlight > 0 && chunks > 0 { b.outboundInFlight -= chunks if b.outboundInFlight < 0 { b.outboundInFlight = 0 } } b.notifyFlowLocked() b.mu.Unlock() } func (b *bulkHandle) bufferedChunkCountLocked() int { if b == nil { return 0 } count := len(b.readQueue) if len(b.readBuf.data) > 0 { count++ } return count } func (b *bulkHandle) shouldFinalizeLocked() bool { if b == nil { return true } if b.resetErr != nil { return true } if b.dedicated { return b.localClosed && b.remoteClosed } return b.localReadClosed || (b.peerReadClosed && b.remoteClosed) || (b.localClosed && b.remoteClosed) } func (b *bulkHandle) snapshot() BulkSnapshot { if b == nil { return BulkSnapshot{} } dedicatedAttached := b.dedicatedAttachedSnapshot() dedicatedState, dedicatedAttempts, dedicatedLastCode, dedicatedDataStarted := b.dedicatedAttachDiagnosticsSnapshot() b.mu.Lock() defer b.mu.Unlock() snapshot := BulkSnapshot{ ID: b.id, DataID: b.dataID, FastPathVersion: normalizeBulkFastPathVersion(b.fastPathVersion), Scope: normalizeFileScope(b.runtimeScope), Range: b.rangeSpec, Metadata: cloneBulkMetadata(b.metadata), Dedicated: b.dedicated, DedicatedLaneID: b.dedicatedLaneID, DedicatedAttached: dedicatedAttached, DedicatedAttachState: bulkDedicatedAttachStateName( dedicatedState, ), DedicatedAttachAttempts: dedicatedAttempts, DedicatedAttachLastCode: dedicatedLastCode, DedicatedDataStarted: dedicatedDataStarted, SessionEpoch: b.sessionEpoch, TransportGeneration: b.transportGeneration, LocalClosed: b.localClosed, LocalReadClosed: b.localReadClosed, RemoteClosed: b.remoteClosed, PeerReadClosed: b.peerReadClosed, BufferedChunks: b.bufferedChunkCountLocked(), BufferedBytes: b.bufferedBytes, ReadTimeout: b.readTimeout, WriteTimeout: b.writeTimeout, ChunkSize: b.chunkSize, WindowBytes: b.windowBytes, MaxInFlight: b.maxInFlight, BytesRead: b.bytesRead, BytesWritten: b.bytesWritten, ReadCalls: b.readCalls, WriteCalls: b.writeCalls, OpenedAt: b.createdAt, LastReadAt: b.lastReadAt, LastWriteAt: b.lastWriteAt, } if b.logical != nil { snapshot.LogicalClientID = b.logical.ID() } if b.resetErr != nil { snapshot.ResetError = b.resetErr.Error() } var diag snapshotBindingDiagnostics switch { case b.logical != nil || b.transport != nil: diag = snapshotBindingDiagnosticsFromLogical(b.logical, b.transport, b.transportGeneration) case b.client != nil: diag = snapshotBindingDiagnosticsFromClient(b.client, b.sessionEpoch) } snapshot.BindingOwner = diag.BindingOwner snapshot.BindingAlive = diag.BindingAlive snapshot.BindingCurrent = diag.BindingCurrent snapshot.BindingReason = diag.BindingReason snapshot.BindingError = diag.BindingError snapshot.BindingBulkAdaptiveSoftPayloadBytes = diag.BindingBulkAdaptiveSoftPayloadBytes snapshot.TransportAttached = diag.TransportAttached snapshot.TransportHasRuntimeConn = diag.TransportHasRuntimeConn snapshot.TransportCurrent = diag.TransportCurrent snapshot.TransportDetachReason = diag.TransportDetachReason snapshot.TransportDetachKind = diag.TransportDetachKind snapshot.TransportDetachGeneration = diag.TransportDetachGeneration snapshot.TransportDetachError = diag.TransportDetachError snapshot.TransportDetachedAt = diag.TransportDetachedAt snapshot.ReattachEligible = diag.ReattachEligible return snapshot } func (b *bulkHandle) finalize() { if b == nil { return } b.markDedicatedAttachClosed() b.maybeSendWindowRelease(0, true) if b.cancel != nil { b.cancel() } if b.writeCtxCancel != nil { b.writeCtxCancel() } if sender := b.clearDedicatedSender(); sender != nil { sender.stop() } if b.client != nil && b.releaseDedicatedActiveReserved() { b.client.releaseBulkDedicatedActiveSlot() } if b.client != nil { b.client.releaseBulkDedicatedLane(b.dedicatedLaneIDSnapshot()) } if conn, owned := b.clearDedicatedConn(); conn != nil && owned { _ = conn.Close() } if b.runtime != nil { b.runtime.remove(b.runtimeScope, b.id) } } func (b *bulkHandle) recordReadLocked(n int, now time.Time) { if b == nil || n <= 0 { return } b.bytesRead += int64(n) b.readCalls++ b.lastReadAt = now } func (b *bulkHandle) recordWrite(n int, now time.Time) { if b == nil || n <= 0 { return } b.mu.Lock() b.bytesWritten += int64(n) b.writeCalls++ b.lastWriteAt = now b.mu.Unlock() } func (b *bulkHandle) waitReadable(ctx context.Context, notify <-chan struct{}, timeout time.Duration) error { if ctx == nil { ctx = context.Background() } deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{}) if deadline.IsZero() { select { case <-notify: return nil case <-ctx.Done(): if resetErr := b.resetErrSnapshot(); resetErr != nil { return resetErr } if b.localReadClosedSnapshot() { return io.ErrClosedPipe } if b.remoteClosedSnapshot() { return nil } return ctx.Err() } } if !deadline.After(time.Now()) { return normalizeStreamDeadlineError(context.DeadlineExceeded) } timer := time.NewTimer(time.Until(deadline)) defer timer.Stop() select { case <-notify: return nil case <-ctx.Done(): if resetErr := b.resetErrSnapshot(); resetErr != nil { return resetErr } if b.localReadClosedSnapshot() { return io.ErrClosedPipe } if b.remoteClosedSnapshot() { return nil } return normalizeStreamDeadlineError(ctx.Err()) case <-timer.C: return normalizeStreamDeadlineError(context.DeadlineExceeded) } } func (b *bulkHandle) resetErrSnapshot() error { if b == nil { return io.ErrClosedPipe } b.mu.Lock() defer b.mu.Unlock() return b.resetErr } func (b *bulkHandle) remoteClosedSnapshot() bool { if b == nil { return true } b.mu.Lock() defer b.mu.Unlock() return b.remoteClosed } func (b *bulkHandle) localClosedSnapshot() bool { if b == nil { return true } b.mu.Lock() defer b.mu.Unlock() return b.localClosed } func (b *bulkHandle) localReadClosedSnapshot() bool { if b == nil { return true } b.mu.Lock() defer b.mu.Unlock() return b.localReadClosed } func (b *bulkHandle) writeStateErrorSnapshot() error { if b == nil { return io.ErrClosedPipe } b.mu.Lock() defer b.mu.Unlock() if b.resetErr != nil { return b.resetErr } if b.localClosed || b.peerReadClosed { return io.ErrClosedPipe } return nil } func (b *bulkHandle) notifyReadable() { if b == nil { return } b.mu.Lock() defer b.mu.Unlock() b.notifyReadableLocked() } func (b *bulkHandle) notifyReadableLocked() { if b == nil || b.readNotify == nil { return } select { case b.readNotify <- struct{}{}: default: } } func (b *bulkHandle) notifyFlowLocked() { if b == nil || b.flowNotify == nil { return } select { case b.flowNotify <- struct{}{}: default: } } func (b *bulkHandle) closeWriteStateLocked() { if b == nil || b.writeStateClosed { return } b.writeStateClosed = true if b.writeStateDone != nil { close(b.writeStateDone) } } func bulkAsyncWriteQueueSize(maxInFlight int) int { if maxInFlight <= 0 { maxInFlight = defaultBulkOpenMaxInFlight } if maxInFlight < 8 { return 8 } if maxInFlight > 128 { return 128 } return maxInFlight } func getBulkAsyncWritePayload(size int) []byte { if size <= 0 { return nil } if pooled, ok := bulkAsyncWritePayloadPool.Get().([]byte); ok && cap(pooled) >= size { return pooled[:size] } return make([]byte, size) } func putBulkAsyncWritePayload(buf []byte) { if cap(buf) == 0 || cap(buf) > 8*1024*1024 { return } bulkAsyncWritePayloadPool.Put(buf[:0]) } func (b *bulkHandle) beginPendingAsyncWriteLocked() { if b == nil { return } if b.pendingAsyncWrites == 0 { b.writeDrain = make(chan struct{}) } b.pendingAsyncWrites++ } func (b *bulkHandle) finishPendingAsyncWrite() { if b == nil { return } b.mu.Lock() if b.pendingAsyncWrites > 0 { b.pendingAsyncWrites-- if b.pendingAsyncWrites == 0 && b.writeDrain != nil { close(b.writeDrain) } } b.mu.Unlock() } func (b *bulkHandle) waitPendingAsyncWrites(ctx context.Context) error { if b == nil { return io.ErrClosedPipe } if ctx == nil { ctx = context.Background() } b.mu.Lock() if b.pendingAsyncWrites == 0 { b.mu.Unlock() return nil } drain := b.writeDrain b.mu.Unlock() select { case <-ctx.Done(): if err := b.writeStateErrorSnapshot(); err != nil { return err } return normalizeStreamDeadlineError(ctx.Err()) case <-b.Context().Done(): if err := b.writeStateErrorSnapshot(); err != nil { return err } return context.Canceled case <-drain: return b.writeStateErrorSnapshot() } } func (b *bulkHandle) enqueueAsyncWrite(ctx context.Context, req bulkAsyncWriteRequest) error { if b == nil { return io.ErrClosedPipe } if ctx == nil { ctx = context.Background() } b.mu.Lock() if b.resetErr != nil { err := b.resetErr b.mu.Unlock() return err } if b.localClosed || b.peerReadClosed { b.mu.Unlock() return io.ErrClosedPipe } queue := b.writeQueue if queue == nil { b.mu.Unlock() return errBulkDataPathNotReady } b.beginPendingAsyncWriteLocked() b.mu.Unlock() select { case queue <- req: return nil case <-ctx.Done(): b.finishPendingAsyncWrite() return normalizeStreamDeadlineError(ctx.Err()) case <-b.Context().Done(): b.finishPendingAsyncWrite() if err := b.writeStateErrorSnapshot(); err != nil { return err } return context.Canceled } } func (b *bulkHandle) runAsyncWriteLoop() { if b == nil { return } defer close(b.writeWorkerDone) for { select { case <-b.Context().Done(): b.drainPendingAsyncWrites() return case req := <-b.writeQueue: b.processAsyncWrite(req) } } } func (b *bulkHandle) drainPendingAsyncWrites() { if b == nil || b.writeQueue == nil { return } for { select { case req := <-b.writeQueue: b.rollbackOutboundWindow(len(req.payload), req.chunks) putBulkAsyncWritePayload(req.payload) b.finishPendingAsyncWrite() default: return } } } func (b *bulkHandle) processAsyncWrite(req bulkAsyncWriteRequest) { if b == nil { return } defer putBulkAsyncWritePayload(req.payload) defer b.finishPendingAsyncWrite() if len(req.payload) == 0 { return } if err := b.writeStateErrorSnapshot(); err != nil { b.rollbackOutboundWindow(len(req.payload), req.chunks) return } b.notifyAcceptStarted() b.mu.Lock() writeTimeout := b.writeTimeout bulkCtx := b.ctx b.mu.Unlock() sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout) if err != nil { b.rollbackOutboundWindow(len(req.payload), req.chunks) if stateErr := b.writeStateErrorSnapshot(); stateErr == nil { b.markReset(err) } return } _, writeErr := b.executeSendWrite(sendCtx, req.startSeq, req.payload, req.chunks, true) cancel() if writeErr != nil { b.markReset(writeErr) } } func (b *bulkHandle) executeSendWrite(ctx context.Context, startSeq uint64, payload []byte, chunks int, payloadOwned bool) (int, error) { if b == nil { return 0, io.ErrClosedPipe } if len(payload) == 0 { return 0, nil } if chunks <= 0 { chunks = 1 } if err := b.writeStateErrorSnapshot(); err != nil { b.rollbackOutboundWindow(len(payload), chunks) return 0, err } b.mu.Lock() sendWriteFn := b.sendWriteFn chunkSize := b.chunkSize b.mu.Unlock() if sendWriteFn == nil { b.rollbackOutboundWindow(len(payload), chunks) return 0, errBulkDataPathNotReady } written, err := sendWriteFn(ctx, b, startSeq, payload, payloadOwned) if written < 0 { written = 0 } if written > len(payload) { written = len(payload) } if written > 0 { b.recordWrite(written, time.Now()) } if written < len(payload) { remaining := len(payload) - written b.rollbackOutboundWindow(remaining, bulkPayloadChunkCount(remaining, chunkSize)) } if err != nil { if b.canIgnoreDedicatedCloseSendError(err) { return written, nil } return written, b.normalizeWriteError(err) } if written != len(payload) { return written, io.ErrShortWrite } return written, nil } func (b *bulkHandle) normalizeWriteError(err error) error { if err == nil { return nil } if stateErr := b.writeStateErrorSnapshot(); stateErr != nil { return stateErr } return normalizeStreamDeadlineError(err) } func (b *bulkHandle) writeStateDoneSnapshot() <-chan struct{} { if b == nil { return nil } b.mu.Lock() defer b.mu.Unlock() return b.writeStateDone } func (b *bulkHandle) newWriteContext(parent context.Context, timeout time.Duration) (context.Context, func(), error) { baseParent := parent if b != nil && parent == b.ctx && b.writeCtx != nil { baseParent = b.writeCtx } ctx, cancel, err := bulkWriteContext(baseParent, timeout) if err != nil { return nil, func() {}, err } if b == nil { return ctx, cancel, nil } if stateErr := b.writeStateErrorSnapshot(); stateErr != nil { cancel() return nil, func() {}, stateErr } return ctx, cancel, nil } func (b *bulkHandle) canIgnoreDedicatedCloseSendError(err error) bool { if b == nil || !b.dedicated || err == nil { return false } b.mu.Lock() defer b.mu.Unlock() if !(b.ctx.Err() != nil || b.remoteClosed || b.peerReadClosed || b.localClosed) { return false } if errors.Is(err, errTransportDetached) || errors.Is(err, net.ErrClosed) || errors.Is(err, io.ErrClosedPipe) { return true } message := strings.ToLower(err.Error()) return strings.Contains(message, "broken pipe") || strings.Contains(message, "use of closed network connection") } func bulkWriteContext(parent context.Context, timeout time.Duration) (context.Context, func(), error) { if parent == nil { parent = context.Background() } deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{}) if !deadline.IsZero() && !deadline.After(time.Now()) { return nil, func() {}, normalizeStreamDeadlineError(context.DeadlineExceeded) } if deadline.IsZero() { return parent, func() {}, nil } ctx, cancel := context.WithDeadline(parent, deadline) return ctx, cancel, nil } func normalizeBulkOpenRequest(req BulkOpenRequest) BulkOpenRequest { req.Range = normalizeBulkRange(req.Range) req.Metadata = cloneBulkMetadata(req.Metadata) req.FastPathVersion = normalizeBulkFastPathVersion(req.FastPathVersion) if req.Dedicated && req.DedicatedLaneID == 0 { req.DedicatedLaneID = 1 } if req.ChunkSize <= 0 { req.ChunkSize = defaultBulkChunkSize } if req.WindowBytes <= 0 { req.WindowBytes = defaultBulkOpenWindowBytes } if req.MaxInFlight <= 0 { req.MaxInFlight = defaultBulkOpenMaxInFlight } if req.ReadTimeout < 0 { req.ReadTimeout = defaultBulkControlReadTimeout } if req.WriteTimeout < 0 { req.WriteTimeout = defaultBulkControlWriteTimeout } return req } func normalizeBulkOpenOptions(opt BulkOpenOptions) BulkOpenOptions { mode := normalizeBulkOpenMode(opt.Mode) switch mode { case BulkOpenModeDefault: // Preserve legacy behavior when Mode is not explicitly set. if opt.Dedicated { mode = BulkOpenModeDedicated } else { mode = BulkOpenModeShared } } readTimeout := opt.ReadTimeout if readTimeout < 0 { readTimeout = defaultBulkControlReadTimeout } writeTimeout := opt.WriteTimeout if writeTimeout < 0 { writeTimeout = defaultBulkControlWriteTimeout } return BulkOpenOptions{ ID: opt.ID, Range: normalizeBulkRange(opt.Range), Metadata: cloneBulkMetadata(opt.Metadata), ReadTimeout: readTimeout, WriteTimeout: writeTimeout, Mode: mode, Dedicated: mode == BulkOpenModeDedicated, ChunkSize: opt.ChunkSize, WindowBytes: opt.WindowBytes, MaxInFlight: opt.MaxInFlight, } } func normalizeBulkOpenMode(mode BulkOpenMode) BulkOpenMode { switch mode { case BulkOpenModeDefault, BulkOpenModeAuto, BulkOpenModeShared, BulkOpenModeDedicated: return mode default: return BulkOpenModeDefault } } func initialBulkDedicatedAttachState(dedicated bool) bulkDedicatedAttachState { if dedicated { return bulkDedicatedAttachStatePending } return bulkDedicatedAttachStateAttached } func bulkPayloadChunkCount(payloadLen int, chunkSize int) int { if payloadLen <= 0 { return 0 } if chunkSize <= 0 { chunkSize = defaultBulkChunkSize } return (payloadLen + chunkSize - 1) / chunkSize } func bulkDedicatedAttachStateName(state bulkDedicatedAttachState) string { switch state { case bulkDedicatedAttachStatePending: return "pending" case bulkDedicatedAttachStateAttached: return "attached" case bulkDedicatedAttachStateDegraded: return "degraded" case bulkDedicatedAttachStateClosed: return "closed" default: return "unknown" } } func bulkOpenModeName(mode BulkOpenMode) string { switch normalizeBulkOpenMode(mode) { case BulkOpenModeAuto: return "auto" case BulkOpenModeShared: return "shared" case BulkOpenModeDedicated: return "dedicated" case BulkOpenModeDefault: fallthrough default: return "default" } } func normalizeBulkRange(r BulkRange) BulkRange { if r.Offset < 0 { r.Offset = -1 } if r.Length < 0 { r.Length = -1 } return r } func validBulkRange(r BulkRange) bool { return r.Offset >= 0 && r.Length >= 0 } func cloneBulkMetadata(src BulkMetadata) BulkMetadata { if len(src) == 0 { return nil } dst := make(BulkMetadata, len(src)) for key, value := range src { dst[key] = value } return dst } func bulkResetError(err error) error { if err == nil { return errBulkReset } return err } func bulkResetMessage(err error) string { if err == nil { return "" } return err.Error() }