2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"math"
|
|
|
|
|
"net"
|
|
|
|
|
"testing"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
itransfer "b612.me/notify/internal/transfer"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func TestGetClientDiagnosticsSnapshotDefaults(t *testing.T) {
|
|
|
|
|
client := NewClient()
|
|
|
|
|
snapshot, err := GetClientDiagnosticsSnapshot(client)
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Runtime.OwnerState, "idle"; got != want {
|
|
|
|
|
t.Fatalf("Runtime.OwnerState = %q, want %q", got, want)
|
|
|
|
|
}
|
2026-04-15 19:52:45 +08:00
|
|
|
if len(snapshot.Streams) != 0 || len(snapshot.Bulks) != 0 || len(snapshot.Records) != 0 || len(snapshot.Transfers) != 0 {
|
2026-04-15 15:24:36 +08:00
|
|
|
t.Fatalf("default diagnostics should be empty: %+v", snapshot)
|
|
|
|
|
}
|
|
|
|
|
if snapshot.Summary != (DiagnosticsSummary{}) {
|
|
|
|
|
t.Fatalf("default summary mismatch: %+v", snapshot.Summary)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestGetClientDiagnosticsSnapshotAggregatesActiveState(t *testing.T) {
|
|
|
|
|
server := NewServer().(*ServerCommon)
|
|
|
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
|
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamAcceptCh := make(chan StreamAcceptInfo, 1)
|
|
|
|
|
bulkAcceptCh := make(chan BulkAcceptInfo, 1)
|
|
|
|
|
server.SetStreamHandler(func(info StreamAcceptInfo) error {
|
|
|
|
|
streamAcceptCh <- info
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
|
|
|
bulkAcceptCh <- info
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
|
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = server.Stop()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
|
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
|
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
|
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = client.Stop()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
stream, err := client.OpenStream(context.Background(), StreamOpenOptions{
|
|
|
|
|
ID: "diag-client-stream",
|
|
|
|
|
Channel: StreamDataChannel,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("client OpenStream failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
waitAcceptedStream(t, streamAcceptCh, 2*time.Second)
|
|
|
|
|
|
|
|
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
|
|
|
ID: "diag-client-bulk",
|
|
|
|
|
Range: BulkRange{
|
|
|
|
|
Length: 64,
|
|
|
|
|
},
|
|
|
|
|
ChunkSize: 16 * 1024,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("client OpenBulk failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
waitAcceptedBulk(t, bulkAcceptCh, 2*time.Second)
|
|
|
|
|
|
|
|
|
|
transferRuntime := client.getTransferRuntime()
|
|
|
|
|
transferRuntime.ensureTransferDescriptor(fileTransferDirectionSend, clientFileScope(), clientFileScope(), 0, itransfer.Descriptor{
|
|
|
|
|
ID: "diag-client-transfer-done",
|
|
|
|
|
Channel: itransfer.DataChannel,
|
|
|
|
|
Size: 32,
|
|
|
|
|
Checksum: "sum-client",
|
|
|
|
|
})
|
|
|
|
|
transferRuntime.activate(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done")
|
|
|
|
|
transferRuntime.complete(fileTransferDirectionSend, clientFileScope(), "diag-client-transfer-done")
|
|
|
|
|
|
|
|
|
|
snapshot, err := GetClientDiagnosticsSnapshot(client)
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.LogicalCount, 1; got != want {
|
|
|
|
|
t.Fatalf("LogicalCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.CurrentTransportCount, 1; got != want {
|
|
|
|
|
t.Fatalf("CurrentTransportCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.StreamCount, 1; got != want {
|
|
|
|
|
t.Fatalf("StreamCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.ActiveStreamCount, 1; got != want {
|
|
|
|
|
t.Fatalf("ActiveStreamCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.BulkCount, 1; got != want {
|
|
|
|
|
t.Fatalf("BulkCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.ActiveBulkCount, 1; got != want {
|
|
|
|
|
t.Fatalf("ActiveBulkCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.TransferCount, 1; got != want {
|
|
|
|
|
t.Fatalf("TransferCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.DoneTransferCount, 1; got != want {
|
|
|
|
|
t.Fatalf("DoneTransferCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got := snapshot.Summary.StaleStreamCount + snapshot.Summary.ResetStreamCount + snapshot.Summary.StaleBulkCount + snapshot.Summary.ResetBulkCount + snapshot.Summary.FailedTransferCount; got != 0 {
|
|
|
|
|
t.Fatalf("unexpected unhealthy counters in active snapshot: %+v", snapshot.Summary)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ = stream.Close()
|
|
|
|
|
_ = bulk.Close()
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 19:52:45 +08:00
|
|
|
func TestGetDiagnosticsSnapshotAggregatesActiveRecordState(t *testing.T) {
|
|
|
|
|
server := NewServer().(*ServerCommon)
|
|
|
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
|
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
recordAcceptCh := make(chan RecordAcceptInfo, 1)
|
|
|
|
|
recordReleaseCh := make(chan struct{})
|
|
|
|
|
recordHandlerDone := make(chan error, 1)
|
|
|
|
|
server.SetRecordStreamHandler(func(info RecordAcceptInfo) error {
|
|
|
|
|
recordAcceptCh <- info
|
|
|
|
|
msg, err := info.RecordStream.ReadRecord(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
recordHandlerDone <- err
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if string(msg.Payload) != "diag-record" {
|
|
|
|
|
err = errors.New("unexpected record payload")
|
|
|
|
|
recordHandlerDone <- err
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := info.RecordStream.AckRecord(msg.Seq); err != nil {
|
|
|
|
|
recordHandlerDone <- err
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
<-recordReleaseCh
|
|
|
|
|
err = info.RecordStream.Close()
|
|
|
|
|
recordHandlerDone <- err
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
|
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = server.Stop()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
|
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
|
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
|
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = client.Stop()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
record, err := client.OpenRecordStream(context.Background(), RecordOpenOptions{
|
|
|
|
|
Stream: StreamOpenOptions{ID: "diag-client-record"},
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("client OpenRecordStream failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-recordAcceptCh:
|
|
|
|
|
case <-time.After(2 * time.Second):
|
|
|
|
|
t.Fatal("timed out waiting accepted record stream")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, err := record.WriteRecord(context.Background(), []byte("diag-record")); err != nil {
|
|
|
|
|
t.Fatalf("WriteRecord failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := record.Barrier(context.Background()); err != nil {
|
|
|
|
|
t.Fatalf("Barrier failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
clientSnapshot, err := GetClientDiagnosticsSnapshot(client)
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if got, want := len(clientSnapshot.Records), 1; got != want {
|
|
|
|
|
t.Fatalf("client record snapshot count = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := clientSnapshot.Summary.RecordCount, 1; got != want {
|
|
|
|
|
t.Fatalf("client RecordCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := clientSnapshot.Summary.ActiveRecordCount, 1; got != want {
|
|
|
|
|
t.Fatalf("client ActiveRecordCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
clientRecord := clientSnapshot.Records[0]
|
|
|
|
|
if got := clientRecord.BatchFramesSent; got < 1 {
|
|
|
|
|
t.Fatalf("client BatchFramesSent = %d, want >= 1", got)
|
|
|
|
|
}
|
|
|
|
|
if got := clientRecord.AckFramesReceived; got < 1 {
|
|
|
|
|
t.Fatalf("client AckFramesReceived = %d, want >= 1", got)
|
|
|
|
|
}
|
|
|
|
|
if got := clientRecord.BarrierCount; got < 1 {
|
|
|
|
|
t.Fatalf("client BarrierCount = %d, want >= 1", got)
|
|
|
|
|
}
|
|
|
|
|
if got := clientSnapshot.Summary.RecordTelemetry.FrameSendCount; got < 1 {
|
|
|
|
|
t.Fatalf("client RecordTelemetry.FrameSendCount = %d, want >= 1", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
serverSnapshot, err := GetServerDiagnosticsSnapshot(server)
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if got, want := len(serverSnapshot.Records), 1; got != want {
|
|
|
|
|
t.Fatalf("server record snapshot count = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := serverSnapshot.Summary.RecordCount, 1; got != want {
|
|
|
|
|
t.Fatalf("server RecordCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := serverSnapshot.Summary.ActiveRecordCount, 1; got != want {
|
|
|
|
|
t.Fatalf("server ActiveRecordCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
serverRecord := serverSnapshot.Records[0]
|
|
|
|
|
if got := serverRecord.BatchFramesReceived; got < 1 {
|
|
|
|
|
t.Fatalf("server BatchFramesReceived = %d, want >= 1", got)
|
|
|
|
|
}
|
|
|
|
|
if got := serverRecord.AckFramesSent; got < 1 {
|
|
|
|
|
t.Fatalf("server AckFramesSent = %d, want >= 1", got)
|
|
|
|
|
}
|
|
|
|
|
if got := serverSnapshot.Summary.RecordTelemetry.FrameReceiveCount; got < 1 {
|
|
|
|
|
t.Fatalf("server RecordTelemetry.FrameReceiveCount = %d, want >= 1", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
close(recordReleaseCh)
|
|
|
|
|
select {
|
|
|
|
|
case err := <-recordHandlerDone:
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("record handler failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
case <-time.After(2 * time.Second):
|
|
|
|
|
t.Fatal("timed out waiting record handler completion")
|
|
|
|
|
}
|
|
|
|
|
_ = record.Close()
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func TestGetServerDiagnosticsSnapshotAggregatesStaleAndResetState(t *testing.T) {
|
|
|
|
|
server := NewServer().(*ServerCommon)
|
|
|
|
|
|
|
|
|
|
left, right := net.Pipe()
|
|
|
|
|
defer right.Close()
|
|
|
|
|
|
|
|
|
|
logical := server.bootstrapAcceptedLogical("diag-server-peer", nil, left)
|
|
|
|
|
if logical == nil {
|
|
|
|
|
t.Fatal("bootstrapAcceptedLogical should return logical")
|
|
|
|
|
}
|
|
|
|
|
logical.markIdentityBound()
|
|
|
|
|
logical.compatClientConn().markClientConnStreamTransport()
|
|
|
|
|
transport := logical.CurrentTransportConn()
|
|
|
|
|
if transport == nil {
|
|
|
|
|
t.Fatal("CurrentTransportConn should return active transport")
|
|
|
|
|
}
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
|
|
|
|
|
streamStale := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{
|
|
|
|
|
StreamID: "diag-stream-stale",
|
|
|
|
|
DataID: 1,
|
|
|
|
|
Channel: StreamDataChannel,
|
|
|
|
|
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig())
|
|
|
|
|
if err := server.getStreamRuntime().register(scope, streamStale); err != nil {
|
|
|
|
|
t.Fatalf("register stale stream failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
streamReset := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{
|
|
|
|
|
StreamID: "diag-stream-reset",
|
|
|
|
|
DataID: 2,
|
|
|
|
|
Channel: StreamDataChannel,
|
|
|
|
|
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, defaultStreamConfig())
|
|
|
|
|
if err := server.getStreamRuntime().register(scope, streamReset); err != nil {
|
|
|
|
|
t.Fatalf("register reset stream failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
streamReset.mu.Lock()
|
|
|
|
|
streamReset.resetErr = errTransportDetached
|
|
|
|
|
streamReset.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
bulkStale := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{
|
|
|
|
|
BulkID: "diag-bulk-stale",
|
|
|
|
|
DataID: 3,
|
|
|
|
|
Range: BulkRange{
|
|
|
|
|
Length: 16,
|
|
|
|
|
},
|
|
|
|
|
ChunkSize: 32 * 1024,
|
|
|
|
|
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
|
|
|
|
|
if err := server.getBulkRuntime().register(scope, bulkStale); err != nil {
|
|
|
|
|
t.Fatalf("register stale bulk failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
bulkReset := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{
|
|
|
|
|
BulkID: "diag-bulk-reset",
|
|
|
|
|
DataID: 4,
|
|
|
|
|
Dedicated: true,
|
|
|
|
|
Range: BulkRange{
|
|
|
|
|
Length: 16,
|
|
|
|
|
},
|
|
|
|
|
ChunkSize: 32 * 1024,
|
|
|
|
|
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
|
|
|
|
|
if err := server.getBulkRuntime().register(scope, bulkReset); err != nil {
|
|
|
|
|
t.Fatalf("register reset bulk failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
bulkReset.mu.Lock()
|
|
|
|
|
bulkReset.resetErr = errTransportDetached
|
|
|
|
|
bulkReset.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
transferRuntime := server.getTransferRuntime()
|
|
|
|
|
transferRuntime.ensureTransferDescriptor(fileTransferDirectionReceive, scope, scope, transport.TransportGeneration(), itransfer.Descriptor{
|
|
|
|
|
ID: "diag-transfer-failed",
|
|
|
|
|
Channel: itransfer.DataChannel,
|
|
|
|
|
Size: 64,
|
|
|
|
|
Checksum: "sum-server",
|
|
|
|
|
})
|
|
|
|
|
transferRuntime.activate(fileTransferDirectionReceive, scope, "diag-transfer-failed")
|
|
|
|
|
transferRuntime.fail(fileTransferDirectionReceive, scope, "diag-transfer-failed", errors.New("boom"))
|
|
|
|
|
|
|
|
|
|
logical.markTransportDetached("heartbeat timeout", nil)
|
|
|
|
|
logical.detachServerOwnedTransport()
|
|
|
|
|
|
|
|
|
|
snapshot, err := GetServerDiagnosticsSnapshot(server)
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if got, want := len(snapshot.Logicals), 1; got != want {
|
|
|
|
|
t.Fatalf("logical snapshot count = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := len(snapshot.CurrentTransports), 0; got != want {
|
|
|
|
|
t.Fatalf("current transport snapshot count = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Runtime.DetachedClientCount, 1; got != want {
|
|
|
|
|
t.Fatalf("DetachedClientCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.LogicalCount, 1; got != want {
|
|
|
|
|
t.Fatalf("LogicalCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.CurrentTransportCount, 0; got != want {
|
|
|
|
|
t.Fatalf("CurrentTransportCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.StreamCount, 2; got != want {
|
|
|
|
|
t.Fatalf("StreamCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.StaleStreamCount, 1; got != want {
|
|
|
|
|
t.Fatalf("StaleStreamCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.ResetStreamCount, 1; got != want {
|
|
|
|
|
t.Fatalf("ResetStreamCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.StreamResetCauses.Total, 1; got != want {
|
|
|
|
|
t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.StreamResetCauses.TransportDetached, 1; got != want {
|
|
|
|
|
t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.BulkCount, 2; got != want {
|
|
|
|
|
t.Fatalf("BulkCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.DedicatedBulkCount, 1; got != want {
|
|
|
|
|
t.Fatalf("DedicatedBulkCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.StaleBulkCount, 1; got != want {
|
|
|
|
|
t.Fatalf("StaleBulkCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.ResetBulkCount, 1; got != want {
|
|
|
|
|
t.Fatalf("ResetBulkCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.BulkResetCauses.Total, 1; got != want {
|
|
|
|
|
t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.BulkResetCauses.TransportDetached, 1; got != want {
|
|
|
|
|
t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.TransferCount, 1; got != want {
|
|
|
|
|
t.Fatalf("TransferCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := snapshot.Summary.FailedTransferCount, 1; got != want {
|
|
|
|
|
t.Fatalf("FailedTransferCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestDiagnosticsSummaryClassifiesResetCauses(t *testing.T) {
|
|
|
|
|
summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{
|
|
|
|
|
Streams: []StreamSnapshot{
|
|
|
|
|
{ResetError: errTransportDetached.Error()},
|
|
|
|
|
{ResetError: errServiceShutdown.Error()},
|
|
|
|
|
{ResetError: errStreamBackpressureExceeded.Error()},
|
|
|
|
|
{ResetError: "stream boom"},
|
|
|
|
|
},
|
|
|
|
|
Bulks: []BulkSnapshot{
|
|
|
|
|
{ResetError: errTransportDetached.Error()},
|
|
|
|
|
{ResetError: errServiceShutdown.Error()},
|
|
|
|
|
{ResetError: errBulkBackpressureExceeded.Error()},
|
|
|
|
|
{ResetError: "bulk boom"},
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if got, want := summary.ResetStreamCount, 4; got != want {
|
|
|
|
|
t.Fatalf("ResetStreamCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.StreamResetCauses.Total, 4; got != want {
|
|
|
|
|
t.Fatalf("StreamResetCauses.Total = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.StreamResetCauses.TransportDetached, 1; got != want {
|
|
|
|
|
t.Fatalf("StreamResetCauses.TransportDetached = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.StreamResetCauses.ServiceShutdown, 1; got != want {
|
|
|
|
|
t.Fatalf("StreamResetCauses.ServiceShutdown = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.StreamResetCauses.Backpressure, 1; got != want {
|
|
|
|
|
t.Fatalf("StreamResetCauses.Backpressure = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.StreamResetCauses.Other, 1; got != want {
|
|
|
|
|
t.Fatalf("StreamResetCauses.Other = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if got, want := summary.ResetBulkCount, 4; got != want {
|
|
|
|
|
t.Fatalf("ResetBulkCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.BulkResetCauses.Total, 4; got != want {
|
|
|
|
|
t.Fatalf("BulkResetCauses.Total = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.BulkResetCauses.TransportDetached, 1; got != want {
|
|
|
|
|
t.Fatalf("BulkResetCauses.TransportDetached = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.BulkResetCauses.ServiceShutdown, 1; got != want {
|
|
|
|
|
t.Fatalf("BulkResetCauses.ServiceShutdown = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.BulkResetCauses.Backpressure, 1; got != want {
|
|
|
|
|
t.Fatalf("BulkResetCauses.Backpressure = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.BulkResetCauses.Other, 1; got != want {
|
|
|
|
|
t.Fatalf("BulkResetCauses.Other = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 19:52:45 +08:00
|
|
|
func TestDiagnosticsSummaryAggregatesRecordTelemetry(t *testing.T) {
|
|
|
|
|
summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{
|
|
|
|
|
Records: []RecordSnapshot{
|
|
|
|
|
{
|
|
|
|
|
ID: "record-active",
|
|
|
|
|
BindingCurrent: true,
|
|
|
|
|
TransportAttached: true,
|
|
|
|
|
TransportCurrent: true,
|
|
|
|
|
OutstandingRecords: 3,
|
|
|
|
|
OutstandingBytes: 4096,
|
|
|
|
|
PendingApplyRecords: 2,
|
|
|
|
|
PendingAckRecords: 1,
|
|
|
|
|
PeakPendingApplyRecords: 5,
|
|
|
|
|
BatchFramesSent: 10,
|
|
|
|
|
AckFramesSent: 4,
|
|
|
|
|
ErrorFramesSent: 1,
|
|
|
|
|
BatchFramesReceived: 8,
|
|
|
|
|
AckFramesReceived: 3,
|
|
|
|
|
ErrorFramesReceived: 0,
|
|
|
|
|
PiggybackAckSent: 6,
|
|
|
|
|
PiggybackAckReceived: 2,
|
|
|
|
|
BarrierCount: 4,
|
|
|
|
|
BarrierFlushWaitDuration: 10 * time.Millisecond,
|
|
|
|
|
BarrierApplyWaitDuration: 30 * time.Millisecond,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
ID: "record-reset",
|
|
|
|
|
ResetError: errTransportDetached.Error(),
|
|
|
|
|
OutstandingRecords: 1,
|
|
|
|
|
OutstandingBytes: 512,
|
|
|
|
|
PendingApplyRecords: 3,
|
|
|
|
|
PendingAckRecords: 2,
|
|
|
|
|
PeakPendingApplyRecords: 7,
|
|
|
|
|
BatchFramesSent: 2,
|
|
|
|
|
AckFramesSent: 1,
|
|
|
|
|
ErrorFramesSent: 1,
|
|
|
|
|
BatchFramesReceived: 1,
|
|
|
|
|
AckFramesReceived: 1,
|
|
|
|
|
ErrorFramesReceived: 1,
|
|
|
|
|
PiggybackAckSent: 1,
|
|
|
|
|
PiggybackAckReceived: 1,
|
|
|
|
|
BarrierCount: 1,
|
|
|
|
|
BarrierFlushWaitDuration: 5 * time.Millisecond,
|
|
|
|
|
BarrierApplyWaitDuration: 15 * time.Millisecond,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if got, want := summary.RecordCount, 2; got != want {
|
|
|
|
|
t.Fatalf("RecordCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.ActiveRecordCount, 1; got != want {
|
|
|
|
|
t.Fatalf("ActiveRecordCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.ResetRecordCount, 1; got != want {
|
|
|
|
|
t.Fatalf("ResetRecordCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.RecordResetCauses.Total, 1; got != want {
|
|
|
|
|
t.Fatalf("RecordResetCauses.Total = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.RecordResetCauses.TransportDetached, 1; got != want {
|
|
|
|
|
t.Fatalf("RecordResetCauses.TransportDetached = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
telemetry := summary.RecordTelemetry
|
|
|
|
|
if got, want := telemetry.BatchFramesSent, int64(12); got != want {
|
|
|
|
|
t.Fatalf("BatchFramesSent = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.AckFramesSent, int64(5); got != want {
|
|
|
|
|
t.Fatalf("AckFramesSent = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.ErrorFramesSent, int64(2); got != want {
|
|
|
|
|
t.Fatalf("ErrorFramesSent = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.BatchFramesReceived, int64(9); got != want {
|
|
|
|
|
t.Fatalf("BatchFramesReceived = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.AckFramesReceived, int64(4); got != want {
|
|
|
|
|
t.Fatalf("AckFramesReceived = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.ErrorFramesReceived, int64(1); got != want {
|
|
|
|
|
t.Fatalf("ErrorFramesReceived = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.FrameSendCount, int64(19); got != want {
|
|
|
|
|
t.Fatalf("FrameSendCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.FrameReceiveCount, int64(14); got != want {
|
|
|
|
|
t.Fatalf("FrameReceiveCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.PiggybackAckSent, int64(7); got != want {
|
|
|
|
|
t.Fatalf("PiggybackAckSent = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.PiggybackAckReceived, int64(3); got != want {
|
|
|
|
|
t.Fatalf("PiggybackAckReceived = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.BarrierCount, int64(5); got != want {
|
|
|
|
|
t.Fatalf("BarrierCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.BarrierFlushWaitDuration, 15*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("BarrierFlushWaitDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.BarrierApplyWaitDuration, 45*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("BarrierApplyWaitDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.OutstandingRecords, 4; got != want {
|
|
|
|
|
t.Fatalf("OutstandingRecords = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.OutstandingBytes, 4608; got != want {
|
|
|
|
|
t.Fatalf("OutstandingBytes = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.PendingApplyRecords, 5; got != want {
|
|
|
|
|
t.Fatalf("PendingApplyRecords = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.PendingAckRecords, 3; got != want {
|
|
|
|
|
t.Fatalf("PendingAckRecords = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.PeakPendingApplyRecords, 7; got != want {
|
|
|
|
|
t.Fatalf("PeakPendingApplyRecords = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func TestDiagnosticsSummaryAggregatesTransferTelemetry(t *testing.T) {
|
|
|
|
|
summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{
|
|
|
|
|
Transfers: []TransferSnapshot{
|
|
|
|
|
{
|
|
|
|
|
ID: "send-done",
|
|
|
|
|
State: TransferStateDone,
|
|
|
|
|
SentBytes: 2048,
|
|
|
|
|
SourceReadDuration: 200 * time.Millisecond,
|
|
|
|
|
StreamWriteDuration: 400 * time.Millisecond,
|
|
|
|
|
CommitWaitDuration: 100 * time.Millisecond,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
ID: "recv-failed",
|
|
|
|
|
State: TransferStateFailed,
|
|
|
|
|
ReceivedBytes: 1024,
|
|
|
|
|
SinkWriteDuration: 250 * time.Millisecond,
|
|
|
|
|
SyncDuration: 50 * time.Millisecond,
|
|
|
|
|
VerifyDuration: 25 * time.Millisecond,
|
|
|
|
|
CommitDuration: 75 * time.Millisecond,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if got, want := summary.TransferCount, 2; got != want {
|
|
|
|
|
t.Fatalf("TransferCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.DoneTransferCount, 1; got != want {
|
|
|
|
|
t.Fatalf("DoneTransferCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := summary.FailedTransferCount, 1; got != want {
|
|
|
|
|
t.Fatalf("FailedTransferCount = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
telemetry := summary.TransferTelemetry
|
|
|
|
|
if got, want := telemetry.SourceReadBytes, int64(2048); got != want {
|
|
|
|
|
t.Fatalf("SourceReadBytes = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.StreamWriteBytes, int64(2048); got != want {
|
|
|
|
|
t.Fatalf("StreamWriteBytes = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.SinkWriteBytes, int64(1024); got != want {
|
|
|
|
|
t.Fatalf("SinkWriteBytes = %d, want %d", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.SourceReadDuration, 200*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("SourceReadDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.StreamWriteDuration, 400*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("StreamWriteDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.SinkWriteDuration, 250*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("SinkWriteDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.SyncDuration, 50*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("SyncDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.VerifyDuration, 25*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("VerifyDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.CommitDuration, 75*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("CommitDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.CommitWaitDuration, 100*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("CommitWaitDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.WorkDuration, time.Second; got != want {
|
|
|
|
|
t.Fatalf("WorkDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.ObservedDuration, 1100*time.Millisecond; got != want {
|
|
|
|
|
t.Fatalf("ObservedDuration = %v, want %v", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.SourceReadThroughputBPS, 10240.0; math.Abs(got-want) > 0.001 {
|
|
|
|
|
t.Fatalf("SourceReadThroughputBPS = %f, want %f", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.StreamWriteThroughputBPS, 5120.0; math.Abs(got-want) > 0.001 {
|
|
|
|
|
t.Fatalf("StreamWriteThroughputBPS = %f, want %f", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.SinkWriteThroughputBPS, 4096.0; math.Abs(got-want) > 0.001 {
|
|
|
|
|
t.Fatalf("SinkWriteThroughputBPS = %f, want %f", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got, want := telemetry.CommitWaitRatio, 1.0/11.0; math.Abs(got-want) > 0.000001 {
|
|
|
|
|
t.Fatalf("CommitWaitRatio = %f, want %f", got, want)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestGetDiagnosticsSnapshotRejectsNil(t *testing.T) {
|
|
|
|
|
if _, err := GetClientDiagnosticsSnapshot(nil); !errors.Is(err, errClientDiagnosticsSnapshotNil) {
|
|
|
|
|
t.Fatalf("GetClientDiagnosticsSnapshot nil error = %v, want %v", err, errClientDiagnosticsSnapshotNil)
|
|
|
|
|
}
|
|
|
|
|
if _, err := GetServerDiagnosticsSnapshot(nil); !errors.Is(err, errServerDiagnosticsSnapshotNil) {
|
|
|
|
|
t.Fatalf("GetServerDiagnosticsSnapshot nil error = %v, want %v", err, errServerDiagnosticsSnapshotNil)
|
|
|
|
|
}
|
|
|
|
|
}
|