package notify import ( "context" "errors" "io" "sync" "testing" "time" ) func BenchmarkStreamTCPThroughput(b *testing.B) { cases := []struct { name string payloadSize int cfg StreamConfig }{ { name: "default_64KiB", payloadSize: 64 * 1024, }, { name: "tuned_256KiB", payloadSize: 256 * 1024, cfg: StreamConfig{ ChunkSize: 256 * 1024, InboundQueueLimit: 256, InboundBufferedBytesLimit: 32 * 1024 * 1024, OutboundWindowBytes: 8 * 1024 * 1024, OutboundMaxInFlightChunks: 32, }, }, { name: "tuned_512KiB", payloadSize: 512 * 1024, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 256, InboundBufferedBytesLimit: 64 * 1024 * 1024, OutboundWindowBytes: 16 * 1024 * 1024, OutboundMaxInFlightChunks: 32, }, }, { name: "tuned_1MiB", payloadSize: 1024 * 1024, cfg: StreamConfig{ ChunkSize: 1024 * 1024, InboundQueueLimit: 256, InboundBufferedBytesLimit: 64 * 1024 * 1024, OutboundWindowBytes: 16 * 1024 * 1024, OutboundMaxInFlightChunks: 32, }, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkStreamTCPThroughput(b, tc.payloadSize, tc.cfg, benchmarkTransportSecurityModernPSK) }) } } func BenchmarkStreamTCPThroughputTrustedRaw(b *testing.B) { cases := []struct { name string payloadSize int cfg StreamConfig }{ { name: "default_64KiB", payloadSize: 64 * 1024, }, { name: "tuned_256KiB", payloadSize: 256 * 1024, cfg: StreamConfig{ ChunkSize: 256 * 1024, InboundQueueLimit: 256, InboundBufferedBytesLimit: 32 * 1024 * 1024, OutboundWindowBytes: 8 * 1024 * 1024, OutboundMaxInFlightChunks: 32, }, }, { name: "tuned_512KiB", payloadSize: 512 * 1024, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 256, InboundBufferedBytesLimit: 64 * 1024 * 1024, OutboundWindowBytes: 16 * 1024 * 1024, OutboundMaxInFlightChunks: 32, }, }, { name: "tuned_1MiB", payloadSize: 1024 * 1024, cfg: StreamConfig{ ChunkSize: 1024 * 1024, InboundQueueLimit: 256, InboundBufferedBytesLimit: 64 * 1024 * 1024, OutboundWindowBytes: 16 * 1024 * 1024, OutboundMaxInFlightChunks: 32, }, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkStreamTCPThroughput(b, tc.payloadSize, tc.cfg, benchmarkTransportSecurityTrustedRaw) }) } } func BenchmarkStreamTCPThroughputConcurrent(b *testing.B) { cases := []struct { name string payloadSize int concurrency int cfg StreamConfig }{ { name: "streams_2_512KiB", payloadSize: 512 * 1024, concurrency: 2, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 512, InboundBufferedBytesLimit: 128 * 1024 * 1024, OutboundWindowBytes: 32 * 1024 * 1024, OutboundMaxInFlightChunks: 64, }, }, { name: "streams_4_512KiB", payloadSize: 512 * 1024, concurrency: 4, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 1024, InboundBufferedBytesLimit: 256 * 1024 * 1024, OutboundWindowBytes: 64 * 1024 * 1024, OutboundMaxInFlightChunks: 128, }, }, { name: "streams_8_512KiB", payloadSize: 512 * 1024, concurrency: 8, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 2048, InboundBufferedBytesLimit: 512 * 1024 * 1024, OutboundWindowBytes: 128 * 1024 * 1024, OutboundMaxInFlightChunks: 256, }, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkStreamTCPThroughputConcurrent(b, tc.payloadSize, tc.concurrency, tc.cfg, benchmarkTransportSecurityModernPSK) }) } } func BenchmarkStreamTCPThroughputConcurrentTrustedRaw(b *testing.B) { cases := []struct { name string payloadSize int concurrency int cfg StreamConfig }{ { name: "streams_2_512KiB", payloadSize: 512 * 1024, concurrency: 2, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 512, InboundBufferedBytesLimit: 128 * 1024 * 1024, OutboundWindowBytes: 32 * 1024 * 1024, OutboundMaxInFlightChunks: 64, }, }, { name: "streams_4_512KiB", payloadSize: 512 * 1024, concurrency: 4, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 1024, InboundBufferedBytesLimit: 256 * 1024 * 1024, OutboundWindowBytes: 64 * 1024 * 1024, OutboundMaxInFlightChunks: 128, }, }, { name: "streams_8_512KiB", payloadSize: 512 * 1024, concurrency: 8, cfg: StreamConfig{ ChunkSize: 512 * 1024, InboundQueueLimit: 2048, InboundBufferedBytesLimit: 512 * 1024 * 1024, OutboundWindowBytes: 128 * 1024 * 1024, OutboundMaxInFlightChunks: 256, }, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkStreamTCPThroughputConcurrent(b, tc.payloadSize, tc.concurrency, tc.cfg, benchmarkTransportSecurityTrustedRaw) }) } } func benchmarkStreamTCPThroughput(b *testing.B, payloadSize int, cfg StreamConfig, securityMode benchmarkTransportSecurityMode) { b.Helper() server := NewServer().(*ServerCommon) server.SetStreamConfig(cfg) benchmarkApplyServerTransportSecurity(b, server, securityMode) acceptCh := make(chan StreamAcceptInfo, 1) server.SetStreamHandler(func(info StreamAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", benchmarkTCPListenAddr(b)); err != nil { b.Fatalf("server Listen failed: %v", err) } b.Cleanup(func() { _ = server.Stop() }) client := NewClient().(*ClientCommon) client.SetStreamConfig(cfg) benchmarkApplyClientTransportSecurity(b, client, securityMode) if err := client.Connect("tcp", benchmarkTCPDialAddr(b, server.listener.Addr().String())); err != nil { b.Fatalf("client Connect failed: %v", err) } b.Cleanup(func() { _ = client.Stop() }) stream, err := client.OpenStream(context.Background(), StreamOpenOptions{Channel: StreamDataChannel}) if err != nil { b.Fatalf("client OpenStream failed: %v", err) } accepted := waitBenchmarkAcceptedStream(b, acceptCh, 5*time.Second) drainDone := make(chan error, 1) go func() { _, err := io.Copy(io.Discard, accepted.Stream) if err != nil && !errors.Is(err, io.EOF) { drainDone <- err return } drainDone <- nil }() payload := make([]byte, payloadSize) for i := range payload { payload[i] = byte(i) } b.ReportAllocs() b.SetBytes(int64(payloadSize)) b.ResetTimer() for i := 0; i < b.N; i++ { n, err := stream.Write(payload) if err != nil { b.Fatalf("stream Write failed at iter %d: %v", i, err) } if n != len(payload) { b.Fatalf("stream Write bytes mismatch at iter %d: got %d want %d", i, n, len(payload)) } } b.StopTimer() if err := stream.CloseWrite(); err != nil { b.Fatalf("stream CloseWrite failed: %v", err) } select { case err := <-drainDone: if err != nil { b.Fatalf("server drain failed: %v", err) } case <-time.After(10 * time.Second): b.Fatal("timed out waiting for server drain") } _ = accepted.Stream.Close() _ = stream.Close() } func benchmarkStreamTCPThroughputConcurrent(b *testing.B, payloadSize int, concurrency int, cfg StreamConfig, securityMode benchmarkTransportSecurityMode) { b.Helper() if concurrency <= 0 { b.Fatal("concurrency must be > 0") } server := NewServer().(*ServerCommon) server.SetStreamConfig(cfg) benchmarkApplyServerTransportSecurity(b, server, securityMode) acceptCh := make(chan StreamAcceptInfo, concurrency*2) server.SetStreamHandler(func(info StreamAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", benchmarkTCPListenAddr(b)); err != nil { b.Fatalf("server Listen failed: %v", err) } b.Cleanup(func() { _ = server.Stop() }) client := NewClient().(*ClientCommon) client.SetStreamConfig(cfg) benchmarkApplyClientTransportSecurity(b, client, securityMode) if err := client.Connect("tcp", benchmarkTCPDialAddr(b, server.listener.Addr().String())); err != nil { b.Fatalf("client Connect failed: %v", err) } b.Cleanup(func() { _ = client.Stop() }) streams := make([]Stream, 0, concurrency) acceptedStreams := make([]Stream, 0, concurrency) for index := 0; index < concurrency; index++ { stream, err := client.OpenStream(context.Background(), StreamOpenOptions{Channel: StreamDataChannel}) if err != nil { b.Fatalf("client OpenStream failed for stream %d: %v", index, err) } streams = append(streams, stream) accepted := waitBenchmarkAcceptedStream(b, acceptCh, 5*time.Second) acceptedStreams = append(acceptedStreams, accepted.Stream) } drainDone := make(chan error, concurrency) for _, acceptedStream := range acceptedStreams { stream := acceptedStream go func() { _, err := io.Copy(io.Discard, stream) if err != nil && !errors.Is(err, io.EOF) { drainDone <- err return } drainDone <- nil }() } payload := make([]byte, payloadSize) for i := range payload { payload[i] = byte(i) } b.ReportAllocs() b.SetBytes(int64(payloadSize)) b.ResetTimer() var wg sync.WaitGroup errCh := make(chan error, concurrency) for index, stream := range streams { count := b.N / concurrency if index < b.N%concurrency { count++ } wg.Add(1) go func(stream Stream, count int) { defer wg.Done() for i := 0; i < count; i++ { n, err := stream.Write(payload) if err != nil { errCh <- err return } if n != len(payload) { errCh <- errors.New("stream write bytes mismatch") return } } }(stream, count) } wg.Wait() close(errCh) for err := range errCh { if err != nil { b.Fatalf("concurrent stream write failed: %v", err) } } b.StopTimer() for index, stream := range streams { if err := stream.CloseWrite(); err != nil { b.Fatalf("stream %d CloseWrite failed: %v", index, err) } } for index := 0; index < concurrency; index++ { select { case err := <-drainDone: if err != nil { b.Fatalf("server drain failed: %v", err) } case <-time.After(10 * time.Second): b.Fatalf("timed out waiting for server drain %d/%d", index+1, concurrency) } } for _, stream := range acceptedStreams { _ = stream.Close() } for _, stream := range streams { _ = stream.Close() } } func waitBenchmarkAcceptedStream(tb testing.TB, ch <-chan StreamAcceptInfo, timeout time.Duration) StreamAcceptInfo { tb.Helper() select { case info := <-ch: return info case <-time.After(timeout): tb.Fatalf("timed out waiting for accepted stream after %v", timeout) return StreamAcceptInfo{} } }