notify/bulk_test.go
starainrt f038a89771
fix: close stream adaptive gaps and switch notify to stario v0.1.1
- make stream fast path honor adaptive soft payload limits end-to-end
  - split oversized fast-stream payloads into sequential frames before batching
  - use adaptive soft cap when encoding stream batch payloads
  - move timeout-like error detection into production code for adaptive tx
  - tune notify FrameReader read size explicitly to avoid throughput regression
  - drop local stario replace and depend on released b612.me/stario v0.1.1
2026-04-18 16:05:57 +08:00

1812 lines
53 KiB
Go

package notify
import (
"context"
"errors"
"io"
"net"
"strings"
"testing"
"time"
)
func TestBulkOpenRoundTripTCP(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{
Offset: 128,
Length: 4096,
},
Metadata: BulkMetadata{
"name": "demo.bin",
},
ChunkSize: 32 * 1024,
})
if err != nil {
t.Fatalf("client OpenBulk failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
if accepted.ID != bulk.ID() {
t.Fatalf("accepted bulk id mismatch: got %q want %q", accepted.ID, bulk.ID())
}
if accepted.Range != (BulkRange{Offset: 128, Length: 4096}) {
t.Fatalf("accepted range mismatch: %+v", accepted.Range)
}
if accepted.Metadata["name"] != "demo.bin" {
t.Fatalf("accepted metadata mismatch: %+v", accepted.Metadata)
}
if accepted.LogicalConn == nil {
t.Fatal("accepted logical connection should not be nil")
}
if accepted.TransportConn == nil {
t.Fatal("accepted transport connection should not be nil")
}
clientHandle, ok := bulk.(*bulkHandle)
if !ok {
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
}
serverHandle, ok := accepted.Bulk.(*bulkHandle)
if !ok {
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
}
if clientHandle.dataIDSnapshot() == 0 {
t.Fatal("client bulk data id should not be zero")
}
if got, want := serverHandle.dataIDSnapshot(), clientHandle.dataIDSnapshot(); got != want {
t.Fatalf("accepted bulk data id = %d, want %d", got, want)
}
if _, err := bulk.Write([]byte("hello-from-client")); err != nil {
t.Fatalf("client bulk Write failed: %v", err)
}
readBulkExactly(t, accepted.Bulk, "hello-from-client", 2*time.Second)
if _, err := accepted.Bulk.Write([]byte("hello-from-server")); err != nil {
t.Fatalf("server bulk Write failed: %v", err)
}
readBulkExactly(t, bulk, "hello-from-server", 2*time.Second)
clientSnapshots, err := GetClientBulkSnapshots(client)
if err != nil {
t.Fatalf("GetClientBulkSnapshots failed: %v", err)
}
if len(clientSnapshots) != 1 || clientSnapshots[0].ID != bulk.ID() {
t.Fatalf("client bulk snapshots mismatch: %+v", clientSnapshots)
}
if got, want := clientSnapshots[0].BindingOwner, "client-session"; got != want {
t.Fatalf("client bulk BindingOwner = %q, want %q", got, want)
}
if !clientSnapshots[0].BindingAlive || !clientSnapshots[0].BindingCurrent || !clientSnapshots[0].TransportAttached || !clientSnapshots[0].TransportCurrent {
t.Fatalf("client bulk binding snapshot mismatch: %+v", clientSnapshots[0])
}
if got, want := clientSnapshots[0].BindingBulkAdaptiveSoftPayloadBytes, bulkAdaptiveSoftPayloadStartBytes; got != want {
t.Fatalf("client bulk BindingBulkAdaptiveSoftPayloadBytes = %d, want %d", got, want)
}
serverSnapshots, err := GetServerBulkSnapshots(server)
if err != nil {
t.Fatalf("GetServerBulkSnapshots failed: %v", err)
}
if len(serverSnapshots) != 1 || serverSnapshots[0].ID != bulk.ID() {
t.Fatalf("server bulk snapshots mismatch: %+v", serverSnapshots)
}
if got, want := serverSnapshots[0].BindingOwner, "server-transport"; got != want {
t.Fatalf("server bulk BindingOwner = %q, want %q", got, want)
}
if got, want := serverSnapshots[0].BindingBulkAdaptiveSoftPayloadBytes, bulkAdaptiveSoftPayloadStartBytes; got != want {
t.Fatalf("server bulk BindingBulkAdaptiveSoftPayloadBytes = %d, want %d", got, want)
}
if !serverSnapshots[0].BindingAlive || !serverSnapshots[0].BindingCurrent || !serverSnapshots[0].TransportAttached || !serverSnapshots[0].TransportCurrent {
t.Fatalf("server bulk binding snapshot mismatch: %+v", serverSnapshots[0])
}
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("client bulk CloseWrite failed: %v", err)
}
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
if err := accepted.Bulk.Close(); err != nil {
t.Fatalf("server bulk Close failed: %v", err)
}
waitForBulkReadEOF(t, bulk, 2*time.Second)
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
}
func TestDedicatedBulkOpenUnblocksSynchronousReadHandler(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
readCh := make(chan string, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
defer func() {
_ = info.Bulk.Close()
}()
buf := make([]byte, 5)
if _, err := io.ReadFull(info.Bulk, buf); err != nil {
return err
}
readCh <- string(buf)
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
type openResult struct {
bulk Bulk
err error
}
openCh := make(chan openResult, 1)
go func() {
bulk, err := client.OpenDedicatedBulk(context.Background(), BulkOpenOptions{
ID: "sync-read-handler",
Range: BulkRange{
Offset: 0,
Length: 5,
},
})
openCh <- openResult{bulk: bulk, err: err}
}()
var bulk Bulk
select {
case result := <-openCh:
if result.err != nil {
t.Fatalf("client OpenDedicatedBulk failed: %v", result.err)
}
bulk = result.bulk
case <-time.After(2 * time.Second):
t.Fatal("client OpenDedicatedBulk timed out while remote handler was synchronously reading")
}
defer func() {
if bulk != nil {
_ = bulk.Close()
}
}()
if _, err := bulk.Write([]byte("hello")); err != nil {
t.Fatalf("client dedicated bulk Write failed: %v", err)
}
select {
case got := <-readCh:
if got != "hello" {
t.Fatalf("server handler read %q, want %q", got, "hello")
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for synchronous handler read")
}
}
func TestDedicatedBulkOpenUnblocksOnBlockingFirstWrite(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
payload := strings.Repeat("w", 16)
writeDone := make(chan error, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
_, err := io.WriteString(info.Bulk, payload)
if err == nil {
err = info.Bulk.CloseWrite()
}
writeDone <- err
return err
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
bulk, err := client.OpenDedicatedBulk(ctx, BulkOpenOptions{
ID: "blocking-write-ready",
Range: BulkRange{
Offset: 0,
Length: int64(len(payload)),
},
ChunkSize: 4,
WindowBytes: 4,
MaxInFlight: 1,
})
if err != nil {
t.Fatalf("client OpenDedicatedBulk failed: %v", err)
}
readBulkExactly(t, bulk, payload, 2*time.Second)
waitForBulkReadEOF(t, bulk, 2*time.Second)
select {
case err := <-writeDone:
if err != nil {
t.Fatalf("server handler write failed: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for blocking first write to finish")
}
if err := bulk.Close(); err != nil {
t.Fatalf("client dedicated bulk Close failed: %v", err)
}
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
}
func TestServerOpenBulkLogicalDedicatedUnblocksOnBlockingFirstRead(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
readCh := make(chan string, 1)
client.SetBulkHandler(func(info BulkAcceptInfo) error {
defer func() {
_ = info.Bulk.Close()
}()
buf := make([]byte, 5)
if _, err := io.ReadFull(info.Bulk, buf); err != nil {
return err
}
readCh <- string(buf)
return nil
})
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
bulk, err := server.OpenBulkLogical(ctx, logical, BulkOpenOptions{
ID: "server-blocking-read-ready",
Range: BulkRange{
Offset: 0,
Length: 5,
},
Dedicated: true,
})
if err != nil {
t.Fatalf("server OpenBulkLogical dedicated failed: %v", err)
}
if _, err := bulk.Write([]byte("hello")); err != nil {
t.Fatalf("server dedicated bulk Write failed: %v", err)
}
select {
case got := <-readCh:
if got != "hello" {
t.Fatalf("client handler read %q, want %q", got, "hello")
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for blocking first read to finish")
}
if err := bulk.Close(); err != nil {
t.Fatalf("server dedicated bulk Close failed: %v", err)
}
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
}
func TestDedicatedBulkOpenReturnsHandlerFailureAfterAccepted(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
server.SetBulkHandler(func(info BulkAcceptInfo) error {
time.Sleep(80 * time.Millisecond)
return errors.New("dedicated handler failed after accept")
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := client.OpenDedicatedBulk(ctx, BulkOpenOptions{
ID: "accepted-then-fail",
Range: BulkRange{
Offset: 0,
Length: 1,
},
})
if err == nil || !strings.Contains(err.Error(), "dedicated handler failed after accept") {
t.Fatalf("client OpenDedicatedBulk error = %v, want dedicated handler failure after accept", err)
}
}
func TestServerOpenBulkLogicalDedicatedReturnsHandlerFailureAfterAccepted(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
client.SetBulkHandler(func(info BulkAcceptInfo) error {
time.Sleep(80 * time.Millisecond)
return errors.New("client dedicated handler failed after accept")
})
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := server.OpenBulkLogical(ctx, logical, BulkOpenOptions{
ID: "server-accepted-then-fail",
Range: BulkRange{
Offset: 0,
Length: 1,
},
Dedicated: true,
})
if err == nil || !strings.Contains(err.Error(), "client dedicated handler failed after accept") {
t.Fatalf("server OpenBulkLogical dedicated error = %v, want client dedicated handler failure after accept", err)
}
}
func TestBulkOpenRoundTripServerLogicalTCP(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
client.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
bulk, err := server.OpenBulkLogical(context.Background(), logical, BulkOpenOptions{
Range: BulkRange{
Offset: 4096,
Length: 8192,
},
Metadata: BulkMetadata{
"purpose": "server-open",
},
ChunkSize: 64 * 1024,
})
if err != nil {
t.Fatalf("server OpenBulkLogical failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
if accepted.ID != bulk.ID() {
t.Fatalf("accepted bulk id mismatch: got %q want %q", accepted.ID, bulk.ID())
}
if accepted.Range != (BulkRange{Offset: 4096, Length: 8192}) {
t.Fatalf("accepted range mismatch: %+v", accepted.Range)
}
if accepted.Metadata["purpose"] != "server-open" {
t.Fatalf("accepted metadata mismatch: %+v", accepted.Metadata)
}
if accepted.LogicalConn != nil {
t.Fatalf("client accepted logical connection should be nil: %+v", accepted.LogicalConn)
}
serverHandle, ok := bulk.(*bulkHandle)
if !ok {
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
}
clientHandle, ok := accepted.Bulk.(*bulkHandle)
if !ok {
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
}
if serverHandle.dataIDSnapshot() == 0 {
t.Fatal("server bulk data id should not be zero")
}
if got, want := clientHandle.dataIDSnapshot(), serverHandle.dataIDSnapshot(); got != want {
t.Fatalf("client accepted bulk data id = %d, want %d", got, want)
}
if _, err := bulk.Write([]byte("server-opened")); err != nil {
t.Fatalf("server bulk Write failed: %v", err)
}
readBulkExactly(t, accepted.Bulk, "server-opened", 2*time.Second)
if _, err := accepted.Bulk.Write([]byte("client-accepted")); err != nil {
t.Fatalf("client bulk Write failed: %v", err)
}
readBulkExactly(t, bulk, "client-accepted", 2*time.Second)
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("server bulk CloseWrite failed: %v", err)
}
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
if err := accepted.Bulk.Close(); err != nil {
t.Fatalf("client accepted bulk Close failed: %v", err)
}
waitForBulkReadEOF(t, bulk, 2*time.Second)
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
}
func TestBulkSnapshotIncludesDetachedBindingDiagnostics(t *testing.T) {
server := NewServer().(*ServerCommon)
left, right := net.Pipe()
defer right.Close()
logical := server.bootstrapAcceptedLogical("bulk-snapshot-detach", nil, left)
if logical == nil {
t.Fatal("bootstrapAcceptedLogical should return logical")
}
transport := logical.CurrentTransportConn()
if transport == nil {
t.Fatal("CurrentTransportConn should return active transport")
}
bulk := newBulkHandle(context.Background(), newBulkRuntime("snapshot-detach"), serverFileScope(logical), BulkOpenRequest{
BulkID: "bulk-snapshot-detach",
DataID: 1,
Range: BulkRange{
Length: 1,
},
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
server.detachLogicalSessionTransport(logical, "heartbeat timeout", nil)
snapshot := bulk.snapshot()
if got, want := snapshot.BindingOwner, "server-transport"; got != want {
t.Fatalf("snapshot BindingOwner = %q, want %q", got, want)
}
if snapshot.BindingCurrent {
t.Fatalf("snapshot BindingCurrent should be false after detach: %+v", snapshot)
}
if snapshot.TransportAttached {
t.Fatalf("snapshot TransportAttached should be false after detach: %+v", snapshot)
}
if snapshot.TransportCurrent {
t.Fatalf("snapshot TransportCurrent should be false after detach: %+v", snapshot)
}
if got, want := snapshot.TransportDetachReason, "heartbeat timeout"; got != want {
t.Fatalf("snapshot TransportDetachReason = %q, want %q", got, want)
}
if got, want := snapshot.TransportDetachKind, clientConnTransportDetachKindHeartbeatTimeout; got != want {
t.Fatalf("snapshot TransportDetachKind = %q, want %q", got, want)
}
}
func TestServerDetachLogicalSessionTransportResetsScopedBulks(t *testing.T) {
server := NewServer().(*ServerCommon)
left, right := net.Pipe()
defer left.Close()
defer right.Close()
logical := server.bootstrapAcceptedLogical("bulk-detach-runtime", nil, left)
if logical == nil {
t.Fatal("bootstrapAcceptedLogical should return logical")
}
defer server.stopLogicalSession(logical, "test cleanup", nil)
transport := logical.CurrentTransportConn()
if transport == nil {
t.Fatal("CurrentTransportConn should return active transport")
}
scope := serverFileScope(logical)
bulk := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{
BulkID: "bulk-detach-runtime",
DataID: 1,
Range: BulkRange{Length: 1},
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
if err := server.getBulkRuntime().register(scope, bulk); err != nil {
t.Fatalf("register bulk failed: %v", err)
}
server.detachLogicalSessionTransport(logical, "read error", errors.New("boom"))
if err := readBulkError(t, bulk, time.Second); !errors.Is(err, errTransportDetached) {
t.Fatalf("detached bulk read error = %v, want %v", err, errTransportDetached)
}
if _, ok := server.getBulkRuntime().lookup(scope, bulk.ID()); ok {
t.Fatal("detached bulk should be removed from runtime")
}
}
func TestBulkOpenRoundTripDedicatedTCP(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
server.SetLink("bulk-dedicated-ping", func(msg *Message) {
_ = msg.Reply([]byte("pong"))
})
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{
Offset: 1024,
Length: 8192,
},
Metadata: BulkMetadata{
"name": "dedicated.bin",
},
Dedicated: true,
ChunkSize: 32 * 1024,
})
if err != nil {
t.Fatalf("client OpenBulk dedicated failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
if !accepted.Dedicated {
t.Fatal("accepted dedicated flag should be true")
}
if !bulk.(*bulkHandle).Dedicated() {
t.Fatal("client bulk dedicated flag should be true")
}
clientSnapshots, err := GetClientBulkSnapshots(client)
if err != nil {
t.Fatalf("GetClientBulkSnapshots failed: %v", err)
}
if len(clientSnapshots) != 1 || !clientSnapshots[0].Dedicated || !clientSnapshots[0].DedicatedAttached {
t.Fatalf("client dedicated bulk snapshots mismatch: %+v", clientSnapshots)
}
if _, err := bulk.Write([]byte("hello-from-dedicated-client")); err != nil {
t.Fatalf("client dedicated bulk Write failed: %v", err)
}
readBulkExactly(t, accepted.Bulk, "hello-from-dedicated-client", 2*time.Second)
if _, err := accepted.Bulk.Write([]byte("hello-from-dedicated-server")); err != nil {
t.Fatalf("server dedicated bulk Write failed: %v", err)
}
readBulkExactly(t, bulk, "hello-from-dedicated-server", 2*time.Second)
reply, err := client.SendWait("bulk-dedicated-ping", []byte("ping"), 2*time.Second)
if err != nil {
t.Fatalf("client SendWait after dedicated bulk failed: %v", err)
}
if got, want := string(reply.Value), "pong"; got != want {
t.Fatalf("SendWait reply mismatch: got %q want %q", got, want)
}
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("client dedicated bulk CloseWrite failed: %v", err)
}
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
if err := accepted.Bulk.Close(); err != nil {
t.Fatalf("server dedicated bulk Close failed: %v", err)
}
waitForBulkReadEOF(t, bulk, 2*time.Second)
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
waitForBulkContextDone(t, accepted.Bulk.Context(), 2*time.Second)
reply, err = client.SendWait("bulk-dedicated-ping", []byte("ping-after-close"), 2*time.Second)
if err != nil {
t.Fatalf("client SendWait after dedicated bulk close failed: %v", err)
}
if got, want := string(reply.Value), "pong"; got != want {
t.Fatalf("SendWait reply after close mismatch: got %q want %q", got, want)
}
}
func TestBulkDedicatedResetReasonBeatsTransportDetached(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
server.SetLink("bulk-dedicated-reset-ping", func(msg *Message) {
_ = msg.Reply([]byte("pong"))
})
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{Offset: 0, Length: 1024},
Dedicated: true,
})
if err != nil {
t.Fatalf("client OpenBulk dedicated failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
wantErr := "dedicated remote flow reset"
if err := accepted.Bulk.Reset(errors.New(wantErr)); err != nil {
t.Fatalf("server dedicated bulk Reset failed: %v", err)
}
clientHandle := bulk.(*bulkHandle)
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
if !strings.Contains(resetErr.Error(), wantErr) {
t.Fatalf("client reset error = %v, want contains %q", resetErr, wantErr)
}
break
}
time.Sleep(10 * time.Millisecond)
}
if _, err := bulk.Write([]byte("abc")); err == nil || !strings.Contains(err.Error(), wantErr) {
t.Fatalf("client dedicated bulk Write error = %v, want contains %q", err, wantErr)
}
reply, err := client.SendWait("bulk-dedicated-reset-ping", []byte("ping-after-reset"), 2*time.Second)
if err != nil {
t.Fatalf("client SendWait after dedicated bulk reset failed: %v", err)
}
if got, want := string(reply.Value), "pong"; got != want {
t.Fatalf("SendWait reply after reset mismatch: got %q want %q", got, want)
}
}
func TestBulkWritePrefersResetErrorOverContextCanceled(t *testing.T) {
wantErr := errors.New("remote flow reset")
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
BulkID: "bulk-reset-propagation",
DataID: 1,
ChunkSize: 4,
WindowBytes: 16,
MaxInFlight: 4,
}, 0, nil, nil, 0, nil, nil, func(ctx context.Context, b *bulkHandle, chunk []byte) error {
b.markReset(wantErr)
<-ctx.Done()
return ctx.Err()
}, nil, nil)
_, err := bulk.Write([]byte("abcdefgh"))
if !errors.Is(err, wantErr) {
t.Fatalf("bulk Write error = %v, want %v", err, wantErr)
}
}
func TestDedicatedBulkWaitReadyPrefersClosedPipeOverContextCanceled(t *testing.T) {
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
BulkID: "bulk-dedicated-ready-close",
DataID: 1,
Dedicated: true,
Range: BulkRange{
Length: 1,
},
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
bulk.markPeerClosed()
err := bulk.waitDedicatedReady(context.Background())
if !errors.Is(err, io.ErrClosedPipe) {
t.Fatalf("waitDedicatedReady error = %v, want %v", err, io.ErrClosedPipe)
}
}
func TestDedicatedBulkWritePrefersClosedPipeOverContextCanceled(t *testing.T) {
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
BulkID: "bulk-dedicated-write-close",
DataID: 1,
Dedicated: true,
ChunkSize: 4,
WindowBytes: 16,
MaxInFlight: 4,
}, 0, nil, nil, 0, nil, nil, func(context.Context, *bulkHandle, []byte) error {
return nil
}, func(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
bulk.markPeerClosed()
<-ctx.Done()
return 0, ctx.Err()
}, nil)
_, err := bulk.Write([]byte("abcdefgh"))
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
t.Fatalf("bulk Write error = %v, want nil or %v", err, io.ErrClosedPipe)
}
if err := bulk.waitPendingAsyncWrites(context.Background()); err != nil && !errors.Is(err, io.ErrClosedPipe) {
t.Fatalf("bulk waitPendingAsyncWrites error = %v, want nil or %v", err, io.ErrClosedPipe)
}
}
func TestBulkReadWaitingLocalClosePrefersClosedPipeOverContextCanceled(t *testing.T) {
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
BulkID: "bulk-read-local-close",
DataID: 1,
Range: BulkRange{
Length: 1,
},
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
errCh := make(chan error, 1)
go func() {
buf := make([]byte, 4)
_, err := bulk.Read(buf)
errCh <- err
}()
time.Sleep(20 * time.Millisecond)
if err := bulk.Close(); err != nil {
t.Fatalf("bulk Close failed: %v", err)
}
select {
case err := <-errCh:
if !errors.Is(err, io.ErrClosedPipe) {
t.Fatalf("bulk Read error = %v, want %v", err, io.ErrClosedPipe)
}
case <-time.After(time.Second):
t.Fatal("bulk Read did not return after local close")
}
}
func TestBulkReleaseControlRoundTripTransport(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
const chunkSize = 64 * 1024
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{Offset: 0, Length: chunkSize},
ChunkSize: chunkSize,
WindowBytes: chunkSize,
MaxInFlight: 1,
})
if err != nil {
t.Fatalf("client OpenBulk failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
clientHandle, ok := bulk.(*bulkHandle)
if !ok {
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
}
serverHandle, ok := accepted.Bulk.(*bulkHandle)
if !ok {
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
}
if accepted.TransportConn == nil {
t.Fatal("accepted transport connection should not be nil")
}
clientHandle.mu.Lock()
clientHandle.outboundAvailBytes = 0
clientHandle.outboundInFlight = 1
clientHandle.mu.Unlock()
if err := sendBulkReleaseServerTransport(server, accepted.TransportConn, BulkReleaseRequest{
BulkID: serverHandle.ID(),
DataID: serverHandle.dataIDSnapshot(),
Bytes: chunkSize,
Chunks: 1,
}); err != nil {
t.Fatalf("sendBulkReleaseServerTransport failed: %v", err)
}
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
clientHandle.mu.Lock()
avail := clientHandle.outboundAvailBytes
inFlight := clientHandle.outboundInFlight
clientHandle.mu.Unlock()
if avail == chunkSize && inFlight == 0 {
return
}
time.Sleep(10 * time.Millisecond)
}
clientHandle.mu.Lock()
avail := clientHandle.outboundAvailBytes
inFlight := clientHandle.outboundInFlight
clientHandle.mu.Unlock()
t.Fatalf("client outbound window not released: avail=%d inFlight=%d", avail, inFlight)
}
func TestBulkSharedWindowFlowControlPreventsBackpressureReset(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
const (
chunkSize = 1024 * 1024
totalBytes = 6 * chunkSize
)
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{
Offset: 0,
Length: totalBytes,
},
ChunkSize: chunkSize,
WindowBytes: chunkSize,
MaxInFlight: 1,
})
if err != nil {
t.Fatalf("client OpenBulk failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
serverHandle, ok := accepted.Bulk.(*bulkHandle)
if !ok {
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
}
serverHandle.mu.Lock()
serverHandle.inboundQueueLimit = 2
serverHandle.inboundBytesLimit = 2 * chunkSize
serverHandle.mu.Unlock()
readDone := make(chan error, 1)
go func() {
buf := make([]byte, chunkSize)
total := 0
for {
n, err := accepted.Bulk.Read(buf)
if n > 0 {
total += n
time.Sleep(15 * time.Millisecond)
}
if err != nil {
if errors.Is(err, io.EOF) {
if total != totalBytes {
readDone <- errors.New("server bulk read size mismatch")
return
}
readDone <- nil
return
}
readDone <- err
return
}
}
}()
payload := make([]byte, totalBytes)
for i := range payload {
payload[i] = byte(i)
}
if _, err := bulk.Write(payload); err != nil {
t.Fatalf("client bulk Write failed: %v", err)
}
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("client bulk CloseWrite failed: %v", err)
}
select {
case err := <-readDone:
if err != nil {
t.Fatalf("server read failed: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for server bulk read")
}
clientHandle, ok := bulk.(*bulkHandle)
if !ok {
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
}
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("client bulk reset error = %v, want nil", resetErr)
}
if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("server bulk reset error = %v, want nil", resetErr)
}
}
func TestBulkDedicatedWindowFlowControlPreventsBackpressureReset(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
const (
chunkSize = 1024 * 1024
writeSize = 4 * chunkSize
totalWrites = 6
totalBytes = totalWrites * writeSize
)
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{Offset: 0, Length: totalBytes},
Dedicated: true,
ChunkSize: chunkSize,
WindowBytes: writeSize,
MaxInFlight: 4,
})
if err != nil {
t.Fatalf("client OpenBulk dedicated failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
serverHandle, ok := accepted.Bulk.(*bulkHandle)
if !ok {
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
}
serverHandle.mu.Lock()
serverHandle.inboundQueueLimit = 8
serverHandle.inboundBytesLimit = writeSize + chunkSize
serverHandle.mu.Unlock()
readDone := make(chan error, 1)
go func() {
buf := make([]byte, writeSize)
total := 0
for {
n, err := accepted.Bulk.Read(buf)
if n > 0 {
total += n
time.Sleep(15 * time.Millisecond)
}
if err != nil {
if errors.Is(err, io.EOF) {
if total != totalBytes {
readDone <- errors.New("server dedicated bulk read size mismatch")
return
}
readDone <- nil
return
}
readDone <- err
return
}
}
}()
payload := make([]byte, writeSize)
for i := range payload {
payload[i] = byte(i)
}
for i := 0; i < totalWrites; i++ {
if _, err := bulk.Write(payload); err != nil {
t.Fatalf("client dedicated bulk Write #%d failed: %v", i, err)
}
}
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("client dedicated bulk CloseWrite failed: %v", err)
}
select {
case err := <-readDone:
if err != nil {
t.Fatalf("server dedicated read failed: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for server dedicated bulk read")
}
clientHandle, ok := bulk.(*bulkHandle)
if !ok {
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
}
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("client dedicated bulk reset error = %v, want nil", resetErr)
}
if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("server dedicated bulk reset error = %v, want nil", resetErr)
}
}
func TestBulkOpenRoundTripServerLogicalDedicatedTCP(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
client.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
bulk, err := server.OpenBulkLogical(context.Background(), logical, BulkOpenOptions{
Range: BulkRange{
Offset: 2048,
Length: 4096,
},
Metadata: BulkMetadata{
"mode": "server-dedicated",
},
Dedicated: true,
ChunkSize: 32 * 1024,
})
if err != nil {
t.Fatalf("server OpenBulkLogical dedicated failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
if !accepted.Dedicated {
t.Fatal("client accepted dedicated flag should be true")
}
if _, err := bulk.Write([]byte("server-dedicated")); err != nil {
t.Fatalf("server dedicated bulk Write failed: %v", err)
}
readBulkExactly(t, accepted.Bulk, "server-dedicated", 2*time.Second)
if _, err := accepted.Bulk.Write([]byte("client-dedicated")); err != nil {
t.Fatalf("client dedicated bulk Write failed: %v", err)
}
readBulkExactly(t, bulk, "client-dedicated", 2*time.Second)
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("server dedicated bulk CloseWrite failed: %v", err)
}
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
if err := accepted.Bulk.Close(); err != nil {
t.Fatalf("client dedicated bulk Close failed: %v", err)
}
waitForBulkReadEOF(t, bulk, 2*time.Second)
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
}
func TestDedicatedBulkCloseWaitsForRemoteCloseBeforeFinalize(t *testing.T) {
runtime := newBulkRuntime("dedicated-close")
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
BulkID: "dedicated-close",
DataID: 1,
Dedicated: true,
Range: BulkRange{
Length: 1,
},
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
if err := runtime.register(clientFileScope(), bulk); err != nil {
t.Fatalf("register bulk failed: %v", err)
}
closeCalls := 0
bulk.closeFn = func(context.Context, *bulkHandle, bool) error {
closeCalls++
return nil
}
if err := bulk.Close(); err != nil {
t.Fatalf("bulk Close failed: %v", err)
}
if got, want := closeCalls, 1; got != want {
t.Fatalf("closeFn calls = %d, want %d", got, want)
}
select {
case <-bulk.Context().Done():
t.Fatal("dedicated full close should wait for remote close before finalize")
default:
}
if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); !ok {
t.Fatal("bulk runtime entry should remain until remote close arrives")
}
snapshot := bulk.snapshot()
if !snapshot.LocalClosed || !snapshot.LocalReadClosed {
t.Fatalf("local close snapshot mismatch: %+v", snapshot)
}
if snapshot.RemoteClosed {
t.Fatalf("remote close should not be set yet: %+v", snapshot)
}
bulk.markRemoteClosed()
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); ok {
t.Fatal("bulk runtime entry should be removed after remote close")
}
}
func TestHandleDedicatedBulkReadErrorTreatsEOFAfterLocalCloseAsGraceful(t *testing.T) {
runtime := newBulkRuntime("dedicated-eof")
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
BulkID: "dedicated-eof",
DataID: 1,
Dedicated: true,
Range: BulkRange{
Length: 1,
},
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
if err := runtime.register(clientFileScope(), bulk); err != nil {
t.Fatalf("register bulk failed: %v", err)
}
bulk.mu.Lock()
bulk.localClosed = true
bulk.mu.Unlock()
handleDedicatedBulkReadError(bulk, io.EOF)
if resetErr := bulk.resetErrSnapshot(); resetErr != nil {
t.Fatalf("reset error = %v, want nil", resetErr)
}
if !bulk.remoteClosedSnapshot() {
t.Fatal("remoteClosed should be set after graceful EOF")
}
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); ok {
t.Fatal("bulk runtime entry should be removed after graceful EOF")
}
}
func TestHandleDedicatedBulkReadErrorTreatsEOFEvenBeforeLocalCloseAsGracefulForDedicated(t *testing.T) {
runtime := newBulkRuntime("dedicated-eof-remote")
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
BulkID: "dedicated-eof-remote",
DataID: 1,
Dedicated: true,
Range: BulkRange{
Length: 8,
},
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
if err := runtime.register(clientFileScope(), bulk); err != nil {
t.Fatalf("register bulk failed: %v", err)
}
handleDedicatedBulkReadError(bulk, io.EOF)
if resetErr := bulk.resetErrSnapshot(); resetErr != nil {
t.Fatalf("reset error = %v, want nil", resetErr)
}
if !bulk.remoteClosedSnapshot() {
t.Fatal("remoteClosed should be set after dedicated EOF")
}
if _, err := bulk.Read(make([]byte, 1)); !errors.Is(err, io.EOF) {
t.Fatalf("bulk Read error = %v, want EOF", err)
}
if err := bulk.Close(); err != nil {
t.Fatalf("bulk Close failed: %v", err)
}
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
}
func TestDedicatedBulkCloseWriteHalfClosesUnderlyingTCPWriteSide(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("net.Listen failed: %v", err)
}
defer func() {
_ = listener.Close()
}()
serverConnCh := make(chan net.Conn, 1)
serverErrCh := make(chan error, 1)
go func() {
conn, err := listener.Accept()
if err != nil {
serverErrCh <- err
return
}
serverConnCh <- conn
}()
clientConnRaw, err := net.Dial("tcp", listener.Addr().String())
if err != nil {
t.Fatalf("net.Dial failed: %v", err)
}
defer func() {
_ = clientConnRaw.Close()
}()
var serverConn net.Conn
select {
case serverConn = <-serverConnCh:
case err := <-serverErrCh:
t.Fatalf("listener.Accept failed: %v", err)
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for accepted TCP conn")
}
defer func() {
_ = serverConn.Close()
}()
runtime := newBulkRuntime("dedicated-half-close")
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
BulkID: "dedicated-half-close",
DataID: 1,
Dedicated: true,
Range: BulkRange{
Length: 1,
},
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
if err := runtime.register(clientFileScope(), bulk); err != nil {
t.Fatalf("register bulk failed: %v", err)
}
if err := bulk.attachDedicatedConn(clientConnRaw); err != nil {
t.Fatalf("attachDedicatedConn failed: %v", err)
}
readDone := make(chan error, 1)
go func() {
_ = serverConn.SetReadDeadline(time.Now().Add(2 * time.Second))
var buf [1]byte
_, err := serverConn.Read(buf[:])
readDone <- err
}()
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("bulk CloseWrite failed: %v", err)
}
select {
case err := <-readDone:
if !errors.Is(err, io.EOF) {
t.Fatalf("server conn Read error = %v, want EOF", err)
}
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for TCP half-close EOF")
}
bulk.markRemoteClosed()
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
}
func TestBulkDedicatedClientFullCloseAfterCloseWriteDoesNotResetTCP(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
ID: "dedicated-close-after-closewrite",
Dedicated: true,
Range: BulkRange{
Length: 5,
},
})
if err != nil {
t.Fatalf("client OpenBulk dedicated failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
if _, err := bulk.Write([]byte("hello")); err != nil {
t.Fatalf("client dedicated bulk Write failed: %v", err)
}
readBulkExactly(t, accepted.Bulk, "hello", 2*time.Second)
if err := bulk.CloseWrite(); err != nil {
t.Fatalf("client dedicated bulk CloseWrite failed: %v", err)
}
if err := bulk.Close(); err != nil {
t.Fatalf("client dedicated bulk Close failed: %v", err)
}
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
if err := accepted.Bulk.Close(); err != nil {
t.Fatalf("server dedicated bulk Close failed: %v", err)
}
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
waitForBulkContextDone(t, accepted.Bulk.Context(), 2*time.Second)
clientHandle := bulk.(*bulkHandle)
serverHandle := accepted.Bulk.(*bulkHandle)
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("client dedicated bulk reset error = %v, want nil", resetErr)
}
if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("server dedicated bulk reset error = %v, want nil", resetErr)
}
}
func TestBulkSharedConcurrentWritersWithSlowReceiver(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
const bulkCount = 6
acceptCh := make(chan BulkAcceptInfo, bulkCount)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
type bulkPair struct {
client Bulk
server Bulk
}
pairs := make([]bulkPair, 0, bulkCount)
for i := 0; i < bulkCount; i++ {
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
ID: "slow-shared-" + formatInt(int64(i)),
ChunkSize: 64 * 1024,
WindowBytes: 128 * 1024,
MaxInFlight: 2,
WriteTimeout: 2 * time.Second,
Range: BulkRange{
Length: 1024 * 1024,
},
})
if err != nil {
t.Fatalf("client OpenBulk #%d failed: %v", i, err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
pairs = append(pairs, bulkPair{client: bulk, server: accepted.Bulk})
}
readErrCh := make(chan error, bulkCount)
for _, pair := range pairs {
go func(serverBulk Bulk) {
buf := make([]byte, 32*1024)
total := 0
for {
n, err := serverBulk.Read(buf)
if n > 0 {
total += n
time.Sleep(2 * time.Millisecond)
}
if err != nil {
if errors.Is(err, io.EOF) {
if closeErr := serverBulk.Close(); closeErr != nil {
readErrCh <- closeErr
return
}
readErrCh <- nil
return
}
readErrCh <- err
return
}
}
}(pair.server)
}
writeErrCh := make(chan error, bulkCount)
payload := make([]byte, 64*1024)
for i := range payload {
payload[i] = byte(i)
}
for _, pair := range pairs {
go func(clientBulk Bulk) {
defer func() {
_ = clientBulk.Close()
}()
for written := 0; written < 1024*1024; written += len(payload) {
if _, err := clientBulk.Write(payload); err != nil {
writeErrCh <- err
return
}
}
if err := clientBulk.CloseWrite(); err != nil {
writeErrCh <- err
return
}
writeErrCh <- nil
}(pair.client)
}
for i := 0; i < bulkCount; i++ {
select {
case err := <-writeErrCh:
if err != nil {
t.Fatalf("slow receiver client write failed: %v", err)
}
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for client writes under slow receiver")
}
}
for i := 0; i < bulkCount; i++ {
select {
case err := <-readErrCh:
if err != nil {
t.Fatalf("slow receiver server read failed: %v", err)
}
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for server reads under slow receiver")
}
}
for _, pair := range pairs {
waitForBulkContextDone(t, pair.client.Context(), 2*time.Second)
waitForBulkContextDone(t, pair.server.Context(), 2*time.Second)
if handle, ok := pair.client.(*bulkHandle); ok {
if resetErr := handle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("client bulk reset error = %v, want nil", resetErr)
}
}
if handle, ok := pair.server.(*bulkHandle); ok {
if resetErr := handle.resetErrSnapshot(); resetErr != nil {
t.Fatalf("server bulk reset error = %v, want nil", resetErr)
}
}
}
}
func TestBulkOpenRequiresHandlerTCP(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
_, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{
Offset: 0,
Length: 128,
},
})
if !errors.Is(err, errBulkHandlerNotConfigured) {
t.Fatalf("client OpenBulk error = %v, want %v", err, errBulkHandlerNotConfigured)
}
}
func waitAcceptedBulk(t *testing.T, ch <-chan BulkAcceptInfo, timeout time.Duration) BulkAcceptInfo {
t.Helper()
select {
case info := <-ch:
return info
case <-time.After(timeout):
t.Fatal("timed out waiting for accepted bulk")
return BulkAcceptInfo{}
}
}
func waitForBulkReadEOF(t *testing.T, bulk Bulk, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
buf := make([]byte, 1)
for time.Now().Before(deadline) {
_, err := bulk.Read(buf)
if errors.Is(err, io.EOF) {
return
}
if err != nil {
t.Fatalf("bulk Read returned unexpected error: %v", err)
}
time.Sleep(10 * time.Millisecond)
}
t.Fatal("timed out waiting for bulk EOF")
}
func waitForBulkContextDone(t *testing.T, ctx context.Context, timeout time.Duration) {
t.Helper()
select {
case <-ctx.Done():
case <-time.After(timeout):
t.Fatal("timed out waiting for bulk context done")
}
}
func readBulkExactly(t *testing.T, bulk Bulk, want string, timeout time.Duration) {
t.Helper()
errCh := make(chan error, 1)
go func() {
buf := make([]byte, len(want))
_, err := io.ReadFull(bulk, buf)
if err != nil {
errCh <- err
return
}
if got := string(buf); got != want {
errCh <- errors.New("bulk payload mismatch: got " + got + " want " + want)
return
}
errCh <- nil
}()
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
case <-time.After(timeout):
t.Fatal("timed out waiting for bulk payload")
}
}
func readBulkError(t *testing.T, bulk Bulk, timeout time.Duration) error {
t.Helper()
errCh := make(chan error, 1)
go func() {
buf := make([]byte, 1)
_, err := bulk.Read(buf)
errCh <- err
}()
select {
case err := <-errCh:
if err == nil {
t.Fatal("expected bulk read error, got nil")
}
return err
case <-time.After(timeout):
t.Fatal("timed out waiting for bulk read error")
return nil
}
}