package notify import ( "context" "errors" "io" "net" "os" "sync" "sync/atomic" "time" ) const ( StreamOpenSignalKey = "notify.stream.open" StreamCloseSignalKey = "notify.stream.close" StreamResetSignalKey = "notify.stream.reset" ) type StreamChannel string const ( StreamControlChannel StreamChannel = "control" StreamDataChannel StreamChannel = "data" StreamRecordChannel StreamChannel = "record" ) type StreamMetadata map[string]string type StreamOpenOptions struct { ID string Channel StreamChannel Metadata StreamMetadata ReadTimeout time.Duration WriteTimeout time.Duration } type StreamAcceptInfo struct { ID string DataID uint64 Channel StreamChannel Metadata StreamMetadata LogicalConn *LogicalConn TransportConn *TransportConn TransportGeneration uint64 Stream Stream } type Stream interface { io.Reader io.Writer io.Closer ID() string Channel() StreamChannel Metadata() StreamMetadata Context() context.Context LogicalConn() *LogicalConn TransportConn() *TransportConn TransportGeneration() uint64 LocalAddr() net.Addr RemoteAddr() net.Addr CloseWrite() error Reset(error) error SetDeadline(time.Time) error SetReadDeadline(time.Time) error SetWriteDeadline(time.Time) error } var ( errStreamClientNil = errors.New("stream client is nil") errStreamServerNil = errors.New("stream server is nil") errStreamLogicalConnNil = errors.New("stream logical connection is nil") errStreamTransportNil = errors.New("stream transport connection is nil") errStreamRuntimeNil = errors.New("stream runtime is nil") errStreamIDEmpty = errors.New("stream id is empty") errStreamAlreadyExists = errors.New("stream already exists") errStreamNotFound = errors.New("stream not found") errStreamHandlerNotConfigured = errors.New("stream handler is not configured") errStreamDataPathNotReady = errors.New("stream data path is not implemented yet") errStreamRejected = errors.New("stream open rejected") errStreamReset = errors.New("stream reset") errStreamBackpressureExceeded = errors.New("stream inbound backpressure exceeded") ) type streamCloseSender func(context.Context, *streamHandle, bool) error type streamResetSender func(context.Context, *streamHandle, string) error type streamDataSender func(context.Context, *streamHandle, []byte) error type streamReadChunk struct { data []byte release func() } func (c *streamReadChunk) clear() { if c == nil { return } if c.release != nil { c.release() } c.data = nil c.release = nil } type streamReadPayloadOwner struct { refs atomic.Int32 release func() } func newStreamReadPayloadOwner(release func()) *streamReadPayloadOwner { if release == nil { return nil } owner := &streamReadPayloadOwner{release: release} owner.refs.Store(1) return owner } func (o *streamReadPayloadOwner) retainChunk() func() { if o == nil { return nil } o.refs.Add(1) return o.releaseChunk } func (o *streamReadPayloadOwner) releaseChunk() { if o == nil { return } if o.refs.Add(-1) == 0 && o.release != nil { o.release() } } func (o *streamReadPayloadOwner) done() { if o == nil { return } o.releaseChunk() } type streamHandle struct { runtime *streamRuntime runtimeScope string id string dataID uint64 fastPathVersion uint8 outboundSeq atomic.Uint64 channel StreamChannel metadata StreamMetadata sessionEpoch uint64 client *ClientCommon logical *LogicalConn transport *TransportConn transportGeneration uint64 readTimeout time.Duration writeTimeout time.Duration closeFn streamCloseSender resetFn streamResetSender sendDataFn streamDataSender chunkSize int inboundQueueLimit int inboundBytesLimit int ctx context.Context cancel context.CancelFunc localAddr net.Addr remoteAddr net.Addr createdAt time.Time writeMu sync.Mutex mu sync.Mutex localClosed bool localReadClosed bool remoteClosed bool peerReadClosed bool resetErr error readQueue []streamReadChunk readBuf streamReadChunk bufferedBytes int readNotify chan struct{} readDeadline time.Time writeDeadline time.Time readDeadlineOverride bool writeDeadlineOverride bool readDeadlineNotify chan struct{} writeDeadlineNotify chan struct{} writeWaitSeq uint64 writeWaitCancel context.CancelFunc writeWaitChanged chan struct{} bytesRead int64 bytesWritten int64 readCalls int64 writeCalls int64 lastReadAt time.Time lastWriteAt time.Time } func newStreamHandle(parent context.Context, runtime *streamRuntime, runtimeScope string, req StreamOpenRequest, sessionEpoch uint64, logical *LogicalConn, transport *TransportConn, transportGeneration uint64, closeFn streamCloseSender, resetFn streamResetSender, sendDataFn streamDataSender, cfg streamConfig) *streamHandle { if parent == nil { parent = context.Background() } ctx, cancel := context.WithCancel(parent) if transportGeneration == 0 && transport != nil { transportGeneration = transport.TransportGeneration() } if transportGeneration == 0 && logical != nil { transportGeneration = logical.transportGenerationSnapshot() } cfg = normalizeStreamConfig(cfg) return &streamHandle{ runtime: runtime, runtimeScope: runtimeScope, id: req.StreamID, dataID: req.DataID, fastPathVersion: normalizeStreamFastPathVersion(req.FastPathVersion), channel: normalizeStreamChannel(req.Channel), metadata: cloneStreamMetadata(req.Metadata), sessionEpoch: sessionEpoch, logical: logical, transport: transport, transportGeneration: transportGeneration, readTimeout: req.ReadTimeout, writeTimeout: req.WriteTimeout, closeFn: closeFn, resetFn: resetFn, sendDataFn: sendDataFn, chunkSize: cfg.ChunkSize, inboundQueueLimit: cfg.InboundQueueLimit, inboundBytesLimit: cfg.InboundBufferedBytesLimit, ctx: ctx, cancel: cancel, readNotify: make(chan struct{}, 1), localAddr: streamLocalAddrSnapshot(logical, transport), remoteAddr: streamRemoteAddrSnapshot(logical, transport), createdAt: time.Now(), readDeadlineNotify: make(chan struct{}), writeDeadlineNotify: make(chan struct{}), } } func (s *streamHandle) SessionEpoch() uint64 { if s == nil { return 0 } return s.sessionEpoch } func (s *streamHandle) acceptsClientSessionEpoch(epoch uint64) bool { if s == nil { return false } if s.sessionEpoch == 0 || epoch == 0 { return true } return s.sessionEpoch == epoch } func (s *streamHandle) acceptsTransportGeneration(transport *TransportConn) bool { if s == nil { return false } if s.transportGeneration == 0 || transport == nil { return true } return s.transportGeneration == transport.TransportGeneration() } func (s *streamHandle) ID() string { if s == nil { return "" } return s.id } func (s *streamHandle) dataIDSnapshot() uint64 { if s == nil { return 0 } return s.dataID } func (s *streamHandle) nextOutboundDataSeq() uint64 { return s.reserveOutboundDataSeqs(1) } func (s *streamHandle) reserveOutboundDataSeqs(count int) uint64 { if s == nil { return 0 } if count <= 0 { count = 1 } end := s.outboundSeq.Add(uint64(count)) return end - uint64(count) + 1 } func (s *streamHandle) fastPathVersionSnapshot() uint8 { if s == nil { return streamFastPathVersionV1 } return normalizeStreamFastPathVersion(s.fastPathVersion) } func (s *streamHandle) Channel() StreamChannel { if s == nil { return StreamDataChannel } return s.channel } func (s *streamHandle) Metadata() StreamMetadata { if s == nil { return nil } return cloneStreamMetadata(s.metadata) } func (s *streamHandle) Context() context.Context { if s == nil { return context.Background() } return s.ctx } func (s *streamHandle) LogicalConn() *LogicalConn { if s == nil { return nil } return s.logical } func (s *streamHandle) TransportConn() *TransportConn { if s == nil { return nil } return s.transport } func (s *streamHandle) TransportGeneration() uint64 { if s == nil { return 0 } return s.transportGeneration } func (s *streamHandle) LocalAddr() net.Addr { if s == nil { return nil } s.mu.Lock() defer s.mu.Unlock() return s.localAddr } func (s *streamHandle) RemoteAddr() net.Addr { if s == nil { return nil } s.mu.Lock() defer s.mu.Unlock() return s.remoteAddr } func (s *streamHandle) readTimeoutSnapshot() time.Duration { if s == nil { return 0 } s.mu.Lock() defer s.mu.Unlock() return s.readTimeout } func (s *streamHandle) writeTimeoutSnapshot() time.Duration { if s == nil { return 0 } s.mu.Lock() defer s.mu.Unlock() return s.writeTimeout } func (s *streamHandle) Read(p []byte) (int, error) { if len(p) == 0 { return 0, nil } if s == nil { return 0, io.ErrClosedPipe } for { s.mu.Lock() localReadClosed := s.localReadClosed if len(s.readBuf.data) > 0 { n := copy(p, s.readBuf.data) s.readBuf.data = s.readBuf.data[n:] s.bufferedBytes -= n if s.bufferedBytes < 0 { s.bufferedBytes = 0 } if len(s.readBuf.data) == 0 { s.readBuf.clear() } s.recordReadLocked(n, time.Now()) s.mu.Unlock() return n, nil } if len(s.readQueue) > 0 { s.readBuf = s.readQueue[0] s.readQueue[0] = streamReadChunk{} s.readQueue = s.readQueue[1:] s.mu.Unlock() continue } resetErr := s.resetErr remoteClosed := s.remoteClosed deadline := s.effectiveReadDeadlineLocked(time.Now()) ctx := s.ctx notify := s.readNotify deadlineNotify := s.readDeadlineNotify s.mu.Unlock() if localReadClosed { return 0, io.ErrClosedPipe } if resetErr != nil { return 0, resetErr } if remoteClosed { return 0, io.EOF } if err := s.waitReadable(ctx, notify, deadlineNotify, deadline); err != nil { return 0, err } } } func (s *streamHandle) Write(p []byte) (int, error) { if len(p) == 0 { return 0, nil } if s == nil { return 0, io.ErrClosedPipe } s.writeMu.Lock() defer s.writeMu.Unlock() s.mu.Lock() resetErr := s.resetErr localClosed := s.localClosed peerReadClosed := s.peerReadClosed sendDataFn := s.sendDataFn chunkSize := s.chunkSize writeTimeout := s.writeTimeout writeDeadlineOverride := s.writeDeadlineOverride streamCtx := s.ctx runtime := s.runtime s.mu.Unlock() if resetErr != nil { return 0, resetErr } if localClosed || peerReadClosed { return 0, io.ErrClosedPipe } if sendDataFn == nil { return 0, errStreamDataPathNotReady } if chunkSize <= 0 { chunkSize = defaultFileChunkSize } written := 0 for written < len(p) { end := written + chunkSize if end > len(p) { end = len(p) } chunk := p[written:end] if !writeDeadlineOverride && writeTimeout <= 0 { if tryAcquireStreamOutboundBudget(runtime, len(chunk)) { err := sendDataFn(streamCtx, s, chunk) releaseStreamOutboundBudget(runtime, len(chunk)) if err != nil { if written > 0 { s.recordWrite(written, time.Now()) } return written, s.normalizeWriteError(err) } written = end continue } } sendCtx, cancel, deadlineChanged, err := s.newWriteContext(streamCtx, writeTimeout) if err != nil { if written > 0 { s.recordWrite(written, time.Now()) } return written, err } release, err := acquireStreamOutboundBudget(runtime, sendCtx, len(chunk)) if err != nil { cancel() if streamDeadlineChanged(deadlineChanged) { continue } if written > 0 { s.recordWrite(written, time.Now()) } return written, s.normalizeWriteError(err) } err = sendDataFn(sendCtx, s, chunk) release() cancel() if err != nil { if streamDeadlineChanged(deadlineChanged) { continue } if written > 0 { s.recordWrite(written, time.Now()) } return written, s.normalizeWriteError(err) } written = end } if written > 0 { s.recordWrite(written, time.Now()) } return written, nil } func (s *streamHandle) SetDeadline(deadline time.Time) error { if err := s.SetReadDeadline(deadline); err != nil { return err } return s.SetWriteDeadline(deadline) } func (s *streamHandle) SetReadDeadline(deadline time.Time) error { if s == nil { return io.ErrClosedPipe } s.mu.Lock() s.readDeadline = deadline s.readDeadlineOverride = true signalStreamDeadlineChangeLocked(&s.readDeadlineNotify) s.mu.Unlock() return nil } func (s *streamHandle) SetWriteDeadline(deadline time.Time) error { if s == nil { return io.ErrClosedPipe } s.mu.Lock() s.writeDeadline = deadline s.writeDeadlineOverride = true signalStreamDeadlineChangeLocked(&s.writeDeadlineNotify) waitCancel := s.writeWaitCancel if s.writeWaitChanged != nil { close(s.writeWaitChanged) s.writeWaitChanged = nil } s.mu.Unlock() if waitCancel != nil { waitCancel() } return nil } func (s *streamHandle) setAddrSnapshot(local net.Addr, remote net.Addr) { if s == nil { return } s.mu.Lock() defer s.mu.Unlock() if local != nil { s.localAddr = local } if remote != nil { s.remoteAddr = remote } } func (s *streamHandle) setClientSnapshotOwner(client *ClientCommon) { if s == nil { return } s.client = client } func (s *streamHandle) recordReadLocked(n int, now time.Time) { if s == nil || n <= 0 { return } s.bytesRead += int64(n) s.readCalls++ s.lastReadAt = now } func (s *streamHandle) recordWrite(n int, now time.Time) { if s == nil || n <= 0 { return } s.mu.Lock() s.bytesWritten += int64(n) s.writeCalls++ s.lastWriteAt = now s.mu.Unlock() } func (s *streamHandle) effectiveReadDeadlineLocked(now time.Time) time.Time { if s == nil { return time.Time{} } if s.readDeadlineOverride { return s.readDeadline } return streamEffectiveDeadline(now, s.readTimeout, time.Time{}) } func (s *streamHandle) effectiveWriteDeadlineLocked(now time.Time, writeTimeout time.Duration) time.Time { if s == nil { return time.Time{} } if s.writeDeadlineOverride { return s.writeDeadline } return streamEffectiveDeadline(now, writeTimeout, time.Time{}) } func (s *streamHandle) newWriteContext(parent context.Context, writeTimeout time.Duration) (context.Context, func(), <-chan struct{}, error) { if parent == nil { parent = context.Background() } s.mu.Lock() deadline := s.effectiveWriteDeadlineLocked(time.Now(), writeTimeout) s.mu.Unlock() if !deadline.IsZero() && !deadline.After(time.Now()) { return nil, func() {}, nil, os.ErrDeadlineExceeded } baseCtx := parent baseCancel := func() {} if !deadline.IsZero() { baseCtx, baseCancel = context.WithDeadline(parent, deadline) } else { baseCtx, baseCancel = context.WithCancel(parent) } changed := make(chan struct{}) s.mu.Lock() s.writeWaitSeq++ waitSeq := s.writeWaitSeq s.writeWaitCancel = baseCancel s.writeWaitChanged = changed s.mu.Unlock() cancel := func() { baseCancel() s.mu.Lock() if s.writeWaitSeq == waitSeq { s.writeWaitCancel = nil s.writeWaitChanged = nil } s.mu.Unlock() } return baseCtx, cancel, changed, nil } func (s *streamHandle) Close() error { return s.close(true) } func (s *streamHandle) CloseWrite() error { return s.close(false) } func (s *streamHandle) close(full bool) error { if s == nil { return nil } s.writeMu.Lock() defer s.writeMu.Unlock() s.mu.Lock() if s.resetErr != nil { err := s.resetErr s.mu.Unlock() return err } if s.localClosed { if !full || s.localReadClosed { s.mu.Unlock() return nil } closeFn := s.closeFn s.mu.Unlock() if closeFn != nil { if err := closeFn(context.Background(), s, true); err != nil && !errors.Is(err, errStreamNotFound) { return err } } s.mu.Lock() if s.localReadClosed { s.mu.Unlock() return nil } s.localReadClosed = true s.clearBufferedDataLocked() shouldFinalize := s.shouldFinalizeLocked() s.mu.Unlock() s.notifyReadable() if shouldFinalize { s.finalize() } return nil } closeFn := s.closeFn s.mu.Unlock() if closeFn != nil { if err := closeFn(context.Background(), s, full); err != nil && !errors.Is(err, errStreamNotFound) { return err } } s.mu.Lock() if s.localClosed { s.mu.Unlock() return nil } s.localClosed = true if full { s.localReadClosed = true s.clearBufferedDataLocked() } shouldFinalize := s.shouldFinalizeLocked() s.mu.Unlock() if full { s.notifyReadable() } if shouldFinalize { s.finalize() } return nil } func (s *streamHandle) Reset(err error) error { if s == nil { return nil } resetErr := streamResetError(err) s.mu.Lock() if s.resetErr != nil { err := s.resetErr s.mu.Unlock() return err } resetFn := s.resetFn s.mu.Unlock() if resetFn != nil { if sendErr := resetFn(context.Background(), s, streamResetMessage(resetErr)); sendErr != nil { return sendErr } } s.markReset(resetErr) return nil } func (s *streamHandle) markRemoteClosed() { if s == nil { return } s.mu.Lock() s.remoteClosed = true shouldFinalize := s.shouldFinalizeLocked() s.mu.Unlock() s.notifyReadable() if shouldFinalize { s.finalize() } } func (s *streamHandle) markPeerClosed() { if s == nil { return } s.mu.Lock() s.remoteClosed = true s.peerReadClosed = true shouldFinalize := s.shouldFinalizeLocked() s.mu.Unlock() s.notifyReadable() if shouldFinalize { s.finalize() } } func (s *streamHandle) markReset(err error) { if s == nil { return } s.mu.Lock() if s.resetErr == nil { s.resetErr = streamResetError(err) s.clearBufferedDataLocked() } s.mu.Unlock() s.notifyReadable() s.finalize() } func (s *streamHandle) resetErrSnapshot() error { if s == nil { return io.ErrClosedPipe } s.mu.Lock() defer s.mu.Unlock() return s.resetErr } func (s *streamHandle) localClosedSnapshot() bool { if s == nil { return true } s.mu.Lock() defer s.mu.Unlock() return s.localClosed } func (s *streamHandle) remoteClosedSnapshot() bool { if s == nil { return true } s.mu.Lock() defer s.mu.Unlock() return s.remoteClosed } func (s *streamHandle) localReadClosedSnapshot() bool { if s == nil { return true } s.mu.Lock() defer s.mu.Unlock() return s.localReadClosed } func (s *streamHandle) peerReadClosedSnapshot() bool { if s == nil { return true } s.mu.Lock() defer s.mu.Unlock() return s.peerReadClosed } func (s *streamHandle) writeStateErrorSnapshot() error { if s == nil { return io.ErrClosedPipe } s.mu.Lock() defer s.mu.Unlock() if s.resetErr != nil { return s.resetErr } if s.localClosed || s.peerReadClosed { return io.ErrClosedPipe } return nil } func (s *streamHandle) shouldFinalizeLocked() bool { return s.resetErr != nil || s.localReadClosed || (s.peerReadClosed && s.remoteClosed) || (s.localClosed && s.remoteClosed) } func (s *streamHandle) pushChunk(chunk []byte) error { return s.pushChunkWithOwnership(chunk, false) } func (s *streamHandle) pushOwnedChunk(chunk []byte) error { return s.pushChunkWithOwnership(chunk, true) } func (s *streamHandle) pushOwnedChunkWithRelease(chunk []byte, release func()) error { return s.pushChunkWithOwnershipAndRelease(chunk, true, release) } func (s *streamHandle) pushChunkWithOwnership(chunk []byte, owned bool) error { return s.pushChunkWithOwnershipAndRelease(chunk, owned, nil) } func (s *streamHandle) pushChunkWithOwnershipAndRelease(chunk []byte, owned bool, release func()) error { if s == nil { return io.ErrClosedPipe } if len(chunk) == 0 { if release != nil { release() } return nil } stored := streamReadChunk{data: chunk, release: release} if !owned { stored.data = append([]byte(nil), chunk...) if stored.release != nil { stored.release() stored.release = nil } } s.mu.Lock() if s.resetErr != nil { err := s.resetErr s.mu.Unlock() stored.clear() return err } if s.inboundQueueLimit > 0 && s.bufferedChunkCountLocked() >= s.inboundQueueLimit { err := s.markResetLocked(errStreamBackpressureExceeded) s.mu.Unlock() stored.clear() s.notifyReadable() s.finalize() return err } if s.inboundBytesLimit > 0 && s.bufferedBytes+len(stored.data) > s.inboundBytesLimit { err := s.markResetLocked(errStreamBackpressureExceeded) s.mu.Unlock() stored.clear() s.notifyReadable() s.finalize() return err } if len(s.readBuf.data) == 0 && len(s.readQueue) == 0 { s.readBuf = stored } else { s.readQueue = append(s.readQueue, stored) } s.bufferedBytes += len(stored.data) s.notifyReadableLocked() s.mu.Unlock() return nil } func (s *streamHandle) markResetLocked(err error) error { if s == nil { return io.ErrClosedPipe } if s.resetErr == nil { s.resetErr = streamResetError(err) s.clearBufferedDataLocked() } return s.resetErr } func (s *streamHandle) clearBufferedDataLocked() { if s == nil { return } s.readBuf.clear() for i := range s.readQueue { s.readQueue[i].clear() } s.readQueue = nil s.readBuf = streamReadChunk{} s.bufferedBytes = 0 } func (s *streamHandle) bufferedChunkCountLocked() int { if s == nil { return 0 } count := len(s.readQueue) if len(s.readBuf.data) > 0 { count++ } return count } func (s *streamHandle) snapshot() StreamSnapshot { if s == nil { return StreamSnapshot{} } s.mu.Lock() defer s.mu.Unlock() snapshot := StreamSnapshot{ ID: s.id, DataID: s.dataID, Scope: normalizeFileScope(s.runtimeScope), Channel: s.channel, Metadata: cloneStreamMetadata(s.metadata), SessionEpoch: s.sessionEpoch, TransportGeneration: s.transportGeneration, LocalClosed: s.localClosed, LocalReadClosed: s.localReadClosed, RemoteClosed: s.remoteClosed, PeerReadClosed: s.peerReadClosed, BufferedChunks: s.bufferedChunkCountLocked(), BufferedBytes: s.bufferedBytes, ReadTimeout: s.readTimeout, WriteTimeout: s.writeTimeout, BytesRead: s.bytesRead, BytesWritten: s.bytesWritten, ReadCalls: s.readCalls, WriteCalls: s.writeCalls, OpenedAt: s.createdAt, LastReadAt: s.lastReadAt, LastWriteAt: s.lastWriteAt, ReadDeadline: s.readDeadline, WriteDeadline: s.writeDeadline, } if s.localAddr != nil { snapshot.LocalAddress = s.localAddr.String() } if s.remoteAddr != nil { snapshot.RemoteAddress = s.remoteAddr.String() } if s.logical != nil { snapshot.LogicalClientID = s.logical.ID() if addr := s.logical.RemoteAddr(); addr != nil { snapshot.RemoteAddress = addr.String() } } if snapshot.RemoteAddress == "" && s.transport != nil && s.transport.RemoteAddr() != nil { snapshot.RemoteAddress = s.transport.RemoteAddr().String() } if s.resetErr != nil { snapshot.ResetError = s.resetErr.Error() } var diag snapshotBindingDiagnostics switch { case s.logical != nil || s.transport != nil: diag = snapshotBindingDiagnosticsFromLogical(s.logical, s.transport, s.transportGeneration) case s.client != nil: diag = snapshotBindingDiagnosticsFromClient(s.client, s.sessionEpoch) } snapshot.BindingOwner = diag.BindingOwner snapshot.BindingAlive = diag.BindingAlive snapshot.BindingCurrent = diag.BindingCurrent snapshot.BindingReason = diag.BindingReason snapshot.BindingError = diag.BindingError snapshot.BindingBulkAdaptiveSoftPayloadBytes = diag.BindingBulkAdaptiveSoftPayloadBytes snapshot.BindingStreamAdaptiveSoftPayloadBytes = diag.BindingStreamAdaptiveSoftPayloadBytes snapshot.BindingStreamAdaptiveWaitThresholdBytes = diag.BindingStreamAdaptiveWaitThresholdBytes snapshot.BindingStreamAdaptiveFlushDelay = diag.BindingStreamAdaptiveFlushDelay snapshot.TransportAttached = diag.TransportAttached snapshot.TransportHasRuntimeConn = diag.TransportHasRuntimeConn snapshot.TransportCurrent = diag.TransportCurrent snapshot.TransportDetachReason = diag.TransportDetachReason snapshot.TransportDetachKind = diag.TransportDetachKind snapshot.TransportDetachGeneration = diag.TransportDetachGeneration snapshot.TransportDetachError = diag.TransportDetachError snapshot.TransportDetachedAt = diag.TransportDetachedAt snapshot.ReattachEligible = diag.ReattachEligible return snapshot } func streamRuntimeCloseError(err error) error { if err != nil { return err } return errServiceShutdown } func (s *streamHandle) finalize() { if s == nil { return } if s.cancel != nil { s.cancel() } if s.runtime != nil { s.runtime.remove(s.runtimeScope, s.id) } } func (s *streamHandle) waitReadable(ctx context.Context, notify <-chan struct{}, deadlineNotify <-chan struct{}, deadline time.Time) error { if ctx == nil { ctx = context.Background() } if deadline.IsZero() { select { case <-notify: return nil case <-deadlineNotify: return nil case <-ctx.Done(): if resetErr := s.resetErrSnapshot(); resetErr != nil { return resetErr } if s.localReadClosedSnapshot() { return io.ErrClosedPipe } if s.remoteClosedSnapshot() { return nil } return ctx.Err() } } if !deadline.After(time.Now()) { return os.ErrDeadlineExceeded } timer := time.NewTimer(time.Until(deadline)) defer timer.Stop() select { case <-notify: return nil case <-deadlineNotify: return nil case <-ctx.Done(): if resetErr := s.resetErrSnapshot(); resetErr != nil { return resetErr } if s.localReadClosedSnapshot() { return io.ErrClosedPipe } if s.remoteClosedSnapshot() { return nil } return ctx.Err() case <-timer.C: return os.ErrDeadlineExceeded } } func (s *streamHandle) normalizeWriteError(err error) error { if err == nil { return nil } if stateErr := s.writeStateErrorSnapshot(); stateErr != nil { return stateErr } return normalizeStreamDeadlineError(err) } func (s *streamHandle) notifyReadable() { if s == nil { return } s.mu.Lock() defer s.mu.Unlock() s.notifyReadableLocked() } func (s *streamHandle) notifyReadableLocked() { if s == nil || s.readNotify == nil { return } select { case s.readNotify <- struct{}{}: default: } } func normalizeStreamChannel(channel StreamChannel) StreamChannel { switch channel { case "", StreamDataChannel: return StreamDataChannel case StreamControlChannel: return StreamControlChannel case StreamRecordChannel: return StreamRecordChannel default: return channel } } func cloneStreamMetadata(src StreamMetadata) StreamMetadata { if len(src) == 0 { return nil } dst := make(StreamMetadata, len(src)) for key, value := range src { dst[key] = value } return dst } func acquireStreamOutboundBudget(runtime *streamRuntime, ctx context.Context, size int) (func(), error) { if runtime == nil { return func() {}, nil } return runtime.acquireOutbound(ctx, size) } func tryAcquireStreamOutboundBudget(runtime *streamRuntime, size int) bool { if runtime == nil { return true } return runtime.tryAcquireOutbound(size) } func releaseStreamOutboundBudget(runtime *streamRuntime, size int) { if runtime == nil { return } runtime.releaseOutbound(size) } func normalizeStreamOpenRequest(req StreamOpenRequest) StreamOpenRequest { req.Channel = normalizeStreamChannel(req.Channel) req.FastPathVersion = normalizeStreamFastPathVersion(req.FastPathVersion) req.Metadata = cloneStreamMetadata(req.Metadata) return req } func streamResetError(err error) error { if err == nil { return errStreamReset } return err } func streamResetMessage(err error) string { if err == nil { return "" } return err.Error() }