notify/diagnostics_snapshot_test.go
starainrt 7ed3dd5b37
feat: 完善 RecordStream 的协议协商、运行观测与文档说明
- 将 RecordStream 出站路径收敛为单 writer loop
  - 支持在 batch header 中 piggyback AckSeq,保留独立 ack 作为兼容回退
  - 增加 record stream 打开阶段能力协商,支持 mixed-version peer 自动降级
  - 补充 RecordSnapshot 与 diagnostics summary 的 record-plane 观测项
  - 增加 batch/ack/error frame、piggyback ack、barrier 等待拆分与 apply backlog 指标
  - 收紧 TransportConn detach 后的 runtime snapshot 语义
  - 补充 README 中的 RecordStream 语义、兼容行为与诊断快照说明
  - 补充相关单测与 race 回归验证
2026-04-15 19:52:45 +08:00

670 lines
24 KiB
Go

package notify
import (
"context"
"errors"
"math"
"net"
"testing"
"time"
itransfer "b612.me/notify/internal/transfer"
)
func TestGetClientDiagnosticsSnapshotDefaults(t *testing.T) {
client := NewClient()
snapshot, err := GetClientDiagnosticsSnapshot(client)
if err != nil {
t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err)
}
if got, want := snapshot.Runtime.OwnerState, "idle"; got != want {
t.Fatalf("Runtime.OwnerState = %q, want %q", got, want)
}
if len(snapshot.Streams) != 0 || len(snapshot.Bulks) != 0 || len(snapshot.Records) != 0 || len(snapshot.Transfers) != 0 {
t.Fatalf("default diagnostics should be empty: %+v", snapshot)
}
if snapshot.Summary != (DiagnosticsSummary{}) {
t.Fatalf("default summary mismatch: %+v", snapshot.Summary)
}
}
func TestGetClientDiagnosticsSnapshotAggregatesActiveState(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
streamAcceptCh := make(chan StreamAcceptInfo, 1)
bulkAcceptCh := make(chan BulkAcceptInfo, 1)
server.SetStreamHandler(func(info StreamAcceptInfo) error {
streamAcceptCh <- info
return nil
})
server.SetBulkHandler(func(info BulkAcceptInfo) error {
bulkAcceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
stream, err := client.OpenStream(context.Background(), StreamOpenOptions{
ID: "diag-client-stream",
Channel: StreamDataChannel,
})
if err != nil {
t.Fatalf("client OpenStream failed: %v", err)
}
waitAcceptedStream(t, streamAcceptCh, 2*time.Second)
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
ID: "diag-client-bulk",
Range: BulkRange{
Length: 64,
},
ChunkSize: 16 * 1024,
})
if err != nil {
t.Fatalf("client OpenBulk failed: %v", err)
}
waitAcceptedBulk(t, bulkAcceptCh, 2*time.Second)
transferRuntime := client.getTransferRuntime()
transferRuntime.ensureTransferDescriptor(fileTransferDirectionSend, clientFileScope(), clientFileScope(), 0, itransfer.Descriptor{
ID: "diag-client-transfer-done",
Channel: itransfer.DataChannel,
Size: 32,
Checksum: "sum-client",
})
transferRuntime.activate(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done")
transferRuntime.complete(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done")
snapshot, err := GetClientDiagnosticsSnapshot(client)
if err != nil {
t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err)
}
if got, want := snapshot.Summary.LogicalCount, 1; got != want {
t.Fatalf("LogicalCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.CurrentTransportCount, 1; got != want {
t.Fatalf("CurrentTransportCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.StreamCount, 1; got != want {
t.Fatalf("StreamCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.ActiveStreamCount, 1; got != want {
t.Fatalf("ActiveStreamCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.BulkCount, 1; got != want {
t.Fatalf("BulkCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.ActiveBulkCount, 1; got != want {
t.Fatalf("ActiveBulkCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.TransferCount, 1; got != want {
t.Fatalf("TransferCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.DoneTransferCount, 1; got != want {
t.Fatalf("DoneTransferCount = %d, want %d", got, want)
}
if got := snapshot.Summary.StaleStreamCount + snapshot.Summary.ResetStreamCount + snapshot.Summary.StaleBulkCount + snapshot.Summary.ResetBulkCount + snapshot.Summary.FailedTransferCount; got != 0 {
t.Fatalf("unexpected unhealthy counters in active snapshot: %+v", snapshot.Summary)
}
_ = stream.Close()
_ = bulk.Close()
}
func TestGetDiagnosticsSnapshotAggregatesActiveRecordState(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
recordAcceptCh := make(chan RecordAcceptInfo, 1)
recordReleaseCh := make(chan struct{})
recordHandlerDone := make(chan error, 1)
server.SetRecordStreamHandler(func(info RecordAcceptInfo) error {
recordAcceptCh <- info
msg, err := info.RecordStream.ReadRecord(context.Background())
if err != nil {
recordHandlerDone <- err
return err
}
if string(msg.Payload) != "diag-record" {
err = errors.New("unexpected record payload")
recordHandlerDone <- err
return err
}
if err := info.RecordStream.AckRecord(msg.Seq); err != nil {
recordHandlerDone <- err
return err
}
<-recordReleaseCh
err = info.RecordStream.Close()
recordHandlerDone <- err
return err
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
record, err := client.OpenRecordStream(context.Background(), RecordOpenOptions{
Stream: StreamOpenOptions{ID: "diag-client-record"},
})
if err != nil {
t.Fatalf("client OpenRecordStream failed: %v", err)
}
select {
case <-recordAcceptCh:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting accepted record stream")
}
if _, err := record.WriteRecord(context.Background(), []byte("diag-record")); err != nil {
t.Fatalf("WriteRecord failed: %v", err)
}
if _, err := record.Barrier(context.Background()); err != nil {
t.Fatalf("Barrier failed: %v", err)
}
clientSnapshot, err := GetClientDiagnosticsSnapshot(client)
if err != nil {
t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err)
}
if got, want := len(clientSnapshot.Records), 1; got != want {
t.Fatalf("client record snapshot count = %d, want %d", got, want)
}
if got, want := clientSnapshot.Summary.RecordCount, 1; got != want {
t.Fatalf("client RecordCount = %d, want %d", got, want)
}
if got, want := clientSnapshot.Summary.ActiveRecordCount, 1; got != want {
t.Fatalf("client ActiveRecordCount = %d, want %d", got, want)
}
clientRecord := clientSnapshot.Records[0]
if got := clientRecord.BatchFramesSent; got < 1 {
t.Fatalf("client BatchFramesSent = %d, want >= 1", got)
}
if got := clientRecord.AckFramesReceived; got < 1 {
t.Fatalf("client AckFramesReceived = %d, want >= 1", got)
}
if got := clientRecord.BarrierCount; got < 1 {
t.Fatalf("client BarrierCount = %d, want >= 1", got)
}
if got := clientSnapshot.Summary.RecordTelemetry.FrameSendCount; got < 1 {
t.Fatalf("client RecordTelemetry.FrameSendCount = %d, want >= 1", got)
}
serverSnapshot, err := GetServerDiagnosticsSnapshot(server)
if err != nil {
t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err)
}
if got, want := len(serverSnapshot.Records), 1; got != want {
t.Fatalf("server record snapshot count = %d, want %d", got, want)
}
if got, want := serverSnapshot.Summary.RecordCount, 1; got != want {
t.Fatalf("server RecordCount = %d, want %d", got, want)
}
if got, want := serverSnapshot.Summary.ActiveRecordCount, 1; got != want {
t.Fatalf("server ActiveRecordCount = %d, want %d", got, want)
}
serverRecord := serverSnapshot.Records[0]
if got := serverRecord.BatchFramesReceived; got < 1 {
t.Fatalf("server BatchFramesReceived = %d, want >= 1", got)
}
if got := serverRecord.AckFramesSent; got < 1 {
t.Fatalf("server AckFramesSent = %d, want >= 1", got)
}
if got := serverSnapshot.Summary.RecordTelemetry.FrameReceiveCount; got < 1 {
t.Fatalf("server RecordTelemetry.FrameReceiveCount = %d, want >= 1", got)
}
close(recordReleaseCh)
select {
case err := <-recordHandlerDone:
if err != nil {
t.Fatalf("record handler failed: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting record handler completion")
}
_ = record.Close()
}
func TestGetServerDiagnosticsSnapshotAggregatesStaleAndResetState(t *testing.T) {
server := NewServer().(*ServerCommon)
left, right := net.Pipe()
defer right.Close()
logical := server.bootstrapAcceptedLogical("diag-server-peer", nil, left)
if logical == nil {
t.Fatal("bootstrapAcceptedLogical should return logical")
}
logical.markIdentityBound()
logical.compatClientConn().markClientConnStreamTransport()
transport := logical.CurrentTransportConn()
if transport == nil {
t.Fatal("CurrentTransportConn should return active transport")
}
scope := serverFileScope(logical)
streamStale := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{
StreamID: "diag-stream-stale",
DataID: 1,
Channel: StreamDataChannel,
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig())
if err := server.getStreamRuntime().register(scope, streamStale); err != nil {
t.Fatalf("register stale stream failed: %v", err)
}
streamReset := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{
StreamID: "diag-stream-reset",
DataID: 2,
Channel: StreamDataChannel,
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig())
if err := server.getStreamRuntime().register(scope, streamReset); err != nil {
t.Fatalf("register reset stream failed: %v", err)
}
streamReset.mu.Lock()
streamReset.resetErr = errTransportDetached
streamReset.mu.Unlock()
bulkStale := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{
BulkID: "diag-bulk-stale",
DataID: 3,
Range: BulkRange{
Length: 16,
},
ChunkSize: 32 * 1024,
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
if err := server.getBulkRuntime().register(scope, bulkStale); err != nil {
t.Fatalf("register stale bulk failed: %v", err)
}
bulkReset := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{
BulkID: "diag-bulk-reset",
DataID: 4,
Dedicated: true,
Range: BulkRange{
Length: 16,
},
ChunkSize: 32 * 1024,
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
if err := server.getBulkRuntime().register(scope, bulkReset); err != nil {
t.Fatalf("register reset bulk failed: %v", err)
}
bulkReset.mu.Lock()
bulkReset.resetErr = errTransportDetached
bulkReset.mu.Unlock()
transferRuntime := server.getTransferRuntime()
transferRuntime.ensureTransferDescriptor(fileTransferDirectionReceive, scope, scope, transport.TransportGeneration(), itransfer.Descriptor{
ID: "diag-transfer-failed",
Channel: itransfer.DataChannel,
Size: 64,
Checksum: "sum-server",
})
transferRuntime.activate(fileTransferDirectionReceive, scope, "diag-transfer-failed")
transferRuntime.fail(fileTransferDirectionReceive, scope, "diag-transfer-failed", errors.New("boom"))
logical.markTransportDetached("heartbeat timeout", nil)
logical.detachServerOwnedTransport()
snapshot, err := GetServerDiagnosticsSnapshot(server)
if err != nil {
t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err)
}
if got, want := len(snapshot.Logicals), 1; got != want {
t.Fatalf("logical snapshot count = %d, want %d", got, want)
}
if got, want := len(snapshot.CurrentTransports), 0; got != want {
t.Fatalf("current transport snapshot count = %d, want %d", got, want)
}
if got, want := snapshot.Runtime.DetachedClientCount, 1; got != want {
t.Fatalf("DetachedClientCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.LogicalCount, 1; got != want {
t.Fatalf("LogicalCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.CurrentTransportCount, 0; got != want {
t.Fatalf("CurrentTransportCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.StreamCount, 2; got != want {
t.Fatalf("StreamCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.StaleStreamCount, 1; got != want {
t.Fatalf("StaleStreamCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.ResetStreamCount, 1; got != want {
t.Fatalf("ResetStreamCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.StreamResetCauses.Total, 1; got != want {
t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want)
}
if got, want := snapshot.Summary.StreamResetCauses.TransportDetached, 1; got != want {
t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want)
}
if got, want := snapshot.Summary.BulkCount, 2; got != want {
t.Fatalf("BulkCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.DedicatedBulkCount, 1; got != want {
t.Fatalf("DedicatedBulkCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.StaleBulkCount, 1; got != want {
t.Fatalf("StaleBulkCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.ResetBulkCount, 1; got != want {
t.Fatalf("ResetBulkCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.BulkResetCauses.Total, 1; got != want {
t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want)
}
if got, want := snapshot.Summary.BulkResetCauses.TransportDetached, 1; got != want {
t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want)
}
if got, want := snapshot.Summary.TransferCount, 1; got != want {
t.Fatalf("TransferCount = %d, want %d", got, want)
}
if got, want := snapshot.Summary.FailedTransferCount, 1; got != want {
t.Fatalf("FailedTransferCount = %d, want %d", got, want)
}
}
func TestDiagnosticsSummaryClassifiesResetCauses(t *testing.T) {
summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{
Streams: []StreamSnapshot{
{ResetError: errTransportDetached.Error()},
{ResetError: errServiceShutdown.Error()},
{ResetError: errStreamBackpressureExceeded.Error()},
{ResetError: "stream boom"},
},
Bulks: []BulkSnapshot{
{ResetError: errTransportDetached.Error()},
{ResetError: errServiceShutdown.Error()},
{ResetError: errBulkBackpressureExceeded.Error()},
{ResetError: "bulk boom"},
},
})
if got, want := summary.ResetStreamCount, 4; got != want {
t.Fatalf("ResetStreamCount = %d, want %d", got, want)
}
if got, want := summary.StreamResetCauses.Total, 4; got != want {
t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want)
}
if got, want := summary.StreamResetCauses.TransportDetached, 1; got != want {
t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want)
}
if got, want := summary.StreamResetCauses.ServiceShutdown, 1; got != want {
t.Fatalf("StreamResetCauses.ServiceShutdown = %d, want %d", got, want)
}
if got, want := summary.StreamResetCauses.Backpressure, 1; got != want {
t.Fatalf("StreamResetCauses.Backpressure = %d, want %d", got, want)
}
if got, want := summary.StreamResetCauses.Other, 1; got != want {
t.Fatalf("StreamResetCauses.Other = %d, want %d", got, want)
}
if got, want := summary.ResetBulkCount, 4; got != want {
t.Fatalf("ResetBulkCount = %d, want %d", got, want)
}
if got, want := summary.BulkResetCauses.Total, 4; got != want {
t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want)
}
if got, want := summary.BulkResetCauses.TransportDetached, 1; got != want {
t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want)
}
if got, want := summary.BulkResetCauses.ServiceShutdown, 1; got != want {
t.Fatalf("BulkResetCauses.ServiceShutdown = %d, want %d", got, want)
}
if got, want := summary.BulkResetCauses.Backpressure, 1; got != want {
t.Fatalf("BulkResetCauses.Backpressure = %d, want %d", got, want)
}
if got, want := summary.BulkResetCauses.Other, 1; got != want {
t.Fatalf("BulkResetCauses.Other = %d, want %d", got, want)
}
}
func TestDiagnosticsSummaryAggregatesRecordTelemetry(t *testing.T) {
summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{
Records: []RecordSnapshot{
{
ID: "record-active",
BindingCurrent: true,
TransportAttached: true,
TransportCurrent: true,
OutstandingRecords: 3,
OutstandingBytes: 4096,
PendingApplyRecords: 2,
PendingAckRecords: 1,
PeakPendingApplyRecords: 5,
BatchFramesSent: 10,
AckFramesSent: 4,
ErrorFramesSent: 1,
BatchFramesReceived: 8,
AckFramesReceived: 3,
ErrorFramesReceived: 0,
PiggybackAckSent: 6,
PiggybackAckReceived: 2,
BarrierCount: 4,
BarrierFlushWaitDuration: 10 * time.Millisecond,
BarrierApplyWaitDuration: 30 * time.Millisecond,
},
{
ID: "record-reset",
ResetError: errTransportDetached.Error(),
OutstandingRecords: 1,
OutstandingBytes: 512,
PendingApplyRecords: 3,
PendingAckRecords: 2,
PeakPendingApplyRecords: 7,
BatchFramesSent: 2,
AckFramesSent: 1,
ErrorFramesSent: 1,
BatchFramesReceived: 1,
AckFramesReceived: 1,
ErrorFramesReceived: 1,
PiggybackAckSent: 1,
PiggybackAckReceived: 1,
BarrierCount: 1,
BarrierFlushWaitDuration: 5 * time.Millisecond,
BarrierApplyWaitDuration: 15 * time.Millisecond,
},
},
})
if got, want := summary.RecordCount, 2; got != want {
t.Fatalf("RecordCount = %d, want %d", got, want)
}
if got, want := summary.ActiveRecordCount, 1; got != want {
t.Fatalf("ActiveRecordCount = %d, want %d", got, want)
}
if got, want := summary.ResetRecordCount, 1; got != want {
t.Fatalf("ResetRecordCount = %d, want %d", got, want)
}
if got, want := summary.RecordResetCauses.Total, 1; got != want {
t.Fatalf("RecordResetCauses.Total = %d, want %d", got, want)
}
if got, want := summary.RecordResetCauses.TransportDetached, 1; got != want {
t.Fatalf("RecordResetCauses.TransportDetached = %d, want %d", got, want)
}
telemetry := summary.RecordTelemetry
if got, want := telemetry.BatchFramesSent, int64(12); got != want {
t.Fatalf("BatchFramesSent = %d, want %d", got, want)
}
if got, want := telemetry.AckFramesSent, int64(5); got != want {
t.Fatalf("AckFramesSent = %d, want %d", got, want)
}
if got, want := telemetry.ErrorFramesSent, int64(2); got != want {
t.Fatalf("ErrorFramesSent = %d, want %d", got, want)
}
if got, want := telemetry.BatchFramesReceived, int64(9); got != want {
t.Fatalf("BatchFramesReceived = %d, want %d", got, want)
}
if got, want := telemetry.AckFramesReceived, int64(4); got != want {
t.Fatalf("AckFramesReceived = %d, want %d", got, want)
}
if got, want := telemetry.ErrorFramesReceived, int64(1); got != want {
t.Fatalf("ErrorFramesReceived = %d, want %d", got, want)
}
if got, want := telemetry.FrameSendCount, int64(19); got != want {
t.Fatalf("FrameSendCount = %d, want %d", got, want)
}
if got, want := telemetry.FrameReceiveCount, int64(14); got != want {
t.Fatalf("FrameReceiveCount = %d, want %d", got, want)
}
if got, want := telemetry.PiggybackAckSent, int64(7); got != want {
t.Fatalf("PiggybackAckSent = %d, want %d", got, want)
}
if got, want := telemetry.PiggybackAckReceived, int64(3); got != want {
t.Fatalf("PiggybackAckReceived = %d, want %d", got, want)
}
if got, want := telemetry.BarrierCount, int64(5); got != want {
t.Fatalf("BarrierCount = %d, want %d", got, want)
}
if got, want := telemetry.BarrierFlushWaitDuration, 15*time.Millisecond; got != want {
t.Fatalf("BarrierFlushWaitDuration = %v, want %v", got, want)
}
if got, want := telemetry.BarrierApplyWaitDuration, 45*time.Millisecond; got != want {
t.Fatalf("BarrierApplyWaitDuration = %v, want %v", got, want)
}
if got, want := telemetry.OutstandingRecords, 4; got != want {
t.Fatalf("OutstandingRecords = %d, want %d", got, want)
}
if got, want := telemetry.OutstandingBytes, 4608; got != want {
t.Fatalf("OutstandingBytes = %d, want %d", got, want)
}
if got, want := telemetry.PendingApplyRecords, 5; got != want {
t.Fatalf("PendingApplyRecords = %d, want %d", got, want)
}
if got, want := telemetry.PendingAckRecords, 3; got != want {
t.Fatalf("PendingAckRecords = %d, want %d", got, want)
}
if got, want := telemetry.PeakPendingApplyRecords, 7; got != want {
t.Fatalf("PeakPendingApplyRecords = %d, want %d", got, want)
}
}
func TestDiagnosticsSummaryAggregatesTransferTelemetry(t *testing.T) {
summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{
Transfers: []TransferSnapshot{
{
ID: "send-done",
State: TransferStateDone,
SentBytes: 2048,
SourceReadDuration: 200 * time.Millisecond,
StreamWriteDuration: 400 * time.Millisecond,
CommitWaitDuration: 100 * time.Millisecond,
},
{
ID: "recv-failed",
State: TransferStateFailed,
ReceivedBytes: 1024,
SinkWriteDuration: 250 * time.Millisecond,
SyncDuration: 50 * time.Millisecond,
VerifyDuration: 25 * time.Millisecond,
CommitDuration: 75 * time.Millisecond,
},
},
})
if got, want := summary.TransferCount, 2; got != want {
t.Fatalf("TransferCount = %d, want %d", got, want)
}
if got, want := summary.DoneTransferCount, 1; got != want {
t.Fatalf("DoneTransferCount = %d, want %d", got, want)
}
if got, want := summary.FailedTransferCount, 1; got != want {
t.Fatalf("FailedTransferCount = %d, want %d", got, want)
}
telemetry := summary.TransferTelemetry
if got, want := telemetry.SourceReadBytes, int64(2048); got != want {
t.Fatalf("SourceReadBytes = %d, want %d", got, want)
}
if got, want := telemetry.StreamWriteBytes, int64(2048); got != want {
t.Fatalf("StreamWriteBytes = %d, want %d", got, want)
}
if got, want := telemetry.SinkWriteBytes, int64(1024); got != want {
t.Fatalf("SinkWriteBytes = %d, want %d", got, want)
}
if got, want := telemetry.SourceReadDuration, 200*time.Millisecond; got != want {
t.Fatalf("SourceReadDuration = %v, want %v", got, want)
}
if got, want := telemetry.StreamWriteDuration, 400*time.Millisecond; got != want {
t.Fatalf("StreamWriteDuration = %v, want %v", got, want)
}
if got, want := telemetry.SinkWriteDuration, 250*time.Millisecond; got != want {
t.Fatalf("SinkWriteDuration = %v, want %v", got, want)
}
if got, want := telemetry.SyncDuration, 50*time.Millisecond; got != want {
t.Fatalf("SyncDuration = %v, want %v", got, want)
}
if got, want := telemetry.VerifyDuration, 25*time.Millisecond; got != want {
t.Fatalf("VerifyDuration = %v, want %v", got, want)
}
if got, want := telemetry.CommitDuration, 75*time.Millisecond; got != want {
t.Fatalf("CommitDuration = %v, want %v", got, want)
}
if got, want := telemetry.CommitWaitDuration, 100*time.Millisecond; got != want {
t.Fatalf("CommitWaitDuration = %v, want %v", got, want)
}
if got, want := telemetry.WorkDuration, time.Second; got != want {
t.Fatalf("WorkDuration = %v, want %v", got, want)
}
if got, want := telemetry.ObservedDuration, 1100*time.Millisecond; got != want {
t.Fatalf("ObservedDuration = %v, want %v", got, want)
}
if got, want := telemetry.SourceReadThroughputBPS, 10240.0; math.Abs(got-want) > 0.001 {
t.Fatalf("SourceReadThroughputBPS = %f, want %f", got, want)
}
if got, want := telemetry.StreamWriteThroughputBPS, 5120.0; math.Abs(got-want) > 0.001 {
t.Fatalf("StreamWriteThroughputBPS = %f, want %f", got, want)
}
if got, want := telemetry.SinkWriteThroughputBPS, 4096.0; math.Abs(got-want) > 0.001 {
t.Fatalf("SinkWriteThroughputBPS = %f, want %f", got, want)
}
if got, want := telemetry.CommitWaitRatio, 1.0/11.0; math.Abs(got-want) > 0.000001 {
t.Fatalf("CommitWaitRatio = %f, want %f", got, want)
}
}
func TestGetDiagnosticsSnapshotRejectsNil(t *testing.T) {
if _, err := GetClientDiagnosticsSnapshot(nil); !errors.Is(err, errClientDiagnosticsSnapshotNil) {
t.Fatalf("GetClientDiagnosticsSnapshot nil error = %v, want %v", err, errClientDiagnosticsSnapshotNil)
}
if _, err := GetServerDiagnosticsSnapshot(nil); !errors.Is(err, errServerDiagnosticsSnapshotNil) {
t.Fatalf("GetServerDiagnosticsSnapshot nil error = %v, want %v", err, errServerDiagnosticsSnapshotNil)
}
}