notify/bulk.go

1466 lines
33 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
"io"
"net"
"strings"
"sync"
"time"
)
const (
BulkOpenSignalKey = "notify.bulk.open"
BulkCloseSignalKey = "notify.bulk.close"
BulkResetSignalKey = "notify.bulk.reset"
BulkReleaseSignalKey = "notify.bulk.release"
defaultBulkChunkSize = 1024 * 1024
defaultBulkInboundQueueLimit = 256
defaultBulkInboundBytesLimit = 64 * 1024 * 1024
defaultBulkOpenWindowBytes = 16 * 1024 * 1024
defaultBulkOpenMaxInFlight = 32
defaultBulkControlReadTimeout = 0
defaultBulkControlWriteTimeout = 0
)
type BulkMetadata map[string]string
type BulkRange struct {
Offset int64
Length int64
}
type BulkOpenOptions struct {
ID string
Range BulkRange
Metadata BulkMetadata
ReadTimeout time.Duration
WriteTimeout time.Duration
Dedicated bool
ChunkSize int
WindowBytes int
MaxInFlight int
}
type BulkAcceptInfo struct {
ID string
Range BulkRange
Metadata BulkMetadata
Dedicated bool
LogicalConn *LogicalConn
TransportConn *TransportConn
TransportGeneration uint64
Bulk Bulk
}
type Bulk interface {
io.Reader
io.Writer
io.Closer
ID() string
Range() BulkRange
Metadata() BulkMetadata
Context() context.Context
LogicalConn() *LogicalConn
TransportConn() *TransportConn
TransportGeneration() uint64
CloseWrite() error
Reset(error) error
Snapshot() BulkSnapshot
}
var (
errBulkClientNil = errors.New("bulk client is nil")
errBulkServerNil = errors.New("bulk server is nil")
errBulkLogicalConnNil = errors.New("bulk logical connection is nil")
errBulkTransportNil = errors.New("bulk transport connection is nil")
errBulkRuntimeNil = errors.New("bulk runtime is nil")
errBulkIDEmpty = errors.New("bulk id is empty")
errBulkAlreadyExists = errors.New("bulk already exists")
errBulkNotFound = errors.New("bulk not found")
errBulkHandlerNotConfigured = errors.New("bulk handler is not configured")
errBulkRejected = errors.New("bulk open rejected")
errBulkReset = errors.New("bulk reset")
errBulkDataIDEmpty = errors.New("bulk data id is empty")
errBulkDataPathNotReady = errors.New("bulk data path is not implemented yet")
errBulkRangeInvalid = errors.New("bulk range is invalid")
errBulkBackpressureExceeded = errors.New("bulk inbound backpressure exceeded")
errBulkDedicatedStreamOnly = errors.New("dedicated bulk requires stream transport")
)
func clientDedicatedBulkSupportError(c *ClientCommon) error {
if c == nil {
return errBulkClientNil
}
if conn := c.clientTransportConnSnapshot(); conn != nil && isPacketTransportConn(conn) {
return errBulkDedicatedStreamOnly
}
if source := c.clientConnectSourceSnapshot(); source != nil && source.isUDP() {
return errBulkDedicatedStreamOnly
}
return nil
}
func logicalDedicatedBulkSupportError(logical *LogicalConn) error {
if logical == nil {
return errBulkLogicalConnNil
}
if transport := logical.CurrentTransportConn(); transport != nil {
return transportDedicatedBulkSupportError(transport)
}
if addr := logical.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) {
return errBulkDedicatedStreamOnly
}
return nil
}
func transportDedicatedBulkSupportError(transport *TransportConn) error {
if transport == nil {
return errBulkTransportNil
}
if !transport.UsesStreamTransport() {
return errBulkDedicatedStreamOnly
}
if addr := transport.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) {
return errBulkDedicatedStreamOnly
}
return nil
}
type bulkCloseSender func(context.Context, *bulkHandle, bool) error
type bulkResetSender func(context.Context, *bulkHandle, string) error
type bulkDataSender func(context.Context, *bulkHandle, []byte) error
type bulkWriteSender func(context.Context, *bulkHandle, []byte) (int, error)
type bulkReleaseSender func(*bulkHandle, int64, int) error
type bulkHandle struct {
runtime *bulkRuntime
runtimeScope string
id string
dataID uint64
outboundSeq uint64
rangeSpec BulkRange
metadata BulkMetadata
sessionEpoch uint64
client *ClientCommon
logical *LogicalConn
transport *TransportConn
transportGeneration uint64
readTimeout time.Duration
writeTimeout time.Duration
dedicated bool
dedicatedAttachToken string
chunkSize int
windowBytes int
maxInFlight int
inboundQueueLimit int
inboundBytesLimit int
closeFn bulkCloseSender
resetFn bulkResetSender
sendDataFn bulkDataSender
sendWriteFn bulkWriteSender
releaseFn bulkReleaseSender
ctx context.Context
cancel context.CancelFunc
createdAt time.Time
writeMu sync.Mutex
mu sync.Mutex
localClosed bool
localReadClosed bool
remoteClosed bool
peerReadClosed bool
resetErr error
readQueue [][]byte
readBuf []byte
bufferedBytes int
readNotify chan struct{}
flowNotify chan struct{}
pendingReleaseBytes int64
pendingReleaseChunks int
outboundAvailBytes int64
outboundInFlight int
bytesRead int64
bytesWritten int64
readCalls int64
writeCalls int64
lastReadAt time.Time
lastWriteAt time.Time
dedicatedMu sync.Mutex
dedicatedConn net.Conn
dedicatedSender *bulkDedicatedSender
dedicatedReady chan struct{}
dedicatedWriteClosed bool
acceptMu sync.Mutex
acceptDispatched bool
}
func newBulkHandle(parent context.Context, runtime *bulkRuntime, runtimeScope string, req BulkOpenRequest, sessionEpoch uint64, logical *LogicalConn, transport *TransportConn, transportGeneration uint64, closeFn bulkCloseSender, resetFn bulkResetSender, sendDataFn bulkDataSender, sendWriteFn bulkWriteSender, releaseFn bulkReleaseSender) *bulkHandle {
if parent == nil {
parent = context.Background()
}
ctx, cancel := context.WithCancel(parent)
if transportGeneration == 0 && transport != nil {
transportGeneration = transport.TransportGeneration()
}
if transportGeneration == 0 && logical != nil {
transportGeneration = logical.transportGenerationSnapshot()
}
req = normalizeBulkOpenRequest(req)
return &bulkHandle{
runtime: runtime,
runtimeScope: runtimeScope,
id: req.BulkID,
dataID: req.DataID,
rangeSpec: req.Range,
metadata: cloneBulkMetadata(req.Metadata),
sessionEpoch: sessionEpoch,
logical: logical,
transport: transport,
transportGeneration: transportGeneration,
readTimeout: req.ReadTimeout,
writeTimeout: req.WriteTimeout,
dedicated: req.Dedicated,
dedicatedAttachToken: req.AttachToken,
chunkSize: req.ChunkSize,
windowBytes: req.WindowBytes,
maxInFlight: req.MaxInFlight,
inboundQueueLimit: defaultBulkInboundQueueLimit,
inboundBytesLimit: defaultBulkInboundBytesLimit,
closeFn: closeFn,
resetFn: resetFn,
sendDataFn: sendDataFn,
sendWriteFn: sendWriteFn,
releaseFn: releaseFn,
ctx: ctx,
cancel: cancel,
createdAt: time.Now(),
readNotify: make(chan struct{}, 1),
flowNotify: make(chan struct{}, 1),
dedicatedReady: make(chan struct{}),
outboundAvailBytes: int64(req.WindowBytes),
}
}
func (b *bulkHandle) ID() string {
if b == nil {
return ""
}
return b.id
}
func (b *bulkHandle) Range() BulkRange {
if b == nil {
return BulkRange{}
}
return b.rangeSpec
}
func (b *bulkHandle) Metadata() BulkMetadata {
if b == nil {
return nil
}
return cloneBulkMetadata(b.metadata)
}
func (b *bulkHandle) Context() context.Context {
if b == nil || b.ctx == nil {
return context.Background()
}
return b.ctx
}
func (b *bulkHandle) LogicalConn() *LogicalConn {
if b == nil {
return nil
}
return b.logical
}
func (b *bulkHandle) TransportConn() *TransportConn {
if b == nil {
return nil
}
return b.transport
}
func (b *bulkHandle) TransportGeneration() uint64 {
if b == nil {
return 0
}
return b.transportGeneration
}
func (b *bulkHandle) Dedicated() bool {
if b == nil {
return false
}
return b.dedicated
}
func (b *bulkHandle) dedicatedAttachTokenSnapshot() string {
if b == nil {
return ""
}
b.mu.Lock()
defer b.mu.Unlock()
return b.dedicatedAttachToken
}
func (b *bulkHandle) setDedicatedAttachToken(token string) {
if b == nil {
return
}
b.mu.Lock()
b.dedicatedAttachToken = token
b.mu.Unlock()
}
func (b *bulkHandle) dedicatedConnSnapshot() net.Conn {
if b == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedConn
}
func (b *bulkHandle) dedicatedSenderSnapshot() *bulkDedicatedSender {
if b == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedSender
}
func (b *bulkHandle) installDedicatedSender(sender *bulkDedicatedSender) *bulkDedicatedSender {
if b == nil || sender == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
if b.dedicatedSender != nil {
return b.dedicatedSender
}
b.dedicatedSender = sender
return sender
}
func (b *bulkHandle) clearDedicatedSender() *bulkDedicatedSender {
if b == nil {
return nil
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
sender := b.dedicatedSender
b.dedicatedSender = nil
return sender
}
func (b *bulkHandle) dedicatedAttachedSnapshot() bool {
return b.dedicatedConnSnapshot() != nil
}
func (b *bulkHandle) waitDedicatedReady(ctx context.Context) error {
if b == nil || !b.Dedicated() || b.dedicatedAttachedSnapshot() {
return nil
}
if ctx == nil {
ctx = context.Background()
}
select {
case <-b.dedicatedReady:
return nil
case <-ctx.Done():
return ctx.Err()
case <-b.Context().Done():
if err := b.writeStateErrorSnapshot(); err != nil {
return err
}
return context.Canceled
}
}
func (b *bulkHandle) attachDedicatedConn(conn net.Conn) error {
if b == nil {
return io.ErrClosedPipe
}
if conn == nil {
return net.ErrClosed
}
b.dedicatedMu.Lock()
if b.dedicatedConn != nil {
b.dedicatedMu.Unlock()
return errors.New("bulk dedicated conn already attached")
}
b.dedicatedConn = conn
b.dedicatedWriteClosed = false
ready := b.dedicatedReady
b.dedicatedMu.Unlock()
if ready != nil {
select {
case <-ready:
default:
close(ready)
}
}
return nil
}
func (b *bulkHandle) bestEffortCloseDedicatedWriteHalf() {
if b == nil || !b.dedicated {
return
}
b.dedicatedMu.Lock()
conn := b.dedicatedConn
alreadyClosed := b.dedicatedWriteClosed
b.dedicatedMu.Unlock()
if conn == nil || alreadyClosed {
return
}
type closeWriter interface {
CloseWrite() error
}
if closeWriterConn, ok := conn.(closeWriter); ok {
if err := closeWriterConn.CloseWrite(); err == nil {
b.dedicatedMu.Lock()
if b.dedicatedConn == conn {
b.dedicatedWriteClosed = true
}
b.dedicatedMu.Unlock()
}
}
}
func (b *bulkHandle) dedicatedWriteHalfClosedSnapshot() bool {
if b == nil {
return false
}
b.dedicatedMu.Lock()
defer b.dedicatedMu.Unlock()
return b.dedicatedWriteClosed
}
func (b *bulkHandle) setClientSnapshotOwner(client *ClientCommon) {
if b == nil {
return
}
b.client = client
}
func (b *bulkHandle) clearDedicatedConn() net.Conn {
if b == nil {
return nil
}
b.dedicatedMu.Lock()
conn := b.dedicatedConn
b.dedicatedConn = nil
b.dedicatedWriteClosed = false
b.dedicatedMu.Unlock()
return conn
}
func (b *bulkHandle) markAcceptDispatched() bool {
if b == nil {
return false
}
b.acceptMu.Lock()
defer b.acceptMu.Unlock()
if b.acceptDispatched {
return false
}
b.acceptDispatched = true
return true
}
func (b *bulkHandle) SessionEpoch() uint64 {
if b == nil {
return 0
}
return b.sessionEpoch
}
func (b *bulkHandle) acceptsClientSessionEpoch(epoch uint64) bool {
if b == nil {
return false
}
if b.sessionEpoch == 0 || epoch == 0 {
return true
}
return b.sessionEpoch == epoch
}
func (b *bulkHandle) acceptsTransportGeneration(transport *TransportConn) bool {
if b == nil {
return false
}
if b.transportGeneration == 0 || transport == nil {
return true
}
return b.transportGeneration == transport.TransportGeneration()
}
func (b *bulkHandle) dataIDSnapshot() uint64 {
if b == nil {
return 0
}
return b.dataID
}
func (b *bulkHandle) nextOutboundDataSeq() uint64 {
if b == nil {
return 0
}
b.mu.Lock()
defer b.mu.Unlock()
b.outboundSeq++
return b.outboundSeq
}
func (b *bulkHandle) Read(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
if b == nil {
return 0, io.ErrClosedPipe
}
for {
b.mu.Lock()
localReadClosed := b.localReadClosed
if len(b.readBuf) > 0 {
n := copy(p, b.readBuf)
b.readBuf = b.readBuf[n:]
b.bufferedBytes -= n
if b.bufferedBytes < 0 {
b.bufferedBytes = 0
}
b.recordReadLocked(n, time.Now())
b.mu.Unlock()
b.maybeSendWindowRelease(n, false)
return n, nil
}
if len(b.readQueue) > 0 {
b.readBuf = b.readQueue[0]
b.readQueue[0] = nil
b.readQueue = b.readQueue[1:]
b.mu.Unlock()
continue
}
resetErr := b.resetErr
remoteClosed := b.remoteClosed
notify := b.readNotify
ctx := b.ctx
readTimeout := b.readTimeout
b.mu.Unlock()
if localReadClosed {
b.maybeSendWindowRelease(0, true)
return 0, io.ErrClosedPipe
}
if resetErr != nil {
b.maybeSendWindowRelease(0, true)
return 0, resetErr
}
if remoteClosed {
b.maybeSendWindowRelease(0, true)
return 0, io.EOF
}
if err := b.waitReadable(ctx, notify, readTimeout); err != nil {
b.maybeSendWindowRelease(0, true)
return 0, err
}
}
}
func (b *bulkHandle) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
if b == nil {
return 0, io.ErrClosedPipe
}
b.writeMu.Lock()
defer b.writeMu.Unlock()
b.mu.Lock()
resetErr := b.resetErr
localClosed := b.localClosed
peerReadClosed := b.peerReadClosed
sendDataFn := b.sendDataFn
sendWriteFn := b.sendWriteFn
chunkSize := b.chunkSize
writeTimeout := b.writeTimeout
bulkCtx := b.ctx
b.mu.Unlock()
if resetErr != nil {
return 0, resetErr
}
if localClosed || peerReadClosed {
return 0, io.ErrClosedPipe
}
if sendDataFn == nil {
return 0, errBulkDataPathNotReady
}
if b.dedicated && sendWriteFn != nil {
written := 0
for written < len(p) {
end := len(p)
if b.windowBytes > 0 && end-written > b.windowBytes {
end = written + b.windowBytes
}
part := p[written:end]
sendCtx, cancel, err := bulkWriteContext(bulkCtx, writeTimeout)
if err != nil {
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, err
}
if err := b.acquireOutboundWindow(sendCtx, len(part)); err != nil {
cancel()
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, b.normalizeWriteError(err)
}
partWritten, err := sendWriteFn(sendCtx, b, part)
cancel()
if partWritten < 0 {
partWritten = 0
}
if partWritten > len(part) {
partWritten = len(part)
}
if partWritten < len(part) {
b.rollbackOutboundWindow(len(part) - partWritten)
}
written += partWritten
if err != nil {
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, b.normalizeWriteError(err)
}
if partWritten != len(part) {
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, io.ErrShortWrite
}
}
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, nil
}
if chunkSize <= 0 {
chunkSize = defaultBulkChunkSize
}
written := 0
for written < len(p) {
end := written + chunkSize
if end > len(p) {
end = len(p)
}
chunk := p[written:end]
sendCtx, cancel, err := bulkWriteContext(bulkCtx, writeTimeout)
if err != nil {
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, err
}
if err := b.acquireOutboundWindow(sendCtx, len(chunk)); err != nil {
cancel()
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, b.normalizeWriteError(err)
}
err = sendDataFn(sendCtx, b, chunk)
cancel()
if err != nil {
b.rollbackOutboundWindow(len(chunk))
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, b.normalizeWriteError(err)
}
written = end
}
if written > 0 {
b.recordWrite(written, time.Now())
}
return written, nil
}
func (b *bulkHandle) Close() error {
return b.close(true)
}
func (b *bulkHandle) CloseWrite() error {
return b.close(false)
}
func (b *bulkHandle) close(full bool) error {
if b == nil {
return nil
}
b.writeMu.Lock()
defer b.writeMu.Unlock()
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
if b.localClosed {
if !full || b.localReadClosed {
b.mu.Unlock()
return nil
}
closeFn := b.closeFn
b.mu.Unlock()
if closeFn != nil && !b.dedicatedWriteHalfClosedSnapshot() {
if err := closeFn(context.Background(), b, true); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) {
return err
}
}
b.bestEffortCloseDedicatedWriteHalf()
b.mu.Lock()
if b.localReadClosed {
b.mu.Unlock()
return nil
}
b.localReadClosed = true
b.clearBufferedDataLocked()
shouldFinalize := b.shouldFinalizeLocked()
b.mu.Unlock()
b.notifyReadable()
if shouldFinalize {
b.finalize()
}
return nil
}
closeFn := b.closeFn
b.mu.Unlock()
if closeFn != nil {
if err := closeFn(context.Background(), b, full); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) {
return err
}
}
b.bestEffortCloseDedicatedWriteHalf()
b.mu.Lock()
if b.localClosed {
b.mu.Unlock()
return nil
}
b.localClosed = true
if full {
b.localReadClosed = true
b.clearBufferedDataLocked()
}
shouldFinalize := b.shouldFinalizeLocked()
b.mu.Unlock()
if full {
b.notifyReadable()
}
if shouldFinalize {
b.finalize()
}
return nil
}
func (b *bulkHandle) Reset(err error) error {
if b == nil {
return nil
}
resetErr := bulkResetError(err)
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
resetFn := b.resetFn
b.mu.Unlock()
if resetFn != nil {
if sendErr := resetFn(context.Background(), b, bulkResetMessage(resetErr)); sendErr != nil {
return sendErr
}
}
b.markReset(resetErr)
return nil
}
func (b *bulkHandle) Snapshot() BulkSnapshot {
return b.snapshot()
}
func (b *bulkHandle) markRemoteClosed() {
if b == nil {
return
}
b.mu.Lock()
b.remoteClosed = true
shouldFinalize := b.shouldFinalizeLocked()
b.mu.Unlock()
b.notifyReadable()
if shouldFinalize {
b.finalize()
}
}
func (b *bulkHandle) markPeerClosed() {
if b == nil {
return
}
b.mu.Lock()
b.remoteClosed = true
b.peerReadClosed = true
shouldFinalize := b.shouldFinalizeLocked()
b.notifyFlowLocked()
b.mu.Unlock()
b.notifyReadable()
if shouldFinalize {
b.finalize()
}
}
func (b *bulkHandle) markReset(err error) {
if b == nil {
return
}
b.mu.Lock()
if b.resetErr == nil {
b.resetErr = bulkResetError(err)
b.clearBufferedDataLocked()
}
b.notifyFlowLocked()
b.mu.Unlock()
b.notifyReadable()
b.finalize()
}
func (b *bulkHandle) pushChunk(chunk []byte) error {
return b.pushChunkWithOwnership(chunk, false)
}
func (b *bulkHandle) pushOwnedChunk(chunk []byte) error {
return b.pushChunkWithOwnership(chunk, true)
}
func (b *bulkHandle) pushOwnedChunkNoReset(chunk []byte) error {
return b.pushChunkWithOwnershipOptions(chunk, true, false)
}
func (b *bulkHandle) pushChunkWithOwnership(chunk []byte, owned bool) error {
return b.pushChunkWithOwnershipOptions(chunk, owned, true)
}
func (b *bulkHandle) pushChunkWithOwnershipOptions(chunk []byte, owned bool, resetOnOverflow bool) error {
if b == nil {
return io.ErrClosedPipe
}
if len(chunk) == 0 {
return nil
}
stored := chunk
if !owned {
stored = append([]byte(nil), chunk...)
}
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
if b.inboundQueueLimit > 0 && b.bufferedChunkCountLocked() >= b.inboundQueueLimit {
if !resetOnOverflow {
b.mu.Unlock()
return errBulkBackpressureExceeded
}
err := b.markResetLocked(errBulkBackpressureExceeded)
b.mu.Unlock()
b.notifyReadable()
b.finalize()
return err
}
if b.inboundBytesLimit > 0 && b.bufferedBytes+len(stored) > b.inboundBytesLimit {
if !resetOnOverflow {
b.mu.Unlock()
return errBulkBackpressureExceeded
}
err := b.markResetLocked(errBulkBackpressureExceeded)
b.mu.Unlock()
b.notifyReadable()
b.finalize()
return err
}
b.readQueue = append(b.readQueue, stored)
b.bufferedBytes += len(stored)
b.notifyReadableLocked()
b.mu.Unlock()
return nil
}
func (b *bulkHandle) markResetLocked(err error) error {
if b == nil {
return io.ErrClosedPipe
}
if b.resetErr == nil {
b.resetErr = bulkResetError(err)
b.clearBufferedDataLocked()
}
return b.resetErr
}
func (b *bulkHandle) clearBufferedDataLocked() {
if b == nil {
return
}
for i := range b.readQueue {
b.readQueue[i] = nil
}
b.readQueue = nil
b.readBuf = nil
b.bufferedBytes = 0
}
func (b *bulkHandle) flowControlEnabled() bool {
if b == nil {
return false
}
return b.releaseFn != nil && (b.windowBytes > 0 || b.maxInFlight > 0)
}
func (b *bulkHandle) releaseThresholdBytes() int64 {
if b == nil {
return int64(defaultBulkChunkSize)
}
threshold := b.chunkSize
if threshold <= 0 {
threshold = defaultBulkChunkSize
}
if b.windowBytes > 0 && threshold > b.windowBytes {
threshold = b.windowBytes
}
if threshold <= 0 {
threshold = defaultBulkChunkSize
}
return int64(threshold)
}
func (b *bulkHandle) maybeSendWindowRelease(consumed int, force bool) {
if b == nil || !b.flowControlEnabled() {
return
}
var (
bytes int64
chunks int
release bulkReleaseSender
)
b.mu.Lock()
if consumed > 0 {
b.pendingReleaseBytes += int64(consumed)
b.pendingReleaseChunks++
}
if !force && b.pendingReleaseBytes < b.releaseThresholdBytes() {
b.mu.Unlock()
return
}
bytes = b.pendingReleaseBytes
chunks = b.pendingReleaseChunks
release = b.releaseFn
b.pendingReleaseBytes = 0
b.pendingReleaseChunks = 0
b.mu.Unlock()
if release != nil && (bytes > 0 || chunks > 0) {
_ = release(b, bytes, chunks)
}
}
func (b *bulkHandle) acquireOutboundWindow(ctx context.Context, size int) error {
if b == nil || size <= 0 || !b.flowControlEnabled() {
return nil
}
if ctx == nil {
ctx = context.Background()
}
need := int64(size)
for {
b.mu.Lock()
if b.resetErr != nil {
err := b.resetErr
b.mu.Unlock()
return err
}
if b.localClosed || b.peerReadClosed {
b.mu.Unlock()
return io.ErrClosedPipe
}
bytesOK := true
if b.windowBytes > 0 {
bytesOK = b.outboundAvailBytes >= need
if !bytesOK && need > int64(b.windowBytes) && b.outboundInFlight == 0 {
bytesOK = true
}
}
chunksOK := true
if b.maxInFlight > 0 {
chunksOK = b.outboundInFlight < b.maxInFlight
}
if bytesOK && chunksOK {
if b.windowBytes > 0 {
b.outboundAvailBytes -= need
}
if b.maxInFlight > 0 {
b.outboundInFlight++
}
b.mu.Unlock()
return nil
}
notify := b.flowNotify
b.mu.Unlock()
select {
case <-notify:
case <-ctx.Done():
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
return stateErr
}
return normalizeStreamDeadlineError(ctx.Err())
}
}
}
func (b *bulkHandle) rollbackOutboundWindow(size int) {
if b == nil || size <= 0 || !b.flowControlEnabled() {
return
}
b.mu.Lock()
if b.windowBytes > 0 {
b.outboundAvailBytes += int64(size)
maxAvail := int64(b.windowBytes)
if b.outboundAvailBytes > maxAvail {
b.outboundAvailBytes = maxAvail
}
}
if b.maxInFlight > 0 && b.outboundInFlight > 0 {
b.outboundInFlight--
}
b.notifyFlowLocked()
b.mu.Unlock()
}
func (b *bulkHandle) releaseOutboundWindow(bytes int64, chunks int) {
if b == nil || !b.flowControlEnabled() {
return
}
b.mu.Lock()
if b.windowBytes > 0 && bytes > 0 {
b.outboundAvailBytes += bytes
maxAvail := int64(b.windowBytes)
if b.outboundAvailBytes > maxAvail {
b.outboundAvailBytes = maxAvail
}
}
if b.maxInFlight > 0 && chunks > 0 {
b.outboundInFlight -= chunks
if b.outboundInFlight < 0 {
b.outboundInFlight = 0
}
}
b.notifyFlowLocked()
b.mu.Unlock()
}
func (b *bulkHandle) bufferedChunkCountLocked() int {
if b == nil {
return 0
}
count := len(b.readQueue)
if len(b.readBuf) > 0 {
count++
}
return count
}
func (b *bulkHandle) shouldFinalizeLocked() bool {
if b == nil {
return true
}
if b.resetErr != nil {
return true
}
if b.dedicated {
return (b.peerReadClosed && b.remoteClosed) || (b.localClosed && b.remoteClosed)
}
return b.localReadClosed || (b.peerReadClosed && b.remoteClosed) || (b.localClosed && b.remoteClosed)
}
func (b *bulkHandle) snapshot() BulkSnapshot {
if b == nil {
return BulkSnapshot{}
}
dedicatedAttached := b.dedicatedAttachedSnapshot()
b.mu.Lock()
defer b.mu.Unlock()
snapshot := BulkSnapshot{
ID: b.id,
DataID: b.dataID,
Scope: normalizeFileScope(b.runtimeScope),
Range: b.rangeSpec,
Metadata: cloneBulkMetadata(b.metadata),
Dedicated: b.dedicated,
DedicatedAttached: dedicatedAttached,
SessionEpoch: b.sessionEpoch,
TransportGeneration: b.transportGeneration,
LocalClosed: b.localClosed,
LocalReadClosed: b.localReadClosed,
RemoteClosed: b.remoteClosed,
PeerReadClosed: b.peerReadClosed,
BufferedChunks: b.bufferedChunkCountLocked(),
BufferedBytes: b.bufferedBytes,
ReadTimeout: b.readTimeout,
WriteTimeout: b.writeTimeout,
ChunkSize: b.chunkSize,
WindowBytes: b.windowBytes,
MaxInFlight: b.maxInFlight,
BytesRead: b.bytesRead,
BytesWritten: b.bytesWritten,
ReadCalls: b.readCalls,
WriteCalls: b.writeCalls,
OpenedAt: b.createdAt,
LastReadAt: b.lastReadAt,
LastWriteAt: b.lastWriteAt,
}
if b.logical != nil {
snapshot.LogicalClientID = b.logical.ID()
}
if b.resetErr != nil {
snapshot.ResetError = b.resetErr.Error()
}
var diag snapshotBindingDiagnostics
switch {
case b.logical != nil || b.transport != nil:
diag = snapshotBindingDiagnosticsFromLogical(b.logical, b.transport, b.transportGeneration)
case b.client != nil:
diag = snapshotBindingDiagnosticsFromClient(b.client, b.sessionEpoch)
}
snapshot.BindingOwner = diag.BindingOwner
snapshot.BindingAlive = diag.BindingAlive
snapshot.BindingCurrent = diag.BindingCurrent
snapshot.BindingReason = diag.BindingReason
snapshot.BindingError = diag.BindingError
snapshot.TransportAttached = diag.TransportAttached
snapshot.TransportHasRuntimeConn = diag.TransportHasRuntimeConn
snapshot.TransportCurrent = diag.TransportCurrent
snapshot.TransportDetachReason = diag.TransportDetachReason
snapshot.TransportDetachKind = diag.TransportDetachKind
snapshot.TransportDetachGeneration = diag.TransportDetachGeneration
snapshot.TransportDetachError = diag.TransportDetachError
snapshot.TransportDetachedAt = diag.TransportDetachedAt
snapshot.ReattachEligible = diag.ReattachEligible
return snapshot
}
func (b *bulkHandle) finalize() {
if b == nil {
return
}
b.maybeSendWindowRelease(0, true)
if b.cancel != nil {
b.cancel()
}
if sender := b.clearDedicatedSender(); sender != nil {
sender.stop()
}
if conn := b.clearDedicatedConn(); conn != nil {
_ = conn.Close()
}
if b.runtime != nil {
b.runtime.remove(b.runtimeScope, b.id)
}
}
func (b *bulkHandle) recordReadLocked(n int, now time.Time) {
if b == nil || n <= 0 {
return
}
b.bytesRead += int64(n)
b.readCalls++
b.lastReadAt = now
}
func (b *bulkHandle) recordWrite(n int, now time.Time) {
if b == nil || n <= 0 {
return
}
b.mu.Lock()
b.bytesWritten += int64(n)
b.writeCalls++
b.lastWriteAt = now
b.mu.Unlock()
}
func (b *bulkHandle) waitReadable(ctx context.Context, notify <-chan struct{}, timeout time.Duration) error {
if ctx == nil {
ctx = context.Background()
}
deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{})
if deadline.IsZero() {
select {
case <-notify:
return nil
case <-ctx.Done():
if resetErr := b.resetErrSnapshot(); resetErr != nil {
return resetErr
}
if b.localReadClosedSnapshot() {
return io.ErrClosedPipe
}
if b.remoteClosedSnapshot() {
return nil
}
return ctx.Err()
}
}
if !deadline.After(time.Now()) {
return normalizeStreamDeadlineError(context.DeadlineExceeded)
}
timer := time.NewTimer(time.Until(deadline))
defer timer.Stop()
select {
case <-notify:
return nil
case <-ctx.Done():
if resetErr := b.resetErrSnapshot(); resetErr != nil {
return resetErr
}
if b.localReadClosedSnapshot() {
return io.ErrClosedPipe
}
if b.remoteClosedSnapshot() {
return nil
}
return normalizeStreamDeadlineError(ctx.Err())
case <-timer.C:
return normalizeStreamDeadlineError(context.DeadlineExceeded)
}
}
func (b *bulkHandle) resetErrSnapshot() error {
if b == nil {
return io.ErrClosedPipe
}
b.mu.Lock()
defer b.mu.Unlock()
return b.resetErr
}
func (b *bulkHandle) remoteClosedSnapshot() bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
return b.remoteClosed
}
func (b *bulkHandle) localClosedSnapshot() bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
return b.localClosed
}
func (b *bulkHandle) localReadClosedSnapshot() bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
return b.localReadClosed
}
func (b *bulkHandle) writeStateErrorSnapshot() error {
if b == nil {
return io.ErrClosedPipe
}
b.mu.Lock()
defer b.mu.Unlock()
if b.resetErr != nil {
return b.resetErr
}
if b.localClosed || b.peerReadClosed {
return io.ErrClosedPipe
}
return nil
}
func (b *bulkHandle) notifyReadable() {
if b == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
b.notifyReadableLocked()
}
func (b *bulkHandle) notifyReadableLocked() {
if b == nil || b.readNotify == nil {
return
}
select {
case b.readNotify <- struct{}{}:
default:
}
}
func (b *bulkHandle) notifyFlowLocked() {
if b == nil || b.flowNotify == nil {
return
}
select {
case b.flowNotify <- struct{}{}:
default:
}
}
func (b *bulkHandle) normalizeWriteError(err error) error {
if err == nil {
return nil
}
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
return stateErr
}
return normalizeStreamDeadlineError(err)
}
func (b *bulkHandle) canIgnoreDedicatedCloseSendError(err error) bool {
if b == nil || !b.dedicated || err == nil {
return false
}
b.mu.Lock()
defer b.mu.Unlock()
if !(b.ctx.Err() != nil || b.remoteClosed || b.peerReadClosed || b.localClosed) {
return false
}
if errors.Is(err, errTransportDetached) || errors.Is(err, net.ErrClosed) || errors.Is(err, io.ErrClosedPipe) {
return true
}
message := strings.ToLower(err.Error())
return strings.Contains(message, "broken pipe") || strings.Contains(message, "use of closed network connection")
}
func bulkWriteContext(parent context.Context, timeout time.Duration) (context.Context, func(), error) {
if parent == nil {
parent = context.Background()
}
deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{})
if !deadline.IsZero() && !deadline.After(time.Now()) {
return nil, func() {}, normalizeStreamDeadlineError(context.DeadlineExceeded)
}
if deadline.IsZero() {
ctx, cancel := context.WithCancel(parent)
return ctx, cancel, nil
}
ctx, cancel := context.WithDeadline(parent, deadline)
return ctx, cancel, nil
}
func normalizeBulkOpenRequest(req BulkOpenRequest) BulkOpenRequest {
req.Range = normalizeBulkRange(req.Range)
req.Metadata = cloneBulkMetadata(req.Metadata)
if req.ChunkSize <= 0 {
req.ChunkSize = defaultBulkChunkSize
}
if req.WindowBytes <= 0 {
req.WindowBytes = defaultBulkOpenWindowBytes
}
if req.MaxInFlight <= 0 {
req.MaxInFlight = defaultBulkOpenMaxInFlight
}
if req.ReadTimeout < 0 {
req.ReadTimeout = defaultBulkControlReadTimeout
}
if req.WriteTimeout < 0 {
req.WriteTimeout = defaultBulkControlWriteTimeout
}
return req
}
func normalizeBulkOpenOptions(opt BulkOpenOptions) BulkOpenOptions {
req := normalizeBulkOpenRequest(BulkOpenRequest{
BulkID: opt.ID,
Range: opt.Range,
Metadata: opt.Metadata,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
Dedicated: opt.Dedicated,
ChunkSize: opt.ChunkSize,
WindowBytes: opt.WindowBytes,
MaxInFlight: opt.MaxInFlight,
})
return BulkOpenOptions{
ID: req.BulkID,
Range: req.Range,
Metadata: req.Metadata,
ReadTimeout: req.ReadTimeout,
WriteTimeout: req.WriteTimeout,
Dedicated: req.Dedicated,
ChunkSize: req.ChunkSize,
WindowBytes: req.WindowBytes,
MaxInFlight: req.MaxInFlight,
}
}
func normalizeBulkRange(r BulkRange) BulkRange {
if r.Offset < 0 {
r.Offset = -1
}
if r.Length < 0 {
r.Length = -1
}
return r
}
func validBulkRange(r BulkRange) bool {
return r.Offset >= 0 && r.Length >= 0
}
func cloneBulkMetadata(src BulkMetadata) BulkMetadata {
if len(src) == 0 {
return nil
}
dst := make(BulkMetadata, len(src))
for key, value := range src {
dst[key] = value
}
return dst
}
func bulkResetError(err error) error {
if err == nil {
return errBulkReset
}
return err
}
func bulkResetMessage(err error) string {
if err == nil {
return ""
}
return err.Error()
}