notify/record_snapshot.go
starainrt 7ed3dd5b37
feat: 完善 RecordStream 的协议协商、运行观测与文档说明
- 将 RecordStream 出站路径收敛为单 writer loop
  - 支持在 batch header 中 piggyback AckSeq,保留独立 ack 作为兼容回退
  - 增加 record stream 打开阶段能力协商,支持 mixed-version peer 自动降级
  - 补充 RecordSnapshot 与 diagnostics summary 的 record-plane 观测项
  - 增加 batch/ack/error frame、piggyback ack、barrier 等待拆分与 apply backlog 指标
  - 收紧 TransportConn detach 后的 runtime snapshot 语义
  - 补充 README 中的 RecordStream 语义、兼容行为与诊断快照说明
  - 补充相关单测与 race 回归验证
2026-04-15 19:52:45 +08:00

253 lines
8.4 KiB
Go

package notify
import (
"errors"
"io"
"sort"
"time"
)
type RecordSnapshot struct {
ID string
DataID uint64
Scope string
Metadata StreamMetadata
UseBatchAck bool
BindingOwner string
BindingAlive bool
BindingCurrent bool
BindingReason string
BindingError string
SessionEpoch uint64
LogicalClientID string
LocalAddress string
RemoteAddress string
TransportGeneration uint64
TransportAttached bool
TransportHasRuntimeConn bool
TransportCurrent bool
TransportDetachReason string
TransportDetachKind string
TransportDetachGeneration uint64
TransportDetachError string
TransportDetachedAt time.Time
ReattachEligible bool
LocalClosed bool
LocalReadClosed bool
RemoteClosed bool
PeerReadClosed bool
OutboundClosed bool
NextOutboundSeq uint64
EnqueuedOutboundSeq uint64
FlushedOutboundSeq uint64
AckedOutboundSeq uint64
OutstandingRecords int
OutstandingBytes int
InboundReceivedSeq uint64
InboundAppliedSeq uint64
InboundAckSentSeq uint64
PendingApplyRecords int
PendingAckRecords int
PeakPendingApplyRecords int
BatchFramesSent int64
AckFramesSent int64
ErrorFramesSent int64
BatchFramesReceived int64
AckFramesReceived int64
ErrorFramesReceived int64
PiggybackAckSent int64
PiggybackAckReceived int64
BarrierCount int64
BarrierFlushWaitDuration time.Duration
BarrierApplyWaitDuration time.Duration
OpenedAt time.Time
LastReadAt time.Time
LastWriteAt time.Time
StreamResetError string
ReadError string
TerminalError string
ResetError string
}
type clientRecordSnapshotReader interface {
clientRecordSnapshots() []RecordSnapshot
}
type serverRecordSnapshotReader interface {
serverRecordSnapshots() []RecordSnapshot
}
var (
errClientRecordSnapshotNil = errors.New("client record snapshot target is nil")
errServerRecordSnapshotNil = errors.New("server record snapshot target is nil")
errClientRecordSnapshotUnsupported = errors.New("client record snapshot target type is unsupported")
errServerRecordSnapshotUnsupported = errors.New("server record snapshot target type is unsupported")
)
func GetClientRecordSnapshots(c Client) ([]RecordSnapshot, error) {
if c == nil {
return nil, errClientRecordSnapshotNil
}
reader, ok := any(c).(clientRecordSnapshotReader)
if !ok {
return nil, errClientRecordSnapshotUnsupported
}
return reader.clientRecordSnapshots(), nil
}
func GetServerRecordSnapshots(s Server) ([]RecordSnapshot, error) {
if s == nil {
return nil, errServerRecordSnapshotNil
}
reader, ok := any(s).(serverRecordSnapshotReader)
if !ok {
return nil, errServerRecordSnapshotUnsupported
}
return reader.serverRecordSnapshots(), nil
}
func (c *ClientCommon) clientRecordSnapshots() []RecordSnapshot {
return recordSnapshotsFromRuntime(c.getRecordRuntime())
}
func (s *ServerCommon) serverRecordSnapshots() []RecordSnapshot {
return recordSnapshotsFromRuntime(s.getRecordRuntime())
}
func recordSnapshotsFromRuntime(runtime *recordRuntime) []RecordSnapshot {
if runtime == nil {
return nil
}
return runtime.snapshots()
}
func sortRecordSnapshots(src []RecordSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].Scope != src[j].Scope {
return src[i].Scope < src[j].Scope
}
if src[i].ID != src[j].ID {
return src[i].ID < src[j].ID
}
if src[i].DataID != src[j].DataID {
return src[i].DataID < src[j].DataID
}
return src[i].TransportGeneration < src[j].TransportGeneration
})
}
func (r *recordStream) snapshot() RecordSnapshot {
if r == nil {
return RecordSnapshot{}
}
snapshot := RecordSnapshot{}
if stream, ok := r.stream.(*streamHandle); ok {
snapshot = recordSnapshotFromStreamSnapshot(stream.snapshot())
} else if r.stream != nil {
snapshot.ID = r.stream.ID()
snapshot.Metadata = cloneStreamMetadata(r.stream.Metadata())
snapshot.TransportGeneration = r.stream.TransportGeneration()
if addr := r.stream.LocalAddr(); addr != nil {
snapshot.LocalAddress = addr.String()
}
if addr := r.stream.RemoteAddr(); addr != nil {
snapshot.RemoteAddress = addr.String()
}
if logical := r.stream.LogicalConn(); logical != nil {
snapshot.LogicalClientID = logical.ID()
}
}
snapshot.UseBatchAck = r.useBatchAck
snapshot.BatchFramesSent = r.obs.batchFramesSent.Load()
snapshot.AckFramesSent = r.obs.ackFramesSent.Load()
snapshot.ErrorFramesSent = r.obs.errorFramesSent.Load()
snapshot.BatchFramesReceived = r.obs.batchFramesReceived.Load()
snapshot.AckFramesReceived = r.obs.ackFramesReceived.Load()
snapshot.ErrorFramesReceived = r.obs.errorFramesReceived.Load()
snapshot.PiggybackAckSent = r.obs.piggybackAckSent.Load()
snapshot.PiggybackAckReceived = r.obs.piggybackAckReceived.Load()
snapshot.BarrierCount = r.obs.barrierCount.Load()
snapshot.BarrierFlushWaitDuration = time.Duration(r.obs.barrierFlushWaitNanos.Load())
snapshot.BarrierApplyWaitDuration = time.Duration(r.obs.barrierApplyWaitNanos.Load())
r.mu.Lock()
snapshot.OutboundClosed = r.outboundClosed
snapshot.NextOutboundSeq = r.nextOutboundSeq
snapshot.EnqueuedOutboundSeq = r.enqueuedOutboundSeq
snapshot.FlushedOutboundSeq = r.flushedOutboundSeq
snapshot.AckedOutboundSeq = r.ackedOutboundSeq
snapshot.OutstandingRecords = r.outstandingRecords
snapshot.OutstandingBytes = r.outstandingBytes
snapshot.InboundReceivedSeq = r.inboundReceivedSeq
snapshot.InboundAppliedSeq = r.inboundAppliedSeq
snapshot.InboundAckSentSeq = r.inboundAckSentSeq
snapshot.PendingApplyRecords = recordPendingCount(r.inboundReceivedSeq, r.inboundAppliedSeq)
snapshot.PendingAckRecords = recordPendingCount(r.inboundAppliedSeq, r.inboundAckSentSeq)
snapshot.PeakPendingApplyRecords = r.maxPendingApply
if r.readErr != nil && !errors.Is(r.readErr, io.EOF) {
snapshot.ReadError = r.readErr.Error()
}
if r.terminalErr != nil {
snapshot.TerminalError = r.terminalErr.Error()
}
r.mu.Unlock()
switch {
case snapshot.TerminalError != "":
snapshot.ResetError = snapshot.TerminalError
case snapshot.StreamResetError != "":
snapshot.ResetError = snapshot.StreamResetError
case snapshot.ReadError != "":
snapshot.ResetError = snapshot.ReadError
}
return snapshot
}
func recordSnapshotFromStreamSnapshot(stream StreamSnapshot) RecordSnapshot {
return RecordSnapshot{
ID: stream.ID,
DataID: stream.DataID,
Scope: stream.Scope,
Metadata: cloneStreamMetadata(stream.Metadata),
BindingOwner: stream.BindingOwner,
BindingAlive: stream.BindingAlive,
BindingCurrent: stream.BindingCurrent,
BindingReason: stream.BindingReason,
BindingError: stream.BindingError,
SessionEpoch: stream.SessionEpoch,
LogicalClientID: stream.LogicalClientID,
LocalAddress: stream.LocalAddress,
RemoteAddress: stream.RemoteAddress,
TransportGeneration: stream.TransportGeneration,
TransportAttached: stream.TransportAttached,
TransportHasRuntimeConn: stream.TransportHasRuntimeConn,
TransportCurrent: stream.TransportCurrent,
TransportDetachReason: stream.TransportDetachReason,
TransportDetachKind: stream.TransportDetachKind,
TransportDetachGeneration: stream.TransportDetachGeneration,
TransportDetachError: stream.TransportDetachError,
TransportDetachedAt: stream.TransportDetachedAt,
ReattachEligible: stream.ReattachEligible,
LocalClosed: stream.LocalClosed,
LocalReadClosed: stream.LocalReadClosed,
RemoteClosed: stream.RemoteClosed,
PeerReadClosed: stream.PeerReadClosed,
OpenedAt: stream.OpenedAt,
LastReadAt: stream.LastReadAt,
LastWriteAt: stream.LastWriteAt,
StreamResetError: stream.ResetError,
}
}
func recordPendingCount(high uint64, low uint64) int {
if high <= low {
return 0
}
diff := high - low
maxInt := uint64(^uint(0) >> 1)
if diff > maxInt {
return int(maxInt)
}
return int(diff)
}