notify/stream_snapshot.go
starainrt f038a89771
fix: close stream adaptive gaps and switch notify to stario v0.1.1
- make stream fast path honor adaptive soft payload limits end-to-end
  - split oversized fast-stream payloads into sequential frames before batching
  - use adaptive soft cap when encoding stream batch payloads
  - move timeout-like error detection into production code for adaptive tx
  - tune notify FrameReader read size explicitly to avoid throughput regression
  - drop local stario replace and depend on released b612.me/stario v0.1.1
2026-04-18 16:05:57 +08:00

127 lines
4.2 KiB
Go

package notify
import (
"errors"
"sort"
"time"
)
type StreamSnapshot struct {
ID string
DataID uint64
Scope string
Channel StreamChannel
Metadata StreamMetadata
BindingOwner string
BindingAlive bool
BindingCurrent bool
BindingReason string
BindingError string
BindingBulkAdaptiveSoftPayloadBytes int
BindingStreamAdaptiveSoftPayloadBytes int
BindingStreamAdaptiveWaitThresholdBytes int
BindingStreamAdaptiveFlushDelay time.Duration
SessionEpoch uint64
LogicalClientID string
LocalAddress string
RemoteAddress string
TransportGeneration uint64
TransportAttached bool
TransportHasRuntimeConn bool
TransportCurrent bool
TransportDetachReason string
TransportDetachKind string
TransportDetachGeneration uint64
TransportDetachError string
TransportDetachedAt time.Time
ReattachEligible bool
LocalClosed bool
LocalReadClosed bool
RemoteClosed bool
PeerReadClosed bool
BufferedChunks int
BufferedBytes int
ReadTimeout time.Duration
WriteTimeout time.Duration
BytesRead int64
BytesWritten int64
ReadCalls int64
WriteCalls int64
OpenedAt time.Time
LastReadAt time.Time
LastWriteAt time.Time
ReadDeadline time.Time
WriteDeadline time.Time
ResetError string
}
type clientStreamSnapshotReader interface {
clientStreamSnapshots() []StreamSnapshot
}
type serverStreamSnapshotReader interface {
serverStreamSnapshots() []StreamSnapshot
}
var (
errClientStreamSnapshotNil = errors.New("client stream snapshot target is nil")
errServerStreamSnapshotNil = errors.New("server stream snapshot target is nil")
errClientStreamSnapshotUnsupported = errors.New("client stream snapshot target type is unsupported")
errServerStreamSnapshotUnsupported = errors.New("server stream snapshot target type is unsupported")
)
func GetClientStreamSnapshots(c Client) ([]StreamSnapshot, error) {
if c == nil {
return nil, errClientStreamSnapshotNil
}
reader, ok := any(c).(clientStreamSnapshotReader)
if !ok {
return nil, errClientStreamSnapshotUnsupported
}
return reader.clientStreamSnapshots(), nil
}
func GetServerStreamSnapshots(s Server) ([]StreamSnapshot, error) {
if s == nil {
return nil, errServerStreamSnapshotNil
}
reader, ok := any(s).(serverStreamSnapshotReader)
if !ok {
return nil, errServerStreamSnapshotUnsupported
}
return reader.serverStreamSnapshots(), nil
}
func (c *ClientCommon) clientStreamSnapshots() []StreamSnapshot {
return streamSnapshotsFromRuntime(c.getStreamRuntime())
}
func (s *ServerCommon) serverStreamSnapshots() []StreamSnapshot {
return streamSnapshotsFromRuntime(s.getStreamRuntime())
}
func streamSnapshotsFromRuntime(runtime *streamRuntime) []StreamSnapshot {
if runtime == nil {
return nil
}
return runtime.snapshots()
}
func sortStreamSnapshots(src []StreamSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].Scope != src[j].Scope {
return src[i].Scope < src[j].Scope
}
if src[i].ID != src[j].ID {
return src[i].ID < src[j].ID
}
if src[i].DataID != src[j].DataID {
return src[i].DataID < src[j].DataID
}
if src[i].TransportGeneration != src[j].TransportGeneration {
return src[i].TransportGeneration < src[j].TransportGeneration
}
return src[i].Channel < src[j].Channel
})
}