package notify import ( "context" "errors" "math" "net" "testing" "time" itransfer "b612.me/notify/internal/transfer" ) func TestGetClientDiagnosticsSnapshotDefaults(t *testing.T) { client := NewClient() snapshot, err := GetClientDiagnosticsSnapshot(client) if err != nil { t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err) } if got, want := snapshot.Runtime.OwnerState, "idle"; got != want { t.Fatalf("Runtime.OwnerState = %q, want %q", got, want) } if len(snapshot.Streams) != 0 || len(snapshot.Bulks) != 0 || len(snapshot.Transfers) != 0 { t.Fatalf("default diagnostics should be empty: %+v", snapshot) } if snapshot.Summary != (DiagnosticsSummary{}) { t.Fatalf("default summary mismatch: %+v", snapshot.Summary) } } func TestGetClientDiagnosticsSnapshotAggregatesActiveState(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } streamAcceptCh := make(chan StreamAcceptInfo, 1) bulkAcceptCh := make(chan BulkAcceptInfo, 1) server.SetStreamHandler(func(info StreamAcceptInfo) error { streamAcceptCh <- info return nil }) server.SetBulkHandler(func(info BulkAcceptInfo) error { bulkAcceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() stream, err := client.OpenStream(context.Background(), StreamOpenOptions{ ID: "diag-client-stream", Channel: StreamDataChannel, }) if err != nil { t.Fatalf("client OpenStream failed: %v", err) } waitAcceptedStream(t, streamAcceptCh, 2*time.Second) bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ ID: "diag-client-bulk", Range: BulkRange{ Length: 64, }, ChunkSize: 16 * 1024, }) if err != nil { t.Fatalf("client OpenBulk failed: %v", err) } waitAcceptedBulk(t, bulkAcceptCh, 2*time.Second) transferRuntime := client.getTransferRuntime() transferRuntime.ensureTransferDescriptor(fileTransferDirectionSend, clientFileScope(), clientFileScope(), 0, itransfer.Descriptor{ ID: "diag-client-transfer-done", Channel: itransfer.DataChannel, Size: 32, Checksum: "sum-client", }) transferRuntime.activate(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done") transferRuntime.complete(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done") snapshot, err := GetClientDiagnosticsSnapshot(client) if err != nil { t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err) } if got, want := snapshot.Summary.LogicalCount, 1; got != want { t.Fatalf("LogicalCount = %d, want %d", got, want) } if got, want := snapshot.Summary.CurrentTransportCount, 1; got != want { t.Fatalf("CurrentTransportCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamCount, 1; got != want { t.Fatalf("StreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ActiveStreamCount, 1; got != want { t.Fatalf("ActiveStreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkCount, 1; got != want { t.Fatalf("BulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ActiveBulkCount, 1; got != want { t.Fatalf("ActiveBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.TransferCount, 1; got != want { t.Fatalf("TransferCount = %d, want %d", got, want) } if got, want := snapshot.Summary.DoneTransferCount, 1; got != want { t.Fatalf("DoneTransferCount = %d, want %d", got, want) } if got := snapshot.Summary.StaleStreamCount + snapshot.Summary.ResetStreamCount + snapshot.Summary.StaleBulkCount + snapshot.Summary.ResetBulkCount + snapshot.Summary.FailedTransferCount; got != 0 { t.Fatalf("unexpected unhealthy counters in active snapshot: %+v", snapshot.Summary) } _ = stream.Close() _ = bulk.Close() } func TestGetServerDiagnosticsSnapshotAggregatesStaleAndResetState(t *testing.T) { server := NewServer().(*ServerCommon) left, right := net.Pipe() defer right.Close() logical := server.bootstrapAcceptedLogical("diag-server-peer", nil, left) if logical == nil { t.Fatal("bootstrapAcceptedLogical should return logical") } logical.markIdentityBound() logical.compatClientConn().markClientConnStreamTransport() transport := logical.CurrentTransportConn() if transport == nil { t.Fatal("CurrentTransportConn should return active transport") } scope := serverFileScope(logical) streamStale := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{ StreamID: "diag-stream-stale", DataID: 1, Channel: StreamDataChannel, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig()) if err := server.getStreamRuntime().register(scope, streamStale); err != nil { t.Fatalf("register stale stream failed: %v", err) } streamReset := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{ StreamID: "diag-stream-reset", DataID: 2, Channel: StreamDataChannel, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig()) if err := server.getStreamRuntime().register(scope, streamReset); err != nil { t.Fatalf("register reset stream failed: %v", err) } streamReset.mu.Lock() streamReset.resetErr = errTransportDetached streamReset.mu.Unlock() bulkStale := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{ BulkID: "diag-bulk-stale", DataID: 3, Range: BulkRange{ Length: 16, }, ChunkSize: 32 * 1024, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil) if err := server.getBulkRuntime().register(scope, bulkStale); err != nil { t.Fatalf("register stale bulk failed: %v", err) } bulkReset := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{ BulkID: "diag-bulk-reset", DataID: 4, Dedicated: true, Range: BulkRange{ Length: 16, }, ChunkSize: 32 * 1024, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil) if err := server.getBulkRuntime().register(scope, bulkReset); err != nil { t.Fatalf("register reset bulk failed: %v", err) } bulkReset.mu.Lock() bulkReset.resetErr = errTransportDetached bulkReset.mu.Unlock() transferRuntime := server.getTransferRuntime() transferRuntime.ensureTransferDescriptor(fileTransferDirectionReceive, scope, scope, transport.TransportGeneration(), itransfer.Descriptor{ ID: "diag-transfer-failed", Channel: itransfer.DataChannel, Size: 64, Checksum: "sum-server", }) transferRuntime.activate(fileTransferDirectionReceive, scope, "diag-transfer-failed") transferRuntime.fail(fileTransferDirectionReceive, scope, "diag-transfer-failed", errors.New("boom")) logical.markTransportDetached("heartbeat timeout", nil) logical.detachServerOwnedTransport() snapshot, err := GetServerDiagnosticsSnapshot(server) if err != nil { t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err) } if got, want := len(snapshot.Logicals), 1; got != want { t.Fatalf("logical snapshot count = %d, want %d", got, want) } if got, want := len(snapshot.CurrentTransports), 0; got != want { t.Fatalf("current transport snapshot count = %d, want %d", got, want) } if got, want := snapshot.Runtime.DetachedClientCount, 1; got != want { t.Fatalf("DetachedClientCount = %d, want %d", got, want) } if got, want := snapshot.Summary.LogicalCount, 1; got != want { t.Fatalf("LogicalCount = %d, want %d", got, want) } if got, want := snapshot.Summary.CurrentTransportCount, 0; got != want { t.Fatalf("CurrentTransportCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamCount, 2; got != want { t.Fatalf("StreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StaleStreamCount, 1; got != want { t.Fatalf("StaleStreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ResetStreamCount, 1; got != want { t.Fatalf("ResetStreamCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamResetCauses.Total, 1; got != want { t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want) } if got, want := snapshot.Summary.StreamResetCauses.TransportDetached, 1; got != want { t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkCount, 2; got != want { t.Fatalf("BulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.DedicatedBulkCount, 1; got != want { t.Fatalf("DedicatedBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.StaleBulkCount, 1; got != want { t.Fatalf("StaleBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.ResetBulkCount, 1; got != want { t.Fatalf("ResetBulkCount = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkResetCauses.Total, 1; got != want { t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want) } if got, want := snapshot.Summary.BulkResetCauses.TransportDetached, 1; got != want { t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := snapshot.Summary.TransferCount, 1; got != want { t.Fatalf("TransferCount = %d, want %d", got, want) } if got, want := snapshot.Summary.FailedTransferCount, 1; got != want { t.Fatalf("FailedTransferCount = %d, want %d", got, want) } } func TestDiagnosticsSummaryClassifiesResetCauses(t *testing.T) { summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{ Streams: []StreamSnapshot{ {ResetError: errTransportDetached.Error()}, {ResetError: errServiceShutdown.Error()}, {ResetError: errStreamBackpressureExceeded.Error()}, {ResetError: "stream boom"}, }, Bulks: []BulkSnapshot{ {ResetError: errTransportDetached.Error()}, {ResetError: errServiceShutdown.Error()}, {ResetError: errBulkBackpressureExceeded.Error()}, {ResetError: "bulk boom"}, }, }) if got, want := summary.ResetStreamCount, 4; got != want { t.Fatalf("ResetStreamCount = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.Total, 4; got != want { t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.TransportDetached, 1; got != want { t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.ServiceShutdown, 1; got != want { t.Fatalf("StreamResetCauses.ServiceShutdown = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.Backpressure, 1; got != want { t.Fatalf("StreamResetCauses.Backpressure = %d, want %d", got, want) } if got, want := summary.StreamResetCauses.Other, 1; got != want { t.Fatalf("StreamResetCauses.Other = %d, want %d", got, want) } if got, want := summary.ResetBulkCount, 4; got != want { t.Fatalf("ResetBulkCount = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.Total, 4; got != want { t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.TransportDetached, 1; got != want { t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.ServiceShutdown, 1; got != want { t.Fatalf("BulkResetCauses.ServiceShutdown = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.Backpressure, 1; got != want { t.Fatalf("BulkResetCauses.Backpressure = %d, want %d", got, want) } if got, want := summary.BulkResetCauses.Other, 1; got != want { t.Fatalf("BulkResetCauses.Other = %d, want %d", got, want) } } func TestDiagnosticsSummaryAggregatesTransferTelemetry(t *testing.T) { summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{ Transfers: []TransferSnapshot{ { ID: "send-done", State: TransferStateDone, SentBytes: 2048, SourceReadDuration: 200 * time.Millisecond, StreamWriteDuration: 400 * time.Millisecond, CommitWaitDuration: 100 * time.Millisecond, }, { ID: "recv-failed", State: TransferStateFailed, ReceivedBytes: 1024, SinkWriteDuration: 250 * time.Millisecond, SyncDuration: 50 * time.Millisecond, VerifyDuration: 25 * time.Millisecond, CommitDuration: 75 * time.Millisecond, }, }, }) if got, want := summary.TransferCount, 2; got != want { t.Fatalf("TransferCount = %d, want %d", got, want) } if got, want := summary.DoneTransferCount, 1; got != want { t.Fatalf("DoneTransferCount = %d, want %d", got, want) } if got, want := summary.FailedTransferCount, 1; got != want { t.Fatalf("FailedTransferCount = %d, want %d", got, want) } telemetry := summary.TransferTelemetry if got, want := telemetry.SourceReadBytes, int64(2048); got != want { t.Fatalf("SourceReadBytes = %d, want %d", got, want) } if got, want := telemetry.StreamWriteBytes, int64(2048); got != want { t.Fatalf("StreamWriteBytes = %d, want %d", got, want) } if got, want := telemetry.SinkWriteBytes, int64(1024); got != want { t.Fatalf("SinkWriteBytes = %d, want %d", got, want) } if got, want := telemetry.SourceReadDuration, 200*time.Millisecond; got != want { t.Fatalf("SourceReadDuration = %v, want %v", got, want) } if got, want := telemetry.StreamWriteDuration, 400*time.Millisecond; got != want { t.Fatalf("StreamWriteDuration = %v, want %v", got, want) } if got, want := telemetry.SinkWriteDuration, 250*time.Millisecond; got != want { t.Fatalf("SinkWriteDuration = %v, want %v", got, want) } if got, want := telemetry.SyncDuration, 50*time.Millisecond; got != want { t.Fatalf("SyncDuration = %v, want %v", got, want) } if got, want := telemetry.VerifyDuration, 25*time.Millisecond; got != want { t.Fatalf("VerifyDuration = %v, want %v", got, want) } if got, want := telemetry.CommitDuration, 75*time.Millisecond; got != want { t.Fatalf("CommitDuration = %v, want %v", got, want) } if got, want := telemetry.CommitWaitDuration, 100*time.Millisecond; got != want { t.Fatalf("CommitWaitDuration = %v, want %v", got, want) } if got, want := telemetry.WorkDuration, time.Second; got != want { t.Fatalf("WorkDuration = %v, want %v", got, want) } if got, want := telemetry.ObservedDuration, 1100*time.Millisecond; got != want { t.Fatalf("ObservedDuration = %v, want %v", got, want) } if got, want := telemetry.SourceReadThroughputBPS, 10240.0; math.Abs(got-want) > 0.001 { t.Fatalf("SourceReadThroughputBPS = %f, want %f", got, want) } if got, want := telemetry.StreamWriteThroughputBPS, 5120.0; math.Abs(got-want) > 0.001 { t.Fatalf("StreamWriteThroughputBPS = %f, want %f", got, want) } if got, want := telemetry.SinkWriteThroughputBPS, 4096.0; math.Abs(got-want) > 0.001 { t.Fatalf("SinkWriteThroughputBPS = %f, want %f", got, want) } if got, want := telemetry.CommitWaitRatio, 1.0/11.0; math.Abs(got-want) > 0.000001 { t.Fatalf("CommitWaitRatio = %f, want %f", got, want) } } func TestGetDiagnosticsSnapshotRejectsNil(t *testing.T) { if _, err := GetClientDiagnosticsSnapshot(nil); !errors.Is(err, errClientDiagnosticsSnapshotNil) { t.Fatalf("GetClientDiagnosticsSnapshot nil error = %v, want %v", err, errClientDiagnosticsSnapshotNil) } if _, err := GetServerDiagnosticsSnapshot(nil); !errors.Is(err, errServerDiagnosticsSnapshotNil) { t.Fatalf("GetServerDiagnosticsSnapshot nil error = %v, want %v", err, errServerDiagnosticsSnapshotNil) } }