package notify import ( "context" "errors" "math" "net" "testing" "time" itransfer "b612.me/notify/internal/transfer" ) func TestGetClientDiagnosticsSnapshotDefaults(t *testing.T) { client := NewClient() snapshot, err := GetClientDiagnosticsSnapshot(client) if err != nil { t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err) } if got, want := snapshot.Runtime.OwnerState, "idle"; got != want { t.Fatalf("Runtime.OwnerState = %q, want %q", got, want) } if len(snapshot.Streams) != 0 || len(snapshot.Bulks) != 0 || len(snapshot.Records) != 0 || len(snapshot.Transfers) != 0 { t.Fatalf("default diagnostics should be empty: %+v", snapshot) } if snapshot.Summary != (DiagnosticsSummary{}) { t.Fatalf("default summary mismatch: %+v", snapshot.Summary) } } func TestGetClientDiagnosticsSnapshotAggregatesActiveState(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } streamAcceptCh := make(chan StreamAcceptInfo, 1) bulkAcceptCh := make(chan BulkAcceptInfo, 1) server.SetStreamHandler(func(info StreamAcceptInfo) error { streamAcceptCh <- info return nil }) server.SetBulkHandler(func(info BulkAcceptInfo) error { bulkAcceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() stream, err := client.OpenStream(context.Background(), StreamOpenOptions{ ID: "diag-client-stream", Channel: StreamDataChannel, }) if err != nil { t.Fatalf("client OpenStream failed: %v", err) } waitAcceptedStream(t, streamAcceptCh, 2*time.Second) bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ ID: "diag-client-bulk", Range: BulkRange{ Length: 64, }, ChunkSize: 16 * 1024, }) if err != nil { t.Fatalf("client OpenBulk failed: %v", err) } waitAcceptedBulk(t, bulkAcceptCh, 2*time.Second) transferRuntime := client.getTransferRuntime() transferRuntime.ensureTransferDescriptor(fileTransferDirectionSend, clientFileScope(), clientFileScope(), 0, itransfer.Descriptor{ ID: "diag-client-transfer-done", Channel: itransfer.DataChannel, Size: 32, Checksum: "sum-client", }) transferRuntime.activate(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done") transferRuntime.complete(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done") snapshot, err := GetClientDiagnosticsSnapshot(client) if err != nil { t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err) } if got, want := snapshot.Summary.LogicalCount, 1; got != want { t.Fatalf("LogicalCount = %d, want %d", got, want) } if got, want := snapshot.Summary.CurrentTransportCount, 1; got != want { t.Fatalf("CurrentTransportCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamCount, 1; got != want { t.Fatalf("StreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ActiveStreamCount, 1; got != want { t.Fatalf("ActiveStreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkCount, 1; got != want { t.Fatalf("BulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ActiveBulkCount, 1; got != want { t.Fatalf("ActiveBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.TransferCount, 1; got != want { t.Fatalf("TransferCount = %d, want %d", got, want) } if got, want := snapshot.Summary.DoneTransferCount, 1; got != want { t.Fatalf("DoneTransferCount = %d, want %d", got, want) } if got := snapshot.Summary.StaleStreamCount + snapshot.Summary.ResetStreamCount + snapshot.Summary.StaleBulkCount + snapshot.Summary.ResetBulkCount + snapshot.Summary.FailedTransferCount; got != 0 { t.Fatalf("unexpected unhealthy counters in active snapshot: %+v", snapshot.Summary) } _ = stream.Close() _ = bulk.Close() } func TestGetDiagnosticsSnapshotAggregatesActiveRecordState(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } recordAcceptCh := make(chan RecordAcceptInfo, 1) recordReleaseCh := make(chan struct{}) recordHandlerDone := make(chan error, 1) server.SetRecordStreamHandler(func(info RecordAcceptInfo) error { recordAcceptCh <- info msg, err := info.RecordStream.ReadRecord(context.Background()) if err != nil { recordHandlerDone <- err return err } if string(msg.Payload) != "diag-record" { err = errors.New("unexpected record payload") recordHandlerDone <- err return err } if err := info.RecordStream.AckRecord(msg.Seq); err != nil { recordHandlerDone <- err return err } <-recordReleaseCh err = info.RecordStream.Close() recordHandlerDone <- err return err }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() record, err := client.OpenRecordStream(context.Background(), RecordOpenOptions{ Stream: StreamOpenOptions{ID: "diag-client-record"}, }) if err != nil { t.Fatalf("client OpenRecordStream failed: %v", err) } select { case <-recordAcceptCh: case <-time.After(2 * time.Second): t.Fatal("timed out waiting accepted record stream") } if _, err := record.WriteRecord(context.Background(), []byte("diag-record")); err != nil { t.Fatalf("WriteRecord failed: %v", err) } if _, err := record.Barrier(context.Background()); err != nil { t.Fatalf("Barrier failed: %v", err) } clientSnapshot, err := GetClientDiagnosticsSnapshot(client) if err != nil { t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err) } if got, want := len(clientSnapshot.Records), 1; got != want { t.Fatalf("client record snapshot count = %d, want %d", got, want) } if got, want := clientSnapshot.Summary.RecordCount, 1; got != want { t.Fatalf("client RecordCount = %d, want %d", got, want) } if got, want := clientSnapshot.Summary.ActiveRecordCount, 1; got != want { t.Fatalf("client ActiveRecordCount = %d, want %d", got, want) } clientRecord := clientSnapshot.Records[0] if got := clientRecord.BatchFramesSent; got < 1 { t.Fatalf("client BatchFramesSent = %d, want >= 1", got) } if got := clientRecord.AckFramesReceived; got < 1 { t.Fatalf("client AckFramesReceived = %d, want >= 1", got) } if got := clientRecord.BarrierCount; got < 1 { t.Fatalf("client BarrierCount = %d, want >= 1", got) } if got := clientSnapshot.Summary.RecordTelemetry.FrameSendCount; got < 1 { t.Fatalf("client RecordTelemetry.FrameSendCount = %d, want >= 1", got) } serverSnapshot, err := GetServerDiagnosticsSnapshot(server) if err != nil { t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err) } if got, want := len(serverSnapshot.Records), 1; got != want { t.Fatalf("server record snapshot count = %d, want %d", got, want) } if got, want := serverSnapshot.Summary.RecordCount, 1; got != want { t.Fatalf("server RecordCount = %d, want %d", got, want) } if got, want := serverSnapshot.Summary.ActiveRecordCount, 1; got != want { t.Fatalf("server ActiveRecordCount = %d, want %d", got, want) } serverRecord := serverSnapshot.Records[0] if got := serverRecord.BatchFramesReceived; got < 1 { t.Fatalf("server BatchFramesReceived = %d, want >= 1", got) } if got := serverRecord.AckFramesSent; got < 1 { t.Fatalf("server AckFramesSent = %d, want >= 1", got) } if got := serverSnapshot.Summary.RecordTelemetry.FrameReceiveCount; got < 1 { t.Fatalf("server RecordTelemetry.FrameReceiveCount = %d, want >= 1", got) } close(recordReleaseCh) select { case err := <-recordHandlerDone: if err != nil { t.Fatalf("record handler failed: %v", err) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting record handler completion") } _ = record.Close() } func TestGetServerDiagnosticsSnapshotAggregatesStaleAndResetState(t *testing.T) { server := NewServer().(*ServerCommon) left, right := net.Pipe() defer right.Close() logical := server.bootstrapAcceptedLogical("diag-server-peer", nil, left) if logical == nil { t.Fatal("bootstrapAcceptedLogical should return logical") } logical.markIdentityBound() logical.compatClientConn().markClientConnStreamTransport() transport := logical.CurrentTransportConn() if transport == nil { t.Fatal("CurrentTransportConn should return active transport") } scope := serverFileScope(logical) streamStale := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{ StreamID: "diag-stream-stale", DataID: 1, Channel: StreamDataChannel, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig()) if err := server.getStreamRuntime().register(scope, streamStale); err != nil { t.Fatalf("register stale stream failed: %v", err) } streamReset := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{ StreamID: "diag-stream-reset", DataID: 2, Channel: StreamDataChannel, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig()) if err := server.getStreamRuntime().register(scope, streamReset); err != nil { t.Fatalf("register reset stream failed: %v", err) } streamReset.mu.Lock() streamReset.resetErr = errTransportDetached streamReset.mu.Unlock() bulkStale := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{ BulkID: "diag-bulk-stale", DataID: 3, Range: BulkRange{ Length: 16, }, ChunkSize: 32 * 1024, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil) if err := server.getBulkRuntime().register(scope, bulkStale); err != nil { t.Fatalf("register stale bulk failed: %v", err) } bulkReset := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{ BulkID: "diag-bulk-reset", DataID: 4, Dedicated: true, Range: BulkRange{ Length: 16, }, ChunkSize: 32 * 1024, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil) if err := server.getBulkRuntime().register(scope, bulkReset); err != nil { t.Fatalf("register reset bulk failed: %v", err) } bulkReset.mu.Lock() bulkReset.resetErr = errTransportDetached bulkReset.mu.Unlock() transferRuntime := server.getTransferRuntime() transferRuntime.ensureTransferDescriptor(fileTransferDirectionReceive, scope, scope, transport.TransportGeneration(), itransfer.Descriptor{ ID: "diag-transfer-failed", Channel: itransfer.DataChannel, Size: 64, Checksum: "sum-server", }) transferRuntime.activate(fileTransferDirectionReceive, scope, "diag-transfer-failed") transferRuntime.fail(fileTransferDirectionReceive, scope, "diag-transfer-failed", errors.New("boom")) logical.markTransportDetached("heartbeat timeout", nil) logical.detachServerOwnedTransport() snapshot, err := GetServerDiagnosticsSnapshot(server) if err != nil { t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err) } if got, want := len(snapshot.Logicals), 1; got != want { t.Fatalf("logical snapshot count = %d, want %d", got, want) } if got, want := len(snapshot.CurrentTransports), 0; got != want { t.Fatalf("current transport snapshot count = %d, want %d", got, want) } if got, want := snapshot.Runtime.DetachedClientCount, 1; got != want { t.Fatalf("DetachedClientCount = %d, want %d", got, want) } if got, want := snapshot.Summary.LogicalCount, 1; got != want { t.Fatalf("LogicalCount = %d, want %d", got, want) } if got, want := snapshot.Summary.CurrentTransportCount, 0; got != want { t.Fatalf("CurrentTransportCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamCount, 2; got != want { t.Fatalf("StreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StaleStreamCount, 1; got != want { t.Fatalf("StaleStreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ResetStreamCount, 1; got != want { t.Fatalf("ResetStreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamResetCauses.Total, 1; got != want { t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamResetCauses.TransportDetached, 1; got != want { t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkCount, 2; got != want { t.Fatalf("BulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.DedicatedBulkCount, 1; got != want { t.Fatalf("DedicatedBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StaleBulkCount, 1; got != want { t.Fatalf("StaleBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ResetBulkCount, 1; got != want { t.Fatalf("ResetBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkResetCauses.Total, 1; got != want { t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkResetCauses.TransportDetached, 1; got != want { t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := snapshot.Summary.TransferCount, 1; got != want { t.Fatalf("TransferCount = %d, want %d", got, want) } if got, want := snapshot.Summary.FailedTransferCount, 1; got != want { t.Fatalf("FailedTransferCount = %d, want %d", got, want) } } func TestDiagnosticsSummaryClassifiesResetCauses(t *testing.T) { summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{ Streams: []StreamSnapshot{ {ResetError: errTransportDetached.Error()}, {ResetError: errServiceShutdown.Error()}, {ResetError: errStreamBackpressureExceeded.Error()}, {ResetError: "stream boom"}, }, Bulks: []BulkSnapshot{ {ResetError: errTransportDetached.Error()}, {ResetError: errServiceShutdown.Error()}, {ResetError: errBulkBackpressureExceeded.Error()}, {ResetError: "bulk boom"}, }, }) if got, want := summary.ResetStreamCount, 4; got != want { t.Fatalf("ResetStreamCount = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.Total, 4; got != want { t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.TransportDetached, 1; got != want { t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.ServiceShutdown, 1; got != want { t.Fatalf("StreamResetCauses.ServiceShutdown = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.Backpressure, 1; got != want { t.Fatalf("StreamResetCauses.Backpressure = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.Other, 1; got != want { t.Fatalf("StreamResetCauses.Other = %d, want %d", got, want) } if got, want := summary.ResetBulkCount, 4; got != want { t.Fatalf("ResetBulkCount = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.Total, 4; got != want { t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.TransportDetached, 1; got != want { t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.ServiceShutdown, 1; got != want { t.Fatalf("BulkResetCauses.ServiceShutdown = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.Backpressure, 1; got != want { t.Fatalf("BulkResetCauses.Backpressure = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.Other, 1; got != want { t.Fatalf("BulkResetCauses.Other = %d, want %d", got, want) } } func TestDiagnosticsSummaryAggregatesRecordTelemetry(t *testing.T) { summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{ Records: []RecordSnapshot{ { ID: "record-active", BindingCurrent: true, TransportAttached: true, TransportCurrent: true, OutstandingRecords: 3, OutstandingBytes: 4096, PendingApplyRecords: 2, PendingAckRecords: 1, PeakPendingApplyRecords: 5, BatchFramesSent: 10, AckFramesSent: 4, ErrorFramesSent: 1, BatchFramesReceived: 8, AckFramesReceived: 3, ErrorFramesReceived: 0, PiggybackAckSent: 6, PiggybackAckReceived: 2, BarrierCount: 4, BarrierFlushWaitDuration: 10 * time.Millisecond, BarrierApplyWaitDuration: 30 * time.Millisecond, }, { ID: "record-reset", ResetError: errTransportDetached.Error(), OutstandingRecords: 1, OutstandingBytes: 512, PendingApplyRecords: 3, PendingAckRecords: 2, PeakPendingApplyRecords: 7, BatchFramesSent: 2, AckFramesSent: 1, ErrorFramesSent: 1, BatchFramesReceived: 1, AckFramesReceived: 1, ErrorFramesReceived: 1, PiggybackAckSent: 1, PiggybackAckReceived: 1, BarrierCount: 1, BarrierFlushWaitDuration: 5 * time.Millisecond, BarrierApplyWaitDuration: 15 * time.Millisecond, }, }, }) if got, want := summary.RecordCount, 2; got != want { t.Fatalf("RecordCount = %d, want %d", got, want) } if got, want := summary.ActiveRecordCount, 1; got != want { t.Fatalf("ActiveRecordCount = %d, want %d", got, want) } if got, want := summary.ResetRecordCount, 1; got != want { t.Fatalf("ResetRecordCount = %d, want %d", got, want) } if got, want := summary.RecordResetCauses.Total, 1; got != want { t.Fatalf("RecordResetCauses.Total = %d, want %d", got, want) } if got, want := summary.RecordResetCauses.TransportDetached, 1; got != want { t.Fatalf("RecordResetCauses.TransportDetached = %d, want %d", got, want) } telemetry := summary.RecordTelemetry if got, want := telemetry.BatchFramesSent, int64(12); got != want { t.Fatalf("BatchFramesSent = %d, want %d", got, want) } if got, want := telemetry.AckFramesSent, int64(5); got != want { t.Fatalf("AckFramesSent = %d, want %d", got, want) } if got, want := telemetry.ErrorFramesSent, int64(2); got != want { t.Fatalf("ErrorFramesSent = %d, want %d", got, want) } if got, want := telemetry.BatchFramesReceived, int64(9); got != want { t.Fatalf("BatchFramesReceived = %d, want %d", got, want) } if got, want := telemetry.AckFramesReceived, int64(4); got != want { t.Fatalf("AckFramesReceived = %d, want %d", got, want) } if got, want := telemetry.ErrorFramesReceived, int64(1); got != want { t.Fatalf("ErrorFramesReceived = %d, want %d", got, want) } if got, want := telemetry.FrameSendCount, int64(19); got != want { t.Fatalf("FrameSendCount = %d, want %d", got, want) } if got, want := telemetry.FrameReceiveCount, int64(14); got != want { t.Fatalf("FrameReceiveCount = %d, want %d", got, want) } if got, want := telemetry.PiggybackAckSent, int64(7); got != want { t.Fatalf("PiggybackAckSent = %d, want %d", got, want) } if got, want := telemetry.PiggybackAckReceived, int64(3); got != want { t.Fatalf("PiggybackAckReceived = %d, want %d", got, want) } if got, want := telemetry.BarrierCount, int64(5); got != want { t.Fatalf("BarrierCount = %d, want %d", got, want) } if got, want := telemetry.BarrierFlushWaitDuration, 15*time.Millisecond; got != want { t.Fatalf("BarrierFlushWaitDuration = %v, want %v", got, want) } if got, want := telemetry.BarrierApplyWaitDuration, 45*time.Millisecond; got != want { t.Fatalf("BarrierApplyWaitDuration = %v, want %v", got, want) } if got, want := telemetry.OutstandingRecords, 4; got != want { t.Fatalf("OutstandingRecords = %d, want %d", got, want) } if got, want := telemetry.OutstandingBytes, 4608; got != want { t.Fatalf("OutstandingBytes = %d, want %d", got, want) } if got, want := telemetry.PendingApplyRecords, 5; got != want { t.Fatalf("PendingApplyRecords = %d, want %d", got, want) } if got, want := telemetry.PendingAckRecords, 3; got != want { t.Fatalf("PendingAckRecords = %d, want %d", got, want) } if got, want := telemetry.PeakPendingApplyRecords, 7; got != want { t.Fatalf("PeakPendingApplyRecords = %d, want %d", got, want) } } func TestDiagnosticsSummaryAggregatesTransferTelemetry(t *testing.T) { summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{ Transfers: []TransferSnapshot{ { ID: "send-done", State: TransferStateDone, SentBytes: 2048, SourceReadDuration: 200 * time.Millisecond, StreamWriteDuration: 400 * time.Millisecond, CommitWaitDuration: 100 * time.Millisecond, }, { ID: "recv-failed", State: TransferStateFailed, ReceivedBytes: 1024, SinkWriteDuration: 250 * time.Millisecond, SyncDuration: 50 * time.Millisecond, VerifyDuration: 25 * time.Millisecond, CommitDuration: 75 * time.Millisecond, }, }, }) if got, want := summary.TransferCount, 2; got != want { t.Fatalf("TransferCount = %d, want %d", got, want) } if got, want := summary.DoneTransferCount, 1; got != want { t.Fatalf("DoneTransferCount = %d, want %d", got, want) } if got, want := summary.FailedTransferCount, 1; got != want { t.Fatalf("FailedTransferCount = %d, want %d", got, want) } telemetry := summary.TransferTelemetry if got, want := telemetry.SourceReadBytes, int64(2048); got != want { t.Fatalf("SourceReadBytes = %d, want %d", got, want) } if got, want := telemetry.StreamWriteBytes, int64(2048); got != want { t.Fatalf("StreamWriteBytes = %d, want %d", got, want) } if got, want := telemetry.SinkWriteBytes, int64(1024); got != want { t.Fatalf("SinkWriteBytes = %d, want %d", got, want) } if got, want := telemetry.SourceReadDuration, 200*time.Millisecond; got != want { t.Fatalf("SourceReadDuration = %v, want %v", got, want) } if got, want := telemetry.StreamWriteDuration, 400*time.Millisecond; got != want { t.Fatalf("StreamWriteDuration = %v, want %v", got, want) } if got, want := telemetry.SinkWriteDuration, 250*time.Millisecond; got != want { t.Fatalf("SinkWriteDuration = %v, want %v", got, want) } if got, want := telemetry.SyncDuration, 50*time.Millisecond; got != want { t.Fatalf("SyncDuration = %v, want %v", got, want) } if got, want := telemetry.VerifyDuration, 25*time.Millisecond; got != want { t.Fatalf("VerifyDuration = %v, want %v", got, want) } if got, want := telemetry.CommitDuration, 75*time.Millisecond; got != want { t.Fatalf("CommitDuration = %v, want %v", got, want) } if got, want := telemetry.CommitWaitDuration, 100*time.Millisecond; got != want { t.Fatalf("CommitWaitDuration = %v, want %v", got, want) } if got, want := telemetry.WorkDuration, time.Second; got != want { t.Fatalf("WorkDuration = %v, want %v", got, want) } if got, want := telemetry.ObservedDuration, 1100*time.Millisecond; got != want { t.Fatalf("ObservedDuration = %v, want %v", got, want) } if got, want := telemetry.SourceReadThroughputBPS, 10240.0; math.Abs(got-want) > 0.001 { t.Fatalf("SourceReadThroughputBPS = %f, want %f", got, want) } if got, want := telemetry.StreamWriteThroughputBPS, 5120.0; math.Abs(got-want) > 0.001 { t.Fatalf("StreamWriteThroughputBPS = %f, want %f", got, want) } if got, want := telemetry.SinkWriteThroughputBPS, 4096.0; math.Abs(got-want) > 0.001 { t.Fatalf("SinkWriteThroughputBPS = %f, want %f", got, want) } if got, want := telemetry.CommitWaitRatio, 1.0/11.0; math.Abs(got-want) > 0.000001 { t.Fatalf("CommitWaitRatio = %f, want %f", got, want) } } func TestGetDiagnosticsSnapshotRejectsNil(t *testing.T) { if _, err := GetClientDiagnosticsSnapshot(nil); !errors.Is(err, errClientDiagnosticsSnapshotNil) { t.Fatalf("GetClientDiagnosticsSnapshot nil error = %v, want %v", err, errClientDiagnosticsSnapshotNil) } if _, err := GetServerDiagnosticsSnapshot(nil); !errors.Is(err, errServerDiagnosticsSnapshotNil) { t.Fatalf("GetServerDiagnosticsSnapshot nil error = %v, want %v", err, errServerDiagnosticsSnapshotNil) } }