- 新增 managed/external/nested 三种传输保护模式 - 新增 peer attach 显式认证、抗重放、channel binding 和可选前向保密协商 - 明确单连接注入与可重拨连接源的语义边界 - 禁止 ConnectByConn 场景下 dedicated bulk 走 sidecar,auto 模式自动回退 shared - 修正 dedicated attach 在 bootstrap/steady profile 切换下的处理逻辑 - 优化 shared bulk super-batch 与批量 framed write 路径 - 降低 stream/bulk fast path 的复制和分发损耗 - 补齐 benchmark、回归测试、运行时快照和 README 文档
2451 lines
56 KiB
Go
2451 lines
56 KiB
Go
package notify
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
BulkOpenSignalKey = "notify.bulk.open"
|
|
BulkCloseSignalKey = "notify.bulk.close"
|
|
BulkResetSignalKey = "notify.bulk.reset"
|
|
BulkReadySignalKey = "notify.bulk.ready"
|
|
BulkReleaseSignalKey = "notify.bulk.release"
|
|
|
|
defaultBulkChunkSize = 1024 * 1024
|
|
defaultBulkInboundQueueLimit = 256
|
|
defaultBulkInboundBytesLimit = 64 * 1024 * 1024
|
|
defaultBulkOpenWindowBytes = 16 * 1024 * 1024
|
|
defaultBulkOpenMaxInFlight = 32
|
|
defaultBulkControlReadTimeout = 0
|
|
defaultBulkControlWriteTimeout = 0
|
|
defaultBulkAcceptReadyTimeout = 10 * time.Second
|
|
)
|
|
|
|
type BulkMetadata map[string]string
|
|
|
|
type BulkRange struct {
|
|
Offset int64
|
|
Length int64
|
|
}
|
|
|
|
type BulkOpenMode uint8
|
|
|
|
const (
|
|
// BulkOpenModeDefault keeps legacy behavior:
|
|
// Dedicated=true -> dedicated, otherwise shared.
|
|
BulkOpenModeDefault BulkOpenMode = iota
|
|
// BulkOpenModeAuto prefers dedicated and falls back to shared.
|
|
BulkOpenModeAuto
|
|
// BulkOpenModeShared forces shared transport path.
|
|
BulkOpenModeShared
|
|
// BulkOpenModeDedicated forces dedicated transport path.
|
|
BulkOpenModeDedicated
|
|
)
|
|
|
|
type BulkNetworkProfile uint8
|
|
|
|
const (
|
|
// BulkNetworkProfileDefault keeps legacy defaults.
|
|
BulkNetworkProfileDefault BulkNetworkProfile = iota
|
|
// BulkNetworkProfileLAN is optimized for low-latency/local links.
|
|
BulkNetworkProfileLAN
|
|
// BulkNetworkProfileWAN is tuned for moderate RTT and occasional loss.
|
|
BulkNetworkProfileWAN
|
|
// BulkNetworkProfileConstrained is tuned for low bandwidth and unstable links.
|
|
BulkNetworkProfileConstrained
|
|
)
|
|
|
|
type BulkDedicatedAttachConfig struct {
|
|
// AttachLimit limits concurrent dedicated attach handshakes per client session.
|
|
// 0 means unlimited.
|
|
AttachLimit int
|
|
// ActiveLimit limits active logical dedicated bulks per client session.
|
|
// Physical sidecars remain bounded by LaneLimit.
|
|
// 0 means unlimited.
|
|
ActiveLimit int
|
|
// LaneLimit limits dedicated physical sidecar lanes per client session.
|
|
// 0 means unlimited.
|
|
LaneLimit int
|
|
// Retry controls extra retries after the first attach attempt.
|
|
Retry int
|
|
// Backoff is the base retry backoff.
|
|
Backoff time.Duration
|
|
// DialTimeout is used for dedicated sidecar dialing.
|
|
DialTimeout time.Duration
|
|
// HelloTimeout is used for dedicated attach request/response handshake.
|
|
HelloTimeout time.Duration
|
|
}
|
|
|
|
type BulkOpenTuning struct {
|
|
ChunkSize int
|
|
WindowBytes int
|
|
MaxInFlight int
|
|
}
|
|
|
|
type bulkDedicatedAttachState uint8
|
|
|
|
const (
|
|
bulkDedicatedAttachStatePending bulkDedicatedAttachState = iota
|
|
bulkDedicatedAttachStateAttached
|
|
bulkDedicatedAttachStateDegraded
|
|
bulkDedicatedAttachStateClosed
|
|
)
|
|
|
|
type BulkOpenOptions struct {
|
|
ID string
|
|
Range BulkRange
|
|
Metadata BulkMetadata
|
|
ReadTimeout time.Duration
|
|
WriteTimeout time.Duration
|
|
Mode BulkOpenMode
|
|
// Deprecated: Dedicated is kept for backward compatibility.
|
|
// Prefer Mode.
|
|
Dedicated bool
|
|
|
|
ChunkSize int
|
|
WindowBytes int
|
|
MaxInFlight int
|
|
}
|
|
|
|
type BulkAcceptInfo struct {
|
|
ID string
|
|
Range BulkRange
|
|
Metadata BulkMetadata
|
|
Dedicated bool
|
|
LogicalConn *LogicalConn
|
|
TransportConn *TransportConn
|
|
TransportGeneration uint64
|
|
Bulk Bulk
|
|
}
|
|
|
|
type Bulk interface {
|
|
io.Reader
|
|
io.Writer
|
|
io.Closer
|
|
|
|
ID() string
|
|
Range() BulkRange
|
|
Metadata() BulkMetadata
|
|
Context() context.Context
|
|
|
|
LogicalConn() *LogicalConn
|
|
TransportConn() *TransportConn
|
|
TransportGeneration() uint64
|
|
|
|
CloseWrite() error
|
|
Reset(error) error
|
|
Snapshot() BulkSnapshot
|
|
}
|
|
|
|
var (
|
|
errBulkClientNil = errors.New("bulk client is nil")
|
|
errBulkServerNil = errors.New("bulk server is nil")
|
|
errBulkLogicalConnNil = errors.New("bulk logical connection is nil")
|
|
errBulkTransportNil = errors.New("bulk transport connection is nil")
|
|
errBulkRuntimeNil = errors.New("bulk runtime is nil")
|
|
errBulkIDEmpty = errors.New("bulk id is empty")
|
|
errBulkAlreadyExists = errors.New("bulk already exists")
|
|
errBulkNotFound = errors.New("bulk not found")
|
|
errBulkHandlerNotConfigured = errors.New("bulk handler is not configured")
|
|
errBulkRejected = errors.New("bulk open rejected")
|
|
errBulkReset = errors.New("bulk reset")
|
|
errBulkDataIDEmpty = errors.New("bulk data id is empty")
|
|
errBulkDataPathNotReady = errors.New("bulk data path is not implemented yet")
|
|
errBulkRangeInvalid = errors.New("bulk range is invalid")
|
|
errBulkBackpressureExceeded = errors.New("bulk inbound backpressure exceeded")
|
|
errBulkDedicatedStreamOnly = errors.New("dedicated bulk requires stream transport")
|
|
errBulkDedicatedSingleConn = errors.New("dedicated bulk requires a dialable additional connection source; ConnectByConn only supports shared transport")
|
|
errBulkDedicatedActiveLimit = errors.New("dedicated bulk active limit reached")
|
|
)
|
|
|
|
func clientDedicatedBulkSupportError(c *ClientCommon) error {
|
|
if c == nil {
|
|
return errBulkClientNil
|
|
}
|
|
if conn := c.clientTransportConnSnapshot(); conn != nil && isPacketTransportConn(conn) {
|
|
return errBulkDedicatedStreamOnly
|
|
}
|
|
if source := c.clientConnectSourceSnapshot(); source != nil && source.isUDP() {
|
|
return errBulkDedicatedStreamOnly
|
|
}
|
|
if source := c.clientConnectSourceSnapshot(); source != nil && !source.supportsAdditionalConn() {
|
|
return errBulkDedicatedSingleConn
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func logicalDedicatedBulkSupportError(logical *LogicalConn) error {
|
|
if logical == nil {
|
|
return errBulkLogicalConnNil
|
|
}
|
|
if transport := logical.CurrentTransportConn(); transport != nil {
|
|
return transportDedicatedBulkSupportError(transport)
|
|
}
|
|
if addr := logical.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) {
|
|
return errBulkDedicatedStreamOnly
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func transportDedicatedBulkSupportError(transport *TransportConn) error {
|
|
if transport == nil {
|
|
return errBulkTransportNil
|
|
}
|
|
if !transport.UsesStreamTransport() {
|
|
return errBulkDedicatedStreamOnly
|
|
}
|
|
if addr := transport.RemoteAddr(); addr != nil && isPacketNetwork(addr.Network()) {
|
|
return errBulkDedicatedStreamOnly
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type bulkCloseSender func(context.Context, *bulkHandle, bool) error
|
|
type bulkResetSender func(context.Context, *bulkHandle, string) error
|
|
type bulkDataSender func(context.Context, *bulkHandle, []byte) error
|
|
type bulkWriteSender func(context.Context, *bulkHandle, uint64, []byte, bool) (int, error)
|
|
type bulkReleaseSender func(*bulkHandle, int64, int) error
|
|
|
|
type bulkAsyncWriteRequest struct {
|
|
startSeq uint64
|
|
payload []byte
|
|
chunks int
|
|
}
|
|
|
|
var bulkAsyncWritePayloadPool sync.Pool
|
|
|
|
type bulkReadChunk struct {
|
|
data []byte
|
|
release func()
|
|
}
|
|
|
|
func (c *bulkReadChunk) clear() {
|
|
if c == nil {
|
|
return
|
|
}
|
|
if c.release != nil {
|
|
c.release()
|
|
}
|
|
c.data = nil
|
|
c.release = nil
|
|
}
|
|
|
|
type bulkReadPayloadOwner struct {
|
|
refs atomic.Int32
|
|
release func()
|
|
}
|
|
|
|
func newBulkReadPayloadOwner(release func()) *bulkReadPayloadOwner {
|
|
if release == nil {
|
|
return nil
|
|
}
|
|
owner := &bulkReadPayloadOwner{release: release}
|
|
owner.refs.Store(1)
|
|
return owner
|
|
}
|
|
|
|
func (o *bulkReadPayloadOwner) retainChunk() func() {
|
|
if o == nil {
|
|
return nil
|
|
}
|
|
o.refs.Add(1)
|
|
return o.releaseChunk
|
|
}
|
|
|
|
func (o *bulkReadPayloadOwner) releaseChunk() {
|
|
if o == nil {
|
|
return
|
|
}
|
|
if o.refs.Add(-1) == 0 && o.release != nil {
|
|
o.release()
|
|
}
|
|
}
|
|
|
|
func (o *bulkReadPayloadOwner) done() {
|
|
if o == nil {
|
|
return
|
|
}
|
|
o.releaseChunk()
|
|
}
|
|
|
|
type bulkHandle struct {
|
|
runtime *bulkRuntime
|
|
runtimeScope string
|
|
id string
|
|
dataID uint64
|
|
fastPathVersion uint8
|
|
outboundSeq uint64
|
|
rangeSpec BulkRange
|
|
metadata BulkMetadata
|
|
sessionEpoch uint64
|
|
client *ClientCommon
|
|
logical *LogicalConn
|
|
transport *TransportConn
|
|
transportGeneration uint64
|
|
readTimeout time.Duration
|
|
writeTimeout time.Duration
|
|
dedicated bool
|
|
dedicatedLaneID uint32
|
|
dedicatedAttachToken string
|
|
chunkSize int
|
|
windowBytes int
|
|
maxInFlight int
|
|
inboundQueueLimit int
|
|
inboundBytesLimit int
|
|
closeFn bulkCloseSender
|
|
resetFn bulkResetSender
|
|
sendDataFn bulkDataSender
|
|
sendWriteFn bulkWriteSender
|
|
releaseFn bulkReleaseSender
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
writeCtx context.Context
|
|
writeCtxCancel context.CancelFunc
|
|
createdAt time.Time
|
|
|
|
writeMu sync.Mutex
|
|
mu sync.Mutex
|
|
|
|
writeQueue chan bulkAsyncWriteRequest
|
|
writeWorkerDone chan struct{}
|
|
writeDrain chan struct{}
|
|
pendingAsyncWrites int
|
|
localClosed bool
|
|
localReadClosed bool
|
|
remoteClosed bool
|
|
peerReadClosed bool
|
|
resetErr error
|
|
readQueue []bulkReadChunk
|
|
readBuf bulkReadChunk
|
|
bufferedBytes int
|
|
readNotify chan struct{}
|
|
flowNotify chan struct{}
|
|
writeStateDone chan struct{}
|
|
writeStateClosed bool
|
|
releaseNotify chan struct{}
|
|
releaseWorkerDone chan struct{}
|
|
pendingReleaseBytes int64
|
|
pendingReleaseChunks int
|
|
outboundAvailBytes int64
|
|
outboundInFlight int
|
|
bytesRead int64
|
|
bytesWritten int64
|
|
readCalls int64
|
|
writeCalls int64
|
|
lastReadAt time.Time
|
|
lastWriteAt time.Time
|
|
|
|
dedicatedMu sync.Mutex
|
|
dedicatedConn net.Conn
|
|
dedicatedConnOwned bool
|
|
dedicatedSender *bulkDedicatedSender
|
|
dedicatedReady chan struct{}
|
|
dedicatedWriteClosed bool
|
|
dedicatedActiveLease bool
|
|
dedicatedState bulkDedicatedAttachState
|
|
dedicatedAttempts uint32
|
|
dedicatedLastCode string
|
|
dedicatedDataStarted bool
|
|
|
|
acceptMu sync.Mutex
|
|
acceptDispatched bool
|
|
acceptReady chan struct{}
|
|
acceptReadyDone bool
|
|
acceptReadyErr error
|
|
acceptNotifyFn func(error)
|
|
acceptNotifySent bool
|
|
}
|
|
|
|
func newBulkHandle(parent context.Context, runtime *bulkRuntime, runtimeScope string, req BulkOpenRequest, sessionEpoch uint64, logical *LogicalConn, transport *TransportConn, transportGeneration uint64, closeFn bulkCloseSender, resetFn bulkResetSender, sendDataFn bulkDataSender, sendWriteFn bulkWriteSender, releaseFn bulkReleaseSender) *bulkHandle {
|
|
if parent == nil {
|
|
parent = context.Background()
|
|
}
|
|
ctx, cancel := context.WithCancel(parent)
|
|
if transportGeneration == 0 && transport != nil {
|
|
transportGeneration = transport.TransportGeneration()
|
|
}
|
|
if transportGeneration == 0 && logical != nil {
|
|
transportGeneration = logical.transportGenerationSnapshot()
|
|
}
|
|
req = normalizeBulkOpenRequest(req)
|
|
handle := &bulkHandle{
|
|
runtime: runtime,
|
|
runtimeScope: runtimeScope,
|
|
id: req.BulkID,
|
|
dataID: req.DataID,
|
|
fastPathVersion: normalizeBulkFastPathVersion(req.FastPathVersion),
|
|
rangeSpec: req.Range,
|
|
metadata: cloneBulkMetadata(req.Metadata),
|
|
sessionEpoch: sessionEpoch,
|
|
logical: logical,
|
|
transport: transport,
|
|
transportGeneration: transportGeneration,
|
|
readTimeout: req.ReadTimeout,
|
|
writeTimeout: req.WriteTimeout,
|
|
dedicated: req.Dedicated,
|
|
dedicatedLaneID: req.DedicatedLaneID,
|
|
dedicatedAttachToken: req.AttachToken,
|
|
chunkSize: req.ChunkSize,
|
|
windowBytes: req.WindowBytes,
|
|
maxInFlight: req.MaxInFlight,
|
|
inboundQueueLimit: defaultBulkInboundQueueLimit,
|
|
inboundBytesLimit: defaultBulkInboundBytesLimit,
|
|
closeFn: closeFn,
|
|
resetFn: resetFn,
|
|
sendDataFn: sendDataFn,
|
|
sendWriteFn: sendWriteFn,
|
|
releaseFn: releaseFn,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
createdAt: time.Now(),
|
|
readNotify: make(chan struct{}, 1),
|
|
flowNotify: make(chan struct{}, 1),
|
|
writeStateDone: make(chan struct{}),
|
|
dedicatedReady: make(chan struct{}),
|
|
dedicatedState: initialBulkDedicatedAttachState(req.Dedicated),
|
|
acceptReady: make(chan struct{}),
|
|
outboundAvailBytes: int64(req.WindowBytes),
|
|
}
|
|
drain := make(chan struct{})
|
|
close(drain)
|
|
handle.writeDrain = drain
|
|
if sendWriteFn != nil {
|
|
handle.writeCtx, handle.writeCtxCancel = context.WithCancel(ctx)
|
|
handle.writeQueue = make(chan bulkAsyncWriteRequest, bulkAsyncWriteQueueSize(req.MaxInFlight))
|
|
handle.writeWorkerDone = make(chan struct{})
|
|
go func(parentDone <-chan struct{}, writeDone <-chan struct{}, cancel context.CancelFunc) {
|
|
select {
|
|
case <-parentDone:
|
|
case <-writeDone:
|
|
cancel()
|
|
}
|
|
}(ctx.Done(), handle.writeStateDone, handle.writeCtxCancel)
|
|
go handle.runAsyncWriteLoop()
|
|
}
|
|
if handle.flowControlEnabled() {
|
|
handle.releaseNotify = make(chan struct{}, 1)
|
|
handle.releaseWorkerDone = make(chan struct{})
|
|
go handle.runWindowReleaseLoop()
|
|
}
|
|
return handle
|
|
}
|
|
|
|
func (b *bulkHandle) ID() string {
|
|
if b == nil {
|
|
return ""
|
|
}
|
|
return b.id
|
|
}
|
|
|
|
func (b *bulkHandle) fastPathVersionSnapshot() uint8 {
|
|
if b == nil {
|
|
return bulkFastPathVersionV1
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return normalizeBulkFastPathVersion(b.fastPathVersion)
|
|
}
|
|
|
|
func (b *bulkHandle) FastPathVersion() uint8 {
|
|
return b.fastPathVersionSnapshot()
|
|
}
|
|
|
|
func (b *bulkHandle) Range() BulkRange {
|
|
if b == nil {
|
|
return BulkRange{}
|
|
}
|
|
return b.rangeSpec
|
|
}
|
|
|
|
func (b *bulkHandle) Metadata() BulkMetadata {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
return cloneBulkMetadata(b.metadata)
|
|
}
|
|
|
|
func (b *bulkHandle) Context() context.Context {
|
|
if b == nil || b.ctx == nil {
|
|
return context.Background()
|
|
}
|
|
return b.ctx
|
|
}
|
|
|
|
func (b *bulkHandle) LogicalConn() *LogicalConn {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
return b.logical
|
|
}
|
|
|
|
func (b *bulkHandle) TransportConn() *TransportConn {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
return b.transport
|
|
}
|
|
|
|
func (b *bulkHandle) TransportGeneration() uint64 {
|
|
if b == nil {
|
|
return 0
|
|
}
|
|
return b.transportGeneration
|
|
}
|
|
|
|
func (b *bulkHandle) Dedicated() bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
return b.dedicated
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedLaneIDSnapshot() uint32 {
|
|
if b == nil {
|
|
return 0
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
return b.dedicatedLaneID
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedAttachTokenSnapshot() string {
|
|
if b == nil {
|
|
return ""
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.dedicatedAttachToken
|
|
}
|
|
|
|
func (b *bulkHandle) setDedicatedAttachToken(token string) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
b.dedicatedAttachToken = token
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) markDedicatedAttachAttempt() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
b.dedicatedAttempts++
|
|
b.dedicatedMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) setDedicatedAttachLastCode(code string) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
b.dedicatedLastCode = code
|
|
b.dedicatedMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) markDedicatedAttachDegraded(code string) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
b.dedicatedState = bulkDedicatedAttachStateDegraded
|
|
b.dedicatedLastCode = code
|
|
b.dedicatedMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) markDedicatedAttachClosed() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
b.dedicatedState = bulkDedicatedAttachStateClosed
|
|
b.dedicatedMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedAttachStateSnapshot() bulkDedicatedAttachState {
|
|
if b == nil {
|
|
return bulkDedicatedAttachStateClosed
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
return b.dedicatedState
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedAttachDiagnosticsSnapshot() (state bulkDedicatedAttachState, attempts uint32, lastCode string, dataStarted bool) {
|
|
if b == nil {
|
|
return bulkDedicatedAttachStateClosed, 0, "", false
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
return b.dedicatedState, b.dedicatedAttempts, b.dedicatedLastCode, b.dedicatedDataStarted
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedConnSnapshot() net.Conn {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
return b.dedicatedConn
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedSenderSnapshot() *bulkDedicatedSender {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
return b.dedicatedSender
|
|
}
|
|
|
|
func (b *bulkHandle) installDedicatedSender(sender *bulkDedicatedSender) *bulkDedicatedSender {
|
|
if b == nil || sender == nil {
|
|
return nil
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
if b.dedicatedSender != nil {
|
|
return b.dedicatedSender
|
|
}
|
|
b.dedicatedSender = sender
|
|
return sender
|
|
}
|
|
|
|
func (b *bulkHandle) clearDedicatedSender() *bulkDedicatedSender {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
sender := b.dedicatedSender
|
|
b.dedicatedSender = nil
|
|
return sender
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedAttachedSnapshot() bool {
|
|
return b.dedicatedConnSnapshot() != nil
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedDataStartedSnapshot() bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
return b.dedicatedDataStarted
|
|
}
|
|
|
|
func (b *bulkHandle) markDedicatedDataStarted() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
b.dedicatedDataStarted = true
|
|
b.dedicatedMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) waitDedicatedReady(ctx context.Context) error {
|
|
if b == nil || !b.Dedicated() || b.dedicatedAttachedSnapshot() {
|
|
return nil
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
if err := b.writeStateErrorSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case <-b.dedicatedReady:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-b.Context().Done():
|
|
if err := b.writeStateErrorSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
return context.Canceled
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) attachDedicatedConn(conn net.Conn) error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if conn == nil {
|
|
return net.ErrClosed
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
if b.dedicatedConn != nil {
|
|
b.dedicatedMu.Unlock()
|
|
return errors.New("bulk dedicated conn already attached")
|
|
}
|
|
b.dedicatedConn = conn
|
|
b.dedicatedConnOwned = true
|
|
b.dedicatedWriteClosed = false
|
|
b.dedicatedState = bulkDedicatedAttachStateAttached
|
|
b.dedicatedLastCode = ""
|
|
ready := b.dedicatedReady
|
|
b.dedicatedMu.Unlock()
|
|
if ready != nil {
|
|
select {
|
|
case <-ready:
|
|
default:
|
|
close(ready)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *bulkHandle) attachDedicatedConnShared(conn net.Conn) error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if conn == nil {
|
|
return net.ErrClosed
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
if b.dedicatedConn != nil {
|
|
if b.dedicatedConn == conn {
|
|
b.dedicatedConnOwned = false
|
|
b.dedicatedState = bulkDedicatedAttachStateAttached
|
|
b.dedicatedLastCode = ""
|
|
b.dedicatedMu.Unlock()
|
|
return nil
|
|
}
|
|
b.dedicatedMu.Unlock()
|
|
return errors.New("bulk dedicated conn already attached")
|
|
}
|
|
b.dedicatedConn = conn
|
|
b.dedicatedConnOwned = false
|
|
b.dedicatedWriteClosed = false
|
|
b.dedicatedState = bulkDedicatedAttachStateAttached
|
|
b.dedicatedLastCode = ""
|
|
ready := b.dedicatedReady
|
|
b.dedicatedMu.Unlock()
|
|
if ready != nil {
|
|
select {
|
|
case <-ready:
|
|
default:
|
|
close(ready)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *bulkHandle) replaceDedicatedConn(conn net.Conn) (net.Conn, *bulkDedicatedSender, error) {
|
|
if b == nil {
|
|
return nil, nil, io.ErrClosedPipe
|
|
}
|
|
if conn == nil {
|
|
return nil, nil, net.ErrClosed
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
oldConn := b.dedicatedConn
|
|
oldOwned := b.dedicatedConnOwned
|
|
oldSender := b.dedicatedSender
|
|
b.dedicatedConn = conn
|
|
b.dedicatedConnOwned = true
|
|
b.dedicatedSender = nil
|
|
b.dedicatedWriteClosed = false
|
|
b.dedicatedState = bulkDedicatedAttachStateAttached
|
|
b.dedicatedLastCode = ""
|
|
ready := b.dedicatedReady
|
|
b.dedicatedMu.Unlock()
|
|
if ready != nil {
|
|
select {
|
|
case <-ready:
|
|
default:
|
|
close(ready)
|
|
}
|
|
}
|
|
if !oldOwned {
|
|
oldConn = nil
|
|
}
|
|
return oldConn, oldSender, nil
|
|
}
|
|
|
|
func (b *bulkHandle) replaceDedicatedConnShared(conn net.Conn) (net.Conn, *bulkDedicatedSender, error) {
|
|
if b == nil {
|
|
return nil, nil, io.ErrClosedPipe
|
|
}
|
|
if conn == nil {
|
|
return nil, nil, net.ErrClosed
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
oldConn := b.dedicatedConn
|
|
oldOwned := b.dedicatedConnOwned
|
|
oldSender := b.dedicatedSender
|
|
b.dedicatedConn = conn
|
|
b.dedicatedConnOwned = false
|
|
b.dedicatedSender = nil
|
|
b.dedicatedWriteClosed = false
|
|
b.dedicatedState = bulkDedicatedAttachStateAttached
|
|
b.dedicatedLastCode = ""
|
|
ready := b.dedicatedReady
|
|
b.dedicatedMu.Unlock()
|
|
if ready != nil {
|
|
select {
|
|
case <-ready:
|
|
default:
|
|
close(ready)
|
|
}
|
|
}
|
|
if !oldOwned {
|
|
oldConn = nil
|
|
}
|
|
return oldConn, oldSender, nil
|
|
}
|
|
|
|
func (b *bulkHandle) bestEffortCloseDedicatedWriteHalf() {
|
|
if b == nil || !b.dedicated {
|
|
return
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
conn := b.dedicatedConn
|
|
owned := b.dedicatedConnOwned
|
|
alreadyClosed := b.dedicatedWriteClosed
|
|
b.dedicatedMu.Unlock()
|
|
if conn == nil || alreadyClosed || !owned {
|
|
return
|
|
}
|
|
type closeWriter interface {
|
|
CloseWrite() error
|
|
}
|
|
if closeWriterConn, ok := conn.(closeWriter); ok {
|
|
if err := closeWriterConn.CloseWrite(); err == nil {
|
|
b.dedicatedMu.Lock()
|
|
if b.dedicatedConn == conn {
|
|
b.dedicatedWriteClosed = true
|
|
}
|
|
b.dedicatedMu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) dedicatedWriteHalfClosedSnapshot() bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
return b.dedicatedWriteClosed
|
|
}
|
|
|
|
func (b *bulkHandle) setClientSnapshotOwner(client *ClientCommon) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.client = client
|
|
}
|
|
|
|
func (b *bulkHandle) clearDedicatedConn() (net.Conn, bool) {
|
|
if b == nil {
|
|
return nil, false
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
conn := b.dedicatedConn
|
|
owned := b.dedicatedConnOwned
|
|
b.dedicatedConn = nil
|
|
b.dedicatedConnOwned = false
|
|
b.dedicatedWriteClosed = false
|
|
b.dedicatedMu.Unlock()
|
|
return conn, owned
|
|
}
|
|
|
|
func (b *bulkHandle) markDedicatedActiveReserved() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
b.dedicatedActiveLease = true
|
|
b.dedicatedMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) releaseDedicatedActiveReserved() bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
b.dedicatedMu.Lock()
|
|
defer b.dedicatedMu.Unlock()
|
|
if !b.dedicatedActiveLease {
|
|
return false
|
|
}
|
|
b.dedicatedActiveLease = false
|
|
return true
|
|
}
|
|
|
|
func (b *bulkHandle) markAcceptDispatched() bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
b.acceptMu.Lock()
|
|
defer b.acceptMu.Unlock()
|
|
if b.acceptDispatched {
|
|
return false
|
|
}
|
|
b.acceptDispatched = true
|
|
return true
|
|
}
|
|
|
|
func (b *bulkHandle) markAcceptHandled() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.acceptMu.Lock()
|
|
b.acceptDispatched = true
|
|
b.acceptMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) setAcceptNotify(fn func(error)) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.acceptMu.Lock()
|
|
b.acceptNotifyFn = fn
|
|
b.acceptMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) clearAcceptNotify() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.acceptMu.Lock()
|
|
b.acceptNotifyFn = nil
|
|
b.acceptMu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) markAcceptReady(err error) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.acceptMu.Lock()
|
|
if b.acceptReadyDone {
|
|
b.acceptMu.Unlock()
|
|
return
|
|
}
|
|
b.acceptReadyDone = true
|
|
b.acceptReadyErr = err
|
|
ch := b.acceptReady
|
|
b.acceptMu.Unlock()
|
|
if ch != nil {
|
|
select {
|
|
case <-ch:
|
|
default:
|
|
close(ch)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) waitAcceptReady(ctx context.Context) error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
b.acceptMu.Lock()
|
|
if b.acceptReadyDone {
|
|
err := b.acceptReadyErr
|
|
b.acceptMu.Unlock()
|
|
return err
|
|
}
|
|
ready := b.acceptReady
|
|
b.acceptMu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-b.Context().Done():
|
|
if err := b.acceptReadyErrorSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
if err := b.resetErrSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
return context.Canceled
|
|
case <-ready:
|
|
return b.acceptReadyErrorSnapshot()
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) acceptReadyErrorSnapshot() error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
b.acceptMu.Lock()
|
|
defer b.acceptMu.Unlock()
|
|
return b.acceptReadyErr
|
|
}
|
|
|
|
func (b *bulkHandle) notifyAcceptStarted() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
var notify func(error)
|
|
b.acceptMu.Lock()
|
|
if b.acceptNotifySent {
|
|
b.acceptMu.Unlock()
|
|
return
|
|
}
|
|
notify = b.acceptNotifyFn
|
|
b.acceptNotifyFn = nil
|
|
if notify == nil {
|
|
b.acceptMu.Unlock()
|
|
return
|
|
}
|
|
b.acceptNotifySent = true
|
|
b.acceptMu.Unlock()
|
|
go notify(nil)
|
|
}
|
|
|
|
func (b *bulkHandle) finishAcceptDispatch(err error) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
var notify func(error)
|
|
b.acceptMu.Lock()
|
|
if b.acceptNotifySent {
|
|
b.acceptMu.Unlock()
|
|
return
|
|
}
|
|
notify = b.acceptNotifyFn
|
|
b.acceptNotifyFn = nil
|
|
b.acceptNotifySent = true
|
|
b.acceptMu.Unlock()
|
|
if notify != nil {
|
|
go notify(err)
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) SessionEpoch() uint64 {
|
|
if b == nil {
|
|
return 0
|
|
}
|
|
return b.sessionEpoch
|
|
}
|
|
|
|
func (b *bulkHandle) acceptsClientSessionEpoch(epoch uint64) bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
if b.sessionEpoch == 0 || epoch == 0 {
|
|
return true
|
|
}
|
|
return b.sessionEpoch == epoch
|
|
}
|
|
|
|
func (b *bulkHandle) acceptsTransportGeneration(transport *TransportConn) bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
if b.transportGeneration == 0 || transport == nil {
|
|
return true
|
|
}
|
|
return b.transportGeneration == transport.TransportGeneration()
|
|
}
|
|
|
|
func (b *bulkHandle) dataIDSnapshot() uint64 {
|
|
if b == nil {
|
|
return 0
|
|
}
|
|
return b.dataID
|
|
}
|
|
|
|
func (b *bulkHandle) nextOutboundDataSeq() uint64 {
|
|
return b.reserveOutboundDataSeqs(1)
|
|
}
|
|
|
|
func (b *bulkHandle) reserveOutboundDataSeqs(count int) uint64 {
|
|
if b == nil || count <= 0 {
|
|
return 0
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
start := b.outboundSeq + 1
|
|
b.outboundSeq += uint64(count)
|
|
return start
|
|
}
|
|
|
|
func (b *bulkHandle) Read(p []byte) (int, error) {
|
|
if len(p) == 0 {
|
|
return 0, nil
|
|
}
|
|
if b == nil {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
b.notifyAcceptStarted()
|
|
for {
|
|
b.mu.Lock()
|
|
localReadClosed := b.localReadClosed
|
|
if len(b.readBuf.data) > 0 {
|
|
n := copy(p, b.readBuf.data)
|
|
b.readBuf.data = b.readBuf.data[n:]
|
|
b.bufferedBytes -= n
|
|
if b.bufferedBytes < 0 {
|
|
b.bufferedBytes = 0
|
|
}
|
|
if len(b.readBuf.data) == 0 {
|
|
b.readBuf.clear()
|
|
}
|
|
b.recordReadLocked(n, time.Now())
|
|
b.mu.Unlock()
|
|
b.maybeSendWindowRelease(n, false)
|
|
return n, nil
|
|
}
|
|
if len(b.readQueue) > 0 {
|
|
b.readBuf = b.readQueue[0]
|
|
b.readQueue[0] = bulkReadChunk{}
|
|
b.readQueue = b.readQueue[1:]
|
|
b.mu.Unlock()
|
|
continue
|
|
}
|
|
resetErr := b.resetErr
|
|
remoteClosed := b.remoteClosed
|
|
notify := b.readNotify
|
|
ctx := b.ctx
|
|
readTimeout := b.readTimeout
|
|
b.mu.Unlock()
|
|
if localReadClosed {
|
|
b.maybeSendWindowRelease(0, true)
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
if resetErr != nil {
|
|
b.maybeSendWindowRelease(0, true)
|
|
return 0, resetErr
|
|
}
|
|
if remoteClosed {
|
|
b.maybeSendWindowRelease(0, true)
|
|
return 0, io.EOF
|
|
}
|
|
if err := b.waitReadable(ctx, notify, readTimeout); err != nil {
|
|
b.maybeSendWindowRelease(0, true)
|
|
return 0, err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) Write(p []byte) (int, error) {
|
|
if len(p) == 0 {
|
|
return 0, nil
|
|
}
|
|
if b == nil {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
b.notifyAcceptStarted()
|
|
b.writeMu.Lock()
|
|
defer b.writeMu.Unlock()
|
|
b.mu.Lock()
|
|
resetErr := b.resetErr
|
|
localClosed := b.localClosed
|
|
peerReadClosed := b.peerReadClosed
|
|
sendDataFn := b.sendDataFn
|
|
sendWriteFn := b.sendWriteFn
|
|
chunkSize := b.chunkSize
|
|
writeTimeout := b.writeTimeout
|
|
bulkCtx := b.ctx
|
|
b.mu.Unlock()
|
|
if resetErr != nil {
|
|
return 0, resetErr
|
|
}
|
|
if localClosed || peerReadClosed {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
if sendDataFn == nil {
|
|
return 0, errBulkDataPathNotReady
|
|
}
|
|
if sendWriteFn != nil {
|
|
written := 0
|
|
for written < len(p) {
|
|
end := len(p)
|
|
if b.windowBytes > 0 && end-written > b.windowBytes {
|
|
end = written + b.windowBytes
|
|
}
|
|
if chunkSize > 0 && b.maxInFlight > 0 {
|
|
maxPartBytes := chunkSize * b.maxInFlight
|
|
if maxPartBytes > 0 && end-written > maxPartBytes {
|
|
end = written + maxPartBytes
|
|
}
|
|
}
|
|
part := p[written:end]
|
|
partChunks := bulkPayloadChunkCount(len(part), chunkSize)
|
|
sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout)
|
|
if err != nil {
|
|
return written, err
|
|
}
|
|
if err := b.acquireOutboundWindow(sendCtx, len(part), partChunks); err != nil {
|
|
cancel()
|
|
return written, b.normalizeWriteError(err)
|
|
}
|
|
startSeq := b.reserveOutboundDataSeqs(partChunks)
|
|
if b.dedicated {
|
|
partWritten, err := b.executeSendWrite(sendCtx, startSeq, part, partChunks, false)
|
|
cancel()
|
|
written += partWritten
|
|
if err != nil {
|
|
return written, err
|
|
}
|
|
continue
|
|
}
|
|
owned := getBulkAsyncWritePayload(len(part))
|
|
copy(owned, part)
|
|
err = b.enqueueAsyncWrite(sendCtx, bulkAsyncWriteRequest{
|
|
startSeq: startSeq,
|
|
payload: owned,
|
|
chunks: partChunks,
|
|
})
|
|
cancel()
|
|
if err != nil {
|
|
putBulkAsyncWritePayload(owned)
|
|
b.rollbackOutboundWindow(len(part), partChunks)
|
|
return written, b.normalizeWriteError(err)
|
|
}
|
|
written += len(part)
|
|
}
|
|
return written, nil
|
|
}
|
|
if chunkSize <= 0 {
|
|
chunkSize = defaultBulkChunkSize
|
|
}
|
|
written := 0
|
|
for written < len(p) {
|
|
end := written + chunkSize
|
|
if end > len(p) {
|
|
end = len(p)
|
|
}
|
|
chunk := p[written:end]
|
|
sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout)
|
|
if err != nil {
|
|
if written > 0 {
|
|
b.recordWrite(written, time.Now())
|
|
}
|
|
return written, err
|
|
}
|
|
if err := b.acquireOutboundWindow(sendCtx, len(chunk), 1); err != nil {
|
|
cancel()
|
|
if written > 0 {
|
|
b.recordWrite(written, time.Now())
|
|
}
|
|
return written, b.normalizeWriteError(err)
|
|
}
|
|
err = sendDataFn(sendCtx, b, chunk)
|
|
cancel()
|
|
if err != nil {
|
|
b.rollbackOutboundWindow(len(chunk), 1)
|
|
if written > 0 {
|
|
b.recordWrite(written, time.Now())
|
|
}
|
|
return written, b.normalizeWriteError(err)
|
|
}
|
|
written = end
|
|
}
|
|
if written > 0 {
|
|
b.recordWrite(written, time.Now())
|
|
}
|
|
return written, nil
|
|
}
|
|
|
|
func (b *bulkHandle) Close() error {
|
|
return b.close(true)
|
|
}
|
|
|
|
func (b *bulkHandle) CloseWrite() error {
|
|
return b.close(false)
|
|
}
|
|
|
|
func (b *bulkHandle) close(full bool) error {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
b.notifyAcceptStarted()
|
|
b.writeMu.Lock()
|
|
defer b.writeMu.Unlock()
|
|
b.mu.Lock()
|
|
if b.resetErr != nil {
|
|
err := b.resetErr
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
if b.localClosed {
|
|
if !full || b.localReadClosed {
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
closeFn := b.closeFn
|
|
b.mu.Unlock()
|
|
if closeFn != nil && !b.dedicatedWriteHalfClosedSnapshot() {
|
|
if err := closeFn(context.Background(), b, true); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) {
|
|
return err
|
|
}
|
|
}
|
|
b.bestEffortCloseDedicatedWriteHalf()
|
|
b.mu.Lock()
|
|
if b.localReadClosed {
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
b.localReadClosed = true
|
|
b.clearBufferedDataLocked()
|
|
b.closeWriteStateLocked()
|
|
shouldFinalize := b.shouldFinalizeLocked()
|
|
b.mu.Unlock()
|
|
b.notifyReadable()
|
|
if shouldFinalize {
|
|
b.finalize()
|
|
}
|
|
return nil
|
|
}
|
|
closeFn := b.closeFn
|
|
b.mu.Unlock()
|
|
if err := b.waitPendingAsyncWrites(context.Background()); err != nil {
|
|
return err
|
|
}
|
|
if closeFn != nil {
|
|
if err := closeFn(context.Background(), b, full); err != nil && !errors.Is(err, errBulkNotFound) && !b.canIgnoreDedicatedCloseSendError(err) {
|
|
return err
|
|
}
|
|
}
|
|
b.bestEffortCloseDedicatedWriteHalf()
|
|
b.mu.Lock()
|
|
if b.localClosed {
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
b.localClosed = true
|
|
b.closeWriteStateLocked()
|
|
if full {
|
|
b.localReadClosed = true
|
|
b.clearBufferedDataLocked()
|
|
}
|
|
shouldFinalize := b.shouldFinalizeLocked()
|
|
b.mu.Unlock()
|
|
if full {
|
|
b.notifyReadable()
|
|
}
|
|
if shouldFinalize {
|
|
b.finalize()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *bulkHandle) Reset(err error) error {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
b.notifyAcceptStarted()
|
|
resetErr := bulkResetError(err)
|
|
b.mu.Lock()
|
|
if b.resetErr != nil {
|
|
err := b.resetErr
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
resetFn := b.resetFn
|
|
b.mu.Unlock()
|
|
if resetFn != nil {
|
|
if sendErr := resetFn(context.Background(), b, bulkResetMessage(resetErr)); sendErr != nil {
|
|
return sendErr
|
|
}
|
|
}
|
|
b.markReset(resetErr)
|
|
return nil
|
|
}
|
|
|
|
func (b *bulkHandle) Snapshot() BulkSnapshot {
|
|
return b.snapshot()
|
|
}
|
|
|
|
func (b *bulkHandle) markRemoteClosed() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
b.remoteClosed = true
|
|
shouldFinalize := b.shouldFinalizeLocked()
|
|
b.mu.Unlock()
|
|
b.notifyReadable()
|
|
if shouldFinalize {
|
|
b.finalize()
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) markPeerClosed() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
b.remoteClosed = true
|
|
b.peerReadClosed = true
|
|
b.closeWriteStateLocked()
|
|
shouldFinalize := b.shouldFinalizeLocked()
|
|
b.notifyFlowLocked()
|
|
b.mu.Unlock()
|
|
b.notifyReadable()
|
|
if shouldFinalize {
|
|
b.finalize()
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) markReset(err error) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
resetErr := bulkResetError(err)
|
|
b.mu.Lock()
|
|
if b.resetErr == nil {
|
|
b.resetErr = resetErr
|
|
b.clearBufferedDataLocked()
|
|
b.closeWriteStateLocked()
|
|
}
|
|
b.notifyFlowLocked()
|
|
b.mu.Unlock()
|
|
b.markAcceptReady(resetErr)
|
|
b.notifyReadable()
|
|
b.finalize()
|
|
}
|
|
|
|
func (b *bulkHandle) pushChunk(chunk []byte) error {
|
|
return b.pushChunkWithOwnership(chunk, false)
|
|
}
|
|
|
|
func (b *bulkHandle) pushOwnedChunk(chunk []byte) error {
|
|
return b.pushChunkWithOwnership(chunk, true)
|
|
}
|
|
|
|
func (b *bulkHandle) pushOwnedChunkNoReset(chunk []byte) error {
|
|
return b.pushChunkWithOwnershipOptions(chunk, true, false)
|
|
}
|
|
|
|
func (b *bulkHandle) pushOwnedChunkWithReleaseNoReset(chunk []byte, release func()) error {
|
|
return b.pushChunkWithOwnershipOptionsAndRelease(chunk, true, false, release)
|
|
}
|
|
|
|
func (b *bulkHandle) pushChunkWithOwnership(chunk []byte, owned bool) error {
|
|
return b.pushChunkWithOwnershipOptions(chunk, owned, true)
|
|
}
|
|
|
|
func (b *bulkHandle) pushChunkWithOwnershipOptions(chunk []byte, owned bool, resetOnOverflow bool) error {
|
|
return b.pushChunkWithOwnershipOptionsAndRelease(chunk, owned, resetOnOverflow, nil)
|
|
}
|
|
|
|
func (b *bulkHandle) pushChunkWithOwnershipOptionsAndRelease(chunk []byte, owned bool, resetOnOverflow bool, release func()) error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if len(chunk) == 0 {
|
|
if release != nil {
|
|
release()
|
|
}
|
|
return nil
|
|
}
|
|
stored := bulkReadChunk{data: chunk, release: release}
|
|
if !owned {
|
|
stored.data = append([]byte(nil), chunk...)
|
|
if stored.release != nil {
|
|
stored.release()
|
|
stored.release = nil
|
|
}
|
|
}
|
|
b.mu.Lock()
|
|
if b.resetErr != nil {
|
|
err := b.resetErr
|
|
b.mu.Unlock()
|
|
stored.clear()
|
|
return err
|
|
}
|
|
if b.inboundQueueLimit > 0 && b.bufferedChunkCountLocked() >= b.inboundQueueLimit {
|
|
if !resetOnOverflow {
|
|
b.mu.Unlock()
|
|
stored.clear()
|
|
return errBulkBackpressureExceeded
|
|
}
|
|
err := b.markResetLocked(errBulkBackpressureExceeded)
|
|
b.mu.Unlock()
|
|
stored.clear()
|
|
b.notifyReadable()
|
|
b.finalize()
|
|
return err
|
|
}
|
|
if b.inboundBytesLimit > 0 && b.bufferedBytes+len(stored.data) > b.inboundBytesLimit {
|
|
if !resetOnOverflow {
|
|
b.mu.Unlock()
|
|
stored.clear()
|
|
return errBulkBackpressureExceeded
|
|
}
|
|
err := b.markResetLocked(errBulkBackpressureExceeded)
|
|
b.mu.Unlock()
|
|
stored.clear()
|
|
b.notifyReadable()
|
|
b.finalize()
|
|
return err
|
|
}
|
|
b.readQueue = append(b.readQueue, stored)
|
|
b.bufferedBytes += len(stored.data)
|
|
b.notifyReadableLocked()
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (b *bulkHandle) markResetLocked(err error) error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if b.resetErr == nil {
|
|
b.resetErr = bulkResetError(err)
|
|
b.clearBufferedDataLocked()
|
|
b.closeWriteStateLocked()
|
|
}
|
|
return b.resetErr
|
|
}
|
|
|
|
func (b *bulkHandle) clearBufferedDataLocked() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.readBuf.clear()
|
|
for i := range b.readQueue {
|
|
b.readQueue[i].clear()
|
|
}
|
|
b.readQueue = nil
|
|
b.readBuf = bulkReadChunk{}
|
|
b.bufferedBytes = 0
|
|
}
|
|
|
|
func (b *bulkHandle) flowControlEnabled() bool {
|
|
if b == nil {
|
|
return false
|
|
}
|
|
return b.releaseFn != nil && (b.windowBytes > 0 || b.maxInFlight > 0)
|
|
}
|
|
|
|
func (b *bulkHandle) releaseThresholdBytes() int64 {
|
|
if b == nil {
|
|
return int64(defaultBulkChunkSize)
|
|
}
|
|
threshold := b.chunkSize
|
|
if threshold <= 0 {
|
|
threshold = defaultBulkChunkSize
|
|
}
|
|
if b.windowBytes > 0 && threshold > b.windowBytes {
|
|
threshold = b.windowBytes
|
|
}
|
|
if threshold <= 0 {
|
|
threshold = defaultBulkChunkSize
|
|
}
|
|
return int64(threshold)
|
|
}
|
|
|
|
func (b *bulkHandle) maybeSendWindowRelease(consumed int, force bool) {
|
|
if b == nil || !b.flowControlEnabled() {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
if consumed > 0 {
|
|
b.pendingReleaseBytes += int64(consumed)
|
|
b.pendingReleaseChunks++
|
|
}
|
|
if !force && b.pendingReleaseBytes < b.releaseThresholdBytes() {
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
b.mu.Unlock()
|
|
b.scheduleWindowRelease()
|
|
}
|
|
|
|
func (b *bulkHandle) scheduleWindowRelease() {
|
|
if b == nil || b.releaseNotify == nil {
|
|
return
|
|
}
|
|
select {
|
|
case b.releaseNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) takePendingWindowRelease() (int64, int, bulkReleaseSender) {
|
|
if b == nil {
|
|
return 0, 0, nil
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
bytes := b.pendingReleaseBytes
|
|
chunks := b.pendingReleaseChunks
|
|
release := b.releaseFn
|
|
b.pendingReleaseBytes = 0
|
|
b.pendingReleaseChunks = 0
|
|
return bytes, chunks, release
|
|
}
|
|
|
|
func (b *bulkHandle) runWindowReleaseLoop() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
defer close(b.releaseWorkerDone)
|
|
for {
|
|
select {
|
|
case <-b.Context().Done():
|
|
return
|
|
case <-b.releaseNotify:
|
|
}
|
|
for {
|
|
bytes, chunks, release := b.takePendingWindowRelease()
|
|
if release == nil || (bytes <= 0 && chunks <= 0) {
|
|
break
|
|
}
|
|
_ = release(b, bytes, chunks)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) acquireOutboundWindow(ctx context.Context, size int, chunks int) error {
|
|
if b == nil || size <= 0 || !b.flowControlEnabled() {
|
|
return nil
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
need := int64(size)
|
|
if chunks <= 0 {
|
|
chunks = 1
|
|
}
|
|
for {
|
|
b.mu.Lock()
|
|
if b.resetErr != nil {
|
|
err := b.resetErr
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
if b.localClosed || b.peerReadClosed {
|
|
b.mu.Unlock()
|
|
return io.ErrClosedPipe
|
|
}
|
|
bytesOK := true
|
|
if b.windowBytes > 0 {
|
|
bytesOK = b.outboundAvailBytes >= need
|
|
if !bytesOK && need > int64(b.windowBytes) && b.outboundInFlight == 0 {
|
|
bytesOK = true
|
|
}
|
|
}
|
|
chunksOK := true
|
|
if b.maxInFlight > 0 {
|
|
chunksOK = b.outboundInFlight+chunks <= b.maxInFlight
|
|
}
|
|
if bytesOK && chunksOK {
|
|
if b.windowBytes > 0 {
|
|
b.outboundAvailBytes -= need
|
|
}
|
|
if b.maxInFlight > 0 {
|
|
b.outboundInFlight += chunks
|
|
}
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
notify := b.flowNotify
|
|
b.mu.Unlock()
|
|
select {
|
|
case <-notify:
|
|
case <-ctx.Done():
|
|
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
|
|
return stateErr
|
|
}
|
|
return normalizeStreamDeadlineError(ctx.Err())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) rollbackOutboundWindow(size int, chunks int) {
|
|
if b == nil || size <= 0 || !b.flowControlEnabled() {
|
|
return
|
|
}
|
|
if chunks <= 0 {
|
|
chunks = 1
|
|
}
|
|
b.mu.Lock()
|
|
if b.windowBytes > 0 {
|
|
b.outboundAvailBytes += int64(size)
|
|
maxAvail := int64(b.windowBytes)
|
|
if b.outboundAvailBytes > maxAvail {
|
|
b.outboundAvailBytes = maxAvail
|
|
}
|
|
}
|
|
if b.maxInFlight > 0 && b.outboundInFlight > 0 {
|
|
b.outboundInFlight -= chunks
|
|
if b.outboundInFlight < 0 {
|
|
b.outboundInFlight = 0
|
|
}
|
|
}
|
|
b.notifyFlowLocked()
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) releaseOutboundWindow(bytes int64, chunks int) {
|
|
if b == nil || !b.flowControlEnabled() {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
if b.windowBytes > 0 && bytes > 0 {
|
|
b.outboundAvailBytes += bytes
|
|
maxAvail := int64(b.windowBytes)
|
|
if b.outboundAvailBytes > maxAvail {
|
|
b.outboundAvailBytes = maxAvail
|
|
}
|
|
}
|
|
if b.maxInFlight > 0 && chunks > 0 {
|
|
b.outboundInFlight -= chunks
|
|
if b.outboundInFlight < 0 {
|
|
b.outboundInFlight = 0
|
|
}
|
|
}
|
|
b.notifyFlowLocked()
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) bufferedChunkCountLocked() int {
|
|
if b == nil {
|
|
return 0
|
|
}
|
|
count := len(b.readQueue)
|
|
if len(b.readBuf.data) > 0 {
|
|
count++
|
|
}
|
|
return count
|
|
}
|
|
|
|
func (b *bulkHandle) shouldFinalizeLocked() bool {
|
|
if b == nil {
|
|
return true
|
|
}
|
|
if b.resetErr != nil {
|
|
return true
|
|
}
|
|
if b.dedicated {
|
|
return b.localClosed && b.remoteClosed
|
|
}
|
|
return b.localReadClosed || (b.peerReadClosed && b.remoteClosed) || (b.localClosed && b.remoteClosed)
|
|
}
|
|
|
|
func (b *bulkHandle) snapshot() BulkSnapshot {
|
|
if b == nil {
|
|
return BulkSnapshot{}
|
|
}
|
|
dedicatedAttached := b.dedicatedAttachedSnapshot()
|
|
dedicatedState, dedicatedAttempts, dedicatedLastCode, dedicatedDataStarted := b.dedicatedAttachDiagnosticsSnapshot()
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
snapshot := BulkSnapshot{
|
|
ID: b.id,
|
|
DataID: b.dataID,
|
|
FastPathVersion: normalizeBulkFastPathVersion(b.fastPathVersion),
|
|
Scope: normalizeFileScope(b.runtimeScope),
|
|
Range: b.rangeSpec,
|
|
Metadata: cloneBulkMetadata(b.metadata),
|
|
Dedicated: b.dedicated,
|
|
DedicatedLaneID: b.dedicatedLaneID,
|
|
DedicatedAttached: dedicatedAttached,
|
|
DedicatedAttachState: bulkDedicatedAttachStateName(
|
|
dedicatedState,
|
|
),
|
|
DedicatedAttachAttempts: dedicatedAttempts,
|
|
DedicatedAttachLastCode: dedicatedLastCode,
|
|
DedicatedDataStarted: dedicatedDataStarted,
|
|
SessionEpoch: b.sessionEpoch,
|
|
TransportGeneration: b.transportGeneration,
|
|
LocalClosed: b.localClosed,
|
|
LocalReadClosed: b.localReadClosed,
|
|
RemoteClosed: b.remoteClosed,
|
|
PeerReadClosed: b.peerReadClosed,
|
|
BufferedChunks: b.bufferedChunkCountLocked(),
|
|
BufferedBytes: b.bufferedBytes,
|
|
ReadTimeout: b.readTimeout,
|
|
WriteTimeout: b.writeTimeout,
|
|
ChunkSize: b.chunkSize,
|
|
WindowBytes: b.windowBytes,
|
|
MaxInFlight: b.maxInFlight,
|
|
BytesRead: b.bytesRead,
|
|
BytesWritten: b.bytesWritten,
|
|
ReadCalls: b.readCalls,
|
|
WriteCalls: b.writeCalls,
|
|
OpenedAt: b.createdAt,
|
|
LastReadAt: b.lastReadAt,
|
|
LastWriteAt: b.lastWriteAt,
|
|
}
|
|
if b.logical != nil {
|
|
snapshot.LogicalClientID = b.logical.ID()
|
|
}
|
|
if b.resetErr != nil {
|
|
snapshot.ResetError = b.resetErr.Error()
|
|
}
|
|
var diag snapshotBindingDiagnostics
|
|
switch {
|
|
case b.logical != nil || b.transport != nil:
|
|
diag = snapshotBindingDiagnosticsFromLogical(b.logical, b.transport, b.transportGeneration)
|
|
case b.client != nil:
|
|
diag = snapshotBindingDiagnosticsFromClient(b.client, b.sessionEpoch)
|
|
}
|
|
snapshot.BindingOwner = diag.BindingOwner
|
|
snapshot.BindingAlive = diag.BindingAlive
|
|
snapshot.BindingCurrent = diag.BindingCurrent
|
|
snapshot.BindingReason = diag.BindingReason
|
|
snapshot.BindingError = diag.BindingError
|
|
snapshot.BindingBulkAdaptiveSoftPayloadBytes = diag.BindingBulkAdaptiveSoftPayloadBytes
|
|
snapshot.TransportAttached = diag.TransportAttached
|
|
snapshot.TransportHasRuntimeConn = diag.TransportHasRuntimeConn
|
|
snapshot.TransportCurrent = diag.TransportCurrent
|
|
snapshot.TransportDetachReason = diag.TransportDetachReason
|
|
snapshot.TransportDetachKind = diag.TransportDetachKind
|
|
snapshot.TransportDetachGeneration = diag.TransportDetachGeneration
|
|
snapshot.TransportDetachError = diag.TransportDetachError
|
|
snapshot.TransportDetachedAt = diag.TransportDetachedAt
|
|
snapshot.ReattachEligible = diag.ReattachEligible
|
|
return snapshot
|
|
}
|
|
|
|
func (b *bulkHandle) finalize() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.markDedicatedAttachClosed()
|
|
b.maybeSendWindowRelease(0, true)
|
|
if b.cancel != nil {
|
|
b.cancel()
|
|
}
|
|
if b.writeCtxCancel != nil {
|
|
b.writeCtxCancel()
|
|
}
|
|
if sender := b.clearDedicatedSender(); sender != nil {
|
|
sender.stop()
|
|
}
|
|
if b.client != nil && b.releaseDedicatedActiveReserved() {
|
|
b.client.releaseBulkDedicatedActiveSlot()
|
|
}
|
|
if b.client != nil {
|
|
b.client.releaseBulkDedicatedLane(b.dedicatedLaneIDSnapshot())
|
|
}
|
|
if conn, owned := b.clearDedicatedConn(); conn != nil && owned {
|
|
_ = conn.Close()
|
|
}
|
|
if b.runtime != nil {
|
|
b.runtime.remove(b.runtimeScope, b.id)
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) recordReadLocked(n int, now time.Time) {
|
|
if b == nil || n <= 0 {
|
|
return
|
|
}
|
|
b.bytesRead += int64(n)
|
|
b.readCalls++
|
|
b.lastReadAt = now
|
|
}
|
|
|
|
func (b *bulkHandle) recordWrite(n int, now time.Time) {
|
|
if b == nil || n <= 0 {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
b.bytesWritten += int64(n)
|
|
b.writeCalls++
|
|
b.lastWriteAt = now
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) waitReadable(ctx context.Context, notify <-chan struct{}, timeout time.Duration) error {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{})
|
|
if deadline.IsZero() {
|
|
select {
|
|
case <-notify:
|
|
return nil
|
|
case <-ctx.Done():
|
|
if resetErr := b.resetErrSnapshot(); resetErr != nil {
|
|
return resetErr
|
|
}
|
|
if b.localReadClosedSnapshot() {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if b.remoteClosedSnapshot() {
|
|
return nil
|
|
}
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
if !deadline.After(time.Now()) {
|
|
return normalizeStreamDeadlineError(context.DeadlineExceeded)
|
|
}
|
|
timer := time.NewTimer(time.Until(deadline))
|
|
defer timer.Stop()
|
|
select {
|
|
case <-notify:
|
|
return nil
|
|
case <-ctx.Done():
|
|
if resetErr := b.resetErrSnapshot(); resetErr != nil {
|
|
return resetErr
|
|
}
|
|
if b.localReadClosedSnapshot() {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if b.remoteClosedSnapshot() {
|
|
return nil
|
|
}
|
|
return normalizeStreamDeadlineError(ctx.Err())
|
|
case <-timer.C:
|
|
return normalizeStreamDeadlineError(context.DeadlineExceeded)
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) resetErrSnapshot() error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.resetErr
|
|
}
|
|
|
|
func (b *bulkHandle) remoteClosedSnapshot() bool {
|
|
if b == nil {
|
|
return true
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.remoteClosed
|
|
}
|
|
|
|
func (b *bulkHandle) localClosedSnapshot() bool {
|
|
if b == nil {
|
|
return true
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.localClosed
|
|
}
|
|
|
|
func (b *bulkHandle) localReadClosedSnapshot() bool {
|
|
if b == nil {
|
|
return true
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.localReadClosed
|
|
}
|
|
|
|
func (b *bulkHandle) writeStateErrorSnapshot() error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if b.resetErr != nil {
|
|
return b.resetErr
|
|
}
|
|
if b.localClosed || b.peerReadClosed {
|
|
return io.ErrClosedPipe
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *bulkHandle) notifyReadable() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
b.notifyReadableLocked()
|
|
}
|
|
|
|
func (b *bulkHandle) notifyReadableLocked() {
|
|
if b == nil || b.readNotify == nil {
|
|
return
|
|
}
|
|
select {
|
|
case b.readNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) notifyFlowLocked() {
|
|
if b == nil || b.flowNotify == nil {
|
|
return
|
|
}
|
|
select {
|
|
case b.flowNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) closeWriteStateLocked() {
|
|
if b == nil || b.writeStateClosed {
|
|
return
|
|
}
|
|
b.writeStateClosed = true
|
|
if b.writeStateDone != nil {
|
|
close(b.writeStateDone)
|
|
}
|
|
}
|
|
|
|
func bulkAsyncWriteQueueSize(maxInFlight int) int {
|
|
if maxInFlight <= 0 {
|
|
maxInFlight = defaultBulkOpenMaxInFlight
|
|
}
|
|
if maxInFlight < 8 {
|
|
return 8
|
|
}
|
|
if maxInFlight > 128 {
|
|
return 128
|
|
}
|
|
return maxInFlight
|
|
}
|
|
|
|
func getBulkAsyncWritePayload(size int) []byte {
|
|
if size <= 0 {
|
|
return nil
|
|
}
|
|
if pooled, ok := bulkAsyncWritePayloadPool.Get().([]byte); ok && cap(pooled) >= size {
|
|
return pooled[:size]
|
|
}
|
|
return make([]byte, size)
|
|
}
|
|
|
|
func putBulkAsyncWritePayload(buf []byte) {
|
|
if cap(buf) == 0 || cap(buf) > 8*1024*1024 {
|
|
return
|
|
}
|
|
bulkAsyncWritePayloadPool.Put(buf[:0])
|
|
}
|
|
|
|
func (b *bulkHandle) beginPendingAsyncWriteLocked() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
if b.pendingAsyncWrites == 0 {
|
|
b.writeDrain = make(chan struct{})
|
|
}
|
|
b.pendingAsyncWrites++
|
|
}
|
|
|
|
func (b *bulkHandle) finishPendingAsyncWrite() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
if b.pendingAsyncWrites > 0 {
|
|
b.pendingAsyncWrites--
|
|
if b.pendingAsyncWrites == 0 && b.writeDrain != nil {
|
|
close(b.writeDrain)
|
|
}
|
|
}
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *bulkHandle) waitPendingAsyncWrites(ctx context.Context) error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
b.mu.Lock()
|
|
if b.pendingAsyncWrites == 0 {
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
drain := b.writeDrain
|
|
b.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
if err := b.writeStateErrorSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
return normalizeStreamDeadlineError(ctx.Err())
|
|
case <-b.Context().Done():
|
|
if err := b.writeStateErrorSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
return context.Canceled
|
|
case <-drain:
|
|
return b.writeStateErrorSnapshot()
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) enqueueAsyncWrite(ctx context.Context, req bulkAsyncWriteRequest) error {
|
|
if b == nil {
|
|
return io.ErrClosedPipe
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
b.mu.Lock()
|
|
if b.resetErr != nil {
|
|
err := b.resetErr
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
if b.localClosed || b.peerReadClosed {
|
|
b.mu.Unlock()
|
|
return io.ErrClosedPipe
|
|
}
|
|
queue := b.writeQueue
|
|
if queue == nil {
|
|
b.mu.Unlock()
|
|
return errBulkDataPathNotReady
|
|
}
|
|
b.beginPendingAsyncWriteLocked()
|
|
b.mu.Unlock()
|
|
select {
|
|
case queue <- req:
|
|
return nil
|
|
case <-ctx.Done():
|
|
b.finishPendingAsyncWrite()
|
|
return normalizeStreamDeadlineError(ctx.Err())
|
|
case <-b.Context().Done():
|
|
b.finishPendingAsyncWrite()
|
|
if err := b.writeStateErrorSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
return context.Canceled
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) runAsyncWriteLoop() {
|
|
if b == nil {
|
|
return
|
|
}
|
|
defer close(b.writeWorkerDone)
|
|
for {
|
|
select {
|
|
case <-b.Context().Done():
|
|
b.drainPendingAsyncWrites()
|
|
return
|
|
case req := <-b.writeQueue:
|
|
b.processAsyncWrite(req)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) drainPendingAsyncWrites() {
|
|
if b == nil || b.writeQueue == nil {
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case req := <-b.writeQueue:
|
|
b.rollbackOutboundWindow(len(req.payload), req.chunks)
|
|
putBulkAsyncWritePayload(req.payload)
|
|
b.finishPendingAsyncWrite()
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) processAsyncWrite(req bulkAsyncWriteRequest) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
defer putBulkAsyncWritePayload(req.payload)
|
|
defer b.finishPendingAsyncWrite()
|
|
if len(req.payload) == 0 {
|
|
return
|
|
}
|
|
if err := b.writeStateErrorSnapshot(); err != nil {
|
|
b.rollbackOutboundWindow(len(req.payload), req.chunks)
|
|
return
|
|
}
|
|
b.notifyAcceptStarted()
|
|
b.mu.Lock()
|
|
writeTimeout := b.writeTimeout
|
|
bulkCtx := b.ctx
|
|
b.mu.Unlock()
|
|
sendCtx, cancel, err := b.newWriteContext(bulkCtx, writeTimeout)
|
|
if err != nil {
|
|
b.rollbackOutboundWindow(len(req.payload), req.chunks)
|
|
if stateErr := b.writeStateErrorSnapshot(); stateErr == nil {
|
|
b.markReset(err)
|
|
}
|
|
return
|
|
}
|
|
_, writeErr := b.executeSendWrite(sendCtx, req.startSeq, req.payload, req.chunks, true)
|
|
cancel()
|
|
if writeErr != nil {
|
|
b.markReset(writeErr)
|
|
}
|
|
}
|
|
|
|
func (b *bulkHandle) executeSendWrite(ctx context.Context, startSeq uint64, payload []byte, chunks int, payloadOwned bool) (int, error) {
|
|
if b == nil {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
if len(payload) == 0 {
|
|
return 0, nil
|
|
}
|
|
if chunks <= 0 {
|
|
chunks = 1
|
|
}
|
|
if err := b.writeStateErrorSnapshot(); err != nil {
|
|
b.rollbackOutboundWindow(len(payload), chunks)
|
|
return 0, err
|
|
}
|
|
b.mu.Lock()
|
|
sendWriteFn := b.sendWriteFn
|
|
chunkSize := b.chunkSize
|
|
b.mu.Unlock()
|
|
if sendWriteFn == nil {
|
|
b.rollbackOutboundWindow(len(payload), chunks)
|
|
return 0, errBulkDataPathNotReady
|
|
}
|
|
written, err := sendWriteFn(ctx, b, startSeq, payload, payloadOwned)
|
|
if written < 0 {
|
|
written = 0
|
|
}
|
|
if written > len(payload) {
|
|
written = len(payload)
|
|
}
|
|
if written > 0 {
|
|
b.recordWrite(written, time.Now())
|
|
}
|
|
if written < len(payload) {
|
|
remaining := len(payload) - written
|
|
b.rollbackOutboundWindow(remaining, bulkPayloadChunkCount(remaining, chunkSize))
|
|
}
|
|
if err != nil {
|
|
if b.canIgnoreDedicatedCloseSendError(err) {
|
|
return written, nil
|
|
}
|
|
return written, b.normalizeWriteError(err)
|
|
}
|
|
if written != len(payload) {
|
|
return written, io.ErrShortWrite
|
|
}
|
|
return written, nil
|
|
}
|
|
|
|
func (b *bulkHandle) normalizeWriteError(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
|
|
return stateErr
|
|
}
|
|
return normalizeStreamDeadlineError(err)
|
|
}
|
|
|
|
func (b *bulkHandle) writeStateDoneSnapshot() <-chan struct{} {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.writeStateDone
|
|
}
|
|
|
|
func (b *bulkHandle) newWriteContext(parent context.Context, timeout time.Duration) (context.Context, func(), error) {
|
|
baseParent := parent
|
|
if b != nil && parent == b.ctx && b.writeCtx != nil {
|
|
baseParent = b.writeCtx
|
|
}
|
|
ctx, cancel, err := bulkWriteContext(baseParent, timeout)
|
|
if err != nil {
|
|
return nil, func() {}, err
|
|
}
|
|
if b == nil {
|
|
return ctx, cancel, nil
|
|
}
|
|
if stateErr := b.writeStateErrorSnapshot(); stateErr != nil {
|
|
cancel()
|
|
return nil, func() {}, stateErr
|
|
}
|
|
return ctx, cancel, nil
|
|
}
|
|
|
|
func (b *bulkHandle) canIgnoreDedicatedCloseSendError(err error) bool {
|
|
if b == nil || !b.dedicated || err == nil {
|
|
return false
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if !(b.ctx.Err() != nil || b.remoteClosed || b.peerReadClosed || b.localClosed) {
|
|
return false
|
|
}
|
|
if errors.Is(err, errTransportDetached) || errors.Is(err, net.ErrClosed) || errors.Is(err, io.ErrClosedPipe) {
|
|
return true
|
|
}
|
|
message := strings.ToLower(err.Error())
|
|
return strings.Contains(message, "broken pipe") || strings.Contains(message, "use of closed network connection")
|
|
}
|
|
|
|
func bulkWriteContext(parent context.Context, timeout time.Duration) (context.Context, func(), error) {
|
|
if parent == nil {
|
|
parent = context.Background()
|
|
}
|
|
deadline := streamEffectiveDeadline(time.Now(), timeout, time.Time{})
|
|
if !deadline.IsZero() && !deadline.After(time.Now()) {
|
|
return nil, func() {}, normalizeStreamDeadlineError(context.DeadlineExceeded)
|
|
}
|
|
if deadline.IsZero() {
|
|
return parent, func() {}, nil
|
|
}
|
|
ctx, cancel := context.WithDeadline(parent, deadline)
|
|
return ctx, cancel, nil
|
|
}
|
|
|
|
func normalizeBulkOpenRequest(req BulkOpenRequest) BulkOpenRequest {
|
|
req.Range = normalizeBulkRange(req.Range)
|
|
req.Metadata = cloneBulkMetadata(req.Metadata)
|
|
req.FastPathVersion = normalizeBulkFastPathVersion(req.FastPathVersion)
|
|
if req.Dedicated && req.DedicatedLaneID == 0 {
|
|
req.DedicatedLaneID = 1
|
|
}
|
|
if req.ChunkSize <= 0 {
|
|
req.ChunkSize = defaultBulkChunkSize
|
|
}
|
|
if req.WindowBytes <= 0 {
|
|
req.WindowBytes = defaultBulkOpenWindowBytes
|
|
}
|
|
if req.MaxInFlight <= 0 {
|
|
req.MaxInFlight = defaultBulkOpenMaxInFlight
|
|
}
|
|
if req.ReadTimeout < 0 {
|
|
req.ReadTimeout = defaultBulkControlReadTimeout
|
|
}
|
|
if req.WriteTimeout < 0 {
|
|
req.WriteTimeout = defaultBulkControlWriteTimeout
|
|
}
|
|
return req
|
|
}
|
|
|
|
func normalizeBulkOpenOptions(opt BulkOpenOptions) BulkOpenOptions {
|
|
mode := normalizeBulkOpenMode(opt.Mode)
|
|
switch mode {
|
|
case BulkOpenModeDefault:
|
|
// Preserve legacy behavior when Mode is not explicitly set.
|
|
if opt.Dedicated {
|
|
mode = BulkOpenModeDedicated
|
|
} else {
|
|
mode = BulkOpenModeShared
|
|
}
|
|
}
|
|
readTimeout := opt.ReadTimeout
|
|
if readTimeout < 0 {
|
|
readTimeout = defaultBulkControlReadTimeout
|
|
}
|
|
writeTimeout := opt.WriteTimeout
|
|
if writeTimeout < 0 {
|
|
writeTimeout = defaultBulkControlWriteTimeout
|
|
}
|
|
return BulkOpenOptions{
|
|
ID: opt.ID,
|
|
Range: normalizeBulkRange(opt.Range),
|
|
Metadata: cloneBulkMetadata(opt.Metadata),
|
|
ReadTimeout: readTimeout,
|
|
WriteTimeout: writeTimeout,
|
|
Mode: mode,
|
|
Dedicated: mode == BulkOpenModeDedicated,
|
|
ChunkSize: opt.ChunkSize,
|
|
WindowBytes: opt.WindowBytes,
|
|
MaxInFlight: opt.MaxInFlight,
|
|
}
|
|
}
|
|
|
|
func normalizeBulkOpenMode(mode BulkOpenMode) BulkOpenMode {
|
|
switch mode {
|
|
case BulkOpenModeDefault, BulkOpenModeAuto, BulkOpenModeShared, BulkOpenModeDedicated:
|
|
return mode
|
|
default:
|
|
return BulkOpenModeDefault
|
|
}
|
|
}
|
|
|
|
func initialBulkDedicatedAttachState(dedicated bool) bulkDedicatedAttachState {
|
|
if dedicated {
|
|
return bulkDedicatedAttachStatePending
|
|
}
|
|
return bulkDedicatedAttachStateAttached
|
|
}
|
|
|
|
func bulkPayloadChunkCount(payloadLen int, chunkSize int) int {
|
|
if payloadLen <= 0 {
|
|
return 0
|
|
}
|
|
if chunkSize <= 0 {
|
|
chunkSize = defaultBulkChunkSize
|
|
}
|
|
return (payloadLen + chunkSize - 1) / chunkSize
|
|
}
|
|
|
|
func bulkDedicatedAttachStateName(state bulkDedicatedAttachState) string {
|
|
switch state {
|
|
case bulkDedicatedAttachStatePending:
|
|
return "pending"
|
|
case bulkDedicatedAttachStateAttached:
|
|
return "attached"
|
|
case bulkDedicatedAttachStateDegraded:
|
|
return "degraded"
|
|
case bulkDedicatedAttachStateClosed:
|
|
return "closed"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
func bulkOpenModeName(mode BulkOpenMode) string {
|
|
switch normalizeBulkOpenMode(mode) {
|
|
case BulkOpenModeAuto:
|
|
return "auto"
|
|
case BulkOpenModeShared:
|
|
return "shared"
|
|
case BulkOpenModeDedicated:
|
|
return "dedicated"
|
|
case BulkOpenModeDefault:
|
|
fallthrough
|
|
default:
|
|
return "default"
|
|
}
|
|
}
|
|
|
|
func normalizeBulkRange(r BulkRange) BulkRange {
|
|
if r.Offset < 0 {
|
|
r.Offset = -1
|
|
}
|
|
if r.Length < 0 {
|
|
r.Length = -1
|
|
}
|
|
return r
|
|
}
|
|
|
|
func validBulkRange(r BulkRange) bool {
|
|
return r.Offset >= 0 && r.Length >= 0
|
|
}
|
|
|
|
func cloneBulkMetadata(src BulkMetadata) BulkMetadata {
|
|
if len(src) == 0 {
|
|
return nil
|
|
}
|
|
dst := make(BulkMetadata, len(src))
|
|
for key, value := range src {
|
|
dst[key] = value
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func bulkResetError(err error) error {
|
|
if err == nil {
|
|
return errBulkReset
|
|
}
|
|
return err
|
|
}
|
|
|
|
func bulkResetMessage(err error) string {
|
|
if err == nil {
|
|
return ""
|
|
}
|
|
return err.Error()
|
|
}
|