- make stream fast path honor adaptive soft payload limits end-to-end - split oversized fast-stream payloads into sequential frames before batching - use adaptive soft cap when encoding stream batch payloads - move timeout-like error detection into production code for adaptive tx - tune notify FrameReader read size explicitly to avoid throughput regression - drop local stario replace and depend on released b612.me/stario v0.1.1
1812 lines
53 KiB
Go
1812 lines
53 KiB
Go
package notify
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestBulkOpenRoundTripTCP(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{
|
|
Offset: 128,
|
|
Length: 4096,
|
|
},
|
|
Metadata: BulkMetadata{
|
|
"name": "demo.bin",
|
|
},
|
|
ChunkSize: 32 * 1024,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
if accepted.ID != bulk.ID() {
|
|
t.Fatalf("accepted bulk id mismatch: got %q want %q", accepted.ID, bulk.ID())
|
|
}
|
|
if accepted.Range != (BulkRange{Offset: 128, Length: 4096}) {
|
|
t.Fatalf("accepted range mismatch: %+v", accepted.Range)
|
|
}
|
|
if accepted.Metadata["name"] != "demo.bin" {
|
|
t.Fatalf("accepted metadata mismatch: %+v", accepted.Metadata)
|
|
}
|
|
if accepted.LogicalConn == nil {
|
|
t.Fatal("accepted logical connection should not be nil")
|
|
}
|
|
if accepted.TransportConn == nil {
|
|
t.Fatal("accepted transport connection should not be nil")
|
|
}
|
|
|
|
clientHandle, ok := bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
|
|
}
|
|
serverHandle, ok := accepted.Bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
|
|
}
|
|
if clientHandle.dataIDSnapshot() == 0 {
|
|
t.Fatal("client bulk data id should not be zero")
|
|
}
|
|
if got, want := serverHandle.dataIDSnapshot(), clientHandle.dataIDSnapshot(); got != want {
|
|
t.Fatalf("accepted bulk data id = %d, want %d", got, want)
|
|
}
|
|
|
|
if _, err := bulk.Write([]byte("hello-from-client")); err != nil {
|
|
t.Fatalf("client bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, accepted.Bulk, "hello-from-client", 2*time.Second)
|
|
|
|
if _, err := accepted.Bulk.Write([]byte("hello-from-server")); err != nil {
|
|
t.Fatalf("server bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, bulk, "hello-from-server", 2*time.Second)
|
|
|
|
clientSnapshots, err := GetClientBulkSnapshots(client)
|
|
if err != nil {
|
|
t.Fatalf("GetClientBulkSnapshots failed: %v", err)
|
|
}
|
|
if len(clientSnapshots) != 1 || clientSnapshots[0].ID != bulk.ID() {
|
|
t.Fatalf("client bulk snapshots mismatch: %+v", clientSnapshots)
|
|
}
|
|
if got, want := clientSnapshots[0].BindingOwner, "client-session"; got != want {
|
|
t.Fatalf("client bulk BindingOwner = %q, want %q", got, want)
|
|
}
|
|
if !clientSnapshots[0].BindingAlive || !clientSnapshots[0].BindingCurrent || !clientSnapshots[0].TransportAttached || !clientSnapshots[0].TransportCurrent {
|
|
t.Fatalf("client bulk binding snapshot mismatch: %+v", clientSnapshots[0])
|
|
}
|
|
if got, want := clientSnapshots[0].BindingBulkAdaptiveSoftPayloadBytes, bulkAdaptiveSoftPayloadStartBytes; got != want {
|
|
t.Fatalf("client bulk BindingBulkAdaptiveSoftPayloadBytes = %d, want %d", got, want)
|
|
}
|
|
serverSnapshots, err := GetServerBulkSnapshots(server)
|
|
if err != nil {
|
|
t.Fatalf("GetServerBulkSnapshots failed: %v", err)
|
|
}
|
|
if len(serverSnapshots) != 1 || serverSnapshots[0].ID != bulk.ID() {
|
|
t.Fatalf("server bulk snapshots mismatch: %+v", serverSnapshots)
|
|
}
|
|
if got, want := serverSnapshots[0].BindingOwner, "server-transport"; got != want {
|
|
t.Fatalf("server bulk BindingOwner = %q, want %q", got, want)
|
|
}
|
|
if got, want := serverSnapshots[0].BindingBulkAdaptiveSoftPayloadBytes, bulkAdaptiveSoftPayloadStartBytes; got != want {
|
|
t.Fatalf("server bulk BindingBulkAdaptiveSoftPayloadBytes = %d, want %d", got, want)
|
|
}
|
|
if !serverSnapshots[0].BindingAlive || !serverSnapshots[0].BindingCurrent || !serverSnapshots[0].TransportAttached || !serverSnapshots[0].TransportCurrent {
|
|
t.Fatalf("server bulk binding snapshot mismatch: %+v", serverSnapshots[0])
|
|
}
|
|
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("client bulk CloseWrite failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
|
|
|
|
if err := accepted.Bulk.Close(); err != nil {
|
|
t.Fatalf("server bulk Close failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, bulk, 2*time.Second)
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
}
|
|
|
|
func TestDedicatedBulkOpenUnblocksSynchronousReadHandler(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
readCh := make(chan string, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
defer func() {
|
|
_ = info.Bulk.Close()
|
|
}()
|
|
buf := make([]byte, 5)
|
|
if _, err := io.ReadFull(info.Bulk, buf); err != nil {
|
|
return err
|
|
}
|
|
readCh <- string(buf)
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
type openResult struct {
|
|
bulk Bulk
|
|
err error
|
|
}
|
|
openCh := make(chan openResult, 1)
|
|
go func() {
|
|
bulk, err := client.OpenDedicatedBulk(context.Background(), BulkOpenOptions{
|
|
ID: "sync-read-handler",
|
|
Range: BulkRange{
|
|
Offset: 0,
|
|
Length: 5,
|
|
},
|
|
})
|
|
openCh <- openResult{bulk: bulk, err: err}
|
|
}()
|
|
|
|
var bulk Bulk
|
|
select {
|
|
case result := <-openCh:
|
|
if result.err != nil {
|
|
t.Fatalf("client OpenDedicatedBulk failed: %v", result.err)
|
|
}
|
|
bulk = result.bulk
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("client OpenDedicatedBulk timed out while remote handler was synchronously reading")
|
|
}
|
|
defer func() {
|
|
if bulk != nil {
|
|
_ = bulk.Close()
|
|
}
|
|
}()
|
|
|
|
if _, err := bulk.Write([]byte("hello")); err != nil {
|
|
t.Fatalf("client dedicated bulk Write failed: %v", err)
|
|
}
|
|
select {
|
|
case got := <-readCh:
|
|
if got != "hello" {
|
|
t.Fatalf("server handler read %q, want %q", got, "hello")
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("timed out waiting for synchronous handler read")
|
|
}
|
|
}
|
|
|
|
func TestDedicatedBulkOpenUnblocksOnBlockingFirstWrite(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
payload := strings.Repeat("w", 16)
|
|
writeDone := make(chan error, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
_, err := io.WriteString(info.Bulk, payload)
|
|
if err == nil {
|
|
err = info.Bulk.CloseWrite()
|
|
}
|
|
writeDone <- err
|
|
return err
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
bulk, err := client.OpenDedicatedBulk(ctx, BulkOpenOptions{
|
|
ID: "blocking-write-ready",
|
|
Range: BulkRange{
|
|
Offset: 0,
|
|
Length: int64(len(payload)),
|
|
},
|
|
ChunkSize: 4,
|
|
WindowBytes: 4,
|
|
MaxInFlight: 1,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenDedicatedBulk failed: %v", err)
|
|
}
|
|
|
|
readBulkExactly(t, bulk, payload, 2*time.Second)
|
|
waitForBulkReadEOF(t, bulk, 2*time.Second)
|
|
|
|
select {
|
|
case err := <-writeDone:
|
|
if err != nil {
|
|
t.Fatalf("server handler write failed: %v", err)
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("timed out waiting for blocking first write to finish")
|
|
}
|
|
|
|
if err := bulk.Close(); err != nil {
|
|
t.Fatalf("client dedicated bulk Close failed: %v", err)
|
|
}
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
}
|
|
|
|
func TestServerOpenBulkLogicalDedicatedUnblocksOnBlockingFirstRead(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
readCh := make(chan string, 1)
|
|
client.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
defer func() {
|
|
_ = info.Bulk.Close()
|
|
}()
|
|
buf := make([]byte, 5)
|
|
if _, err := io.ReadFull(info.Bulk, buf); err != nil {
|
|
return err
|
|
}
|
|
readCh <- string(buf)
|
|
return nil
|
|
})
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
bulk, err := server.OpenBulkLogical(ctx, logical, BulkOpenOptions{
|
|
ID: "server-blocking-read-ready",
|
|
Range: BulkRange{
|
|
Offset: 0,
|
|
Length: 5,
|
|
},
|
|
Dedicated: true,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("server OpenBulkLogical dedicated failed: %v", err)
|
|
}
|
|
|
|
if _, err := bulk.Write([]byte("hello")); err != nil {
|
|
t.Fatalf("server dedicated bulk Write failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case got := <-readCh:
|
|
if got != "hello" {
|
|
t.Fatalf("client handler read %q, want %q", got, "hello")
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("timed out waiting for blocking first read to finish")
|
|
}
|
|
|
|
if err := bulk.Close(); err != nil {
|
|
t.Fatalf("server dedicated bulk Close failed: %v", err)
|
|
}
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
}
|
|
|
|
func TestDedicatedBulkOpenReturnsHandlerFailureAfterAccepted(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
time.Sleep(80 * time.Millisecond)
|
|
return errors.New("dedicated handler failed after accept")
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
_, err := client.OpenDedicatedBulk(ctx, BulkOpenOptions{
|
|
ID: "accepted-then-fail",
|
|
Range: BulkRange{
|
|
Offset: 0,
|
|
Length: 1,
|
|
},
|
|
})
|
|
if err == nil || !strings.Contains(err.Error(), "dedicated handler failed after accept") {
|
|
t.Fatalf("client OpenDedicatedBulk error = %v, want dedicated handler failure after accept", err)
|
|
}
|
|
}
|
|
|
|
func TestServerOpenBulkLogicalDedicatedReturnsHandlerFailureAfterAccepted(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
client.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
time.Sleep(80 * time.Millisecond)
|
|
return errors.New("client dedicated handler failed after accept")
|
|
})
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
_, err := server.OpenBulkLogical(ctx, logical, BulkOpenOptions{
|
|
ID: "server-accepted-then-fail",
|
|
Range: BulkRange{
|
|
Offset: 0,
|
|
Length: 1,
|
|
},
|
|
Dedicated: true,
|
|
})
|
|
if err == nil || !strings.Contains(err.Error(), "client dedicated handler failed after accept") {
|
|
t.Fatalf("server OpenBulkLogical dedicated error = %v, want client dedicated handler failure after accept", err)
|
|
}
|
|
}
|
|
|
|
func TestBulkOpenRoundTripServerLogicalTCP(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
client.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
|
|
bulk, err := server.OpenBulkLogical(context.Background(), logical, BulkOpenOptions{
|
|
Range: BulkRange{
|
|
Offset: 4096,
|
|
Length: 8192,
|
|
},
|
|
Metadata: BulkMetadata{
|
|
"purpose": "server-open",
|
|
},
|
|
ChunkSize: 64 * 1024,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("server OpenBulkLogical failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
if accepted.ID != bulk.ID() {
|
|
t.Fatalf("accepted bulk id mismatch: got %q want %q", accepted.ID, bulk.ID())
|
|
}
|
|
if accepted.Range != (BulkRange{Offset: 4096, Length: 8192}) {
|
|
t.Fatalf("accepted range mismatch: %+v", accepted.Range)
|
|
}
|
|
if accepted.Metadata["purpose"] != "server-open" {
|
|
t.Fatalf("accepted metadata mismatch: %+v", accepted.Metadata)
|
|
}
|
|
if accepted.LogicalConn != nil {
|
|
t.Fatalf("client accepted logical connection should be nil: %+v", accepted.LogicalConn)
|
|
}
|
|
|
|
serverHandle, ok := bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
|
|
}
|
|
clientHandle, ok := accepted.Bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
|
|
}
|
|
if serverHandle.dataIDSnapshot() == 0 {
|
|
t.Fatal("server bulk data id should not be zero")
|
|
}
|
|
if got, want := clientHandle.dataIDSnapshot(), serverHandle.dataIDSnapshot(); got != want {
|
|
t.Fatalf("client accepted bulk data id = %d, want %d", got, want)
|
|
}
|
|
|
|
if _, err := bulk.Write([]byte("server-opened")); err != nil {
|
|
t.Fatalf("server bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, accepted.Bulk, "server-opened", 2*time.Second)
|
|
|
|
if _, err := accepted.Bulk.Write([]byte("client-accepted")); err != nil {
|
|
t.Fatalf("client bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, bulk, "client-accepted", 2*time.Second)
|
|
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("server bulk CloseWrite failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
|
|
|
|
if err := accepted.Bulk.Close(); err != nil {
|
|
t.Fatalf("client accepted bulk Close failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, bulk, 2*time.Second)
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
}
|
|
|
|
func TestBulkSnapshotIncludesDetachedBindingDiagnostics(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
left, right := net.Pipe()
|
|
defer right.Close()
|
|
|
|
logical := server.bootstrapAcceptedLogical("bulk-snapshot-detach", nil, left)
|
|
if logical == nil {
|
|
t.Fatal("bootstrapAcceptedLogical should return logical")
|
|
}
|
|
transport := logical.CurrentTransportConn()
|
|
if transport == nil {
|
|
t.Fatal("CurrentTransportConn should return active transport")
|
|
}
|
|
bulk := newBulkHandle(context.Background(), newBulkRuntime("snapshot-detach"), serverFileScope(logical), BulkOpenRequest{
|
|
BulkID: "bulk-snapshot-detach",
|
|
DataID: 1,
|
|
Range: BulkRange{
|
|
Length: 1,
|
|
},
|
|
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
|
|
|
|
server.detachLogicalSessionTransport(logical, "heartbeat timeout", nil)
|
|
|
|
snapshot := bulk.snapshot()
|
|
if got, want := snapshot.BindingOwner, "server-transport"; got != want {
|
|
t.Fatalf("snapshot BindingOwner = %q, want %q", got, want)
|
|
}
|
|
if snapshot.BindingCurrent {
|
|
t.Fatalf("snapshot BindingCurrent should be false after detach: %+v", snapshot)
|
|
}
|
|
if snapshot.TransportAttached {
|
|
t.Fatalf("snapshot TransportAttached should be false after detach: %+v", snapshot)
|
|
}
|
|
if snapshot.TransportCurrent {
|
|
t.Fatalf("snapshot TransportCurrent should be false after detach: %+v", snapshot)
|
|
}
|
|
if got, want := snapshot.TransportDetachReason, "heartbeat timeout"; got != want {
|
|
t.Fatalf("snapshot TransportDetachReason = %q, want %q", got, want)
|
|
}
|
|
if got, want := snapshot.TransportDetachKind, clientConnTransportDetachKindHeartbeatTimeout; got != want {
|
|
t.Fatalf("snapshot TransportDetachKind = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestServerDetachLogicalSessionTransportResetsScopedBulks(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
left, right := net.Pipe()
|
|
defer left.Close()
|
|
defer right.Close()
|
|
|
|
logical := server.bootstrapAcceptedLogical("bulk-detach-runtime", nil, left)
|
|
if logical == nil {
|
|
t.Fatal("bootstrapAcceptedLogical should return logical")
|
|
}
|
|
defer server.stopLogicalSession(logical, "test cleanup", nil)
|
|
|
|
transport := logical.CurrentTransportConn()
|
|
if transport == nil {
|
|
t.Fatal("CurrentTransportConn should return active transport")
|
|
}
|
|
scope := serverFileScope(logical)
|
|
bulk := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{
|
|
BulkID: "bulk-detach-runtime",
|
|
DataID: 1,
|
|
Range: BulkRange{Length: 1},
|
|
}, 0, logical, transport, transport.TransportGeneration(), nil, nil, nil, nil, nil)
|
|
if err := server.getBulkRuntime().register(scope, bulk); err != nil {
|
|
t.Fatalf("register bulk failed: %v", err)
|
|
}
|
|
|
|
server.detachLogicalSessionTransport(logical, "read error", errors.New("boom"))
|
|
|
|
if err := readBulkError(t, bulk, time.Second); !errors.Is(err, errTransportDetached) {
|
|
t.Fatalf("detached bulk read error = %v, want %v", err, errTransportDetached)
|
|
}
|
|
if _, ok := server.getBulkRuntime().lookup(scope, bulk.ID()); ok {
|
|
t.Fatal("detached bulk should be removed from runtime")
|
|
}
|
|
}
|
|
|
|
func TestBulkOpenRoundTripDedicatedTCP(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
server.SetLink("bulk-dedicated-ping", func(msg *Message) {
|
|
_ = msg.Reply([]byte("pong"))
|
|
})
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{
|
|
Offset: 1024,
|
|
Length: 8192,
|
|
},
|
|
Metadata: BulkMetadata{
|
|
"name": "dedicated.bin",
|
|
},
|
|
Dedicated: true,
|
|
ChunkSize: 32 * 1024,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk dedicated failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
if !accepted.Dedicated {
|
|
t.Fatal("accepted dedicated flag should be true")
|
|
}
|
|
if !bulk.(*bulkHandle).Dedicated() {
|
|
t.Fatal("client bulk dedicated flag should be true")
|
|
}
|
|
clientSnapshots, err := GetClientBulkSnapshots(client)
|
|
if err != nil {
|
|
t.Fatalf("GetClientBulkSnapshots failed: %v", err)
|
|
}
|
|
if len(clientSnapshots) != 1 || !clientSnapshots[0].Dedicated || !clientSnapshots[0].DedicatedAttached {
|
|
t.Fatalf("client dedicated bulk snapshots mismatch: %+v", clientSnapshots)
|
|
}
|
|
|
|
if _, err := bulk.Write([]byte("hello-from-dedicated-client")); err != nil {
|
|
t.Fatalf("client dedicated bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, accepted.Bulk, "hello-from-dedicated-client", 2*time.Second)
|
|
|
|
if _, err := accepted.Bulk.Write([]byte("hello-from-dedicated-server")); err != nil {
|
|
t.Fatalf("server dedicated bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, bulk, "hello-from-dedicated-server", 2*time.Second)
|
|
|
|
reply, err := client.SendWait("bulk-dedicated-ping", []byte("ping"), 2*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("client SendWait after dedicated bulk failed: %v", err)
|
|
}
|
|
if got, want := string(reply.Value), "pong"; got != want {
|
|
t.Fatalf("SendWait reply mismatch: got %q want %q", got, want)
|
|
}
|
|
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("client dedicated bulk CloseWrite failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
|
|
|
|
if err := accepted.Bulk.Close(); err != nil {
|
|
t.Fatalf("server dedicated bulk Close failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, bulk, 2*time.Second)
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
waitForBulkContextDone(t, accepted.Bulk.Context(), 2*time.Second)
|
|
|
|
reply, err = client.SendWait("bulk-dedicated-ping", []byte("ping-after-close"), 2*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("client SendWait after dedicated bulk close failed: %v", err)
|
|
}
|
|
if got, want := string(reply.Value), "pong"; got != want {
|
|
t.Fatalf("SendWait reply after close mismatch: got %q want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestBulkDedicatedResetReasonBeatsTransportDetached(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
server.SetLink("bulk-dedicated-reset-ping", func(msg *Message) {
|
|
_ = msg.Reply([]byte("pong"))
|
|
})
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{Offset: 0, Length: 1024},
|
|
Dedicated: true,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk dedicated failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
wantErr := "dedicated remote flow reset"
|
|
if err := accepted.Bulk.Reset(errors.New(wantErr)); err != nil {
|
|
t.Fatalf("server dedicated bulk Reset failed: %v", err)
|
|
}
|
|
|
|
clientHandle := bulk.(*bulkHandle)
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
|
|
if !strings.Contains(resetErr.Error(), wantErr) {
|
|
t.Fatalf("client reset error = %v, want contains %q", resetErr, wantErr)
|
|
}
|
|
break
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
if _, err := bulk.Write([]byte("abc")); err == nil || !strings.Contains(err.Error(), wantErr) {
|
|
t.Fatalf("client dedicated bulk Write error = %v, want contains %q", err, wantErr)
|
|
}
|
|
|
|
reply, err := client.SendWait("bulk-dedicated-reset-ping", []byte("ping-after-reset"), 2*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("client SendWait after dedicated bulk reset failed: %v", err)
|
|
}
|
|
if got, want := string(reply.Value), "pong"; got != want {
|
|
t.Fatalf("SendWait reply after reset mismatch: got %q want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestBulkWritePrefersResetErrorOverContextCanceled(t *testing.T) {
|
|
wantErr := errors.New("remote flow reset")
|
|
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
|
|
BulkID: "bulk-reset-propagation",
|
|
DataID: 1,
|
|
ChunkSize: 4,
|
|
WindowBytes: 16,
|
|
MaxInFlight: 4,
|
|
}, 0, nil, nil, 0, nil, nil, func(ctx context.Context, b *bulkHandle, chunk []byte) error {
|
|
b.markReset(wantErr)
|
|
<-ctx.Done()
|
|
return ctx.Err()
|
|
}, nil, nil)
|
|
|
|
_, err := bulk.Write([]byte("abcdefgh"))
|
|
if !errors.Is(err, wantErr) {
|
|
t.Fatalf("bulk Write error = %v, want %v", err, wantErr)
|
|
}
|
|
}
|
|
|
|
func TestDedicatedBulkWaitReadyPrefersClosedPipeOverContextCanceled(t *testing.T) {
|
|
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
|
|
BulkID: "bulk-dedicated-ready-close",
|
|
DataID: 1,
|
|
Dedicated: true,
|
|
Range: BulkRange{
|
|
Length: 1,
|
|
},
|
|
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
|
|
|
|
bulk.markPeerClosed()
|
|
|
|
err := bulk.waitDedicatedReady(context.Background())
|
|
if !errors.Is(err, io.ErrClosedPipe) {
|
|
t.Fatalf("waitDedicatedReady error = %v, want %v", err, io.ErrClosedPipe)
|
|
}
|
|
}
|
|
|
|
func TestDedicatedBulkWritePrefersClosedPipeOverContextCanceled(t *testing.T) {
|
|
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
|
|
BulkID: "bulk-dedicated-write-close",
|
|
DataID: 1,
|
|
Dedicated: true,
|
|
ChunkSize: 4,
|
|
WindowBytes: 16,
|
|
MaxInFlight: 4,
|
|
}, 0, nil, nil, 0, nil, nil, func(context.Context, *bulkHandle, []byte) error {
|
|
return nil
|
|
}, func(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
|
|
bulk.markPeerClosed()
|
|
<-ctx.Done()
|
|
return 0, ctx.Err()
|
|
}, nil)
|
|
|
|
_, err := bulk.Write([]byte("abcdefgh"))
|
|
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
|
t.Fatalf("bulk Write error = %v, want nil or %v", err, io.ErrClosedPipe)
|
|
}
|
|
if err := bulk.waitPendingAsyncWrites(context.Background()); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
|
t.Fatalf("bulk waitPendingAsyncWrites error = %v, want nil or %v", err, io.ErrClosedPipe)
|
|
}
|
|
}
|
|
|
|
func TestBulkReadWaitingLocalClosePrefersClosedPipeOverContextCanceled(t *testing.T) {
|
|
bulk := newBulkHandle(context.Background(), nil, "test", BulkOpenRequest{
|
|
BulkID: "bulk-read-local-close",
|
|
DataID: 1,
|
|
Range: BulkRange{
|
|
Length: 1,
|
|
},
|
|
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
buf := make([]byte, 4)
|
|
_, err := bulk.Read(buf)
|
|
errCh <- err
|
|
}()
|
|
|
|
time.Sleep(20 * time.Millisecond)
|
|
if err := bulk.Close(); err != nil {
|
|
t.Fatalf("bulk Close failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if !errors.Is(err, io.ErrClosedPipe) {
|
|
t.Fatalf("bulk Read error = %v, want %v", err, io.ErrClosedPipe)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("bulk Read did not return after local close")
|
|
}
|
|
}
|
|
|
|
func TestBulkReleaseControlRoundTripTransport(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
const chunkSize = 64 * 1024
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{Offset: 0, Length: chunkSize},
|
|
ChunkSize: chunkSize,
|
|
WindowBytes: chunkSize,
|
|
MaxInFlight: 1,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
clientHandle, ok := bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
|
|
}
|
|
serverHandle, ok := accepted.Bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
|
|
}
|
|
if accepted.TransportConn == nil {
|
|
t.Fatal("accepted transport connection should not be nil")
|
|
}
|
|
|
|
clientHandle.mu.Lock()
|
|
clientHandle.outboundAvailBytes = 0
|
|
clientHandle.outboundInFlight = 1
|
|
clientHandle.mu.Unlock()
|
|
|
|
if err := sendBulkReleaseServerTransport(server, accepted.TransportConn, BulkReleaseRequest{
|
|
BulkID: serverHandle.ID(),
|
|
DataID: serverHandle.dataIDSnapshot(),
|
|
Bytes: chunkSize,
|
|
Chunks: 1,
|
|
}); err != nil {
|
|
t.Fatalf("sendBulkReleaseServerTransport failed: %v", err)
|
|
}
|
|
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
clientHandle.mu.Lock()
|
|
avail := clientHandle.outboundAvailBytes
|
|
inFlight := clientHandle.outboundInFlight
|
|
clientHandle.mu.Unlock()
|
|
if avail == chunkSize && inFlight == 0 {
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
clientHandle.mu.Lock()
|
|
avail := clientHandle.outboundAvailBytes
|
|
inFlight := clientHandle.outboundInFlight
|
|
clientHandle.mu.Unlock()
|
|
t.Fatalf("client outbound window not released: avail=%d inFlight=%d", avail, inFlight)
|
|
}
|
|
|
|
func TestBulkSharedWindowFlowControlPreventsBackpressureReset(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
const (
|
|
chunkSize = 1024 * 1024
|
|
totalBytes = 6 * chunkSize
|
|
)
|
|
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{
|
|
Offset: 0,
|
|
Length: totalBytes,
|
|
},
|
|
ChunkSize: chunkSize,
|
|
WindowBytes: chunkSize,
|
|
MaxInFlight: 1,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
serverHandle, ok := accepted.Bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
|
|
}
|
|
serverHandle.mu.Lock()
|
|
serverHandle.inboundQueueLimit = 2
|
|
serverHandle.inboundBytesLimit = 2 * chunkSize
|
|
serverHandle.mu.Unlock()
|
|
|
|
readDone := make(chan error, 1)
|
|
go func() {
|
|
buf := make([]byte, chunkSize)
|
|
total := 0
|
|
for {
|
|
n, err := accepted.Bulk.Read(buf)
|
|
if n > 0 {
|
|
total += n
|
|
time.Sleep(15 * time.Millisecond)
|
|
}
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
if total != totalBytes {
|
|
readDone <- errors.New("server bulk read size mismatch")
|
|
return
|
|
}
|
|
readDone <- nil
|
|
return
|
|
}
|
|
readDone <- err
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
payload := make([]byte, totalBytes)
|
|
for i := range payload {
|
|
payload[i] = byte(i)
|
|
}
|
|
if _, err := bulk.Write(payload); err != nil {
|
|
t.Fatalf("client bulk Write failed: %v", err)
|
|
}
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("client bulk CloseWrite failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case err := <-readDone:
|
|
if err != nil {
|
|
t.Fatalf("server read failed: %v", err)
|
|
}
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("timed out waiting for server bulk read")
|
|
}
|
|
|
|
clientHandle, ok := bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
|
|
}
|
|
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("client bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("server bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
}
|
|
|
|
func TestBulkDedicatedWindowFlowControlPreventsBackpressureReset(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
const (
|
|
chunkSize = 1024 * 1024
|
|
writeSize = 4 * chunkSize
|
|
totalWrites = 6
|
|
totalBytes = totalWrites * writeSize
|
|
)
|
|
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{Offset: 0, Length: totalBytes},
|
|
Dedicated: true,
|
|
ChunkSize: chunkSize,
|
|
WindowBytes: writeSize,
|
|
MaxInFlight: 4,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk dedicated failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
serverHandle, ok := accepted.Bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("accepted bulk type = %T, want *bulkHandle", accepted.Bulk)
|
|
}
|
|
serverHandle.mu.Lock()
|
|
serverHandle.inboundQueueLimit = 8
|
|
serverHandle.inboundBytesLimit = writeSize + chunkSize
|
|
serverHandle.mu.Unlock()
|
|
|
|
readDone := make(chan error, 1)
|
|
go func() {
|
|
buf := make([]byte, writeSize)
|
|
total := 0
|
|
for {
|
|
n, err := accepted.Bulk.Read(buf)
|
|
if n > 0 {
|
|
total += n
|
|
time.Sleep(15 * time.Millisecond)
|
|
}
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
if total != totalBytes {
|
|
readDone <- errors.New("server dedicated bulk read size mismatch")
|
|
return
|
|
}
|
|
readDone <- nil
|
|
return
|
|
}
|
|
readDone <- err
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
payload := make([]byte, writeSize)
|
|
for i := range payload {
|
|
payload[i] = byte(i)
|
|
}
|
|
for i := 0; i < totalWrites; i++ {
|
|
if _, err := bulk.Write(payload); err != nil {
|
|
t.Fatalf("client dedicated bulk Write #%d failed: %v", i, err)
|
|
}
|
|
}
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("client dedicated bulk CloseWrite failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case err := <-readDone:
|
|
if err != nil {
|
|
t.Fatalf("server dedicated read failed: %v", err)
|
|
}
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("timed out waiting for server dedicated bulk read")
|
|
}
|
|
|
|
clientHandle, ok := bulk.(*bulkHandle)
|
|
if !ok {
|
|
t.Fatalf("bulk type = %T, want *bulkHandle", bulk)
|
|
}
|
|
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("client dedicated bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("server dedicated bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
}
|
|
|
|
func TestBulkOpenRoundTripServerLogicalDedicatedTCP(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
client.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
logical := waitForTransferControlLogicalConn(t, server, 2*time.Second)
|
|
bulk, err := server.OpenBulkLogical(context.Background(), logical, BulkOpenOptions{
|
|
Range: BulkRange{
|
|
Offset: 2048,
|
|
Length: 4096,
|
|
},
|
|
Metadata: BulkMetadata{
|
|
"mode": "server-dedicated",
|
|
},
|
|
Dedicated: true,
|
|
ChunkSize: 32 * 1024,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("server OpenBulkLogical dedicated failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
if !accepted.Dedicated {
|
|
t.Fatal("client accepted dedicated flag should be true")
|
|
}
|
|
|
|
if _, err := bulk.Write([]byte("server-dedicated")); err != nil {
|
|
t.Fatalf("server dedicated bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, accepted.Bulk, "server-dedicated", 2*time.Second)
|
|
|
|
if _, err := accepted.Bulk.Write([]byte("client-dedicated")); err != nil {
|
|
t.Fatalf("client dedicated bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, bulk, "client-dedicated", 2*time.Second)
|
|
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("server dedicated bulk CloseWrite failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
|
|
|
|
if err := accepted.Bulk.Close(); err != nil {
|
|
t.Fatalf("client dedicated bulk Close failed: %v", err)
|
|
}
|
|
waitForBulkReadEOF(t, bulk, 2*time.Second)
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
}
|
|
|
|
func TestDedicatedBulkCloseWaitsForRemoteCloseBeforeFinalize(t *testing.T) {
|
|
runtime := newBulkRuntime("dedicated-close")
|
|
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
|
|
BulkID: "dedicated-close",
|
|
DataID: 1,
|
|
Dedicated: true,
|
|
Range: BulkRange{
|
|
Length: 1,
|
|
},
|
|
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
|
|
if err := runtime.register(clientFileScope(), bulk); err != nil {
|
|
t.Fatalf("register bulk failed: %v", err)
|
|
}
|
|
|
|
closeCalls := 0
|
|
bulk.closeFn = func(context.Context, *bulkHandle, bool) error {
|
|
closeCalls++
|
|
return nil
|
|
}
|
|
|
|
if err := bulk.Close(); err != nil {
|
|
t.Fatalf("bulk Close failed: %v", err)
|
|
}
|
|
if got, want := closeCalls, 1; got != want {
|
|
t.Fatalf("closeFn calls = %d, want %d", got, want)
|
|
}
|
|
select {
|
|
case <-bulk.Context().Done():
|
|
t.Fatal("dedicated full close should wait for remote close before finalize")
|
|
default:
|
|
}
|
|
if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); !ok {
|
|
t.Fatal("bulk runtime entry should remain until remote close arrives")
|
|
}
|
|
snapshot := bulk.snapshot()
|
|
if !snapshot.LocalClosed || !snapshot.LocalReadClosed {
|
|
t.Fatalf("local close snapshot mismatch: %+v", snapshot)
|
|
}
|
|
if snapshot.RemoteClosed {
|
|
t.Fatalf("remote close should not be set yet: %+v", snapshot)
|
|
}
|
|
|
|
bulk.markRemoteClosed()
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); ok {
|
|
t.Fatal("bulk runtime entry should be removed after remote close")
|
|
}
|
|
}
|
|
|
|
func TestHandleDedicatedBulkReadErrorTreatsEOFAfterLocalCloseAsGraceful(t *testing.T) {
|
|
runtime := newBulkRuntime("dedicated-eof")
|
|
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
|
|
BulkID: "dedicated-eof",
|
|
DataID: 1,
|
|
Dedicated: true,
|
|
Range: BulkRange{
|
|
Length: 1,
|
|
},
|
|
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
|
|
if err := runtime.register(clientFileScope(), bulk); err != nil {
|
|
t.Fatalf("register bulk failed: %v", err)
|
|
}
|
|
|
|
bulk.mu.Lock()
|
|
bulk.localClosed = true
|
|
bulk.mu.Unlock()
|
|
|
|
handleDedicatedBulkReadError(bulk, io.EOF)
|
|
|
|
if resetErr := bulk.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("reset error = %v, want nil", resetErr)
|
|
}
|
|
if !bulk.remoteClosedSnapshot() {
|
|
t.Fatal("remoteClosed should be set after graceful EOF")
|
|
}
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
if _, ok := runtime.lookup(clientFileScope(), bulk.ID()); ok {
|
|
t.Fatal("bulk runtime entry should be removed after graceful EOF")
|
|
}
|
|
}
|
|
|
|
func TestHandleDedicatedBulkReadErrorTreatsEOFEvenBeforeLocalCloseAsGracefulForDedicated(t *testing.T) {
|
|
runtime := newBulkRuntime("dedicated-eof-remote")
|
|
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
|
|
BulkID: "dedicated-eof-remote",
|
|
DataID: 1,
|
|
Dedicated: true,
|
|
Range: BulkRange{
|
|
Length: 8,
|
|
},
|
|
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
|
|
if err := runtime.register(clientFileScope(), bulk); err != nil {
|
|
t.Fatalf("register bulk failed: %v", err)
|
|
}
|
|
|
|
handleDedicatedBulkReadError(bulk, io.EOF)
|
|
|
|
if resetErr := bulk.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("reset error = %v, want nil", resetErr)
|
|
}
|
|
if !bulk.remoteClosedSnapshot() {
|
|
t.Fatal("remoteClosed should be set after dedicated EOF")
|
|
}
|
|
if _, err := bulk.Read(make([]byte, 1)); !errors.Is(err, io.EOF) {
|
|
t.Fatalf("bulk Read error = %v, want EOF", err)
|
|
}
|
|
if err := bulk.Close(); err != nil {
|
|
t.Fatalf("bulk Close failed: %v", err)
|
|
}
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
}
|
|
|
|
func TestDedicatedBulkCloseWriteHalfClosesUnderlyingTCPWriteSide(t *testing.T) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("net.Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = listener.Close()
|
|
}()
|
|
|
|
serverConnCh := make(chan net.Conn, 1)
|
|
serverErrCh := make(chan error, 1)
|
|
go func() {
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
serverErrCh <- err
|
|
return
|
|
}
|
|
serverConnCh <- conn
|
|
}()
|
|
|
|
clientConnRaw, err := net.Dial("tcp", listener.Addr().String())
|
|
if err != nil {
|
|
t.Fatalf("net.Dial failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = clientConnRaw.Close()
|
|
}()
|
|
|
|
var serverConn net.Conn
|
|
select {
|
|
case serverConn = <-serverConnCh:
|
|
case err := <-serverErrCh:
|
|
t.Fatalf("listener.Accept failed: %v", err)
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("timed out waiting for accepted TCP conn")
|
|
}
|
|
defer func() {
|
|
_ = serverConn.Close()
|
|
}()
|
|
|
|
runtime := newBulkRuntime("dedicated-half-close")
|
|
bulk := newBulkHandle(context.Background(), runtime, clientFileScope(), BulkOpenRequest{
|
|
BulkID: "dedicated-half-close",
|
|
DataID: 1,
|
|
Dedicated: true,
|
|
Range: BulkRange{
|
|
Length: 1,
|
|
},
|
|
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
|
|
if err := runtime.register(clientFileScope(), bulk); err != nil {
|
|
t.Fatalf("register bulk failed: %v", err)
|
|
}
|
|
if err := bulk.attachDedicatedConn(clientConnRaw); err != nil {
|
|
t.Fatalf("attachDedicatedConn failed: %v", err)
|
|
}
|
|
|
|
readDone := make(chan error, 1)
|
|
go func() {
|
|
_ = serverConn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
|
var buf [1]byte
|
|
_, err := serverConn.Read(buf[:])
|
|
readDone <- err
|
|
}()
|
|
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("bulk CloseWrite failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case err := <-readDone:
|
|
if !errors.Is(err, io.EOF) {
|
|
t.Fatalf("server conn Read error = %v, want EOF", err)
|
|
}
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatal("timed out waiting for TCP half-close EOF")
|
|
}
|
|
|
|
bulk.markRemoteClosed()
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
}
|
|
|
|
func TestBulkDedicatedClientFullCloseAfterCloseWriteDoesNotResetTCP(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
ID: "dedicated-close-after-closewrite",
|
|
Dedicated: true,
|
|
Range: BulkRange{
|
|
Length: 5,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk dedicated failed: %v", err)
|
|
}
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
|
|
if _, err := bulk.Write([]byte("hello")); err != nil {
|
|
t.Fatalf("client dedicated bulk Write failed: %v", err)
|
|
}
|
|
readBulkExactly(t, accepted.Bulk, "hello", 2*time.Second)
|
|
|
|
if err := bulk.CloseWrite(); err != nil {
|
|
t.Fatalf("client dedicated bulk CloseWrite failed: %v", err)
|
|
}
|
|
if err := bulk.Close(); err != nil {
|
|
t.Fatalf("client dedicated bulk Close failed: %v", err)
|
|
}
|
|
|
|
waitForBulkReadEOF(t, accepted.Bulk, 2*time.Second)
|
|
|
|
if err := accepted.Bulk.Close(); err != nil {
|
|
t.Fatalf("server dedicated bulk Close failed: %v", err)
|
|
}
|
|
|
|
waitForBulkContextDone(t, bulk.Context(), 2*time.Second)
|
|
waitForBulkContextDone(t, accepted.Bulk.Context(), 2*time.Second)
|
|
|
|
clientHandle := bulk.(*bulkHandle)
|
|
serverHandle := accepted.Bulk.(*bulkHandle)
|
|
if resetErr := clientHandle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("client dedicated bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
if resetErr := serverHandle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("server dedicated bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
}
|
|
|
|
func TestBulkSharedConcurrentWritersWithSlowReceiver(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
const bulkCount = 6
|
|
acceptCh := make(chan BulkAcceptInfo, bulkCount)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
type bulkPair struct {
|
|
client Bulk
|
|
server Bulk
|
|
}
|
|
pairs := make([]bulkPair, 0, bulkCount)
|
|
for i := 0; i < bulkCount; i++ {
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
ID: "slow-shared-" + formatInt(int64(i)),
|
|
ChunkSize: 64 * 1024,
|
|
WindowBytes: 128 * 1024,
|
|
MaxInFlight: 2,
|
|
WriteTimeout: 2 * time.Second,
|
|
Range: BulkRange{
|
|
Length: 1024 * 1024,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk #%d failed: %v", i, err)
|
|
}
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
pairs = append(pairs, bulkPair{client: bulk, server: accepted.Bulk})
|
|
}
|
|
|
|
readErrCh := make(chan error, bulkCount)
|
|
for _, pair := range pairs {
|
|
go func(serverBulk Bulk) {
|
|
buf := make([]byte, 32*1024)
|
|
total := 0
|
|
for {
|
|
n, err := serverBulk.Read(buf)
|
|
if n > 0 {
|
|
total += n
|
|
time.Sleep(2 * time.Millisecond)
|
|
}
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
if closeErr := serverBulk.Close(); closeErr != nil {
|
|
readErrCh <- closeErr
|
|
return
|
|
}
|
|
readErrCh <- nil
|
|
return
|
|
}
|
|
readErrCh <- err
|
|
return
|
|
}
|
|
}
|
|
}(pair.server)
|
|
}
|
|
|
|
writeErrCh := make(chan error, bulkCount)
|
|
payload := make([]byte, 64*1024)
|
|
for i := range payload {
|
|
payload[i] = byte(i)
|
|
}
|
|
for _, pair := range pairs {
|
|
go func(clientBulk Bulk) {
|
|
defer func() {
|
|
_ = clientBulk.Close()
|
|
}()
|
|
for written := 0; written < 1024*1024; written += len(payload) {
|
|
if _, err := clientBulk.Write(payload); err != nil {
|
|
writeErrCh <- err
|
|
return
|
|
}
|
|
}
|
|
if err := clientBulk.CloseWrite(); err != nil {
|
|
writeErrCh <- err
|
|
return
|
|
}
|
|
writeErrCh <- nil
|
|
}(pair.client)
|
|
}
|
|
|
|
for i := 0; i < bulkCount; i++ {
|
|
select {
|
|
case err := <-writeErrCh:
|
|
if err != nil {
|
|
t.Fatalf("slow receiver client write failed: %v", err)
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("timed out waiting for client writes under slow receiver")
|
|
}
|
|
}
|
|
|
|
for i := 0; i < bulkCount; i++ {
|
|
select {
|
|
case err := <-readErrCh:
|
|
if err != nil {
|
|
t.Fatalf("slow receiver server read failed: %v", err)
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("timed out waiting for server reads under slow receiver")
|
|
}
|
|
}
|
|
|
|
for _, pair := range pairs {
|
|
waitForBulkContextDone(t, pair.client.Context(), 2*time.Second)
|
|
waitForBulkContextDone(t, pair.server.Context(), 2*time.Second)
|
|
if handle, ok := pair.client.(*bulkHandle); ok {
|
|
if resetErr := handle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("client bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
}
|
|
if handle, ok := pair.server.(*bulkHandle); ok {
|
|
if resetErr := handle.resetErrSnapshot(); resetErr != nil {
|
|
t.Fatalf("server bulk reset error = %v, want nil", resetErr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestBulkOpenRequiresHandlerTCP(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
_, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{
|
|
Offset: 0,
|
|
Length: 128,
|
|
},
|
|
})
|
|
if !errors.Is(err, errBulkHandlerNotConfigured) {
|
|
t.Fatalf("client OpenBulk error = %v, want %v", err, errBulkHandlerNotConfigured)
|
|
}
|
|
}
|
|
|
|
func waitAcceptedBulk(t *testing.T, ch <-chan BulkAcceptInfo, timeout time.Duration) BulkAcceptInfo {
|
|
t.Helper()
|
|
select {
|
|
case info := <-ch:
|
|
return info
|
|
case <-time.After(timeout):
|
|
t.Fatal("timed out waiting for accepted bulk")
|
|
return BulkAcceptInfo{}
|
|
}
|
|
}
|
|
|
|
func waitForBulkReadEOF(t *testing.T, bulk Bulk, timeout time.Duration) {
|
|
t.Helper()
|
|
deadline := time.Now().Add(timeout)
|
|
buf := make([]byte, 1)
|
|
for time.Now().Before(deadline) {
|
|
_, err := bulk.Read(buf)
|
|
if errors.Is(err, io.EOF) {
|
|
return
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("bulk Read returned unexpected error: %v", err)
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
t.Fatal("timed out waiting for bulk EOF")
|
|
}
|
|
|
|
func waitForBulkContextDone(t *testing.T, ctx context.Context, timeout time.Duration) {
|
|
t.Helper()
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-time.After(timeout):
|
|
t.Fatal("timed out waiting for bulk context done")
|
|
}
|
|
}
|
|
|
|
func readBulkExactly(t *testing.T, bulk Bulk, want string, timeout time.Duration) {
|
|
t.Helper()
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
buf := make([]byte, len(want))
|
|
_, err := io.ReadFull(bulk, buf)
|
|
if err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
if got := string(buf); got != want {
|
|
errCh <- errors.New("bulk payload mismatch: got " + got + " want " + want)
|
|
return
|
|
}
|
|
errCh <- nil
|
|
}()
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
case <-time.After(timeout):
|
|
t.Fatal("timed out waiting for bulk payload")
|
|
}
|
|
}
|
|
|
|
func readBulkError(t *testing.T, bulk Bulk, timeout time.Duration) error {
|
|
t.Helper()
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
buf := make([]byte, 1)
|
|
_, err := bulk.Read(buf)
|
|
errCh <- err
|
|
}()
|
|
select {
|
|
case err := <-errCh:
|
|
if err == nil {
|
|
t.Fatal("expected bulk read error, got nil")
|
|
}
|
|
return err
|
|
case <-time.After(timeout):
|
|
t.Fatal("timed out waiting for bulk read error")
|
|
return nil
|
|
}
|
|
}
|