notify/diagnostics_snapshot.go

500 lines
16 KiB
Go
Raw Permalink Normal View History

package notify
import (
"errors"
"sort"
"strings"
"time"
)
type DiagnosticsResetCauseSummary struct {
Total int
TransportDetached int
ServiceShutdown int
Backpressure int
Other int
}
type DiagnosticsTransferTelemetrySummary struct {
SourceReadBytes int64
StreamWriteBytes int64
SinkWriteBytes int64
SourceReadDuration time.Duration
StreamWriteDuration time.Duration
SinkWriteDuration time.Duration
SyncDuration time.Duration
VerifyDuration time.Duration
CommitDuration time.Duration
CommitWaitDuration time.Duration
WorkDuration time.Duration
ObservedDuration time.Duration
SourceReadThroughputBPS float64
StreamWriteThroughputBPS float64
SinkWriteThroughputBPS float64
CommitWaitRatio float64
}
type DiagnosticsRecordTelemetrySummary struct {
BatchFramesSent int64
AckFramesSent int64
ErrorFramesSent int64
BatchFramesReceived int64
AckFramesReceived int64
ErrorFramesReceived int64
FrameSendCount int64
FrameReceiveCount int64
PiggybackAckSent int64
PiggybackAckReceived int64
BarrierCount int64
BarrierFlushWaitDuration time.Duration
BarrierApplyWaitDuration time.Duration
OutstandingRecords int
OutstandingBytes int
PendingApplyRecords int
PendingAckRecords int
PeakPendingApplyRecords int
}
type DiagnosticsSummary struct {
LogicalCount int
CurrentTransportCount int
BulkAttachAttempts int64
BulkAttachRetries int64
BulkAttachSuccess int64
BulkAutoFallbacks int64
StreamCount int
ActiveStreamCount int
StaleStreamCount int
ResetStreamCount int
BulkCount int
DedicatedBulkCount int
ActiveBulkCount int
StaleBulkCount int
ResetBulkCount int
RecordCount int
ActiveRecordCount int
StaleRecordCount int
ResetRecordCount int
TransferCount int
ActiveTransferCount int
PausedTransferCount int
DoneTransferCount int
FailedTransferCount int
AbortedTransferCount int
StreamResetCauses DiagnosticsResetCauseSummary
BulkResetCauses DiagnosticsResetCauseSummary
RecordResetCauses DiagnosticsResetCauseSummary
RecordTelemetry DiagnosticsRecordTelemetrySummary
TransferTelemetry DiagnosticsTransferTelemetrySummary
}
type ClientDiagnosticsSnapshot struct {
Runtime ClientRuntimeSnapshot
Streams []StreamSnapshot
Bulks []BulkSnapshot
Records []RecordSnapshot
Transfers []TransferSnapshot
Summary DiagnosticsSummary
}
type ServerDiagnosticsSnapshot struct {
Runtime ServerRuntimeSnapshot
Logicals []ClientConnRuntimeSnapshot
CurrentTransports []TransportConnRuntimeSnapshot
Streams []StreamSnapshot
Bulks []BulkSnapshot
Records []RecordSnapshot
Transfers []TransferSnapshot
Summary DiagnosticsSummary
}
var (
errClientDiagnosticsSnapshotNil = errors.New("client diagnostics snapshot target is nil")
errServerDiagnosticsSnapshotNil = errors.New("server diagnostics snapshot target is nil")
)
func GetClientDiagnosticsSnapshot(c Client) (ClientDiagnosticsSnapshot, error) {
if c == nil {
return ClientDiagnosticsSnapshot{}, errClientDiagnosticsSnapshotNil
}
runtime, err := GetClientRuntimeSnapshot(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
streams, err := GetClientStreamSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
bulks, err := GetClientBulkSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
records, err := GetClientRecordSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
transfers, err := GetClientTransferSnapshots(c)
if err != nil {
return ClientDiagnosticsSnapshot{}, err
}
snapshot := ClientDiagnosticsSnapshot{
Runtime: runtime,
Streams: streams,
Bulks: bulks,
Records: records,
Transfers: transfers,
}
snapshot.Summary = summarizeClientDiagnosticsSnapshot(snapshot)
return snapshot, nil
}
func GetServerDiagnosticsSnapshot(s Server) (ServerDiagnosticsSnapshot, error) {
if s == nil {
return ServerDiagnosticsSnapshot{}, errServerDiagnosticsSnapshotNil
}
runtime, err := GetServerRuntimeSnapshot(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
logicals, err := serverLogicalRuntimeSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
transports, err := serverCurrentTransportRuntimeSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
streams, err := GetServerStreamSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
bulks, err := GetServerBulkSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
records, err := GetServerRecordSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
transfers, err := GetServerTransferSnapshots(s)
if err != nil {
return ServerDiagnosticsSnapshot{}, err
}
snapshot := ServerDiagnosticsSnapshot{
Runtime: runtime,
Logicals: logicals,
CurrentTransports: transports,
Streams: streams,
Bulks: bulks,
Records: records,
Transfers: transfers,
}
snapshot.Summary = summarizeServerDiagnosticsSnapshot(snapshot)
return snapshot, nil
}
func serverLogicalRuntimeSnapshots(s Server) ([]ClientConnRuntimeSnapshot, error) {
if s == nil {
return nil, errServerDiagnosticsSnapshotNil
}
logicals := s.GetLogicalConnList()
out := make([]ClientConnRuntimeSnapshot, 0, len(logicals))
for _, logical := range logicals {
if logical == nil {
continue
}
snapshot, err := GetLogicalConnRuntimeSnapshot(logical)
if err != nil {
return nil, err
}
out = append(out, snapshot)
}
sortClientConnRuntimeSnapshots(out)
return out, nil
}
func serverCurrentTransportRuntimeSnapshots(s Server) ([]TransportConnRuntimeSnapshot, error) {
if s == nil {
return nil, errServerDiagnosticsSnapshotNil
}
transports := s.GetCurrentTransportConnList()
out := make([]TransportConnRuntimeSnapshot, 0, len(transports))
for _, transport := range transports {
if transport == nil {
continue
}
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
if err != nil {
return nil, err
}
out = append(out, snapshot)
}
sortTransportConnRuntimeSnapshots(out)
return out, nil
}
func summarizeClientDiagnosticsSnapshot(snapshot ClientDiagnosticsSnapshot) DiagnosticsSummary {
summary := DiagnosticsSummary{
LogicalCount: diagnosticsLogicalCountFromClientRuntime(snapshot.Runtime),
BulkAttachAttempts: snapshot.Runtime.BulkAttachAttempts,
BulkAttachRetries: snapshot.Runtime.BulkAttachRetries,
BulkAttachSuccess: snapshot.Runtime.BulkAttachSuccess,
BulkAutoFallbacks: snapshot.Runtime.BulkAutoFallbacks,
}
if snapshot.Runtime.TransportAttached {
summary.CurrentTransportCount = 1
}
summarizeStreamSnapshots(&summary, snapshot.Streams)
summarizeBulkSnapshots(&summary, snapshot.Bulks)
summarizeRecordSnapshots(&summary, snapshot.Records)
summarizeTransferSnapshots(&summary, snapshot.Transfers)
return summary
}
func summarizeServerDiagnosticsSnapshot(snapshot ServerDiagnosticsSnapshot) DiagnosticsSummary {
summary := DiagnosticsSummary{
LogicalCount: len(snapshot.Logicals),
CurrentTransportCount: len(snapshot.CurrentTransports),
}
summarizeStreamSnapshots(&summary, snapshot.Streams)
summarizeBulkSnapshots(&summary, snapshot.Bulks)
summarizeRecordSnapshots(&summary, snapshot.Records)
summarizeTransferSnapshots(&summary, snapshot.Transfers)
return summary
}
func diagnosticsLogicalCountFromClientRuntime(runtime ClientRuntimeSnapshot) int {
if runtime.Alive || runtime.SessionEpoch != 0 || runtime.TransportAttached || runtime.HasRuntimeConn || runtime.HasRuntimeQueue {
return 1
}
return 0
}
func summarizeStreamSnapshots(summary *DiagnosticsSummary, snapshots []StreamSnapshot) {
if summary == nil {
return
}
summary.StreamCount = len(snapshots)
for _, snapshot := range snapshots {
switch {
case snapshot.ResetError != "":
summary.ResetStreamCount++
accumulateDiagnosticsResetCause(&summary.StreamResetCauses, snapshot.ResetError, errStreamBackpressureExceeded.Error())
case streamSnapshotFinished(snapshot):
case streamSnapshotBoundActive(snapshot):
summary.ActiveStreamCount++
default:
summary.StaleStreamCount++
}
}
}
func summarizeBulkSnapshots(summary *DiagnosticsSummary, snapshots []BulkSnapshot) {
if summary == nil {
return
}
summary.BulkCount = len(snapshots)
for _, snapshot := range snapshots {
if snapshot.Dedicated {
summary.DedicatedBulkCount++
}
switch {
case snapshot.ResetError != "":
summary.ResetBulkCount++
accumulateDiagnosticsResetCause(&summary.BulkResetCauses, snapshot.ResetError, errBulkBackpressureExceeded.Error())
case bulkSnapshotFinished(snapshot):
case bulkSnapshotBoundActive(snapshot):
summary.ActiveBulkCount++
default:
summary.StaleBulkCount++
}
}
}
func summarizeRecordSnapshots(summary *DiagnosticsSummary, snapshots []RecordSnapshot) {
if summary == nil {
return
}
summary.RecordCount = len(snapshots)
for _, snapshot := range snapshots {
switch {
case snapshot.ResetError != "":
summary.ResetRecordCount++
accumulateDiagnosticsResetCause(&summary.RecordResetCauses, snapshot.ResetError, "")
case recordSnapshotFinished(snapshot):
case recordSnapshotBoundActive(snapshot):
summary.ActiveRecordCount++
default:
summary.StaleRecordCount++
}
accumulateDiagnosticsRecordTelemetry(&summary.RecordTelemetry, snapshot)
}
finalizeDiagnosticsRecordTelemetry(&summary.RecordTelemetry)
}
func summarizeTransferSnapshots(summary *DiagnosticsSummary, snapshots []TransferSnapshot) {
if summary == nil {
return
}
summary.TransferCount = len(snapshots)
for _, snapshot := range snapshots {
switch snapshot.State {
case TransferStateDone:
summary.DoneTransferCount++
case TransferStateFailed:
summary.FailedTransferCount++
case TransferStateAborted:
summary.AbortedTransferCount++
case TransferStatePaused:
summary.PausedTransferCount++
default:
summary.ActiveTransferCount++
}
accumulateDiagnosticsTransferTelemetry(&summary.TransferTelemetry, snapshot)
}
finalizeDiagnosticsTransferTelemetry(&summary.TransferTelemetry)
}
func streamSnapshotFinished(snapshot StreamSnapshot) bool {
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
}
func bulkSnapshotFinished(snapshot BulkSnapshot) bool {
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
}
func recordSnapshotFinished(snapshot RecordSnapshot) bool {
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
}
func streamSnapshotBoundActive(snapshot StreamSnapshot) bool {
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
}
func bulkSnapshotBoundActive(snapshot BulkSnapshot) bool {
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
}
func recordSnapshotBoundActive(snapshot RecordSnapshot) bool {
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
}
func accumulateDiagnosticsResetCause(summary *DiagnosticsResetCauseSummary, resetError string, backpressureError string) {
if summary == nil || resetError == "" {
return
}
summary.Total++
if diagnosticsResetErrorMatches(resetError, errTransportDetached) {
summary.TransportDetached++
return
}
if diagnosticsResetErrorMatches(resetError, errServiceShutdown) {
summary.ServiceShutdown++
return
}
if resetError == backpressureError || strings.HasPrefix(resetError, backpressureError+":") {
summary.Backpressure++
return
}
summary.Other++
}
func diagnosticsResetErrorMatches(resetError string, target error) bool {
if resetError == "" || target == nil {
return false
}
base := target.Error()
return resetError == base || strings.HasPrefix(resetError, base+":")
}
func accumulateDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary, snapshot TransferSnapshot) {
if summary == nil {
return
}
summary.SourceReadBytes += transferSummarySourceReadBytes(snapshot)
summary.StreamWriteBytes += transferSummaryStreamWriteBytes(snapshot)
summary.SinkWriteBytes += transferSummarySinkWriteBytes(snapshot)
summary.SourceReadDuration += snapshot.SourceReadDuration
summary.StreamWriteDuration += snapshot.StreamWriteDuration
summary.SinkWriteDuration += snapshot.SinkWriteDuration
summary.SyncDuration += snapshot.SyncDuration
summary.VerifyDuration += snapshot.VerifyDuration
summary.CommitDuration += snapshot.CommitDuration
summary.CommitWaitDuration += snapshot.CommitWaitDuration
}
func finalizeDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary) {
if summary == nil {
return
}
summary.WorkDuration = summary.SourceReadDuration + summary.StreamWriteDuration + summary.SinkWriteDuration +
summary.SyncDuration + summary.VerifyDuration + summary.CommitDuration
summary.ObservedDuration = summary.WorkDuration + summary.CommitWaitDuration
summary.SourceReadThroughputBPS = throughputBytesPerSecond(summary.SourceReadBytes, summary.SourceReadDuration)
summary.StreamWriteThroughputBPS = throughputBytesPerSecond(summary.StreamWriteBytes, summary.StreamWriteDuration)
summary.SinkWriteThroughputBPS = throughputBytesPerSecond(summary.SinkWriteBytes, summary.SinkWriteDuration)
summary.CommitWaitRatio = durationRatio(summary.CommitWaitDuration, summary.ObservedDuration)
}
func accumulateDiagnosticsRecordTelemetry(summary *DiagnosticsRecordTelemetrySummary, snapshot RecordSnapshot) {
if summary == nil {
return
}
summary.BatchFramesSent += snapshot.BatchFramesSent
summary.AckFramesSent += snapshot.AckFramesSent
summary.ErrorFramesSent += snapshot.ErrorFramesSent
summary.BatchFramesReceived += snapshot.BatchFramesReceived
summary.AckFramesReceived += snapshot.AckFramesReceived
summary.ErrorFramesReceived += snapshot.ErrorFramesReceived
summary.PiggybackAckSent += snapshot.PiggybackAckSent
summary.PiggybackAckReceived += snapshot.PiggybackAckReceived
summary.BarrierCount += snapshot.BarrierCount
summary.BarrierFlushWaitDuration += snapshot.BarrierFlushWaitDuration
summary.BarrierApplyWaitDuration += snapshot.BarrierApplyWaitDuration
summary.OutstandingRecords += snapshot.OutstandingRecords
summary.OutstandingBytes += snapshot.OutstandingBytes
summary.PendingApplyRecords += snapshot.PendingApplyRecords
summary.PendingAckRecords += snapshot.PendingAckRecords
if snapshot.PeakPendingApplyRecords > summary.PeakPendingApplyRecords {
summary.PeakPendingApplyRecords = snapshot.PeakPendingApplyRecords
}
}
func finalizeDiagnosticsRecordTelemetry(summary *DiagnosticsRecordTelemetrySummary) {
if summary == nil {
return
}
summary.FrameSendCount = summary.BatchFramesSent + summary.AckFramesSent + summary.ErrorFramesSent
summary.FrameReceiveCount = summary.BatchFramesReceived + summary.AckFramesReceived + summary.ErrorFramesReceived
}
func sortClientConnRuntimeSnapshots(src []ClientConnRuntimeSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].ClientID != src[j].ClientID {
return src[i].ClientID < src[j].ClientID
}
if src[i].TransportGeneration != src[j].TransportGeneration {
return src[i].TransportGeneration < src[j].TransportGeneration
}
return src[i].RemoteAddress < src[j].RemoteAddress
})
}
func sortTransportConnRuntimeSnapshots(src []TransportConnRuntimeSnapshot) {
sort.Slice(src, func(i, j int) bool {
if src[i].ClientID != src[j].ClientID {
return src[i].ClientID < src[j].ClientID
}
if src[i].TransportGeneration != src[j].TransportGeneration {
return src[i].TransportGeneration < src[j].TransportGeneration
}
return src[i].RemoteAddress < src[j].RemoteAddress
})
}