package notify import ( "errors" "sort" "strings" "time" ) type DiagnosticsResetCauseSummary struct { Total int TransportDetached int ServiceShutdown int Backpressure int Other int } type DiagnosticsTransferTelemetrySummary struct { SourceReadBytes int64 StreamWriteBytes int64 SinkWriteBytes int64 SourceReadDuration time.Duration StreamWriteDuration time.Duration SinkWriteDuration time.Duration SyncDuration time.Duration VerifyDuration time.Duration CommitDuration time.Duration CommitWaitDuration time.Duration WorkDuration time.Duration ObservedDuration time.Duration SourceReadThroughputBPS float64 StreamWriteThroughputBPS float64 SinkWriteThroughputBPS float64 CommitWaitRatio float64 } type DiagnosticsSummary struct { LogicalCount int CurrentTransportCount int StreamCount int ActiveStreamCount int StaleStreamCount int ResetStreamCount int BulkCount int DedicatedBulkCount int ActiveBulkCount int StaleBulkCount int ResetBulkCount int TransferCount int ActiveTransferCount int PausedTransferCount int DoneTransferCount int FailedTransferCount int AbortedTransferCount int StreamResetCauses DiagnosticsResetCauseSummary BulkResetCauses DiagnosticsResetCauseSummary TransferTelemetry DiagnosticsTransferTelemetrySummary } type ClientDiagnosticsSnapshot struct { Runtime ClientRuntimeSnapshot Streams []StreamSnapshot Bulks []BulkSnapshot Transfers []TransferSnapshot Summary DiagnosticsSummary } type ServerDiagnosticsSnapshot struct { Runtime ServerRuntimeSnapshot Logicals []ClientConnRuntimeSnapshot CurrentTransports []TransportConnRuntimeSnapshot Streams []StreamSnapshot Bulks []BulkSnapshot Transfers []TransferSnapshot Summary DiagnosticsSummary } var ( errClientDiagnosticsSnapshotNil = errors.New("client diagnostics snapshot target is nil") errServerDiagnosticsSnapshotNil = errors.New("server diagnostics snapshot target is nil") ) func GetClientDiagnosticsSnapshot(c Client) (ClientDiagnosticsSnapshot, error) { if c == nil { return ClientDiagnosticsSnapshot{}, errClientDiagnosticsSnapshotNil } runtime, err := GetClientRuntimeSnapshot(c) if err != nil { return ClientDiagnosticsSnapshot{}, err } streams, err := GetClientStreamSnapshots(c) if err != nil { return ClientDiagnosticsSnapshot{}, err } bulks, err := GetClientBulkSnapshots(c) if err != nil { return ClientDiagnosticsSnapshot{}, err } transfers, err := GetClientTransferSnapshots(c) if err != nil { return ClientDiagnosticsSnapshot{}, err } snapshot := ClientDiagnosticsSnapshot{ Runtime: runtime, Streams: streams, Bulks: bulks, Transfers: transfers, } snapshot.Summary = summarizeClientDiagnosticsSnapshot(snapshot) return snapshot, nil } func GetServerDiagnosticsSnapshot(s Server) (ServerDiagnosticsSnapshot, error) { if s == nil { return ServerDiagnosticsSnapshot{}, errServerDiagnosticsSnapshotNil } runtime, err := GetServerRuntimeSnapshot(s) if err != nil { return ServerDiagnosticsSnapshot{}, err } logicals, err := serverLogicalRuntimeSnapshots(s) if err != nil { return ServerDiagnosticsSnapshot{}, err } transports, err := serverCurrentTransportRuntimeSnapshots(s) if err != nil { return ServerDiagnosticsSnapshot{}, err } streams, err := GetServerStreamSnapshots(s) if err != nil { return ServerDiagnosticsSnapshot{}, err } bulks, err := GetServerBulkSnapshots(s) if err != nil { return ServerDiagnosticsSnapshot{}, err } transfers, err := GetServerTransferSnapshots(s) if err != nil { return ServerDiagnosticsSnapshot{}, err } snapshot := ServerDiagnosticsSnapshot{ Runtime: runtime, Logicals: logicals, CurrentTransports: transports, Streams: streams, Bulks: bulks, Transfers: transfers, } snapshot.Summary = summarizeServerDiagnosticsSnapshot(snapshot) return snapshot, nil } func serverLogicalRuntimeSnapshots(s Server) ([]ClientConnRuntimeSnapshot, error) { if s == nil { return nil, errServerDiagnosticsSnapshotNil } logicals := s.GetLogicalConnList() out := make([]ClientConnRuntimeSnapshot, 0, len(logicals)) for _, logical := range logicals { if logical == nil { continue } snapshot, err := GetLogicalConnRuntimeSnapshot(logical) if err != nil { return nil, err } out = append(out, snapshot) } sortClientConnRuntimeSnapshots(out) return out, nil } func serverCurrentTransportRuntimeSnapshots(s Server) ([]TransportConnRuntimeSnapshot, error) { if s == nil { return nil, errServerDiagnosticsSnapshotNil } transports := s.GetCurrentTransportConnList() out := make([]TransportConnRuntimeSnapshot, 0, len(transports)) for _, transport := range transports { if transport == nil { continue } snapshot, err := GetTransportConnRuntimeSnapshot(transport) if err != nil { return nil, err } out = append(out, snapshot) } sortTransportConnRuntimeSnapshots(out) return out, nil } func summarizeClientDiagnosticsSnapshot(snapshot ClientDiagnosticsSnapshot) DiagnosticsSummary { summary := DiagnosticsSummary{ LogicalCount: diagnosticsLogicalCountFromClientRuntime(snapshot.Runtime), } if snapshot.Runtime.TransportAttached { summary.CurrentTransportCount = 1 } summarizeStreamSnapshots(&summary, snapshot.Streams) summarizeBulkSnapshots(&summary, snapshot.Bulks) summarizeTransferSnapshots(&summary, snapshot.Transfers) return summary } func summarizeServerDiagnosticsSnapshot(snapshot ServerDiagnosticsSnapshot) DiagnosticsSummary { summary := DiagnosticsSummary{ LogicalCount: len(snapshot.Logicals), CurrentTransportCount: len(snapshot.CurrentTransports), } summarizeStreamSnapshots(&summary, snapshot.Streams) summarizeBulkSnapshots(&summary, snapshot.Bulks) summarizeTransferSnapshots(&summary, snapshot.Transfers) return summary } func diagnosticsLogicalCountFromClientRuntime(runtime ClientRuntimeSnapshot) int { if runtime.Alive || runtime.SessionEpoch != 0 || runtime.TransportAttached || runtime.HasRuntimeConn || runtime.HasRuntimeQueue { return 1 } return 0 } func summarizeStreamSnapshots(summary *DiagnosticsSummary, snapshots []StreamSnapshot) { if summary == nil { return } summary.StreamCount = len(snapshots) for _, snapshot := range snapshots { switch { case snapshot.ResetError != "": summary.ResetStreamCount++ accumulateDiagnosticsResetCause(&summary.StreamResetCauses, snapshot.ResetError, errStreamBackpressureExceeded.Error()) case streamSnapshotFinished(snapshot): case streamSnapshotBoundActive(snapshot): summary.ActiveStreamCount++ default: summary.StaleStreamCount++ } } } func summarizeBulkSnapshots(summary *DiagnosticsSummary, snapshots []BulkSnapshot) { if summary == nil { return } summary.BulkCount = len(snapshots) for _, snapshot := range snapshots { if snapshot.Dedicated { summary.DedicatedBulkCount++ } switch { case snapshot.ResetError != "": summary.ResetBulkCount++ accumulateDiagnosticsResetCause(&summary.BulkResetCauses, snapshot.ResetError, errBulkBackpressureExceeded.Error()) case bulkSnapshotFinished(snapshot): case bulkSnapshotBoundActive(snapshot): summary.ActiveBulkCount++ default: summary.StaleBulkCount++ } } } func summarizeTransferSnapshots(summary *DiagnosticsSummary, snapshots []TransferSnapshot) { if summary == nil { return } summary.TransferCount = len(snapshots) for _, snapshot := range snapshots { switch snapshot.State { case TransferStateDone: summary.DoneTransferCount++ case TransferStateFailed: summary.FailedTransferCount++ case TransferStateAborted: summary.AbortedTransferCount++ case TransferStatePaused: summary.PausedTransferCount++ default: summary.ActiveTransferCount++ } accumulateDiagnosticsTransferTelemetry(&summary.TransferTelemetry, snapshot) } finalizeDiagnosticsTransferTelemetry(&summary.TransferTelemetry) } func streamSnapshotFinished(snapshot StreamSnapshot) bool { return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed } func bulkSnapshotFinished(snapshot BulkSnapshot) bool { return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed } func streamSnapshotBoundActive(snapshot StreamSnapshot) bool { return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent } func bulkSnapshotBoundActive(snapshot BulkSnapshot) bool { return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent } func accumulateDiagnosticsResetCause(summary *DiagnosticsResetCauseSummary, resetError string, backpressureError string) { if summary == nil || resetError == "" { return } summary.Total++ if diagnosticsResetErrorMatches(resetError, errTransportDetached) { summary.TransportDetached++ return } if diagnosticsResetErrorMatches(resetError, errServiceShutdown) { summary.ServiceShutdown++ return } if resetError == backpressureError || strings.HasPrefix(resetError, backpressureError+":") { summary.Backpressure++ return } summary.Other++ } func diagnosticsResetErrorMatches(resetError string, target error) bool { if resetError == "" || target == nil { return false } base := target.Error() return resetError == base || strings.HasPrefix(resetError, base+":") } func accumulateDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary, snapshot TransferSnapshot) { if summary == nil { return } summary.SourceReadBytes += transferSummarySourceReadBytes(snapshot) summary.StreamWriteBytes += transferSummaryStreamWriteBytes(snapshot) summary.SinkWriteBytes += transferSummarySinkWriteBytes(snapshot) summary.SourceReadDuration += snapshot.SourceReadDuration summary.StreamWriteDuration += snapshot.StreamWriteDuration summary.SinkWriteDuration += snapshot.SinkWriteDuration summary.SyncDuration += snapshot.SyncDuration summary.VerifyDuration += snapshot.VerifyDuration summary.CommitDuration += snapshot.CommitDuration summary.CommitWaitDuration += snapshot.CommitWaitDuration } func finalizeDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary) { if summary == nil { return } summary.WorkDuration = summary.SourceReadDuration + summary.StreamWriteDuration + summary.SinkWriteDuration + summary.SyncDuration + summary.VerifyDuration + summary.CommitDuration summary.ObservedDuration = summary.WorkDuration + summary.CommitWaitDuration summary.SourceReadThroughputBPS = throughputBytesPerSecond(summary.SourceReadBytes, summary.SourceReadDuration) summary.StreamWriteThroughputBPS = throughputBytesPerSecond(summary.StreamWriteBytes, summary.StreamWriteDuration) summary.SinkWriteThroughputBPS = throughputBytesPerSecond(summary.SinkWriteBytes, summary.SinkWriteDuration) summary.CommitWaitRatio = durationRatio(summary.CommitWaitDuration, summary.ObservedDuration) } func sortClientConnRuntimeSnapshots(src []ClientConnRuntimeSnapshot) { sort.Slice(src, func(i, j int) bool { if src[i].ClientID != src[j].ClientID { return src[i].ClientID < src[j].ClientID } if src[i].TransportGeneration != src[j].TransportGeneration { return src[i].TransportGeneration < src[j].TransportGeneration } return src[i].RemoteAddress < src[j].RemoteAddress }) } func sortTransportConnRuntimeSnapshots(src []TransportConnRuntimeSnapshot) { sort.Slice(src, func(i, j int) bool { if src[i].ClientID != src[j].ClientID { return src[i].ClientID < src[j].ClientID } if src[i].TransportGeneration != src[j].TransportGeneration { return src[i].TransportGeneration < src[j].TransportGeneration } return src[i].RemoteAddress < src[j].RemoteAddress }) }