notify/diagnostics_snapshot.go
starainrt 09d972c7b7
feat(notify): 重构通信内核并补齐 stream/bulk/record/transfer 能力
- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层
  - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径
  - 完成 transfer/file 传输内核与状态快照、诊断能力
  - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块
  - 增加大规模回归、并发与基准测试覆盖
  - 更新依赖库
2026-04-15 15:24:36 +08:00

388 lines
12 KiB
Go

package notify
import (
"errors"
"sort"
"strings"
"time"
)
type DiagnosticsResetCauseSummary struct {
Total int
TransportDetached int
ServiceShutdown int
Backpressure int
Other int
}
type DiagnosticsTransferTelemetrySummary struct {
SourceReadBytes int64
StreamWriteBytes int64
SinkWriteBytes int64
SourceReadDuration time.Duration
StreamWriteDuration time.Duration
SinkWriteDuration time.Duration
SyncDuration time.Duration
VerifyDuration time.Duration
CommitDuration time.Duration
CommitWaitDuration time.Duration
WorkDuration time.Duration
ObservedDuration time.Duration
SourceReadThroughputBPS float64
StreamWriteThroughputBPS float64
SinkWriteThroughputBPS float64
CommitWaitRatio float64
}
type DiagnosticsSummary struct {
LogicalCount int
CurrentTransportCount int
StreamCount int
ActiveStreamCount int
StaleStreamCount int
ResetStreamCount int
BulkCount int
DedicatedBulkCount int
ActiveBulkCount int
StaleBulkCount int
ResetBulkCount int
TransferCount int
ActiveTransferCount int
PausedTransferCount int
DoneTransferCount int
FailedTransferCount int
AbortedTransferCount int
StreamResetCauses DiagnosticsResetCauseSummary
BulkResetCauses DiagnosticsResetCauseSummary
TransferTelemetry DiagnosticsTransferTelemetrySummary
}
type ClientDiagnosticsSnapshot struct {
Runtime ClientRuntimeSnapshot
Streams []StreamSnapshot
Bulks []BulkSnapshot
Transfers []TransferSnapshot
Summary DiagnosticsSummary
}
type ServerDiagnosticsSnapshot struct {
Runtime ServerRuntimeSnapshot
Logicals []ClientConnRuntimeSnapshot
CurrentTransports []TransportConnRuntimeSnapshot
Streams []StreamSnapshot
Bulks []BulkSnapshot
Transfers []TransferSnapshot
Summary DiagnosticsSummary
}
var (
errClientDiagnosticsSnapshotNil = errors.New("client diagnostics snapshot target is nil")
errServerDiagnosticsSnapshotNil = errors.New("server diagnostics snapshot target is nil")
)
func GetClientDiagnosticsSnapshot(c Client) (ClientDiagnosticsSnapshot, error) {
if c == nil {
return ClientDiagnosticsSnapshot{}, errClientDiagnosticsSnapshotNil
}
runtime, err := GetClientRuntimeSnapshot(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
streams, err := GetClientStreamSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
bulks, err := GetClientBulkSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
transfers, err := GetClientTransferSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
snapshot := ClientDiagnosticsSnapshot{
Runtime: runtime,
Streams: streams,
Bulks: bulks,
Transfers: transfers,
}
snapshot.Summary = summarizeClientDiagnosticsSnapshot(snapshot)
return snapshot, nil
}
func GetServerDiagnosticsSnapshot(s Server) (ServerDiagnosticsSnapshot, error) {
if s == nil {
return ServerDiagnosticsSnapshot{}, errServerDiagnosticsSnapshotNil
}
runtime, err := GetServerRuntimeSnapshot(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
logicals, err := serverLogicalRuntimeSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
transports, err := serverCurrentTransportRuntimeSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
streams, err := GetServerStreamSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
bulks, err := GetServerBulkSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
transfers, err := GetServerTransferSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
snapshot := ServerDiagnosticsSnapshot{
Runtime: runtime,
Logicals: logicals,
CurrentTransports: transports,
Streams: streams,
Bulks: bulks,
Transfers: transfers,
}
snapshot.Summary = summarizeServerDiagnosticsSnapshot(snapshot)
return snapshot, nil
}
func serverLogicalRuntimeSnapshots(s Server) ([]ClientConnRuntimeSnapshot, error) {
if s == nil {
return nil, errServerDiagnosticsSnapshotNil
}
logicals := s.GetLogicalConnList()
out := make([]ClientConnRuntimeSnapshot, 0, len(logicals))
for _, logical := range logicals {
if logical == nil {
continue
}
snapshot, err := GetLogicalConnRuntimeSnapshot(logical)
if err != nil {
return nil, err
}
out = append(out, snapshot)
}
sortClientConnRuntimeSnapshots(out)
return out, nil
}
func serverCurrentTransportRuntimeSnapshots(s Server) ([]TransportConnRuntimeSnapshot, error) {
if s == nil {
return nil, errServerDiagnosticsSnapshotNil
}
transports := s.GetCurrentTransportConnList()
out := make([]TransportConnRuntimeSnapshot, 0, len(transports))
for _, transport := range transports {
if transport == nil {
continue
}
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
if err != nil {
return nil, err
}
out = append(out, snapshot)
}
sortTransportConnRuntimeSnapshots(out)
return out, nil
}
func summarizeClientDiagnosticsSnapshot(snapshot ClientDiagnosticsSnapshot) DiagnosticsSummary {
summary := DiagnosticsSummary{
LogicalCount: diagnosticsLogicalCountFromClientRuntime(snapshot.Runtime),
}
if snapshot.Runtime.TransportAttached {
summary.CurrentTransportCount = 1
}
summarizeStreamSnapshots(&summary, snapshot.Streams)
summarizeBulkSnapshots(&summary, snapshot.Bulks)
summarizeTransferSnapshots(&summary, snapshot.Transfers)
return summary
}
func summarizeServerDiagnosticsSnapshot(snapshot ServerDiagnosticsSnapshot) DiagnosticsSummary {
summary := DiagnosticsSummary{
LogicalCount: len(snapshot.Logicals),
CurrentTransportCount: len(snapshot.CurrentTransports),
}
summarizeStreamSnapshots(&summary, snapshot.Streams)
summarizeBulkSnapshots(&summary, snapshot.Bulks)
summarizeTransferSnapshots(&summary, snapshot.Transfers)
return summary
}
func diagnosticsLogicalCountFromClientRuntime(runtime ClientRuntimeSnapshot) int {
if runtime.Alive || runtime.SessionEpoch != 0 || runtime.TransportAttached || runtime.HasRuntimeConn || runtime.HasRuntimeQueue {
return 1
}
return 0
}
func summarizeStreamSnapshots(summary *DiagnosticsSummary, snapshots []StreamSnapshot) {
if summary == nil {
return
}
summary.StreamCount = len(snapshots)
for _, snapshot := range snapshots {
switch {
case snapshot.ResetError != "":
summary.ResetStreamCount++
accumulateDiagnosticsResetCause(&summary.StreamResetCauses, snapshot.ResetError, errStreamBackpressureExceeded.Error())
case streamSnapshotFinished(snapshot):
case streamSnapshotBoundActive(snapshot):
summary.ActiveStreamCount++
default:
summary.StaleStreamCount++
}
}
}
func summarizeBulkSnapshots(summary *DiagnosticsSummary, snapshots []BulkSnapshot) {
if summary == nil {
return
}
summary.BulkCount = len(snapshots)
for _, snapshot := range snapshots {
if snapshot.Dedicated {
summary.DedicatedBulkCount++
}
switch {
case snapshot.ResetError != "":
summary.ResetBulkCount++
accumulateDiagnosticsResetCause(&summary.BulkResetCauses, snapshot.ResetError, errBulkBackpressureExceeded.Error())
case bulkSnapshotFinished(snapshot):
case bulkSnapshotBoundActive(snapshot):
summary.ActiveBulkCount++
default:
summary.StaleBulkCount++
}
}
}
func summarizeTransferSnapshots(summary *DiagnosticsSummary, snapshots []TransferSnapshot) {
if summary == nil {
return
}
summary.TransferCount = len(snapshots)
for _, snapshot := range snapshots {
switch snapshot.State {
case TransferStateDone:
summary.DoneTransferCount++
case TransferStateFailed:
summary.FailedTransferCount++
case TransferStateAborted:
summary.AbortedTransferCount++
case TransferStatePaused:
summary.PausedTransferCount++
default:
summary.ActiveTransferCount++
}
accumulateDiagnosticsTransferTelemetry(&summary.TransferTelemetry, snapshot)
}
finalizeDiagnosticsTransferTelemetry(&summary.TransferTelemetry)
}
func streamSnapshotFinished(snapshot StreamSnapshot) bool {
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
}
func bulkSnapshotFinished(snapshot BulkSnapshot) bool {
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
}
func streamSnapshotBoundActive(snapshot StreamSnapshot) bool {
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
}
func bulkSnapshotBoundActive(snapshot BulkSnapshot) bool {
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
}
func accumulateDiagnosticsResetCause(summary *DiagnosticsResetCauseSummary, resetError string, backpressureError string) {
if summary == nil || resetError == "" {
return
}
summary.Total++
if diagnosticsResetErrorMatches(resetError, errTransportDetached) {
summary.TransportDetached++
return
}
if diagnosticsResetErrorMatches(resetError, errServiceShutdown) {
summary.ServiceShutdown++
return
}
if resetError == backpressureError || strings.HasPrefix(resetError, backpressureError+":") {
summary.Backpressure++
return
}
summary.Other++
}
func diagnosticsResetErrorMatches(resetError string, target error) bool {
if resetError == "" || target == nil {
return false
}
base := target.Error()
return resetError == base || strings.HasPrefix(resetError, base+":")
}
func accumulateDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary, snapshot TransferSnapshot) {
if summary == nil {
return
}
summary.SourceReadBytes += transferSummarySourceReadBytes(snapshot)
summary.StreamWriteBytes += transferSummaryStreamWriteBytes(snapshot)
summary.SinkWriteBytes += transferSummarySinkWriteBytes(snapshot)
summary.SourceReadDuration += snapshot.SourceReadDuration
summary.StreamWriteDuration += snapshot.StreamWriteDuration
summary.SinkWriteDuration += snapshot.SinkWriteDuration
summary.SyncDuration += snapshot.SyncDuration
summary.VerifyDuration += snapshot.VerifyDuration
summary.CommitDuration += snapshot.CommitDuration
summary.CommitWaitDuration += snapshot.CommitWaitDuration
}
func finalizeDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary) {
if summary == nil {
return
}
summary.WorkDuration = summary.SourceReadDuration + summary.StreamWriteDuration + summary.SinkWriteDuration +
summary.SyncDuration + summary.VerifyDuration + summary.CommitDuration
summary.ObservedDuration = summary.WorkDuration + summary.CommitWaitDuration
summary.SourceReadThroughputBPS = throughputBytesPerSecond(summary.SourceReadBytes, summary.SourceReadDuration)
summary.StreamWriteThroughputBPS = throughputBytesPerSecond(summary.StreamWriteBytes, summary.StreamWriteDuration)
summary.SinkWriteThroughputBPS = throughputBytesPerSecond(summary.SinkWriteBytes, summary.SinkWriteDuration)
summary.CommitWaitRatio = durationRatio(summary.CommitWaitDuration, summary.ObservedDuration)
}
func sortClientConnRuntimeSnapshots(src []ClientConnRuntimeSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].ClientID != src[j].ClientID {
return src[i].ClientID < src[j].ClientID
}
if src[i].TransportGeneration != src[j].TransportGeneration {
return src[i].TransportGeneration < src[j].TransportGeneration
}
return src[i].RemoteAddress < src[j].RemoteAddress
})
}
func sortTransportConnRuntimeSnapshots(src []TransportConnRuntimeSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].ClientID != src[j].ClientID {
return src[i].ClientID < src[j].ClientID
}
if src[i].TransportGeneration != src[j].TransportGeneration {
return src[i].TransportGeneration < src[j].TransportGeneration
}
return src[i].RemoteAddress < src[j].RemoteAddress
})
}