388 lines
12 KiB
Go
388 lines
12 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"errors"
|
||
|
|
"sort"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
type DiagnosticsResetCauseSummary struct {
|
||
|
|
Total int
|
||
|
|
TransportDetached int
|
||
|
|
ServiceShutdown int
|
||
|
|
Backpressure int
|
||
|
|
Other int
|
||
|
|
}
|
||
|
|
|
||
|
|
type DiagnosticsTransferTelemetrySummary struct {
|
||
|
|
SourceReadBytes int64
|
||
|
|
StreamWriteBytes int64
|
||
|
|
SinkWriteBytes int64
|
||
|
|
SourceReadDuration time.Duration
|
||
|
|
StreamWriteDuration time.Duration
|
||
|
|
SinkWriteDuration time.Duration
|
||
|
|
SyncDuration time.Duration
|
||
|
|
VerifyDuration time.Duration
|
||
|
|
CommitDuration time.Duration
|
||
|
|
CommitWaitDuration time.Duration
|
||
|
|
WorkDuration time.Duration
|
||
|
|
ObservedDuration time.Duration
|
||
|
|
SourceReadThroughputBPS float64
|
||
|
|
StreamWriteThroughputBPS float64
|
||
|
|
SinkWriteThroughputBPS float64
|
||
|
|
CommitWaitRatio float64
|
||
|
|
}
|
||
|
|
|
||
|
|
type DiagnosticsSummary struct {
|
||
|
|
LogicalCount int
|
||
|
|
CurrentTransportCount int
|
||
|
|
|
||
|
|
StreamCount int
|
||
|
|
ActiveStreamCount int
|
||
|
|
StaleStreamCount int
|
||
|
|
ResetStreamCount int
|
||
|
|
|
||
|
|
BulkCount int
|
||
|
|
DedicatedBulkCount int
|
||
|
|
ActiveBulkCount int
|
||
|
|
StaleBulkCount int
|
||
|
|
ResetBulkCount int
|
||
|
|
|
||
|
|
TransferCount int
|
||
|
|
ActiveTransferCount int
|
||
|
|
PausedTransferCount int
|
||
|
|
DoneTransferCount int
|
||
|
|
FailedTransferCount int
|
||
|
|
AbortedTransferCount int
|
||
|
|
|
||
|
|
StreamResetCauses DiagnosticsResetCauseSummary
|
||
|
|
BulkResetCauses DiagnosticsResetCauseSummary
|
||
|
|
TransferTelemetry DiagnosticsTransferTelemetrySummary
|
||
|
|
}
|
||
|
|
|
||
|
|
type ClientDiagnosticsSnapshot struct {
|
||
|
|
Runtime ClientRuntimeSnapshot
|
||
|
|
Streams []StreamSnapshot
|
||
|
|
Bulks []BulkSnapshot
|
||
|
|
Transfers []TransferSnapshot
|
||
|
|
Summary DiagnosticsSummary
|
||
|
|
}
|
||
|
|
|
||
|
|
type ServerDiagnosticsSnapshot struct {
|
||
|
|
Runtime ServerRuntimeSnapshot
|
||
|
|
Logicals []ClientConnRuntimeSnapshot
|
||
|
|
CurrentTransports []TransportConnRuntimeSnapshot
|
||
|
|
Streams []StreamSnapshot
|
||
|
|
Bulks []BulkSnapshot
|
||
|
|
Transfers []TransferSnapshot
|
||
|
|
Summary DiagnosticsSummary
|
||
|
|
}
|
||
|
|
|
||
|
|
var (
|
||
|
|
errClientDiagnosticsSnapshotNil = errors.New("client diagnostics snapshot target is nil")
|
||
|
|
errServerDiagnosticsSnapshotNil = errors.New("server diagnostics snapshot target is nil")
|
||
|
|
)
|
||
|
|
|
||
|
|
func GetClientDiagnosticsSnapshot(c Client) (ClientDiagnosticsSnapshot, error) {
|
||
|
|
if c == nil {
|
||
|
|
return ClientDiagnosticsSnapshot{}, errClientDiagnosticsSnapshotNil
|
||
|
|
}
|
||
|
|
runtime, err := GetClientRuntimeSnapshot(c)
|
||
|
|
if err != nil {
|
||
|
|
return ClientDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
streams, err := GetClientStreamSnapshots(c)
|
||
|
|
if err != nil {
|
||
|
|
return ClientDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
bulks, err := GetClientBulkSnapshots(c)
|
||
|
|
if err != nil {
|
||
|
|
return ClientDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
transfers, err := GetClientTransferSnapshots(c)
|
||
|
|
if err != nil {
|
||
|
|
return ClientDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
snapshot := ClientDiagnosticsSnapshot{
|
||
|
|
Runtime: runtime,
|
||
|
|
Streams: streams,
|
||
|
|
Bulks: bulks,
|
||
|
|
Transfers: transfers,
|
||
|
|
}
|
||
|
|
snapshot.Summary = summarizeClientDiagnosticsSnapshot(snapshot)
|
||
|
|
return snapshot, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func GetServerDiagnosticsSnapshot(s Server) (ServerDiagnosticsSnapshot, error) {
|
||
|
|
if s == nil {
|
||
|
|
return ServerDiagnosticsSnapshot{}, errServerDiagnosticsSnapshotNil
|
||
|
|
}
|
||
|
|
runtime, err := GetServerRuntimeSnapshot(s)
|
||
|
|
if err != nil {
|
||
|
|
return ServerDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
logicals, err := serverLogicalRuntimeSnapshots(s)
|
||
|
|
if err != nil {
|
||
|
|
return ServerDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
transports, err := serverCurrentTransportRuntimeSnapshots(s)
|
||
|
|
if err != nil {
|
||
|
|
return ServerDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
streams, err := GetServerStreamSnapshots(s)
|
||
|
|
if err != nil {
|
||
|
|
return ServerDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
bulks, err := GetServerBulkSnapshots(s)
|
||
|
|
if err != nil {
|
||
|
|
return ServerDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
transfers, err := GetServerTransferSnapshots(s)
|
||
|
|
if err != nil {
|
||
|
|
return ServerDiagnosticsSnapshot{}, err
|
||
|
|
}
|
||
|
|
snapshot := ServerDiagnosticsSnapshot{
|
||
|
|
Runtime: runtime,
|
||
|
|
Logicals: logicals,
|
||
|
|
CurrentTransports: transports,
|
||
|
|
Streams: streams,
|
||
|
|
Bulks: bulks,
|
||
|
|
Transfers: transfers,
|
||
|
|
}
|
||
|
|
snapshot.Summary = summarizeServerDiagnosticsSnapshot(snapshot)
|
||
|
|
return snapshot, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func serverLogicalRuntimeSnapshots(s Server) ([]ClientConnRuntimeSnapshot, error) {
|
||
|
|
if s == nil {
|
||
|
|
return nil, errServerDiagnosticsSnapshotNil
|
||
|
|
}
|
||
|
|
logicals := s.GetLogicalConnList()
|
||
|
|
out := make([]ClientConnRuntimeSnapshot, 0, len(logicals))
|
||
|
|
for _, logical := range logicals {
|
||
|
|
if logical == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
snapshot, err := GetLogicalConnRuntimeSnapshot(logical)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
out = append(out, snapshot)
|
||
|
|
}
|
||
|
|
sortClientConnRuntimeSnapshots(out)
|
||
|
|
return out, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func serverCurrentTransportRuntimeSnapshots(s Server) ([]TransportConnRuntimeSnapshot, error) {
|
||
|
|
if s == nil {
|
||
|
|
return nil, errServerDiagnosticsSnapshotNil
|
||
|
|
}
|
||
|
|
transports := s.GetCurrentTransportConnList()
|
||
|
|
out := make([]TransportConnRuntimeSnapshot, 0, len(transports))
|
||
|
|
for _, transport := range transports {
|
||
|
|
if transport == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
out = append(out, snapshot)
|
||
|
|
}
|
||
|
|
sortTransportConnRuntimeSnapshots(out)
|
||
|
|
return out, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func summarizeClientDiagnosticsSnapshot(snapshot ClientDiagnosticsSnapshot) DiagnosticsSummary {
|
||
|
|
summary := DiagnosticsSummary{
|
||
|
|
LogicalCount: diagnosticsLogicalCountFromClientRuntime(snapshot.Runtime),
|
||
|
|
}
|
||
|
|
if snapshot.Runtime.TransportAttached {
|
||
|
|
summary.CurrentTransportCount = 1
|
||
|
|
}
|
||
|
|
summarizeStreamSnapshots(&summary, snapshot.Streams)
|
||
|
|
summarizeBulkSnapshots(&summary, snapshot.Bulks)
|
||
|
|
summarizeTransferSnapshots(&summary, snapshot.Transfers)
|
||
|
|
return summary
|
||
|
|
}
|
||
|
|
|
||
|
|
func summarizeServerDiagnosticsSnapshot(snapshot ServerDiagnosticsSnapshot) DiagnosticsSummary {
|
||
|
|
summary := DiagnosticsSummary{
|
||
|
|
LogicalCount: len(snapshot.Logicals),
|
||
|
|
CurrentTransportCount: len(snapshot.CurrentTransports),
|
||
|
|
}
|
||
|
|
summarizeStreamSnapshots(&summary, snapshot.Streams)
|
||
|
|
summarizeBulkSnapshots(&summary, snapshot.Bulks)
|
||
|
|
summarizeTransferSnapshots(&summary, snapshot.Transfers)
|
||
|
|
return summary
|
||
|
|
}
|
||
|
|
|
||
|
|
func diagnosticsLogicalCountFromClientRuntime(runtime ClientRuntimeSnapshot) int {
|
||
|
|
if runtime.Alive || runtime.SessionEpoch != 0 || runtime.TransportAttached || runtime.HasRuntimeConn || runtime.HasRuntimeQueue {
|
||
|
|
return 1
|
||
|
|
}
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
|
||
|
|
func summarizeStreamSnapshots(summary *DiagnosticsSummary, snapshots []StreamSnapshot) {
|
||
|
|
if summary == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
summary.StreamCount = len(snapshots)
|
||
|
|
for _, snapshot := range snapshots {
|
||
|
|
switch {
|
||
|
|
case snapshot.ResetError != "":
|
||
|
|
summary.ResetStreamCount++
|
||
|
|
accumulateDiagnosticsResetCause(&summary.StreamResetCauses, snapshot.ResetError, errStreamBackpressureExceeded.Error())
|
||
|
|
case streamSnapshotFinished(snapshot):
|
||
|
|
case streamSnapshotBoundActive(snapshot):
|
||
|
|
summary.ActiveStreamCount++
|
||
|
|
default:
|
||
|
|
summary.StaleStreamCount++
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func summarizeBulkSnapshots(summary *DiagnosticsSummary, snapshots []BulkSnapshot) {
|
||
|
|
if summary == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
summary.BulkCount = len(snapshots)
|
||
|
|
for _, snapshot := range snapshots {
|
||
|
|
if snapshot.Dedicated {
|
||
|
|
summary.DedicatedBulkCount++
|
||
|
|
}
|
||
|
|
switch {
|
||
|
|
case snapshot.ResetError != "":
|
||
|
|
summary.ResetBulkCount++
|
||
|
|
accumulateDiagnosticsResetCause(&summary.BulkResetCauses, snapshot.ResetError, errBulkBackpressureExceeded.Error())
|
||
|
|
case bulkSnapshotFinished(snapshot):
|
||
|
|
case bulkSnapshotBoundActive(snapshot):
|
||
|
|
summary.ActiveBulkCount++
|
||
|
|
default:
|
||
|
|
summary.StaleBulkCount++
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func summarizeTransferSnapshots(summary *DiagnosticsSummary, snapshots []TransferSnapshot) {
|
||
|
|
if summary == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
summary.TransferCount = len(snapshots)
|
||
|
|
for _, snapshot := range snapshots {
|
||
|
|
switch snapshot.State {
|
||
|
|
case TransferStateDone:
|
||
|
|
summary.DoneTransferCount++
|
||
|
|
case TransferStateFailed:
|
||
|
|
summary.FailedTransferCount++
|
||
|
|
case TransferStateAborted:
|
||
|
|
summary.AbortedTransferCount++
|
||
|
|
case TransferStatePaused:
|
||
|
|
summary.PausedTransferCount++
|
||
|
|
default:
|
||
|
|
summary.ActiveTransferCount++
|
||
|
|
}
|
||
|
|
accumulateDiagnosticsTransferTelemetry(&summary.TransferTelemetry, snapshot)
|
||
|
|
}
|
||
|
|
finalizeDiagnosticsTransferTelemetry(&summary.TransferTelemetry)
|
||
|
|
}
|
||
|
|
|
||
|
|
func streamSnapshotFinished(snapshot StreamSnapshot) bool {
|
||
|
|
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
|
||
|
|
}
|
||
|
|
|
||
|
|
func bulkSnapshotFinished(snapshot BulkSnapshot) bool {
|
||
|
|
return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed
|
||
|
|
}
|
||
|
|
|
||
|
|
func streamSnapshotBoundActive(snapshot StreamSnapshot) bool {
|
||
|
|
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
|
||
|
|
}
|
||
|
|
|
||
|
|
func bulkSnapshotBoundActive(snapshot BulkSnapshot) bool {
|
||
|
|
return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent
|
||
|
|
}
|
||
|
|
|
||
|
|
func accumulateDiagnosticsResetCause(summary *DiagnosticsResetCauseSummary, resetError string, backpressureError string) {
|
||
|
|
if summary == nil || resetError == "" {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
summary.Total++
|
||
|
|
if diagnosticsResetErrorMatches(resetError, errTransportDetached) {
|
||
|
|
summary.TransportDetached++
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if diagnosticsResetErrorMatches(resetError, errServiceShutdown) {
|
||
|
|
summary.ServiceShutdown++
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if resetError == backpressureError || strings.HasPrefix(resetError, backpressureError+":") {
|
||
|
|
summary.Backpressure++
|
||
|
|
return
|
||
|
|
}
|
||
|
|
summary.Other++
|
||
|
|
}
|
||
|
|
|
||
|
|
func diagnosticsResetErrorMatches(resetError string, target error) bool {
|
||
|
|
if resetError == "" || target == nil {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
base := target.Error()
|
||
|
|
return resetError == base || strings.HasPrefix(resetError, base+":")
|
||
|
|
}
|
||
|
|
|
||
|
|
func accumulateDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary, snapshot TransferSnapshot) {
|
||
|
|
if summary == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
summary.SourceReadBytes += transferSummarySourceReadBytes(snapshot)
|
||
|
|
summary.StreamWriteBytes += transferSummaryStreamWriteBytes(snapshot)
|
||
|
|
summary.SinkWriteBytes += transferSummarySinkWriteBytes(snapshot)
|
||
|
|
summary.SourceReadDuration += snapshot.SourceReadDuration
|
||
|
|
summary.StreamWriteDuration += snapshot.StreamWriteDuration
|
||
|
|
summary.SinkWriteDuration += snapshot.SinkWriteDuration
|
||
|
|
summary.SyncDuration += snapshot.SyncDuration
|
||
|
|
summary.VerifyDuration += snapshot.VerifyDuration
|
||
|
|
summary.CommitDuration += snapshot.CommitDuration
|
||
|
|
summary.CommitWaitDuration += snapshot.CommitWaitDuration
|
||
|
|
}
|
||
|
|
|
||
|
|
func finalizeDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetrySummary) {
|
||
|
|
if summary == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
summary.WorkDuration = summary.SourceReadDuration + summary.StreamWriteDuration + summary.SinkWriteDuration +
|
||
|
|
summary.SyncDuration + summary.VerifyDuration + summary.CommitDuration
|
||
|
|
summary.ObservedDuration = summary.WorkDuration + summary.CommitWaitDuration
|
||
|
|
summary.SourceReadThroughputBPS = throughputBytesPerSecond(summary.SourceReadBytes, summary.SourceReadDuration)
|
||
|
|
summary.StreamWriteThroughputBPS = throughputBytesPerSecond(summary.StreamWriteBytes, summary.StreamWriteDuration)
|
||
|
|
summary.SinkWriteThroughputBPS = throughputBytesPerSecond(summary.SinkWriteBytes, summary.SinkWriteDuration)
|
||
|
|
summary.CommitWaitRatio = durationRatio(summary.CommitWaitDuration, summary.ObservedDuration)
|
||
|
|
}
|
||
|
|
|
||
|
|
func sortClientConnRuntimeSnapshots(src []ClientConnRuntimeSnapshot) {
|
||
|
|
sort.Slice(src, func(i, j int) bool {
|
||
|
|
if src[i].ClientID != src[j].ClientID {
|
||
|
|
return src[i].ClientID < src[j].ClientID
|
||
|
|
}
|
||
|
|
if src[i].TransportGeneration != src[j].TransportGeneration {
|
||
|
|
return src[i].TransportGeneration < src[j].TransportGeneration
|
||
|
|
}
|
||
|
|
return src[i].RemoteAddress < src[j].RemoteAddress
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func sortTransportConnRuntimeSnapshots(src []TransportConnRuntimeSnapshot) {
|
||
|
|
sort.Slice(src, func(i, j int) bool {
|
||
|
|
if src[i].ClientID != src[j].ClientID {
|
||
|
|
return src[i].ClientID < src[j].ClientID
|
||
|
|
}
|
||
|
|
if src[i].TransportGeneration != src[j].TransportGeneration {
|
||
|
|
return src[i].TransportGeneration < src[j].TransportGeneration
|
||
|
|
}
|
||
|
|
return src[i].RemoteAddress < src[j].RemoteAddress
|
||
|
|
})
|
||
|
|
}
|