notify/transfer_snapshot_test.go

505 lines
20 KiB
Go
Raw Permalink Normal View History

package notify
import (
itransfer "b612.me/notify/internal/transfer"
"errors"
"math"
"testing"
"time"
)
func TestGetClientTransferSnapshotsAndByID(t *testing.T) {
client := NewClient()
common := client.(*ClientCommon)
now := time.Unix(1500, 0)
common.getFileTransferState().observe(fileTransferDirectionSend, FileEvent{
Kind: EnvelopeFileMeta,
Packet: FilePacket{FileID: "client-transfer", Name: "demo.bin", Size: 12, Checksum: "sum-client"},
Path: "/tmp/demo.bin",
Time: now,
})
common.getFileTransferState().observe(fileTransferDirectionSend, FileEvent{
Kind: EnvelopeFileChunk,
Packet: FilePacket{FileID: "client-transfer", Name: "demo.bin", Size: 12, Checksum: "sum-client"},
Received: 12,
Time: now.Add(time.Second),
})
common.getFileTransferState().observe(fileTransferDirectionSend, FileEvent{
Kind: EnvelopeFileEnd,
Packet: FilePacket{FileID: "client-transfer", Name: "demo.bin", Size: 12, Checksum: "sum-client"},
Received: 12,
Done: true,
Time: now.Add(2 * time.Second),
})
snapshots, err := GetClientTransferSnapshots(client)
if err != nil {
t.Fatalf("GetClientTransferSnapshots failed: %v", err)
}
if got, want := len(snapshots), 1; got != want {
t.Fatalf("snapshot count = %d, want %d", got, want)
}
if got, want := snapshots[0].State, TransferStateDone; got != want {
t.Fatalf("state = %v, want %v", got, want)
}
if got, want := snapshots[0].Scope, clientFileScope(); got != want {
t.Fatalf("scope = %q, want %q", got, want)
}
if got, want := snapshots[0].RuntimeScope, clientFileScope(); got != want {
t.Fatalf("runtime scope = %q, want %q", got, want)
}
if got := snapshots[0].TransportGeneration; got != 0 {
t.Fatalf("transport generation = %d, want 0", got)
}
if got, want := snapshots[0].Direction, TransferDirectionSend; got != want {
t.Fatalf("direction = %v, want %v", got, want)
}
if got, want := snapshots[0].Channel, TransferChannelData; got != want {
t.Fatalf("channel = %q, want %q", got, want)
}
if got, want := snapshots[0].AckedBytes, int64(12); got != want {
t.Fatalf("acked bytes = %d, want %d", got, want)
}
if got, want := snapshots[0].Stage, "end"; got != want {
t.Fatalf("stage = %q, want %q", got, want)
}
if got := snapshots[0].LastFailureStage; got != "" {
t.Fatalf("last failure stage = %q, want empty", got)
}
if got := snapshots[0].Metadata["path"]; got != "/tmp/demo.bin" {
t.Fatalf("metadata path = %q, want /tmp/demo.bin", got)
}
if snapshots[0].StartedAt.IsZero() || snapshots[0].UpdatedAt.IsZero() || snapshots[0].CompletedAt.IsZero() {
t.Fatal("snapshot timestamps should be populated")
}
snapshot, ok, err := GetClientTransferSnapshotByID(client, "client-transfer")
if err != nil {
t.Fatalf("GetClientTransferSnapshotByID failed: %v", err)
}
if !ok {
t.Fatal("client transfer snapshot should exist")
}
if got, want := snapshot.ID, "client-transfer"; got != want {
t.Fatalf("snapshot ID = %q, want %q", got, want)
}
scopedSnapshot, ok, err := GetClientTransferSnapshotByIDScope(client, "client-transfer", clientFileScope())
if err != nil {
t.Fatalf("GetClientTransferSnapshotByIDScope failed: %v", err)
}
if !ok {
t.Fatal("client transfer scoped snapshot should exist")
}
if got, want := scopedSnapshot.Scope, clientFileScope(); got != want {
t.Fatalf("scoped snapshot scope = %q, want %q", got, want)
}
if _, ok, err := GetClientTransferSnapshotByID(client, "missing-transfer"); err != nil || ok {
t.Fatalf("missing transfer lookup = (%v, %v), want (nil, false)", err, ok)
}
if _, ok, err := GetClientTransferSnapshotByIDScope(client, "missing-transfer", clientFileScope()); err != nil || ok {
t.Fatalf("missing scoped transfer lookup = (%v, %v), want (nil, false)", err, ok)
}
}
func TestGetServerTransferSnapshotsAndByID(t *testing.T) {
server := NewServer()
common := server.(*ServerCommon)
now := time.Unix(1600, 0)
common.getFileTransferState().observe(fileTransferDirectionReceive, FileEvent{
Kind: EnvelopeFileMeta,
Packet: FilePacket{FileID: "server-transfer", Name: "recv.bin", Size: 7, Checksum: "sum-server"},
Time: now,
})
common.getFileTransferState().observe(fileTransferDirectionReceive, FileEvent{
Kind: EnvelopeFileChunk,
Packet: FilePacket{FileID: "server-transfer", Name: "recv.bin", Size: 7, Checksum: "sum-server"},
Received: 7,
Time: now.Add(time.Second),
})
common.getFileTransferState().observe(fileTransferDirectionReceive, FileEvent{
Kind: EnvelopeFileEnd,
Packet: FilePacket{FileID: "server-transfer", Name: "recv.bin", Size: 7, Checksum: "sum-server"},
Received: 7,
Done: true,
Time: now.Add(2 * time.Second),
})
snapshots, err := GetServerTransferSnapshots(server)
if err != nil {
t.Fatalf("GetServerTransferSnapshots failed: %v", err)
}
if got, want := len(snapshots), 1; got != want {
t.Fatalf("snapshot count = %d, want %d", got, want)
}
if got, want := snapshots[0].Direction, TransferDirectionReceive; got != want {
t.Fatalf("direction = %v, want %v", got, want)
}
if got, want := snapshots[0].Scope, clientFileScope(); got != want {
t.Fatalf("scope = %q, want %q", got, want)
}
if got, want := snapshots[0].RuntimeScope, clientFileScope(); got != want {
t.Fatalf("runtime scope = %q, want %q", got, want)
}
if got := snapshots[0].TransportGeneration; got != 0 {
t.Fatalf("transport generation = %d, want 0", got)
}
if got, want := snapshots[0].ReceivedBytes, int64(7); got != want {
t.Fatalf("received bytes = %d, want %d", got, want)
}
if got, want := snapshots[0].State, TransferStateDone; got != want {
t.Fatalf("state = %v, want %v", got, want)
}
snapshot, ok, err := GetServerTransferSnapshotByID(server, "server-transfer")
if err != nil {
t.Fatalf("GetServerTransferSnapshotByID failed: %v", err)
}
if !ok {
t.Fatal("server transfer snapshot should exist")
}
if got, want := snapshot.ID, "server-transfer"; got != want {
t.Fatalf("snapshot ID = %q, want %q", got, want)
}
scopedSnapshot, ok, err := GetServerTransferSnapshotByIDScope(server, "server-transfer", clientFileScope())
if err != nil {
t.Fatalf("GetServerTransferSnapshotByIDScope failed: %v", err)
}
if !ok {
t.Fatal("server transfer scoped snapshot should exist")
}
if got, want := scopedSnapshot.Scope, clientFileScope(); got != want {
t.Fatalf("scoped snapshot scope = %q, want %q", got, want)
}
}
func TestGetTransferSnapshotsRejectNil(t *testing.T) {
if _, err := GetClientTransferSnapshots(nil); !errors.Is(err, errClientTransferSnapshotNil) {
t.Fatalf("GetClientTransferSnapshots nil error = %v, want %v", err, errClientTransferSnapshotNil)
}
if _, err := GetServerTransferSnapshots(nil); !errors.Is(err, errServerTransferSnapshotNil) {
t.Fatalf("GetServerTransferSnapshots nil error = %v, want %v", err, errServerTransferSnapshotNil)
}
if _, _, err := GetClientTransferSnapshotByID(nil, "x"); !errors.Is(err, errClientTransferSnapshotNil) {
t.Fatalf("GetClientTransferSnapshotByID nil error = %v, want %v", err, errClientTransferSnapshotNil)
}
if _, _, err := GetServerTransferSnapshotByID(nil, "x"); !errors.Is(err, errServerTransferSnapshotNil) {
t.Fatalf("GetServerTransferSnapshotByID nil error = %v, want %v", err, errServerTransferSnapshotNil)
}
if _, _, err := GetClientTransferSnapshotByIDScope(nil, "x", "scope-a"); !errors.Is(err, errClientTransferSnapshotNil) {
t.Fatalf("GetClientTransferSnapshotByIDScope nil error = %v, want %v", err, errClientTransferSnapshotNil)
}
if _, _, err := GetServerTransferSnapshotByIDScope(nil, "x", "scope-a"); !errors.Is(err, errServerTransferSnapshotNil) {
t.Fatalf("GetServerTransferSnapshotByIDScope nil error = %v, want %v", err, errServerTransferSnapshotNil)
}
if _, _, err := GetClientTransferSnapshotByIDQuery(nil, "x", TransferSnapshotQuery{}); !errors.Is(err, errClientTransferSnapshotNil) {
t.Fatalf("GetClientTransferSnapshotByIDQuery nil error = %v, want %v", err, errClientTransferSnapshotNil)
}
if _, _, err := GetServerTransferSnapshotByIDQuery(nil, "x", TransferSnapshotQuery{}); !errors.Is(err, errServerTransferSnapshotNil) {
t.Fatalf("GetServerTransferSnapshotByIDQuery nil error = %v, want %v", err, errServerTransferSnapshotNil)
}
}
func TestGetClientTransferSnapshotExposesFailureStage(t *testing.T) {
client := NewClient()
common := client.(*ClientCommon)
state := common.getFileTransferState()
session := &fileSendSession{
fileID: "client-failure-stage",
path: "/tmp/failure.bin",
name: "failure.bin",
size: 1,
checksum: "sum-failure",
}
state.startRuntimeSendSession(clientFileScope(), clientFileScope(), 0, session)
state.recordRuntimeStage(fileTransferDirectionSend, clientFileScope(), session.fileID, "meta")
state.recordRuntimeTimeout(fileTransferDirectionSend, clientFileScope(), session.fileID)
state.recordRuntimeFailureStage(fileTransferDirectionSend, clientFileScope(), session.fileID, "meta")
state.observe(fileTransferDirectionSend, FileEvent{
Kind: EnvelopeFileAbort,
Packet: FilePacket{FileID: session.fileID, Name: session.name, Size: session.size, Checksum: session.checksum, Stage: "meta"},
Err: errString("meta timeout"),
Time: time.Unix(1700, 0),
})
snapshot, ok, err := GetClientTransferSnapshotByID(client, session.fileID)
if err != nil {
t.Fatalf("GetClientTransferSnapshotByID failed: %v", err)
}
if !ok {
t.Fatal("client transfer snapshot should exist")
}
if got, want := snapshot.Stage, "meta"; got != want {
t.Fatalf("stage = %q, want %q", got, want)
}
if got, want := snapshot.LastFailureStage, "meta"; got != want {
t.Fatalf("last failure stage = %q, want %q", got, want)
}
if got, want := snapshot.LastError, "meta timeout"; got != want {
t.Fatalf("last error = %q, want %q", got, want)
}
}
func TestGetClientTransferSnapshotByIDRejectsAmbiguousMatches(t *testing.T) {
client := NewClient()
common := client.(*ClientCommon)
now := time.Unix(1750, 0)
common.getFileTransferState().observe(fileTransferDirectionSend, FileEvent{
Kind: EnvelopeFileMeta,
Packet: FilePacket{FileID: "shared-transfer", Name: "send.bin", Size: 4, Checksum: "sum-send"},
Time: now,
})
common.getFileTransferState().observe(fileTransferDirectionReceive, FileEvent{
Kind: EnvelopeFileMeta,
Packet: FilePacket{FileID: "shared-transfer", Name: "recv.bin", Size: 6, Checksum: "sum-recv"},
Time: now.Add(time.Second),
})
snapshots, err := GetClientTransferSnapshots(client)
if err != nil {
t.Fatalf("GetClientTransferSnapshots failed: %v", err)
}
if got, want := len(snapshots), 2; got != want {
t.Fatalf("snapshot count = %d, want %d", got, want)
}
if _, ok, err := GetClientTransferSnapshotByID(client, "shared-transfer"); err != nil || ok {
t.Fatalf("GetClientTransferSnapshotByID ambiguous = (%v, %v), want (nil, false)", err, ok)
}
}
func TestGetClientTransferSnapshotByIDScopeResolvesScopedMatches(t *testing.T) {
client := NewClient()
runtime := client.(*ClientCommon).getTransferRuntime()
seedTransferRuntimeSnapshot(runtime, fileTransferDirectionSend, "scope-a", "shared-transfer", itransfer.DataChannel, 4, map[string]string{
"name": "a.bin",
})
seedTransferRuntimeSnapshot(runtime, fileTransferDirectionSend, "scope-b", "shared-transfer", itransfer.ControlChannel, 8, map[string]string{
"name": "b.bin",
})
if _, ok, err := GetClientTransferSnapshotByID(client, "shared-transfer"); err != nil || ok {
t.Fatalf("GetClientTransferSnapshotByID ambiguous = (%v, %v), want (nil, false)", err, ok)
}
scopeASnapshot, ok, err := GetClientTransferSnapshotByIDScope(client, "shared-transfer", " scope-a ")
if err != nil {
t.Fatalf("GetClientTransferSnapshotByIDScope scope-a failed: %v", err)
}
if !ok {
t.Fatal("scope-a snapshot should exist")
}
if got, want := scopeASnapshot.Scope, "scope-a"; got != want {
t.Fatalf("scope-a snapshot scope = %q, want %q", got, want)
}
if got, want := scopeASnapshot.Metadata["name"], "a.bin"; got != want {
t.Fatalf("scope-a snapshot metadata[name] = %q, want %q", got, want)
}
if got, want := scopeASnapshot.Channel, TransferChannelData; got != want {
t.Fatalf("scope-a snapshot channel = %q, want %q", got, want)
}
scopeBSnapshot, ok, err := GetClientTransferSnapshotByIDScope(client, "shared-transfer", "scope-b")
if err != nil {
t.Fatalf("GetClientTransferSnapshotByIDScope scope-b failed: %v", err)
}
if !ok {
t.Fatal("scope-b snapshot should exist")
}
if got, want := scopeBSnapshot.Scope, "scope-b"; got != want {
t.Fatalf("scope-b snapshot scope = %q, want %q", got, want)
}
if got, want := scopeBSnapshot.Metadata["name"], "b.bin"; got != want {
t.Fatalf("scope-b snapshot metadata[name] = %q, want %q", got, want)
}
if got, want := scopeBSnapshot.Channel, TransferChannelControl; got != want {
t.Fatalf("scope-b snapshot channel = %q, want %q", got, want)
}
}
func TestGetClientTransferSnapshotByIDScopeRejectsDirectionAmbiguity(t *testing.T) {
client := NewClient()
runtime := client.(*ClientCommon).getTransferRuntime()
seedTransferRuntimeSnapshot(runtime, fileTransferDirectionSend, "shared-scope", "shared-transfer", itransfer.DataChannel, 4, map[string]string{
"name": "send.bin",
})
seedTransferRuntimeSnapshot(runtime, fileTransferDirectionReceive, "shared-scope", "shared-transfer", itransfer.DataChannel, 4, map[string]string{
"name": "recv.bin",
})
if _, ok, err := GetClientTransferSnapshotByIDScope(client, "shared-transfer", "shared-scope"); err != nil || ok {
t.Fatalf("GetClientTransferSnapshotByIDScope ambiguous = (%v, %v), want (nil, false)", err, ok)
}
}
func TestGetServerTransferSnapshotByIDQueryResolvesTransportGeneration(t *testing.T) {
server := NewServer()
runtime := server.(*ServerCommon).getTransferRuntime()
seedTransferRuntimeSnapshotWithBinding(runtime, fileTransferDirectionReceive, "peer-gen-1", "peer", 1, "shared-transfer", itransfer.DataChannel, 4, map[string]string{
"name": "gen1.bin",
})
seedTransferRuntimeSnapshotWithBinding(runtime, fileTransferDirectionReceive, "peer-gen-2", "peer", 2, "shared-transfer", itransfer.DataChannel, 8, map[string]string{
"name": "gen2.bin",
})
if _, ok, err := GetServerTransferSnapshotByID(server, "shared-transfer"); err != nil || ok {
t.Fatalf("GetServerTransferSnapshotByID ambiguous = (%v, %v), want (nil, false)", err, ok)
}
if _, ok, err := GetServerTransferSnapshotByIDScope(server, "shared-transfer", "peer"); err != nil || ok {
t.Fatalf("GetServerTransferSnapshotByIDScope ambiguous = (%v, %v), want (nil, false)", err, ok)
}
byRuntime, ok, err := GetServerTransferSnapshotByIDQuery(server, "shared-transfer", TransferSnapshotQuery{
RuntimeScope: "peer-gen-2",
})
if err != nil {
t.Fatalf("GetServerTransferSnapshotByIDQuery runtime scope failed: %v", err)
}
if !ok {
t.Fatal("runtime-scoped snapshot should exist")
}
if got, want := byRuntime.Metadata["name"], "gen2.bin"; got != want {
t.Fatalf("runtime-scoped snapshot metadata[name] = %q, want %q", got, want)
}
byGeneration, ok, err := GetServerTransferSnapshotByIDQuery(server, "shared-transfer", TransferSnapshotQuery{
Scope: "peer",
TransportGeneration: 1,
MatchTransportGeneration: true,
})
if err != nil {
t.Fatalf("GetServerTransferSnapshotByIDQuery generation failed: %v", err)
}
if !ok {
t.Fatal("generation-scoped snapshot should exist")
}
if got, want := byGeneration.Metadata["name"], "gen1.bin"; got != want {
t.Fatalf("generation-scoped snapshot metadata[name] = %q, want %q", got, want)
}
if _, ok, err := GetServerTransferSnapshotByIDQuery(server, "shared-transfer", TransferSnapshotQuery{
Scope: "peer",
}); err != nil || ok {
t.Fatalf("GetServerTransferSnapshotByIDQuery ambiguous filter = (%v, %v), want (nil, false)", err, ok)
}
}
func TestTransferSnapshotTelemetrySummarySend(t *testing.T) {
snapshot := TransferSnapshot{
Direction: TransferDirectionSend,
SentBytes: 2048,
AckedBytes: 2048,
SourceReadDuration: 200 * time.Millisecond,
StreamWriteDuration: 400 * time.Millisecond,
CommitWaitDuration: 100 * time.Millisecond,
}
summary := snapshot.TelemetrySummary()
if got, want := summary.SourceReadBytes, int64(2048); got != want {
t.Fatalf("source read bytes = %d, want %d", got, want)
}
if got, want := summary.StreamWriteBytes, int64(2048); got != want {
t.Fatalf("stream write bytes = %d, want %d", got, want)
}
if got := summary.SinkWriteBytes; got != 0 {
t.Fatalf("sink write bytes = %d, want 0", got)
}
if got, want := summary.WorkDuration, 600*time.Millisecond; got != want {
t.Fatalf("work duration = %v, want %v", got, want)
}
if got, want := summary.ObservedDuration, 700*time.Millisecond; got != want {
t.Fatalf("observed duration = %v, want %v", got, want)
}
if got, want := summary.SourceReadThroughputBPS, 10240.0; math.Abs(got-want) > 0.001 {
t.Fatalf("source throughput = %f, want %f", got, want)
}
if got, want := summary.StreamWriteThroughputBPS, 5120.0; math.Abs(got-want) > 0.001 {
t.Fatalf("stream throughput = %f, want %f", got, want)
}
if got, want := summary.CommitWaitRatio, 1.0/7.0; math.Abs(got-want) > 0.000001 {
t.Fatalf("commit wait ratio = %f, want %f", got, want)
}
}
func TestTransferSnapshotTelemetrySummaryReceive(t *testing.T) {
snapshot := TransferSnapshot{
Direction: TransferDirectionReceive,
ReceivedBytes: 4096,
SinkWriteDuration: 500 * time.Millisecond,
SyncDuration: 200 * time.Millisecond,
VerifyDuration: 100 * time.Millisecond,
CommitDuration: 300 * time.Millisecond,
}
summary := snapshot.TelemetrySummary()
if got := summary.SourceReadBytes; got != 0 {
t.Fatalf("source read bytes = %d, want 0", got)
}
if got := summary.StreamWriteBytes; got != 0 {
t.Fatalf("stream write bytes = %d, want 0", got)
}
if got, want := summary.SinkWriteBytes, int64(4096); got != want {
t.Fatalf("sink write bytes = %d, want %d", got, want)
}
if got, want := summary.WorkDuration, 1100*time.Millisecond; got != want {
t.Fatalf("work duration = %v, want %v", got, want)
}
if got, want := summary.ObservedDuration, 1100*time.Millisecond; got != want {
t.Fatalf("observed duration = %v, want %v", got, want)
}
if got, want := summary.SinkWriteThroughputBPS, 8192.0; math.Abs(got-want) > 0.001 {
t.Fatalf("sink throughput = %f, want %f", got, want)
}
if got := summary.CommitWaitRatio; got != 0 {
t.Fatalf("commit wait ratio = %f, want 0", got)
}
}
func TestTransferSnapshotTelemetrySummaryHandlesZeroDurations(t *testing.T) {
snapshot := TransferSnapshot{
SentBytes: 128,
ReceivedBytes: 256,
}
summary := snapshot.TelemetrySummary()
if summary.SourceReadThroughputBPS != 0 || summary.StreamWriteThroughputBPS != 0 || summary.SinkWriteThroughputBPS != 0 {
t.Fatalf("throughputs with zero duration should be zero: %+v", summary)
}
if summary.CommitWaitRatio != 0 {
t.Fatalf("commit wait ratio with zero observed duration = %f, want 0", summary.CommitWaitRatio)
}
}
func seedTransferRuntimeSnapshot(runtime *transferRuntime, direction fileTransferDirection, scope string, transferID string, channel itransfer.Channel, size int64, metadata map[string]string) {
seedTransferRuntimeSnapshotWithBinding(runtime, direction, scope, scope, 0, transferID, channel, size, metadata)
}
func seedTransferRuntimeSnapshotWithBinding(runtime *transferRuntime, direction fileTransferDirection, runtimeScope string, publicScope string, transportGeneration uint64, transferID string, channel itransfer.Channel, size int64, metadata map[string]string) {
if runtime == nil {
return
}
desc := itransfer.Descriptor{
ID: transferID,
Channel: channel,
Size: size,
Checksum: "seed-checksum",
Metadata: itransfer.Metadata(metadata),
}
runtime.ensureTransferDescriptor(direction, runtimeScope, publicScope, transportGeneration, desc)
runtime.recordStage(direction, runtimeScope, transferID, "seed")
switch direction {
case fileTransferDirectionReceive:
runtime.recordReceive(direction, runtimeScope, transferID, size)
default:
runtime.recordSend(direction, runtimeScope, transferID, size)
runtime.setAckedBytes(direction, runtimeScope, transferID, size)
}
runtime.complete(direction, runtimeScope, transferID)
}