2026-04-15 15:24:36 +08:00
package notify
import (
"context"
"errors"
2026-04-18 16:05:57 +08:00
"net"
"strings"
"sync"
2026-04-15 15:24:36 +08:00
"testing"
"time"
)
func TestBulkOpenDedicatedUDPRejected ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
return nil
} )
if err := server . Listen ( "udp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "udp" , signalRoundTripServerAddr ( server , "" ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
_ , err := client . OpenBulk ( context . Background ( ) , BulkOpenOptions {
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
Dedicated : true ,
} )
if ! errors . Is ( err , errBulkDedicatedStreamOnly ) {
t . Fatalf ( "client OpenBulk dedicated over udp error = %v, want %v" , err , errBulkDedicatedStreamOnly )
}
}
2026-04-18 16:05:57 +08:00
func TestBulkOpenAutoUDPFallsBackToShared ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 2 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "udp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "udp" , signalRoundTripServerAddr ( server , "" ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
bulk , err := client . OpenBulk ( context . Background ( ) , BulkOpenOptions {
Mode : BulkOpenModeAuto ,
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenBulk auto over udp failed: %v" , err )
}
if bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "client OpenBulk auto over udp should fall back to shared" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulk ( t , acceptCh , 2 * time . Second )
if accepted . Dedicated {
t . Fatal ( "server accepted bulk should be shared for auto over udp" )
}
_ = accepted . Bulk . Close ( )
}
2026-04-20 16:35:44 +08:00
func TestOpenDedicatedBulkConnectByConnRejectedAsSingleConnMode ( t * testing . T ) {
server := newRunningPeerAttachServerForTest ( t , func ( server * ServerCommon ) {
if err := UsePSKOverExternalTransportServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UsePSKOverExternalTransportServer failed: %v" , err )
}
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
return nil
} )
} )
client := NewClient ( ) . ( * ClientCommon )
if err := UsePSKOverExternalTransportClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UsePSKOverExternalTransportClient failed: %v" , err )
}
left , right := net . Pipe ( )
defer right . Close ( )
bootstrapPeerAttachConnForTest ( t , server , right )
if err := client . ConnectByConn ( left ) ; err != nil {
t . Fatalf ( "ConnectByConn failed: %v" , err )
}
defer func ( ) {
client . setByeFromServer ( true )
_ = client . Stop ( )
} ( )
_ , err := client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if ! errors . Is ( err , errBulkDedicatedSingleConn ) {
t . Fatalf ( "client OpenDedicatedBulk over ConnectByConn error = %v, want %v" , err , errBulkDedicatedSingleConn )
}
}
func TestOpenBulkAutoConnectByConnFallsBackToShared ( t * testing . T ) {
acceptCh := make ( chan BulkAcceptInfo , 2 )
server := newRunningPeerAttachServerForTest ( t , func ( server * ServerCommon ) {
if err := UsePSKOverExternalTransportServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UsePSKOverExternalTransportServer failed: %v" , err )
}
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
} )
client := NewClient ( ) . ( * ClientCommon )
if err := UsePSKOverExternalTransportClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UsePSKOverExternalTransportClient failed: %v" , err )
}
left , right := net . Pipe ( )
defer right . Close ( )
bootstrapPeerAttachConnForTest ( t , server , right )
if err := client . ConnectByConn ( left ) ; err != nil {
t . Fatalf ( "ConnectByConn failed: %v" , err )
}
defer func ( ) {
client . setByeFromServer ( true )
_ = client . Stop ( )
} ( )
bulk , err := client . OpenBulk ( context . Background ( ) , BulkOpenOptions {
Mode : BulkOpenModeAuto ,
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenBulk auto over ConnectByConn failed: %v" , err )
}
if bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "client OpenBulk auto over ConnectByConn should fall back to shared" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulk ( t , acceptCh , 2 * time . Second )
if accepted . Dedicated {
t . Fatal ( "server accepted bulk should be shared after ConnectByConn auto fallback" )
}
if _ , err := bulk . Write ( [ ] byte ( "shared-over-single-conn" ) ) ; err != nil {
t . Fatalf ( "client bulk Write failed: %v" , err )
}
readBulkExactly ( t , accepted . Bulk , "shared-over-single-conn" , 2 * time . Second )
select {
case extra := <- acceptCh :
t . Fatalf ( "unexpected extra server bulk accept: %+v" , extra )
case <- time . After ( 300 * time . Millisecond ) :
}
_ = accepted . Bulk . Close ( )
}
func TestOpenDedicatedBulkExternalTransportDialableSourceSucceeds ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UsePSKOverExternalTransportServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UsePSKOverExternalTransportServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 2 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UsePSKOverExternalTransportClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UsePSKOverExternalTransportClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
client . setByeFromServer ( true )
_ = client . Stop ( )
} ( )
bulk , err := client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
ID : "external-dedicated-dialable" ,
Range : BulkRange {
Offset : 0 ,
Length : 64 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenDedicatedBulk failed: %v" , err )
}
if ! bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "client OpenDedicatedBulk over dialable external transport should stay dedicated" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulkByID ( t , acceptCh , bulk . ID ( ) , 2 * time . Second )
if ! accepted . Dedicated {
t . Fatal ( "server accepted bulk should stay dedicated over dialable external transport" )
}
defer func ( ) {
_ = accepted . Bulk . Close ( )
} ( )
clientHandle := bulk . ( * bulkHandle )
if clientHandle . dedicatedConnSnapshot ( ) == nil {
t . Fatal ( "client dedicated sidecar conn should be attached" )
}
if mainConn := client . clientTransportConnSnapshot ( ) ; mainConn != nil && clientHandle . dedicatedConnSnapshot ( ) == mainConn {
t . Fatal ( "client dedicated sidecar should use an additional physical connection" )
}
if got := client . clientTransportProtectionSnapshot ( ) . mode ; got != ProtectionExternal {
t . Fatalf ( "client steady protection mode = %v, want %v" , got , ProtectionExternal )
}
payload := "external-dedicated-sidecar"
if _ , err := bulk . Write ( [ ] byte ( payload ) ) ; err != nil {
t . Fatalf ( "client bulk Write failed: %v" , err )
}
readBulkExactly ( t , accepted . Bulk , payload , 2 * time . Second )
}
2026-04-18 16:05:57 +08:00
func TestOpenDedicatedBulkWaitsForActiveSlotUntilContextDeadline ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
client . bulkDedicatedActiveLimit = 1
client . bulkDedicatedActive . Store ( 1 )
ctx , cancel := context . WithTimeout ( context . Background ( ) , 40 * time . Millisecond )
defer cancel ( )
_ , err := client . OpenDedicatedBulk ( ctx , BulkOpenOptions {
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if ! errors . Is ( err , context . DeadlineExceeded ) {
t . Fatalf ( "client OpenDedicatedBulk error = %v, want %v" , err , context . DeadlineExceeded )
}
}
func TestOpenBulkAutoWaitsForActiveSlotAndKeepsDedicated ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 4 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
client . bulkDedicatedActiveLimit = 1
client . bulkDedicatedActive . Store ( 1 )
time . AfterFunc ( 40 * time . Millisecond , func ( ) {
client . releaseBulkDedicatedActiveSlot ( )
} )
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
defer cancel ( )
bulk , err := client . OpenBulk ( ctx , BulkOpenOptions {
Mode : BulkOpenModeAuto ,
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenBulk auto failed: %v" , err )
}
if ! bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "client OpenBulk auto should wait for active slot and stay dedicated" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulkByID ( t , acceptCh , bulk . ID ( ) , 2 * time . Second )
if ! accepted . Dedicated {
t . Fatal ( "server accepted bulk should stay dedicated after active slot wait" )
}
if _ , err := bulk . Write ( [ ] byte ( "auto-fallback" ) ) ; err != nil {
t . Fatalf ( "client bulk Write failed: %v" , err )
}
readBulkExactly ( t , accepted . Bulk , "auto-fallback" , 2 * time . Second )
_ = accepted . Bulk . Close ( )
}
func TestOpenSharedBulkForcesShared ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 2 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
bulk , err := client . OpenSharedBulk ( context . Background ( ) , BulkOpenOptions {
Mode : BulkOpenModeDedicated ,
Dedicated : true ,
Range : BulkRange {
Offset : 0 ,
Length : 256 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenSharedBulk failed: %v" , err )
}
if bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "OpenSharedBulk should force shared mode" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulkByID ( t , acceptCh , bulk . ID ( ) , 2 * time . Second )
if accepted . Dedicated {
t . Fatal ( "server accepted bulk should be shared for OpenSharedBulk" )
}
_ = accepted . Bulk . Close ( )
}
func TestOpenBulkDefaultModeUsesClientSetting ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
client . SetBulkDefaultOpenMode ( BulkOpenModeDedicated )
cfg := client . BulkDedicatedAttachConfig ( )
cfg . ActiveLimit = 1
client . SetBulkDedicatedAttachConfig ( cfg )
client . bulkDedicatedActive . Store ( 1 )
ctx , cancel := context . WithTimeout ( context . Background ( ) , 40 * time . Millisecond )
defer cancel ( )
_ , err := client . OpenBulk ( ctx , BulkOpenOptions {
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if ! errors . Is ( err , context . DeadlineExceeded ) {
t . Fatalf ( "client OpenBulk default-mode dedicated error = %v, want %v" , err , context . DeadlineExceeded )
}
}
func TestBulkNetworkProfileWANUsesAutoFallback ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 2 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
client . SetBulkNetworkProfile ( BulkNetworkProfileWAN )
if got , want := client . BulkNetworkProfile ( ) , BulkNetworkProfileWAN ; got != want {
t . Fatalf ( "BulkNetworkProfile = %v, want %v" , got , want )
}
if got , want := client . BulkDefaultOpenMode ( ) , BulkOpenModeAuto ; got != want {
t . Fatalf ( "BulkDefaultOpenMode = %v, want %v" , got , want )
}
cfg := client . BulkDedicatedAttachConfig ( )
if got , want := cfg . ActiveLimit , 4096 ; got != want {
t . Fatalf ( "WAN ActiveLimit = %d, want %d" , got , want )
}
client . setClientConnectSource ( newClientFactoryConnectSource ( func ( context . Context ) ( net . Conn , error ) {
return nil , errors . New ( "forced attach dial failure" )
} ) )
bulk , err := client . OpenBulk ( context . Background ( ) , BulkOpenOptions {
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenBulk with WAN profile failed: %v" , err )
}
if bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "client OpenBulk with WAN profile should fallback to shared when dedicated attach fails" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulkByID ( t , acceptCh , bulk . ID ( ) , 2 * time . Second )
if accepted . Dedicated {
t . Fatal ( "server accepted bulk should be shared after WAN auto fallback" )
}
_ = accepted . Bulk . Close ( )
}
func TestOpenBulkAutoAttachFailureTriggersSingleServerAccept ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 2 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
client . setClientConnectSource ( newClientFactoryConnectSource ( func ( context . Context ) ( net . Conn , error ) {
return nil , errors . New ( "forced attach dial failure" )
} ) )
bulk , err := client . OpenBulk ( context . Background ( ) , BulkOpenOptions {
Mode : BulkOpenModeAuto ,
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenBulk auto failed: %v" , err )
}
if bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "client OpenBulk auto should fallback to shared after attach dial failure" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulk ( t , acceptCh , 2 * time . Second )
if accepted . Dedicated {
t . Fatal ( "server should only dispatch shared accept after dedicated attach failure" )
}
select {
case extra := <- acceptCh :
t . Fatalf ( "unexpected extra server bulk accept: %+v" , extra )
case <- time . After ( 300 * time . Millisecond ) :
}
_ = accepted . Bulk . Close ( )
}
func TestDedicatedBulkReusesSessionSidecarAcrossBulks ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 4 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
cfg := client . BulkDedicatedAttachConfig ( )
cfg . LaneLimit = 1
client . SetBulkDedicatedAttachConfig ( cfg )
bulk1 , err := client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
ID : "reuse-1" ,
Range : BulkRange {
Offset : 0 ,
Length : 16 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenDedicatedBulk #1 failed: %v" , err )
}
accepted1 := waitAcceptedBulkByID ( t , acceptCh , bulk1 . ID ( ) , 2 * time . Second )
bulk2 , err := client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
ID : "reuse-2" ,
Range : BulkRange {
Offset : 16 ,
Length : 16 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenDedicatedBulk #2 failed: %v" , err )
}
accepted2 := waitAcceptedBulkByID ( t , acceptCh , bulk2 . ID ( ) , 2 * time . Second )
clientHandle1 := bulk1 . ( * bulkHandle )
clientHandle2 := bulk2 . ( * bulkHandle )
serverHandle1 := accepted1 . Bulk . ( * bulkHandle )
serverHandle2 := accepted2 . Bulk . ( * bulkHandle )
clientConn1 := clientHandle1 . dedicatedConnSnapshot ( )
clientConn2 := clientHandle2 . dedicatedConnSnapshot ( )
if clientConn1 == nil || clientConn2 == nil || clientConn1 != clientConn2 {
t . Fatal ( "client dedicated bulks should reuse the same sidecar conn" )
}
serverConn1 := serverHandle1 . dedicatedConnSnapshot ( )
serverConn2 := serverHandle2 . dedicatedConnSnapshot ( )
if serverConn1 == nil || serverConn2 == nil || serverConn1 != serverConn2 {
t . Fatal ( "server dedicated bulks should reuse the same sidecar conn" )
}
if got := client . bulkAttachAttemptCount . Load ( ) ; got != 1 {
t . Fatalf ( "client bulkAttachAttemptCount = %d, want 1" , got )
}
if got := client . bulkAttachSuccessCount . Load ( ) ; got != 1 {
t . Fatalf ( "client bulkAttachSuccessCount = %d, want 1" , got )
}
if err := bulk1 . Close ( ) ; err != nil {
t . Fatalf ( "client bulk1 Close failed: %v" , err )
}
if err := accepted1 . Bulk . Close ( ) ; err != nil {
t . Fatalf ( "server bulk1 Close failed: %v" , err )
}
waitForBulkContextDone ( t , bulk1 . Context ( ) , 2 * time . Second )
waitForBulkContextDone ( t , accepted1 . Bulk . Context ( ) , 2 * time . Second )
if client . clientDedicatedSidecarSnapshot ( ) == nil {
t . Fatal ( "shared dedicated sidecar should stay alive after closing only one bulk" )
}
if _ , err := bulk2 . Write ( [ ] byte ( "reuse-ok" ) ) ; err != nil {
t . Fatalf ( "client bulk2 Write failed after bulk1 close: %v" , err )
}
readBulkExactly ( t , accepted2 . Bulk , "reuse-ok" , 2 * time . Second )
if err := bulk2 . Close ( ) ; err != nil {
t . Fatalf ( "client bulk2 Close failed: %v" , err )
}
if err := accepted2 . Bulk . Close ( ) ; err != nil {
t . Fatalf ( "server bulk2 Close failed: %v" , err )
}
}
func TestSharedDedicatedSidecarStaleDataIDRejectDoesNotBreakOtherBulks ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 4 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
cfg := client . BulkDedicatedAttachConfig ( )
cfg . LaneLimit = 1
client . SetBulkDedicatedAttachConfig ( cfg )
bulk1 , err := client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
ID : "stale-sidecar-1" ,
Range : BulkRange {
Offset : 0 ,
Length : 16 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenDedicatedBulk #1 failed: %v" , err )
}
accepted1 := waitAcceptedBulkByID ( t , acceptCh , bulk1 . ID ( ) , 2 * time . Second )
bulk2 , err := client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
ID : "stale-sidecar-2" ,
Range : BulkRange {
Offset : 16 ,
Length : 16 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenDedicatedBulk #2 failed: %v" , err )
}
accepted2 := waitAcceptedBulkByID ( t , acceptCh , bulk2 . ID ( ) , 2 * time . Second )
clientHandle1 := bulk1 . ( * bulkHandle )
clientHandle2 := bulk2 . ( * bulkHandle )
clientConn1 := clientHandle1 . dedicatedConnSnapshot ( )
clientConn2 := clientHandle2 . dedicatedConnSnapshot ( )
if clientConn1 == nil || clientConn2 == nil || clientConn1 != clientConn2 {
t . Fatal ( "client dedicated bulks should share the same sidecar conn" )
}
staleDataID := clientHandle1 . dataIDSnapshot ( )
if staleDataID == 0 {
t . Fatal ( "stale data id should not be zero" )
}
if err := bulk1 . Close ( ) ; err != nil {
t . Fatalf ( "client bulk1 Close failed: %v" , err )
}
if err := accepted1 . Bulk . Close ( ) ; err != nil {
t . Fatalf ( "server bulk1 Close failed: %v" , err )
}
waitForBulkContextDone ( t , bulk1 . Context ( ) , 2 * time . Second )
waitForBulkContextDone ( t , accepted1 . Bulk . Context ( ) , 2 * time . Second )
stalePayload , err := client . encodeDedicatedBulkBatchPayload ( staleDataID , [ ] bulkDedicatedSendRequest { {
Type : bulkFastPayloadTypeData ,
Seq : 1 ,
Payload : [ ] byte ( "stale-data" ) ,
} } )
if err != nil {
t . Fatalf ( "encodeDedicatedBulkBatchPayload failed: %v" , err )
}
if err := writeBulkDedicatedRecord ( clientConn2 , stalePayload ) ; err != nil {
t . Fatalf ( "writeBulkDedicatedRecord stale payload failed: %v" , err )
}
deadline := time . Now ( ) . Add ( 200 * time . Millisecond )
for time . Now ( ) . Before ( deadline ) {
sidecar := client . clientDedicatedSidecarSnapshot ( )
if sidecar == nil || sidecar . conn != clientConn2 {
t . Fatalf ( "shared dedicated sidecar should remain alive after stale data reject, got %+v" , sidecar )
}
time . Sleep ( 10 * time . Millisecond )
}
if _ , err := bulk2 . Write ( [ ] byte ( "still-alive" ) ) ; err != nil {
t . Fatalf ( "client bulk2 Write failed after stale data reject: %v" , err )
}
readBulkExactly ( t , accepted2 . Bulk , "still-alive" , 2 * time . Second )
if err := bulk2 . Close ( ) ; err != nil {
t . Fatalf ( "client bulk2 Close failed: %v" , err )
}
if err := accepted2 . Bulk . Close ( ) ; err != nil {
t . Fatalf ( "server bulk2 Close failed: %v" , err )
}
}
func TestDedicatedBulkConcurrentOpenSingleflightAttach ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 4 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
server . SetLink ( systemBulkAttachKey , func ( msg * Message ) {
time . Sleep ( 120 * time . Millisecond )
_ = server . handleBulkAttachSystemMessage ( * msg )
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
cfg := client . BulkDedicatedAttachConfig ( )
cfg . AttachLimit = 4
cfg . LaneLimit = 1
client . SetBulkDedicatedAttachConfig ( cfg )
results := make ( [ ] Bulk , 2 )
errs := make ( [ ] error , 2 )
ids := [ ] string { "concurrent-1" , "concurrent-2" }
start := make ( chan struct { } )
var wg sync . WaitGroup
for i := range ids {
wg . Add ( 1 )
go func ( index int ) {
defer wg . Done ( )
<- start
results [ index ] , errs [ index ] = client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
ID : ids [ index ] ,
Range : BulkRange {
Offset : int64 ( index * 16 ) ,
Length : 16 ,
} ,
} )
} ( i )
}
close ( start )
wg . Wait ( )
for i , err := range errs {
if err != nil {
t . Fatalf ( "client OpenDedicatedBulk #%d failed: %v" , i + 1 , err )
}
}
acceptedByID := make ( map [ string ] BulkAcceptInfo , 2 )
for len ( acceptedByID ) < 2 {
info := waitAcceptedBulk ( t , acceptCh , 2 * time . Second )
acceptedByID [ info . ID ] = info
}
accepted1 := acceptedByID [ ids [ 0 ] ]
accepted2 := acceptedByID [ ids [ 1 ] ]
clientHandle1 := results [ 0 ] . ( * bulkHandle )
clientHandle2 := results [ 1 ] . ( * bulkHandle )
serverHandle1 := accepted1 . Bulk . ( * bulkHandle )
serverHandle2 := accepted2 . Bulk . ( * bulkHandle )
if got := client . bulkAttachAttemptCount . Load ( ) ; got != 1 {
t . Fatalf ( "client bulkAttachAttemptCount = %d, want 1" , got )
}
if got := client . bulkAttachSuccessCount . Load ( ) ; got != 1 {
t . Fatalf ( "client bulkAttachSuccessCount = %d, want 1" , got )
}
clientConn1 := clientHandle1 . dedicatedConnSnapshot ( )
clientConn2 := clientHandle2 . dedicatedConnSnapshot ( )
if clientConn1 == nil || clientConn2 == nil || clientConn1 != clientConn2 {
t . Fatal ( "concurrent dedicated bulks should share one client sidecar conn" )
}
serverConn1 := serverHandle1 . dedicatedConnSnapshot ( )
serverConn2 := serverHandle2 . dedicatedConnSnapshot ( )
if serverConn1 == nil || serverConn2 == nil || serverConn1 != serverConn2 {
t . Fatal ( "concurrent dedicated bulks should share one server sidecar conn" )
}
if _ , err := results [ 0 ] . Write ( [ ] byte ( "c1" ) ) ; err != nil {
t . Fatalf ( "client bulk1 Write failed: %v" , err )
}
if _ , err := results [ 1 ] . Write ( [ ] byte ( "c2" ) ) ; err != nil {
t . Fatalf ( "client bulk2 Write failed: %v" , err )
}
readBulkExactly ( t , accepted1 . Bulk , "c1" , 2 * time . Second )
readBulkExactly ( t , accepted2 . Bulk , "c2" , 2 * time . Second )
for _ , bulk := range results {
if bulk != nil {
_ = bulk . Close ( )
}
}
_ = accepted1 . Bulk . Close ( )
_ = accepted2 . Bulk . Close ( )
}
func TestDedicatedBulkConcurrentPendingRejectPropagatesOpenError ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan string , 2 )
badDispatchCh := make ( chan struct { } , 1 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
if info . ID == "pending-reject-bad" {
select {
case badDispatchCh <- struct { } { } :
default :
}
return errors . New ( "server rejected pending dedicated bulk" )
}
acceptCh <- info . ID
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
transportConn := client . clientTransportConnSnapshot ( )
if transportConn == nil || transportConn . RemoteAddr ( ) == nil {
t . Fatal ( "client transport snapshot should not be nil" )
}
network := transportConn . RemoteAddr ( ) . Network ( )
addr := transportConn . RemoteAddr ( ) . String ( )
client . setClientConnectSource ( newClientFactoryConnectSource ( func ( ctx context . Context ) ( net . Conn , error ) {
timer := time . NewTimer ( 120 * time . Millisecond )
defer timer . Stop ( )
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
case <- timer . C :
}
var dialer net . Dialer
return dialer . DialContext ( ctx , network , addr )
} ) )
cfg := client . BulkDedicatedAttachConfig ( )
cfg . AttachLimit = 4
cfg . LaneLimit = 1
client . SetBulkDedicatedAttachConfig ( cfg )
type openResult struct {
id string
bulk Bulk
err error
}
resultCh := make ( chan openResult , 2 )
start := make ( chan struct { } )
for _ , id := range [ ] string { "pending-reject-good" , "pending-reject-bad" } {
go func ( id string ) {
<- start
ctx , cancel := context . WithTimeout ( context . Background ( ) , 3 * time . Second )
defer cancel ( )
bulk , err := client . OpenDedicatedBulk ( ctx , BulkOpenOptions {
ID : id ,
Range : BulkRange {
Offset : 0 ,
Length : 16 ,
} ,
} )
resultCh <- openResult { id : id , bulk : bulk , err : err }
} ( id )
}
close ( start )
results := make ( map [ string ] openResult , 2 )
for len ( results ) < 2 {
result := <- resultCh
results [ result . id ] = result
}
good := results [ "pending-reject-good" ]
if good . err != nil {
t . Fatalf ( "good dedicated open failed: %v" , good . err )
}
defer func ( ) {
if good . bulk != nil {
_ = good . bulk . Close ( )
}
} ( )
bad := results [ "pending-reject-bad" ]
if bad . err == nil {
dispatched := false
select {
case <- badDispatchCh :
dispatched = true
default :
}
if bad . bulk == nil {
t . Fatalf ( "bad dedicated open should fail after remote pending accept rejection: dispatched=%v" , dispatched )
}
if handle , ok := bad . bulk . ( * bulkHandle ) ; ok {
t . Fatalf ( "bad dedicated open should fail after remote pending accept rejection: dispatched=%v snapshot=%+v resetErr=%v" , dispatched , bad . bulk . Snapshot ( ) , handle . resetErrSnapshot ( ) )
}
t . Fatalf ( "bad dedicated open should fail after remote pending accept rejection: dispatched=%v snapshot=%+v" , dispatched , bad . bulk . Snapshot ( ) )
}
if ! strings . Contains ( bad . err . Error ( ) , "server rejected pending dedicated bulk" ) {
t . Fatalf ( "bad dedicated open error = %v, want remote reject detail" , bad . err )
}
if got := client . bulkAttachAttemptCount . Load ( ) ; got != 1 {
t . Fatalf ( "client bulkAttachAttemptCount = %d, want 1" , got )
}
if got := client . bulkAttachSuccessCount . Load ( ) ; got != 1 {
t . Fatalf ( "client bulkAttachSuccessCount = %d, want 1" , got )
}
select {
case acceptedID := <- acceptCh :
if acceptedID != "pending-reject-good" {
t . Fatalf ( "accepted bulk id = %q, want %q" , acceptedID , "pending-reject-good" )
}
case <- time . After ( 2 * time . Second ) :
t . Fatal ( "timed out waiting for accepted pending dedicated bulk" )
}
select {
case extra := <- acceptCh :
t . Fatalf ( "unexpected extra accepted bulk id: %q" , extra )
case <- time . After ( 300 * time . Millisecond ) :
}
}
func TestDedicatedBulkLanePoolSpreadsAcrossConfiguredLanes ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 6 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
cfg := client . BulkDedicatedAttachConfig ( )
cfg . AttachLimit = 4
cfg . LaneLimit = 2
client . SetBulkDedicatedAttachConfig ( cfg )
ids := [ ] string { "lane-1" , "lane-2" , "lane-3" }
results := make ( [ ] Bulk , 0 , len ( ids ) )
acceptedByID := make ( map [ string ] BulkAcceptInfo , len ( ids ) )
for index , id := range ids {
bulk , err := client . OpenDedicatedBulk ( context . Background ( ) , BulkOpenOptions {
ID : id ,
Range : BulkRange {
Offset : int64 ( index * 16 ) ,
Length : 16 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenDedicatedBulk %q failed: %v" , id , err )
}
results = append ( results , bulk )
acceptedByID [ id ] = waitAcceptedBulkByID ( t , acceptCh , id , 2 * time . Second )
}
defer func ( ) {
for _ , bulk := range results {
if bulk != nil {
_ = bulk . Close ( )
}
}
for _ , accepted := range acceptedByID {
_ = accepted . Bulk . Close ( )
}
} ( )
clientConns := make ( map [ net . Conn ] struct { } )
serverConns := make ( map [ net . Conn ] struct { } )
laneIDs := make ( [ ] uint32 , 0 , len ( results ) )
for _ , bulk := range results {
handle := bulk . ( * bulkHandle )
conn := handle . dedicatedConnSnapshot ( )
if conn == nil {
t . Fatal ( "client dedicated conn should not be nil" )
}
clientConns [ conn ] = struct { } { }
laneIDs = append ( laneIDs , handle . dedicatedLaneIDSnapshot ( ) )
}
for _ , accepted := range acceptedByID {
handle := accepted . Bulk . ( * bulkHandle )
conn := handle . dedicatedConnSnapshot ( )
if conn == nil {
t . Fatal ( "server dedicated conn should not be nil" )
}
serverConns [ conn ] = struct { } { }
}
if got , want := len ( clientConns ) , 2 ; got != want {
t . Fatalf ( "client dedicated lane conn count = %d, want %d" , got , want )
}
if got , want := len ( serverConns ) , 2 ; got != want {
t . Fatalf ( "server dedicated lane conn count = %d, want %d" , got , want )
}
if got , want := client . bulkAttachAttemptCount . Load ( ) , int64 ( 2 ) ; got != want {
t . Fatalf ( "client bulkAttachAttemptCount = %d, want %d" , got , want )
}
if got , want := client . bulkAttachSuccessCount . Load ( ) , int64 ( 2 ) ; got != want {
t . Fatalf ( "client bulkAttachSuccessCount = %d, want %d" , got , want )
}
if laneIDs [ 0 ] == 0 || laneIDs [ 1 ] == 0 || laneIDs [ 2 ] == 0 {
t . Fatalf ( "dedicated lane ids should be assigned, got %v" , laneIDs )
}
if laneIDs [ 0 ] == laneIDs [ 1 ] {
t . Fatalf ( "first two bulks should spread across lanes, got %v" , laneIDs [ : 2 ] )
}
if laneIDs [ 2 ] != laneIDs [ 0 ] && laneIDs [ 2 ] != laneIDs [ 1 ] {
t . Fatalf ( "third bulk should reuse an existing lane, got %v" , laneIDs )
}
}
func TestServerOpenBulkLogicalAutoUDPFallsBackToShared ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
if err := server . Listen ( "udp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 2 )
client . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := client . Connect ( "udp" , signalRoundTripServerAddr ( server , "" ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
logical := waitForTransferControlLogicalConn ( t , server , 2 * time . Second )
bulk , err := server . OpenBulkLogical ( context . Background ( ) , logical , BulkOpenOptions {
Mode : BulkOpenModeAuto ,
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
} )
if err != nil {
t . Fatalf ( "server OpenBulkLogical auto over udp failed: %v" , err )
}
if bulk . Snapshot ( ) . Dedicated {
t . Fatal ( "server OpenBulkLogical auto over udp should use shared" )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulkByID ( t , acceptCh , bulk . ID ( ) , 2 * time . Second )
if accepted . Dedicated {
t . Fatal ( "client accepted bulk should be shared for server auto over udp" )
}
_ = accepted . Bulk . Close ( )
}
func TestBulkDedicatedAttachConfigNormalize ( t * testing . T ) {
client := NewClient ( ) . ( * ClientCommon )
client . SetBulkDedicatedAttachConfig ( BulkDedicatedAttachConfig {
AttachLimit : - 1 ,
ActiveLimit : - 2 ,
LaneLimit : - 3 ,
Retry : - 3 ,
Backoff : 0 ,
DialTimeout : 0 ,
HelloTimeout : 0 ,
} )
cfg := client . BulkDedicatedAttachConfig ( )
if cfg . AttachLimit != 0 {
t . Fatalf ( "AttachLimit = %d, want 0" , cfg . AttachLimit )
}
if cfg . ActiveLimit != 0 {
t . Fatalf ( "ActiveLimit = %d, want 0" , cfg . ActiveLimit )
}
if cfg . LaneLimit != 0 {
t . Fatalf ( "LaneLimit = %d, want 0" , cfg . LaneLimit )
}
if cfg . Retry != 0 {
t . Fatalf ( "Retry = %d, want 0" , cfg . Retry )
}
if cfg . Backoff <= 0 || cfg . DialTimeout <= 0 || cfg . HelloTimeout <= 0 {
t . Fatalf ( "normalized timeouts should be >0: %+v" , cfg )
}
if client . bulkDedicatedAttachSem != nil {
t . Fatal ( "bulk dedicated attach semaphore should be nil when AttachLimit=0" )
}
}
func TestBulkDedicatedAttachDefaults ( t * testing . T ) {
client := NewClient ( ) . ( * ClientCommon )
cfg := client . BulkDedicatedAttachConfig ( )
if got , want := cfg . AttachLimit , 16 ; got != want {
t . Fatalf ( "default AttachLimit = %d, want %d" , got , want )
}
if got , want := cfg . ActiveLimit , 4096 ; got != want {
t . Fatalf ( "default ActiveLimit = %d, want %d" , got , want )
}
if got , want := cfg . LaneLimit , 4 ; got != want {
t . Fatalf ( "default LaneLimit = %d, want %d" , got , want )
}
client . SetBulkNetworkProfile ( BulkNetworkProfileWAN )
cfg = client . BulkDedicatedAttachConfig ( )
if got , want := cfg . AttachLimit , 2 ; got != want {
t . Fatalf ( "WAN AttachLimit = %d, want %d" , got , want )
}
if got , want := cfg . ActiveLimit , 4096 ; got != want {
t . Fatalf ( "WAN ActiveLimit = %d, want %d" , got , want )
}
if got , want := cfg . LaneLimit , 2 ; got != want {
t . Fatalf ( "WAN LaneLimit = %d, want %d" , got , want )
}
}
func TestBulkDedicatedDefaultsSupportHighLogicalConcurrency ( t * testing . T ) {
client := NewClient ( ) . ( * ClientCommon )
cfg := client . BulkDedicatedAttachConfig ( )
if cfg . ActiveLimit < 2048 {
t . Fatalf ( "default ActiveLimit = %d, want >= 2048" , cfg . ActiveLimit )
}
if cfg . LaneLimit != 4 {
t . Fatalf ( "default LaneLimit = %d, want 4" , cfg . LaneLimit )
}
const logicalBulks = 2048
for i := 0 ; i < logicalBulks ; i ++ {
if ! client . reserveBulkDedicatedActiveSlot ( ) {
t . Fatalf ( "reserveBulkDedicatedActiveSlot failed at %d/%d" , i + 1 , logicalBulks )
}
}
for i := logicalBulks ; i < cfg . ActiveLimit ; i ++ {
if ! client . reserveBulkDedicatedActiveSlot ( ) {
t . Fatalf ( "reserveBulkDedicatedActiveSlot failed before reaching configured limit at %d/%d" , i + 1 , cfg . ActiveLimit )
}
}
if client . reserveBulkDedicatedActiveSlot ( ) {
t . Fatal ( "reserveBulkDedicatedActiveSlot should stop at configured logical limit" )
}
for i := 0 ; i < cfg . ActiveLimit ; i ++ {
client . releaseBulkDedicatedActiveSlot ( )
}
laneCounts := make ( map [ uint32 ] int , cfg . LaneLimit )
for i := 0 ; i < logicalBulks ; i ++ {
laneID := client . reserveBulkDedicatedLane ( )
laneCounts [ laneID ] ++
}
if got , want := len ( laneCounts ) , cfg . LaneLimit ; got != want {
t . Fatalf ( "logical lane spread count = %d, want %d" , got , want )
}
minCount , maxCount := logicalBulks , 0
for laneID , count := range laneCounts {
if laneID == 0 {
t . Fatal ( "lane id should be normalized" )
}
if count < minCount {
minCount = count
}
if count > maxCount {
maxCount = count
}
}
if maxCount - minCount > 1 {
t . Fatalf ( "logical lane spread too uneven: min=%d max=%d counts=%v" , minCount , maxCount , laneCounts )
}
for laneID , count := range laneCounts {
for i := 0 ; i < count ; i ++ {
client . releaseBulkDedicatedLane ( laneID )
}
}
client . bulkDedicatedSidecarMu . Lock ( )
remaining := len ( client . bulkDedicatedLanes )
client . bulkDedicatedSidecarMu . Unlock ( )
if remaining != 0 {
t . Fatalf ( "bulk dedicated lanes should be released, remaining=%d" , remaining )
}
}
func TestBulkOpenTuningNormalize ( t * testing . T ) {
client := NewClient ( ) . ( * ClientCommon )
client . SetBulkOpenTuning ( BulkOpenTuning {
ChunkSize : - 1 ,
WindowBytes : 1 ,
MaxInFlight : 0 ,
} )
tuning := client . BulkOpenTuning ( )
if got , want := tuning . ChunkSize , defaultBulkChunkSize ; got != want {
t . Fatalf ( "ChunkSize = %d, want %d" , got , want )
}
if got , want := tuning . WindowBytes , defaultBulkChunkSize ; got != want {
t . Fatalf ( "WindowBytes = %d, want %d" , got , want )
}
if got , want := tuning . MaxInFlight , defaultBulkOpenMaxInFlight ; got != want {
t . Fatalf ( "MaxInFlight = %d, want %d" , got , want )
}
}
func TestBulkNetworkProfileAppliesOpenTuning ( t * testing . T ) {
client := NewClient ( ) . ( * ClientCommon )
tuning := client . BulkOpenTuning ( )
if got , want := tuning . ChunkSize , defaultBulkChunkSize ; got != want {
t . Fatalf ( "default ChunkSize = %d, want %d" , got , want )
}
if got , want := tuning . WindowBytes , defaultBulkOpenWindowBytes ; got != want {
t . Fatalf ( "default WindowBytes = %d, want %d" , got , want )
}
if got , want := tuning . MaxInFlight , defaultBulkOpenMaxInFlight ; got != want {
t . Fatalf ( "default MaxInFlight = %d, want %d" , got , want )
}
client . SetBulkNetworkProfile ( BulkNetworkProfileWAN )
tuning = client . BulkOpenTuning ( )
if got , want := tuning . ChunkSize , 512 * 1024 ; got != want {
t . Fatalf ( "WAN ChunkSize = %d, want %d" , got , want )
}
if got , want := tuning . WindowBytes , 8 * 1024 * 1024 ; got != want {
t . Fatalf ( "WAN WindowBytes = %d, want %d" , got , want )
}
if got , want := tuning . MaxInFlight , 16 ; got != want {
t . Fatalf ( "WAN MaxInFlight = %d, want %d" , got , want )
}
client . SetBulkNetworkProfile ( BulkNetworkProfileConstrained )
tuning = client . BulkOpenTuning ( )
if got , want := tuning . ChunkSize , 128 * 1024 ; got != want {
t . Fatalf ( "Constrained ChunkSize = %d, want %d" , got , want )
}
if got , want := tuning . WindowBytes , 1024 * 1024 ; got != want {
t . Fatalf ( "Constrained WindowBytes = %d, want %d" , got , want )
}
if got , want := tuning . MaxInFlight , 8 ; got != want {
t . Fatalf ( "Constrained MaxInFlight = %d, want %d" , got , want )
}
}
func TestOpenBulkAppliesClientOpenTuningDefaults ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
acceptCh := make ( chan BulkAcceptInfo , 1 )
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
acceptCh <- info
return nil
} )
if err := server . Listen ( "tcp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "tcp" , server . listener . Addr ( ) . String ( ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
client . SetBulkOpenTuning ( BulkOpenTuning {
ChunkSize : 256 * 1024 ,
WindowBytes : 2 * 1024 * 1024 ,
MaxInFlight : 7 ,
} )
tuning := client . BulkOpenTuning ( )
if got , want := tuning . ChunkSize , 256 * 1024 ; got != want {
t . Fatalf ( "client tuning ChunkSize = %d, want %d" , got , want )
}
if got , want := tuning . WindowBytes , 2 * 1024 * 1024 ; got != want {
t . Fatalf ( "client tuning WindowBytes = %d, want %d" , got , want )
}
if got , want := tuning . MaxInFlight , 7 ; got != want {
t . Fatalf ( "client tuning MaxInFlight = %d, want %d" , got , want )
}
bulk , err := client . OpenSharedBulk ( context . Background ( ) , BulkOpenOptions {
Range : BulkRange {
Offset : 0 ,
Length : 512 * 1024 ,
} ,
} )
if err != nil {
t . Fatalf ( "client OpenSharedBulk failed: %v" , err )
}
defer func ( ) {
_ = bulk . Close ( )
} ( )
accepted := waitAcceptedBulk ( t , acceptCh , 2 * time . Second )
defer func ( ) {
_ = accepted . Bulk . Close ( )
} ( )
clientSnapshot := bulk . Snapshot ( )
if got , want := clientSnapshot . ChunkSize , 256 * 1024 ; got != want {
t . Fatalf ( "client ChunkSize = %d, want %d" , got , want )
}
if got , want := clientSnapshot . WindowBytes , 2 * 1024 * 1024 ; got != want {
t . Fatalf ( "client WindowBytes = %d, want %d" , got , want )
}
if got , want := clientSnapshot . MaxInFlight , 7 ; got != want {
t . Fatalf ( "client MaxInFlight = %d, want %d" , got , want )
}
serverSnapshot := accepted . Bulk . Snapshot ( )
if got , want := serverSnapshot . ChunkSize , 256 * 1024 ; got != want {
t . Fatalf ( "server ChunkSize = %d, want %d" , got , want )
}
if got , want := serverSnapshot . WindowBytes , 2 * 1024 * 1024 ; got != want {
t . Fatalf ( "server WindowBytes = %d, want %d" , got , want )
}
if got , want := serverSnapshot . MaxInFlight , 7 ; got != want {
t . Fatalf ( "server MaxInFlight = %d, want %d" , got , want )
}
}
2026-04-15 15:24:36 +08:00
func TestServerOpenBulkLogicalDedicatedUDPRejected ( t * testing . T ) {
server := NewServer ( ) . ( * ServerCommon )
if err := UseModernPSKServer ( server , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKServer failed: %v" , err )
}
server . SetBulkHandler ( func ( info BulkAcceptInfo ) error {
return nil
} )
if err := server . Listen ( "udp" , "127.0.0.1:0" ) ; err != nil {
t . Fatalf ( "server Listen failed: %v" , err )
}
defer func ( ) {
_ = server . Stop ( )
} ( )
client := NewClient ( ) . ( * ClientCommon )
if err := UseModernPSKClient ( client , integrationSharedSecret , integrationModernPSKOptions ( ) ) ; err != nil {
t . Fatalf ( "UseModernPSKClient failed: %v" , err )
}
if err := client . Connect ( "udp" , signalRoundTripServerAddr ( server , "" ) ) ; err != nil {
t . Fatalf ( "client Connect failed: %v" , err )
}
defer func ( ) {
_ = client . Stop ( )
} ( )
logical := waitForTransferControlLogicalConn ( t , server , 2 * time . Second )
_ , err := server . OpenBulkLogical ( context . Background ( ) , logical , BulkOpenOptions {
Range : BulkRange {
Offset : 0 ,
Length : 128 ,
} ,
Dedicated : true ,
} )
if ! errors . Is ( err , errBulkDedicatedStreamOnly ) {
t . Fatalf ( "server OpenBulkLogical dedicated over udp error = %v, want %v" , err , errBulkDedicatedStreamOnly )
}
}
2026-04-18 16:05:57 +08:00
func waitAcceptedBulkByID ( t * testing . T , ch <- chan BulkAcceptInfo , id string , timeout time . Duration ) BulkAcceptInfo {
t . Helper ( )
deadline := time . Now ( ) . Add ( timeout )
for {
remain := time . Until ( deadline )
if remain <= 0 {
t . Fatalf ( "timed out waiting for accepted bulk id=%q" , id )
}
select {
case info := <- ch :
if info . ID == id {
return info
}
case <- time . After ( remain ) :
t . Fatalf ( "timed out waiting for accepted bulk id=%q" , id )
}
}
}