package notify import ( "context" "errors" "io" "net" "strings" "testing" "time" ) func TestBulkOpenRoundTripTCP(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{ Offset: 128, Length: 4096, }, Metadata: BulkMetadata{ "name": "demo.bin", }, ChunkSize: 32 * 1024, }) if err != nil { t.Fatalf("client OpenBulk failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) if accepted.ID != bulk.ID() { t.Fatalf("accepted bulk id mismatch: got %q want %q", accepted.ID, bulk.ID()) } if accepted.Range != (BulkRange{Offset: 128, Length: 4096}) { t.Fatalf("accepted range mismatch: %+v", accepted.Range) } if accepted.Metadata["name"] != "demo.bin" { t.Fatalf("accepted metadata mismatch: %+v", accepted.Metadata) } if accepted.LogicalConn == nil { t.Fatal("accepted logical connection should not be nil") } if accepted.TransportConn == nil { t.Fatal("accepted transport connection should not be nil") } clientHandle, ok := bulk.(*bulkHandle) if !ok { t.Fatalf("bulk type = %T, want *bulkHandle", bulk) } serverHandle, ok := accepted.Bulk.(*bulkHandle) if !ok { t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk) } if clientHandle.dataIDSnapshot() == 0 { t.Fatal("client bulk data id should not be zero") } if got, want := serverHandle.dataIDSnapshot(), clientHandle.dataIDSnapshot(); got != want { t.Fatalf("accepted bulk data id = %d, want %d", got, want) } if _, err := bulk.Write([]byte("hello-from-client")); err != nil { t.Fatalf("client bulk Write failed: %v", err) } readBulkExactly(t, accepted.Bulk, "hello-from-client", 2*time.Second) if _, err := accepted.Bulk.Write([]byte("hello-from-server")); err != nil { t.Fatalf("server bulk Write failed: %v", err) } readBulkExactly(t, bulk, "hello-from-server", 2*time.Second) clientSnapshots, err := GetClientBulkSnapshots(client) if err != nil { t.Fatalf("GetClientBulkSnapshots failed: %v", err) } if len(clientSnapshots) != 1 || clientSnapshots[0].ID != bulk.ID() { t.Fatalf("client bulk snapshots mismatch: %+v", clientSnapshots) } if got, want := clientSnapshots[0].BindingOwner, "client-session"; got != want { t.Fatalf("client bulk BindingOwner = %q, want %q", got, want) } if !clientSnapshots[0].BindingAlive || !clientSnapshots[0].BindingCurrent || !clientSnapshots[0].TransportAttached || !clientSnapshots[0].TransportCurrent { t.Fatalf("client bulk binding snapshot mismatch: %+v", clientSnapshots[0]) } if got, want := clientSnapshots[0].BindingBulkAdaptiveSoftPayloadBytes, bulkAdaptiveSoftPayloadStartBytes; got != want { t.Fatalf("client bulk BindingBulkAdaptiveSoftPayloadBytes = %d, want %d", got, want) } serverSnapshots, err := GetServerBulkSnapshots(server) if err != nil { t.Fatalf("GetServerBulkSnapshots failed: %v", err) } if len(serverSnapshots) != 1 || serverSnapshots[0].ID != bulk.ID() { t.Fatalf("server bulk snapshots mismatch: %+v", serverSnapshots) } if got, want := serverSnapshots[0].BindingOwner, "server-transport"; got != want { t.Fatalf("server bulk BindingOwner = %q, want %q", got, want) } if got, want := serverSnapshots[0].BindingBulkAdaptiveSoftPayloadBytes, bulkAdaptiveSoftPayloadStartBytes; got != want { t.Fatalf("server bulk BindingBulkAdaptiveSoftPayloadBytes = %d, want %d", got, want) } if !serverSnapshots[0].BindingAlive || !serverSnapshots[0].BindingCurrent || !serverSnapshots[0].TransportAttached || !serverSnapshots[0].TransportCurrent { t.Fatalf("server bulk binding snapshot mismatch: %+v", serverSnapshots[0]) } if err := bulk.CloseWrite(); err != nil { t.Fatalf("client bulk CloseWrite failed: %v", err) } waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second) if err := accepted.Bulk.Close(); err != nil { t.Fatalf("server bulk Close failed: %v", err) } waitForBulkReadEOF(t, bulk, 2*time.Second) waitForBulkContextDone(t, bulk.Context(), 2*time.Second) } func TestDedicatedBulkOpenUnblocksSynchronousReadHandler(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } readCh := make(chan string, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { defer func() { _ = info.Bulk.Close() }() buf := make([]byte, 5) if _, err := io.ReadFull(info.Bulk, buf); err != nil { return err } readCh <- string(buf) return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() type openResult struct { bulk Bulk err error } openCh := make(chan openResult, 1) go func() { bulk, err := client.OpenDedicatedBulk(context.Background(), BulkOpenOptions{ ID: "sync-read-handler", Range: BulkRange{ Offset: 0, Length: 5, }, }) openCh <- openResult{bulk: bulk, err: err} }() var bulk Bulk select { case result := <-openCh: if result.err != nil { t.Fatalf("client OpenDedicatedBulk failed: %v", result.err) } bulk = result.bulk case <-time.After(2 * time.Second): t.Fatal("client OpenDedicatedBulk timed out while remote handler was synchronously reading") } defer func() { if bulk != nil { _ = bulk.Close() } }() if _, err := bulk.Write([]byte("hello")); err != nil { t.Fatalf("client dedicated bulk Write failed: %v", err) } select { case got := <-readCh: if got != "hello" { t.Fatalf("server handler read %q, want %q", got, "hello") } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for synchronous handler read") } } func TestDedicatedBulkOpenUnblocksOnBlockingFirstWrite(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } payload := strings.Repeat("w", 16) writeDone := make(chan error, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { _, err := io.WriteString(info.Bulk, payload) if err == nil { err = info.Bulk.CloseWrite() } writeDone <- err return err }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() bulk, err := client.OpenDedicatedBulk(ctx, BulkOpenOptions{ ID: "blocking-write-ready", Range: BulkRange{ Offset: 0, Length: int64(len(payload)), }, ChunkSize: 4, WindowBytes: 4, MaxInFlight: 1, }) if err != nil { t.Fatalf("client OpenDedicatedBulk failed: %v", err) } readBulkExactly(t, bulk, payload, 2*time.Second) waitForBulkReadEOF(t, bulk, 2*time.Second) select { case err := <-writeDone: if err != nil { t.Fatalf("server handler write failed: %v", err) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for blocking first write to finish") } if err := bulk.Close(); err != nil { t.Fatalf("client dedicated bulk Close failed: %v", err) } waitForBulkContextDone(t, bulk.Context(), 2*time.Second) } func TestServerOpenBulkLogicalDedicatedUnblocksOnBlockingFirstRead(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } readCh := make(chan string, 1) client.SetBulkHandler(func(info BulkAcceptInfo) error { defer func() { _ = info.Bulk.Close() }() buf := make([]byte, 5) if _, err := io.ReadFull(info.Bulk, buf); err != nil { return err } readCh <- string(buf) return nil }) if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() logical := waitForTransferControlLogicalConn(t, server, 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() bulk, err := server.OpenBulkLogical(ctx, logical, BulkOpenOptions{ ID: "server-blocking-read-ready", Range: BulkRange{ Offset: 0, Length: 5, }, Dedicated: true, }) if err != nil { t.Fatalf("server OpenBulkLogical dedicated failed: %v", err) } if _, err := bulk.Write([]byte("hello")); err != nil { t.Fatalf("server dedicated bulk Write failed: %v", err) } select { case got := <-readCh: if got != "hello" { t.Fatalf("client handler read %q, want %q", got, "hello") } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for blocking first read to finish") } if err := bulk.Close(); err != nil { t.Fatalf("server dedicated bulk Close failed: %v", err) } waitForBulkContextDone(t, bulk.Context(), 2*time.Second) } func TestDedicatedBulkOpenReturnsHandlerFailureAfterAccepted(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } server.SetBulkHandler(func(info BulkAcceptInfo) error { time.Sleep(80 * time.Millisecond) return errors.New("dedicated handler failed after accept") }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() _, err := client.OpenDedicatedBulk(ctx, BulkOpenOptions{ ID: "accepted-then-fail", Range: BulkRange{ Offset: 0, Length: 1, }, }) if err == nil || !strings.Contains(err.Error(), "dedicated handler failed after accept") { t.Fatalf("client OpenDedicatedBulk error = %v, want dedicated handler failure after accept", err) } } func TestServerOpenBulkLogicalDedicatedReturnsHandlerFailureAfterAccepted(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } client.SetBulkHandler(func(info BulkAcceptInfo) error { time.Sleep(80 * time.Millisecond) return errors.New("client dedicated handler failed after accept") }) if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() logical := waitForTransferControlLogicalConn(t, server, 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() _, err := server.OpenBulkLogical(ctx, logical, BulkOpenOptions{ ID: "server-accepted-then-fail", Range: BulkRange{ Offset: 0, Length: 1, }, Dedicated: true, }) if err == nil || !strings.Contains(err.Error(), "client dedicated handler failed after accept") { t.Fatalf("server OpenBulkLogical dedicated error = %v, want client dedicated handler failure after accept", err) } } func TestBulkOpenRoundTripServerLogicalTCP(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) client.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() logical := waitForTransferControlLogicalConn(t, server, 2*time.Second) bulk, err := server.OpenBulkLogical(context.Background(), logical, BulkOpenOptions{ Range: BulkRange{ Offset: 4096, Length: 8192, }, Metadata: BulkMetadata{ "purpose": "server-open", }, ChunkSize: 64 * 1024, }) if err != nil { t.Fatalf("server OpenBulkLogical failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) if accepted.ID != bulk.ID() { t.Fatalf("accepted bulk id mismatch: got %q want %q", accepted.ID, bulk.ID()) } if accepted.Range != (BulkRange{Offset: 4096, Length: 8192}) { t.Fatalf("accepted range mismatch: %+v", accepted.Range) } if accepted.Metadata["purpose"] != "server-open" { t.Fatalf("accepted metadata mismatch: %+v", accepted.Metadata) } if accepted.LogicalConn != nil { t.Fatalf("client accepted logical connection should be nil: %+v", accepted.LogicalConn) } serverHandle, ok := bulk.(*bulkHandle) if !ok { t.Fatalf("bulk type = %T, want *bulkHandle", bulk) } clientHandle, ok := accepted.Bulk.(*bulkHandle) if !ok { t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk) } if serverHandle.dataIDSnapshot() == 0 { t.Fatal("server bulk data id should not be zero") } if got, want := clientHandle.dataIDSnapshot(), serverHandle.dataIDSnapshot(); got != want { t.Fatalf("client accepted bulk data id = %d, want %d", got, want) } if _, err := bulk.Write([]byte("server-opened")); err != nil { t.Fatalf("server bulk Write failed: %v", err) } readBulkExactly(t, accepted.Bulk, "server-opened", 2*time.Second) if _, err := accepted.Bulk.Write([]byte("client-accepted")); err != nil { t.Fatalf("client bulk Write failed: %v", err) } readBulkExactly(t, bulk, "client-accepted", 2*time.Second) if err := bulk.CloseWrite(); err != nil { t.Fatalf("server bulk CloseWrite failed: %v", err) } waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second) if err := accepted.Bulk.Close(); err != nil { t.Fatalf("client accepted bulk Close failed: %v", err) } waitForBulkReadEOF(t, bulk, 2*time.Second) waitForBulkContextDone(t, bulk.Context(), 2*time.Second) } func TestBulkSnapshotIncludesDetachedBindingDiagnostics(t *testing.T) { server := NewServer().(*ServerCommon) left, right := net.Pipe() defer right.Close() logical := server.bootstrapAcceptedLogical("bulk-snapshot-detach", nil, left) if logical == nil { t.Fatal("bootstrapAcceptedLogical should return logical") } transport := logical.CurrentTransportConn() if transport == nil { t.Fatal("CurrentTransportConn should return active transport") } bulk := newBulkHandle(context.Background(), newBulkRuntime("snapshot-detach"), serverFileScope(logical), BulkOpenRequest{ BulkID: "bulk-snapshot-detach", DataID: 1, Range: BulkRange{ Length: 1, }, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil) server.detachLogicalSessionTransport(logical, "heartbeat timeout", nil) snapshot := bulk.snapshot() if got, want := snapshot.BindingOwner, "server-transport"; got != want { t.Fatalf("snapshot BindingOwner = %q, want %q", got, want) } if snapshot.BindingCurrent { t.Fatalf("snapshot BindingCurrent should be false after detach: %+v", snapshot) } if snapshot.TransportAttached { t.Fatalf("snapshot TransportAttached should be false after detach: %+v", snapshot) } if snapshot.TransportCurrent { t.Fatalf("snapshot TransportCurrent should be false after detach: %+v", snapshot) } if got, want := snapshot.TransportDetachReason, "heartbeat timeout"; got != want { t.Fatalf("snapshot TransportDetachReason = %q, want %q", got, want) } if got, want := snapshot.TransportDetachKind, clientConnTransportDetachKindHeartbeatTimeout; got != want { t.Fatalf("snapshot TransportDetachKind = %q, want %q", got, want) } } func TestServerDetachLogicalSessionTransportResetsScopedBulks(t *testing.T) { server := NewServer().(*ServerCommon) left, right := net.Pipe() defer left.Close() defer right.Close() logical := server.bootstrapAcceptedLogical("bulk-detach-runtime", nil, left) if logical == nil { t.Fatal("bootstrapAcceptedLogical should return logical") } defer server.stopLogicalSession(logical, "test cleanup", nil) transport := logical.CurrentTransportConn() if transport == nil { t.Fatal("CurrentTransportConn should return active transport") } scope := serverFileScope(logical) bulk := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{ BulkID: "bulk-detach-runtime", DataID: 1, Range: BulkRange{Length: 1}, }, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil) if err := server.getBulkRuntime().register(scope, bulk); err != nil { t.Fatalf("register bulk failed: %v", err) } server.detachLogicalSessionTransport(logical, "read error", errors.New("boom")) if err := readBulkError(t, bulk, time.Second); !errors.Is(err, errTransportDetached) { t.Fatalf("detached bulk read error = %v, want %v", err, errTransportDetached) } if _, ok := server.getBulkRuntime().lookup(scope, bulk.ID()); ok { t.Fatal("detached bulk should be removed from runtime") } } func TestBulkOpenRoundTripDedicatedTCP(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } server.SetLink("bulk-dedicated-ping", func(msg *Message) { _ = msg.Reply([]byte("pong")) }) acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{ Offset: 1024, Length: 8192, }, Metadata: BulkMetadata{ "name": "dedicated.bin", }, Dedicated: true, ChunkSize: 32 * 1024, }) if err != nil { t.Fatalf("client OpenBulk dedicated failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) if !accepted.Dedicated { t.Fatal("accepted dedicated flag should be true") } if !bulk.(*bulkHandle).Dedicated() { t.Fatal("client bulk dedicated flag should be true") } clientSnapshots, err := GetClientBulkSnapshots(client) if err != nil { t.Fatalf("GetClientBulkSnapshots failed: %v", err) } if len(clientSnapshots) != 1 || !clientSnapshots[0].Dedicated || !clientSnapshots[0].DedicatedAttached { t.Fatalf("client dedicated bulk snapshots mismatch: %+v", clientSnapshots) } if _, err := bulk.Write([]byte("hello-from-dedicated-client")); err != nil { t.Fatalf("client dedicated bulk Write failed: %v", err) } readBulkExactly(t, accepted.Bulk, "hello-from-dedicated-client", 2*time.Second) if _, err := accepted.Bulk.Write([]byte("hello-from-dedicated-server")); err != nil { t.Fatalf("server dedicated bulk Write failed: %v", err) } readBulkExactly(t, bulk, "hello-from-dedicated-server", 2*time.Second) reply, err := client.SendWait("bulk-dedicated-ping", []byte("ping"), 2*time.Second) if err != nil { t.Fatalf("client SendWait after dedicated bulk failed: %v", err) } if got, want := string(reply.Value), "pong"; got != want { t.Fatalf("SendWait reply mismatch: got %q want %q", got, want) } if err := bulk.CloseWrite(); err != nil { t.Fatalf("client dedicated bulk CloseWrite failed: %v", err) } waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second) if err := accepted.Bulk.Close(); err != nil { t.Fatalf("server dedicated bulk Close failed: %v", err) } waitForBulkReadEOF(t, bulk, 2*time.Second) waitForBulkContextDone(t, bulk.Context(), 2*time.Second) waitForBulkContextDone(t, accepted.Bulk.Context(), 2*time.Second) reply, err = client.SendWait("bulk-dedicated-ping", []byte("ping-after-close"), 2*time.Second) if err != nil { t.Fatalf("client SendWait after dedicated bulk close failed: %v", err) } if got, want := string(reply.Value), "pong"; got != want { t.Fatalf("SendWait reply after close mismatch: got %q want %q", got, want) } } func TestBulkDedicatedResetReasonBeatsTransportDetached(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } server.SetLink("bulk-dedicated-reset-ping", func(msg *Message) { _ = msg.Reply([]byte("pong")) }) acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{Offset: 0, Length: 1024}, Dedicated: true, }) if err != nil { t.Fatalf("client OpenBulk dedicated failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) wantErr := "dedicated remote flow reset" if err := accepted.Bulk.Reset(errors.New(wantErr)); err != nil { t.Fatalf("server dedicated bulk Reset failed: %v", err) } clientHandle := bulk.(*bulkHandle) deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil { if !strings.Contains(resetErr.Error(), wantErr) { t.Fatalf("client reset error = %v, want contains %q", resetErr, wantErr) } break } time.Sleep(10 * time.Millisecond) } if _, err := bulk.Write([]byte("abc")); err == nil || !strings.Contains(err.Error(), wantErr) { t.Fatalf("client dedicated bulk Write error = %v, want contains %q", err, wantErr) } reply, err := client.SendWait("bulk-dedicated-reset-ping", []byte("ping-after-reset"), 2*time.Second) if err != nil { t.Fatalf("client SendWait after dedicated bulk reset failed: %v", err) } if got, want := string(reply.Value), "pong"; got != want { t.Fatalf("SendWait reply after reset mismatch: got %q want %q", got, want) } } func TestBulkWritePrefersResetErrorOverContextCanceled(t *testing.T) { wantErr := errors.New("remote flow reset") bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{ BulkID: "bulk-reset-propagation", DataID: 1, ChunkSize: 4, WindowBytes: 16, MaxInFlight: 4, }, 0, nil, nil, 0, nil, nil, func(ctx context.Context, b *bulkHandle, chunk []byte) error { b.markReset(wantErr) <-ctx.Done() return ctx.Err() }, nil, nil) _, err := bulk.Write([]byte("abcdefgh")) if !errors.Is(err, wantErr) { t.Fatalf("bulk Write error = %v, want %v", err, wantErr) } } func TestDedicatedBulkWaitReadyPrefersClosedPipeOverContextCanceled(t *testing.T) { bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{ BulkID: "bulk-dedicated-ready-close", DataID: 1, Dedicated: true, Range: BulkRange{ Length: 1, }, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) bulk.markPeerClosed() err := bulk.waitDedicatedReady(context.Background()) if !errors.Is(err, io.ErrClosedPipe) { t.Fatalf("waitDedicatedReady error = %v, want %v", err, io.ErrClosedPipe) } } func TestDedicatedBulkWritePrefersClosedPipeOverContextCanceled(t *testing.T) { bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{ BulkID: "bulk-dedicated-write-close", DataID: 1, Dedicated: true, ChunkSize: 4, WindowBytes: 16, MaxInFlight: 4, }, 0, nil, nil, 0, nil, nil, func(context.Context, *bulkHandle, []byte) error { return nil }, func(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) { bulk.markPeerClosed() <-ctx.Done() return 0, ctx.Err() }, nil) _, err := bulk.Write([]byte("abcdefgh")) if err != nil && !errors.Is(err, io.ErrClosedPipe) { t.Fatalf("bulk Write error = %v, want nil or %v", err, io.ErrClosedPipe) } if err := bulk.waitPendingAsyncWrites(context.Background()); err != nil && !errors.Is(err, io.ErrClosedPipe) { t.Fatalf("bulk waitPendingAsyncWrites error = %v, want nil or %v", err, io.ErrClosedPipe) } } func TestBulkReadWaitingLocalClosePrefersClosedPipeOverContextCanceled(t *testing.T) { bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{ BulkID: "bulk-read-local-close", DataID: 1, Range: BulkRange{ Length: 1, }, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) errCh := make(chan error, 1) go func() { buf := make([]byte, 4) _, err := bulk.Read(buf) errCh <- err }() time.Sleep(20 * time.Millisecond) if err := bulk.Close(); err != nil { t.Fatalf("bulk Close failed: %v", err) } select { case err := <-errCh: if !errors.Is(err, io.ErrClosedPipe) { t.Fatalf("bulk Read error = %v, want %v", err, io.ErrClosedPipe) } case <-time.After(time.Second): t.Fatal("bulk Read did not return after local close") } } func TestBulkReleaseControlRoundTripTransport(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() const chunkSize = 64 * 1024 bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{Offset: 0, Length: chunkSize}, ChunkSize: chunkSize, WindowBytes: chunkSize, MaxInFlight: 1, }) if err != nil { t.Fatalf("client OpenBulk failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) clientHandle, ok := bulk.(*bulkHandle) if !ok { t.Fatalf("bulk type = %T, want *bulkHandle", bulk) } serverHandle, ok := accepted.Bulk.(*bulkHandle) if !ok { t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk) } if accepted.TransportConn == nil { t.Fatal("accepted transport connection should not be nil") } clientHandle.mu.Lock() clientHandle.outboundAvailBytes = 0 clientHandle.outboundInFlight = 1 clientHandle.mu.Unlock() if err := sendBulkReleaseServerTransport(server, accepted.TransportConn, BulkReleaseRequest{ BulkID: serverHandle.ID(), DataID: serverHandle.dataIDSnapshot(), Bytes: chunkSize, Chunks: 1, }); err != nil { t.Fatalf("sendBulkReleaseServerTransport failed: %v", err) } deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { clientHandle.mu.Lock() avail := clientHandle.outboundAvailBytes inFlight := clientHandle.outboundInFlight clientHandle.mu.Unlock() if avail == chunkSize && inFlight == 0 { return } time.Sleep(10 * time.Millisecond) } clientHandle.mu.Lock() avail := clientHandle.outboundAvailBytes inFlight := clientHandle.outboundInFlight clientHandle.mu.Unlock() t.Fatalf("client outbound window not released: avail=%d inFlight=%d", avail, inFlight) } func TestBulkSharedWindowFlowControlPreventsBackpressureReset(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() const ( chunkSize = 1024 * 1024 totalBytes = 6 * chunkSize ) bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{ Offset: 0, Length: totalBytes, }, ChunkSize: chunkSize, WindowBytes: chunkSize, MaxInFlight: 1, }) if err != nil { t.Fatalf("client OpenBulk failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) serverHandle, ok := accepted.Bulk.(*bulkHandle) if !ok { t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk) } serverHandle.mu.Lock() serverHandle.inboundQueueLimit = 2 serverHandle.inboundBytesLimit = 2 * chunkSize serverHandle.mu.Unlock() readDone := make(chan error, 1) go func() { buf := make([]byte, chunkSize) total := 0 for { n, err := accepted.Bulk.Read(buf) if n > 0 { total += n time.Sleep(15 * time.Millisecond) } if err != nil { if errors.Is(err, io.EOF) { if total != totalBytes { readDone <- errors.New("server bulk read size mismatch") return } readDone <- nil return } readDone <- err return } } }() payload := make([]byte, totalBytes) for i := range payload { payload[i] = byte(i) } if _, err := bulk.Write(payload); err != nil { t.Fatalf("client bulk Write failed: %v", err) } if err := bulk.CloseWrite(); err != nil { t.Fatalf("client bulk CloseWrite failed: %v", err) } select { case err := <-readDone: if err != nil { t.Fatalf("server read failed: %v", err) } case <-time.After(5 * time.Second): t.Fatal("timed out waiting for server bulk read") } clientHandle, ok := bulk.(*bulkHandle) if !ok { t.Fatalf("bulk type = %T, want *bulkHandle", bulk) } if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil { t.Fatalf("client bulk reset error = %v, want nil", resetErr) } if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil { t.Fatalf("server bulk reset error = %v, want nil", resetErr) } } func TestBulkDedicatedWindowFlowControlPreventsBackpressureReset(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() const ( chunkSize = 1024 * 1024 writeSize = 4 * chunkSize totalWrites = 6 totalBytes = totalWrites * writeSize ) bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{Offset: 0, Length: totalBytes}, Dedicated: true, ChunkSize: chunkSize, WindowBytes: writeSize, MaxInFlight: 4, }) if err != nil { t.Fatalf("client OpenBulk dedicated failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) serverHandle, ok := accepted.Bulk.(*bulkHandle) if !ok { t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk) } serverHandle.mu.Lock() serverHandle.inboundQueueLimit = 8 serverHandle.inboundBytesLimit = writeSize + chunkSize serverHandle.mu.Unlock() readDone := make(chan error, 1) go func() { buf := make([]byte, writeSize) total := 0 for { n, err := accepted.Bulk.Read(buf) if n > 0 { total += n time.Sleep(15 * time.Millisecond) } if err != nil { if errors.Is(err, io.EOF) { if total != totalBytes { readDone <- errors.New("server dedicated bulk read size mismatch") return } readDone <- nil return } readDone <- err return } } }() payload := make([]byte, writeSize) for i := range payload { payload[i] = byte(i) } for i := 0; i < totalWrites; i++ { if _, err := bulk.Write(payload); err != nil { t.Fatalf("client dedicated bulk Write #%d failed: %v", i, err) } } if err := bulk.CloseWrite(); err != nil { t.Fatalf("client dedicated bulk CloseWrite failed: %v", err) } select { case err := <-readDone: if err != nil { t.Fatalf("server dedicated read failed: %v", err) } case <-time.After(5 * time.Second): t.Fatal("timed out waiting for server dedicated bulk read") } clientHandle, ok := bulk.(*bulkHandle) if !ok { t.Fatalf("bulk type = %T, want *bulkHandle", bulk) } if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil { t.Fatalf("client dedicated bulk reset error = %v, want nil", resetErr) } if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil { t.Fatalf("server dedicated bulk reset error = %v, want nil", resetErr) } } func TestBulkOpenRoundTripServerLogicalDedicatedTCP(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) client.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() logical := waitForTransferControlLogicalConn(t, server, 2*time.Second) bulk, err := server.OpenBulkLogical(context.Background(), logical, BulkOpenOptions{ Range: BulkRange{ Offset: 2048, Length: 4096, }, Metadata: BulkMetadata{ "mode": "server-dedicated", }, Dedicated: true, ChunkSize: 32 * 1024, }) if err != nil { t.Fatalf("server OpenBulkLogical dedicated failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) if !accepted.Dedicated { t.Fatal("client accepted dedicated flag should be true") } if _, err := bulk.Write([]byte("server-dedicated")); err != nil { t.Fatalf("server dedicated bulk Write failed: %v", err) } readBulkExactly(t, accepted.Bulk, "server-dedicated", 2*time.Second) if _, err := accepted.Bulk.Write([]byte("client-dedicated")); err != nil { t.Fatalf("client dedicated bulk Write failed: %v", err) } readBulkExactly(t, bulk, "client-dedicated", 2*time.Second) if err := bulk.CloseWrite(); err != nil { t.Fatalf("server dedicated bulk CloseWrite failed: %v", err) } waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second) if err := accepted.Bulk.Close(); err != nil { t.Fatalf("client dedicated bulk Close failed: %v", err) } waitForBulkReadEOF(t, bulk, 2*time.Second) waitForBulkContextDone(t, bulk.Context(), 2*time.Second) } func TestDedicatedBulkCloseWaitsForRemoteCloseBeforeFinalize(t *testing.T) { runtime := newBulkRuntime("dedicated-close") bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{ BulkID: "dedicated-close", DataID: 1, Dedicated: true, Range: BulkRange{ Length: 1, }, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) if err := runtime.register(clientFileScope(), bulk); err != nil { t.Fatalf("register bulk failed: %v", err) } closeCalls := 0 bulk.closeFn = func(context.Context, *bulkHandle, bool) error { closeCalls++ return nil } if err := bulk.Close(); err != nil { t.Fatalf("bulk Close failed: %v", err) } if got, want := closeCalls, 1; got != want { t.Fatalf("closeFn calls = %d, want %d", got, want) } select { case <-bulk.Context().Done(): t.Fatal("dedicated full close should wait for remote close before finalize") default: } if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); !ok { t.Fatal("bulk runtime entry should remain until remote close arrives") } snapshot := bulk.snapshot() if !snapshot.LocalClosed || !snapshot.LocalReadClosed { t.Fatalf("local close snapshot mismatch: %+v", snapshot) } if snapshot.RemoteClosed { t.Fatalf("remote close should not be set yet: %+v", snapshot) } bulk.markRemoteClosed() waitForBulkContextDone(t, bulk.Context(), 2*time.Second) if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); ok { t.Fatal("bulk runtime entry should be removed after remote close") } } func TestHandleDedicatedBulkReadErrorTreatsEOFAfterLocalCloseAsGraceful(t *testing.T) { runtime := newBulkRuntime("dedicated-eof") bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{ BulkID: "dedicated-eof", DataID: 1, Dedicated: true, Range: BulkRange{ Length: 1, }, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) if err := runtime.register(clientFileScope(), bulk); err != nil { t.Fatalf("register bulk failed: %v", err) } bulk.mu.Lock() bulk.localClosed = true bulk.mu.Unlock() handleDedicatedBulkReadError(bulk, io.EOF) if resetErr := bulk.resetErrSnapshot(); resetErr != nil { t.Fatalf("reset error = %v, want nil", resetErr) } if !bulk.remoteClosedSnapshot() { t.Fatal("remoteClosed should be set after graceful EOF") } waitForBulkContextDone(t, bulk.Context(), 2*time.Second) if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); ok { t.Fatal("bulk runtime entry should be removed after graceful EOF") } } func TestHandleDedicatedBulkReadErrorTreatsEOFEvenBeforeLocalCloseAsGracefulForDedicated(t *testing.T) { runtime := newBulkRuntime("dedicated-eof-remote") bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{ BulkID: "dedicated-eof-remote", DataID: 1, Dedicated: true, Range: BulkRange{ Length: 8, }, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) if err := runtime.register(clientFileScope(), bulk); err != nil { t.Fatalf("register bulk failed: %v", err) } handleDedicatedBulkReadError(bulk, io.EOF) if resetErr := bulk.resetErrSnapshot(); resetErr != nil { t.Fatalf("reset error = %v, want nil", resetErr) } if !bulk.remoteClosedSnapshot() { t.Fatal("remoteClosed should be set after dedicated EOF") } if _, err := bulk.Read(make([]byte, 1)); !errors.Is(err, io.EOF) { t.Fatalf("bulk Read error = %v, want EOF", err) } if err := bulk.Close(); err != nil { t.Fatalf("bulk Close failed: %v", err) } waitForBulkContextDone(t, bulk.Context(), 2*time.Second) } func TestDedicatedBulkCloseWriteHalfClosesUnderlyingTCPWriteSide(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("net.Listen failed: %v", err) } defer func() { _ = listener.Close() }() serverConnCh := make(chan net.Conn, 1) serverErrCh := make(chan error, 1) go func() { conn, err := listener.Accept() if err != nil { serverErrCh <- err return } serverConnCh <- conn }() clientConnRaw, err := net.Dial("tcp", listener.Addr().String()) if err != nil { t.Fatalf("net.Dial failed: %v", err) } defer func() { _ = clientConnRaw.Close() }() var serverConn net.Conn select { case serverConn = <-serverConnCh: case err := <-serverErrCh: t.Fatalf("listener.Accept failed: %v", err) case <-time.After(2 * time.Second): t.Fatal("timed out waiting for accepted TCP conn") } defer func() { _ = serverConn.Close() }() runtime := newBulkRuntime("dedicated-half-close") bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{ BulkID: "dedicated-half-close", DataID: 1, Dedicated: true, Range: BulkRange{ Length: 1, }, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) if err := runtime.register(clientFileScope(), bulk); err != nil { t.Fatalf("register bulk failed: %v", err) } if err := bulk.attachDedicatedConn(clientConnRaw); err != nil { t.Fatalf("attachDedicatedConn failed: %v", err) } readDone := make(chan error, 1) go func() { _ = serverConn.SetReadDeadline(time.Now().Add(2 * time.Second)) var buf [1]byte _, err := serverConn.Read(buf[:]) readDone <- err }() if err := bulk.CloseWrite(); err != nil { t.Fatalf("bulk CloseWrite failed: %v", err) } select { case err := <-readDone: if !errors.Is(err, io.EOF) { t.Fatalf("server conn Read error = %v, want EOF", err) } case <-time.After(3 * time.Second): t.Fatal("timed out waiting for TCP half-close EOF") } bulk.markRemoteClosed() waitForBulkContextDone(t, bulk.Context(), 2*time.Second) } func TestBulkDedicatedClientFullCloseAfterCloseWriteDoesNotResetTCP(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ ID: "dedicated-close-after-closewrite", Dedicated: true, Range: BulkRange{ Length: 5, }, }) if err != nil { t.Fatalf("client OpenBulk dedicated failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) if _, err := bulk.Write([]byte("hello")); err != nil { t.Fatalf("client dedicated bulk Write failed: %v", err) } readBulkExactly(t, accepted.Bulk, "hello", 2*time.Second) if err := bulk.CloseWrite(); err != nil { t.Fatalf("client dedicated bulk CloseWrite failed: %v", err) } if err := bulk.Close(); err != nil { t.Fatalf("client dedicated bulk Close failed: %v", err) } waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second) if err := accepted.Bulk.Close(); err != nil { t.Fatalf("server dedicated bulk Close failed: %v", err) } waitForBulkContextDone(t, bulk.Context(), 2*time.Second) waitForBulkContextDone(t, accepted.Bulk.Context(), 2*time.Second) clientHandle := bulk.(*bulkHandle) serverHandle := accepted.Bulk.(*bulkHandle) if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil { t.Fatalf("client dedicated bulk reset error = %v, want nil", resetErr) } if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil { t.Fatalf("server dedicated bulk reset error = %v, want nil", resetErr) } } func TestBulkSharedConcurrentWritersWithSlowReceiver(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } const bulkCount = 6 acceptCh := make(chan BulkAcceptInfo, bulkCount) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() type bulkPair struct { client Bulk server Bulk } pairs := make([]bulkPair, 0, bulkCount) for i := 0; i < bulkCount; i++ { bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ ID: "slow-shared-" + formatInt(int64(i)), ChunkSize: 64 * 1024, WindowBytes: 128 * 1024, MaxInFlight: 2, WriteTimeout: 2 * time.Second, Range: BulkRange{ Length: 1024 * 1024, }, }) if err != nil { t.Fatalf("client OpenBulk #%d failed: %v", i, err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) pairs = append(pairs, bulkPair{client: bulk, server: accepted.Bulk}) } readErrCh := make(chan error, bulkCount) for _, pair := range pairs { go func(serverBulk Bulk) { buf := make([]byte, 32*1024) total := 0 for { n, err := serverBulk.Read(buf) if n > 0 { total += n time.Sleep(2 * time.Millisecond) } if err != nil { if errors.Is(err, io.EOF) { if closeErr := serverBulk.Close(); closeErr != nil { readErrCh <- closeErr return } readErrCh <- nil return } readErrCh <- err return } } }(pair.server) } writeErrCh := make(chan error, bulkCount) payload := make([]byte, 64*1024) for i := range payload { payload[i] = byte(i) } for _, pair := range pairs { go func(clientBulk Bulk) { defer func() { _ = clientBulk.Close() }() for written := 0; written < 1024*1024; written += len(payload) { if _, err := clientBulk.Write(payload); err != nil { writeErrCh <- err return } } if err := clientBulk.CloseWrite(); err != nil { writeErrCh <- err return } writeErrCh <- nil }(pair.client) } for i := 0; i < bulkCount; i++ { select { case err := <-writeErrCh: if err != nil { t.Fatalf("slow receiver client write failed: %v", err) } case <-time.After(10 * time.Second): t.Fatal("timed out waiting for client writes under slow receiver") } } for i := 0; i < bulkCount; i++ { select { case err := <-readErrCh: if err != nil { t.Fatalf("slow receiver server read failed: %v", err) } case <-time.After(10 * time.Second): t.Fatal("timed out waiting for server reads under slow receiver") } } for _, pair := range pairs { waitForBulkContextDone(t, pair.client.Context(), 2*time.Second) waitForBulkContextDone(t, pair.server.Context(), 2*time.Second) if handle, ok := pair.client.(*bulkHandle); ok { if resetErr := handle.resetErrSnapshot(); resetErr != nil { t.Fatalf("client bulk reset error = %v, want nil", resetErr) } } if handle, ok := pair.server.(*bulkHandle); ok { if resetErr := handle.resetErrSnapshot(); resetErr != nil { t.Fatalf("server bulk reset error = %v, want nil", resetErr) } } } } func TestBulkOpenRequiresHandlerTCP(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() _, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{ Offset: 0, Length: 128, }, }) if !errors.Is(err, errBulkHandlerNotConfigured) { t.Fatalf("client OpenBulk error = %v, want %v", err, errBulkHandlerNotConfigured) } } func waitAcceptedBulk(t *testing.T, ch <-chan BulkAcceptInfo, timeout time.Duration) BulkAcceptInfo { t.Helper() select { case info := <-ch: return info case <-time.After(timeout): t.Fatal("timed out waiting for accepted bulk") return BulkAcceptInfo{} } } func waitForBulkReadEOF(t *testing.T, bulk Bulk, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) buf := make([]byte, 1) for time.Now().Before(deadline) { _, err := bulk.Read(buf) if errors.Is(err, io.EOF) { return } if err != nil { t.Fatalf("bulk Read returned unexpected error: %v", err) } time.Sleep(10 * time.Millisecond) } t.Fatal("timed out waiting for bulk EOF") } func waitForBulkContextDone(t *testing.T, ctx context.Context, timeout time.Duration) { t.Helper() select { case <-ctx.Done(): case <-time.After(timeout): t.Fatal("timed out waiting for bulk context done") } } func readBulkExactly(t *testing.T, bulk Bulk, want string, timeout time.Duration) { t.Helper() errCh := make(chan error, 1) go func() { buf := make([]byte, len(want)) _, err := io.ReadFull(bulk, buf) if err != nil { errCh <- err return } if got := string(buf); got != want { errCh <- errors.New("bulk payload mismatch: got " + got + " want " + want) return } errCh <- nil }() select { case err := <-errCh: if err != nil { t.Fatal(err) } case <-time.After(timeout): t.Fatal("timed out waiting for bulk payload") } } func readBulkError(t *testing.T, bulk Bulk, timeout time.Duration) error { t.Helper() errCh := make(chan error, 1) go func() { buf := make([]byte, 1) _, err := bulk.Read(buf) errCh <- err }() select { case err := <-errCh: if err == nil { t.Fatal("expected bulk read error, got nil") } return err case <-time.After(timeout): t.Fatal("timed out waiting for bulk read error") return nil } }