notify/diagnostics_snapshot.go

388 lines
12 KiB
Go
Raw Permalink Normal View History

package notify
import (
"errors"
"sort"
"strings"
"time"
)
type DiagnosticsResetCauseSummary struct {
Total int
TransportDetached int
ServiceShutdown int
Backpressure int
Other int
}
type DiagnosticsTransferTelemetrySummary struct {
SourceReadBytes int64
StreamWriteBytes int64
SinkWriteBytes int64
SourceReadDuration time.Duration
StreamWriteDuration time.Duration
SinkWriteDuration time.Duration
SyncDuration time.Duration
VerifyDuration time.Duration
CommitDuration time.Duration
CommitWaitDuration time.Duration
WorkDuration time.Duration
ObservedDuration time.Duration
SourceReadThroughputBPS float64
StreamWriteThroughputBPS float64
SinkWriteThroughputBPS float64
CommitWaitRatio float64
}
type DiagnosticsSummary struct {
LogicalCount int
CurrentTransportCount int
StreamCount int
ActiveStreamCount int
StaleStreamCount int
ResetStreamCount int
BulkCount int
DedicatedBulkCount int
ActiveBulkCount int
StaleBulkCount int
ResetBulkCount int
TransferCount int
ActiveTransferCount int
PausedTransferCount int
DoneTransferCount int
FailedTransferCount int
AbortedTransferCount int
StreamResetCauses DiagnosticsResetCauseSummary
BulkResetCauses DiagnosticsResetCauseSummary
TransferTelemetry DiagnosticsTransferTelemetrySummary
}
type ClientDiagnosticsSnapshot struct {
Runtime ClientRuntimeSnapshot
Streams []StreamSnapshot
Bulks []BulkSnapshot
Transfers []TransferSnapshot
Summary DiagnosticsSummary
}
type ServerDiagnosticsSnapshot struct {
Runtime ServerRuntimeSnapshot
Logicals []ClientConnRuntimeSnapshot
CurrentTransports []TransportConnRuntimeSnapshot
Streams []StreamSnapshot
Bulks []BulkSnapshot
Transfers []TransferSnapshot
Summary DiagnosticsSummary
}
var (
errClientDiagnosticsSnapshotNil = errors.New("client diagnostics snapshot target is nil")
errServerDiagnosticsSnapshotNil = errors.New("server diagnostics snapshot target is nil")
)
func GetClientDiagnosticsSnapshot(c Client) (ClientDiagnosticsSnapshot, error) {
if c == nil {
return ClientDiagnosticsSnapshot{}, errClientDiagnosticsSnapshotNil
}
runtime, err := GetClientRuntimeSnapshot(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
streams, err := GetClientStreamSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
bulks, err := GetClientBulkSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
transfers, err := GetClientTransferSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
snapshot := ClientDiagnosticsSnapshot{
Runtime: runtime,
Streams: streams,
Bulks: bulks,
Transfers: transfers,
}
snapshot.Summary = summarizeClientDiagnosticsSnapshot(snapshot)
return snapshot, nil
}
func GetServerDiagnosticsSnapshot(s Server) (ServerDiagnosticsSnapshot, error) {
if s == nil {
return ServerDiagnosticsSnapshot{}, errServerDiagnosticsSnapshotNil
}
runtime, err := GetServerRuntimeSnapshot(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
logicals, err := serverLogicalRuntimeSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
transports, err := serverCurrentTransportRuntimeSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
streams, err := GetServerStreamSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
bulks, err := GetServerBulkSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
transfers, err := GetServerTransferSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
snapshot := ServerDiagnosticsSnapshot{
Runtime: runtime,
Logicals: logicals,
CurrentTransports: transports,
Streams: streams,
Bulks: bulks,
Transfers: transfers,
}
snapshot.Summary = summarizeServerDiagnosticsSnapshot(snapshot)
return snapshot, nil
}
func serverLogicalRuntimeSnapshots(s Server) ([]ClientConnRuntimeSnapshot, error) {
if s == nil {
return nil, errServerDiagnosticsSnapshotNil
}
logicals := s.GetLogicalConnList()
out := make([]ClientConnRuntimeSnapshot, 0, len(logicals))
for _, logical := range logicals {
if logical == nil {
continue
}
snapshot, err := GetLogicalConnRuntimeSnapshot(logical)
if err != nil {
return nil, err
}
out = append(out, snapshot)
}
sortClientConnRuntimeSnapshots(out)
return out, nil
}
func serverCurrentTransportRuntimeSnapshots(s Server) ([]TransportConnRuntimeSnapshot, error) {
if s == nil {
return nil, errServerDiagnosticsSnapshotNil
}
transports := s.GetCurrentTransportConnList()
out := make([]TransportConnRuntimeSnapshot, 0, len(transports))
for _, transport := range transports {
if transport == nil {
continue
}
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
if err != nil {
return nil, err
}
out = append(out, snapshot)
}
sortTransportConnRuntimeSnapshots(out)
return out, nil
}
func summarizeClientDiagnosticsSnapshot(snapshot ClientDiagnosticsSnapshot) DiagnosticsSummary {
summary := DiagnosticsSummary{
LogicalCount: diagnosticsLogicalCountFromClientRuntime(snapshot.Runtime),
}
if snapshot.Runtime.TransportAttached {
summary.CurrentTransportCount = 1
}
summarizeStreamSnapshots(&summary, snapshot.Streams)
summarizeBulkSnapshots(&summary, snapshot.Bulks)
summarizeTransferSnapshots(&summary, snapshot.Transfers)
return summary
}
func summarizeServerDiagnosticsSnapshot(snapshot ServerDiagnosticsSnapshot) DiagnosticsSummary {
summary := DiagnosticsSummary{
LogicalCount: len(snapshot.Logicals),
CurrentTransportCount: len(snapshot.CurrentTransports),
}
summarizeStreamSnapshots(&summary, snapshot.Streams)
summarizeBulkSnapshots(&summary, snapshot.Bulks)
summarizeTransferSnapshots(&summary, snapshot.Transfers)
return summary
}
func diagnosticsLogicalCountFromClientRuntime(runtime ClientRuntimeSnapshot) int {
if runtime.Alive || runtime.SessionEpoch != 0 || runtime.TransportAttached || runtime.HasRuntimeConn || runtime.HasRuntimeQueue {
return 1
}
return 0
}
func summarizeStreamSnapshots(summary *DiagnosticsSummary, snapshots []StreamSnapshot) {
if summary == nil {
return
}
summary.StreamCount = len(snapshots)
for _, snapshot := range snapshots {
switch {
case snapshot.ResetError != "":
summary.ResetStreamCount++
accumulateDiagnosticsResetCause(&summary.StreamResetCauses, snapshot.ResetError, errStreamBackpressureExceeded.Error())
case streamSnapshotFinished(snapshot):
case streamSnapshotBoundActive(snapshot):
summary.ActiveStreamCount++
default:
summary.StaleStreamCount++
}
}
}
func summarizeBulkSnapshots(summary *DiagnosticsSummary, snapshots []BulkSnapshot) {
if summary == nil {
return
}
summary.BulkCount = len(snapshots)
for _, snapshot := range snapshots {
if snapshot.Dedicated {
summary.DedicatedBulkCount++
}
switch {
case snapshot.ResetError != "":
summary.ResetBulkCount++
accumulateDiagnosticsResetCause(&summary.BulkResetCauses, snapshot.ResetError, errBulkBackpressureExceeded.Error())
case bulkSnapshotFinished(snapshot):
case bulkSnapshotBoundActive(snapshot):
summary.ActiveBulkCount++
default:
summary.StaleBulkCount++
}
}
}
func summarizeTransferSnapshots(summary *DiagnosticsSummary, snapshots []TransferSnapshot) {
if summary == nil {
return
}
summary.TransferCount = len(snapshots)
for _, snapshot := range snapshots {
switch snapshot.State {
case TransferStateDone:
summary.DoneTransferCount++
case TransferStateFailed:
summary.FailedTransferCount++
case TransferStateAborted:
summary.AbortedTransferCount++
case TransferStatePaused:
summary.PausedTransferCount++
default:
summary.ActiveTransferCount++
}
accumulateDiagnosticsTransferTelemetry(&summary.TransferTelemetry, snapshot)
}
finalizeDiagnosticsTransferTelemetry(&summary.TransferTelemetry)
}
func streamSnapshotFinished(snapshot StreamSnapshot) bool {
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
}
func bulkSnapshotFinished(snapshot BulkSnapshot) bool {
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
}
func streamSnapshotBoundActive(snapshot StreamSnapshot) bool {
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
}
func bulkSnapshotBoundActive(snapshot BulkSnapshot) bool {
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
}
func accumulateDiagnosticsResetCause(summary *DiagnosticsResetCauseSummary, resetError string, backpressureError string) {
if summary == nil || resetError == "" {
return
}
summary.Total++
if diagnosticsResetErrorMatches(resetError, errTransportDetached) {
summary.TransportDetached++
return
}
if diagnosticsResetErrorMatches(resetError, errServiceShutdown) {
summary.ServiceShutdown++
return
}
if resetError == backpressureError || strings.HasPrefix(resetError, backpressureError+":") {
summary.Backpressure++
return
}
summary.Other++
}
func diagnosticsResetErrorMatches(resetError string, target error) bool {
if resetError == "" || target == nil {
return false
}
base := target.Error()
return resetError == base || strings.HasPrefix(resetError, base+":")
}
func accumulateDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary, snapshot TransferSnapshot) {
if summary == nil {
return
}
summary.SourceReadBytes += transferSummarySourceReadBytes(snapshot)
summary.StreamWriteBytes += transferSummaryStreamWriteBytes(snapshot)
summary.SinkWriteBytes += transferSummarySinkWriteBytes(snapshot)
summary.SourceReadDuration += snapshot.SourceReadDuration
summary.StreamWriteDuration += snapshot.StreamWriteDuration
summary.SinkWriteDuration += snapshot.SinkWriteDuration
summary.SyncDuration += snapshot.SyncDuration
summary.VerifyDuration += snapshot.VerifyDuration
summary.CommitDuration += snapshot.CommitDuration
summary.CommitWaitDuration += snapshot.CommitWaitDuration
}
func finalizeDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary) {
if summary == nil {
return
}
summary.WorkDuration = summary.SourceReadDuration + summary.StreamWriteDuration + summary.SinkWriteDuration +
summary.SyncDuration + summary.VerifyDuration + summary.CommitDuration
summary.ObservedDuration = summary.WorkDuration + summary.CommitWaitDuration
summary.SourceReadThroughputBPS = throughputBytesPerSecond(summary.SourceReadBytes, summary.SourceReadDuration)
summary.StreamWriteThroughputBPS = throughputBytesPerSecond(summary.StreamWriteBytes, summary.StreamWriteDuration)
summary.SinkWriteThroughputBPS = throughputBytesPerSecond(summary.SinkWriteBytes, summary.SinkWriteDuration)
summary.CommitWaitRatio = durationRatio(summary.CommitWaitDuration, summary.ObservedDuration)
}
func sortClientConnRuntimeSnapshots(src []ClientConnRuntimeSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].ClientID != src[j].ClientID {
return src[i].ClientID < src[j].ClientID
}
if src[i].TransportGeneration != src[j].TransportGeneration {
return src[i].TransportGeneration < src[j].TransportGeneration
}
return src[i].RemoteAddress < src[j].RemoteAddress
})
}
func sortTransportConnRuntimeSnapshots(src []TransportConnRuntimeSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].ClientID != src[j].ClientID {
return src[i].ClientID < src[j].ClientID
}
if src[i].TransportGeneration != src[j].TransportGeneration {
return src[i].TransportGeneration < src[j].TransportGeneration
}
return src[i].RemoteAddress < src[j].RemoteAddress
})
}