package notify import ( "context" "errors" "io" "sync" "testing" "time" ) func BenchmarkBulkTCPThroughput(b *testing.B) { cases := []struct { name string payloadSize int }{ { name: "chunk_256KiB", payloadSize: 256 * 1024, }, { name: "chunk_512KiB", payloadSize: 512 * 1024, }, { name: "chunk_768KiB", payloadSize: 768 * 1024, }, { name: "chunk_1MiB", payloadSize: 1024 * 1024, }, { name: "chunk_2MiB", payloadSize: 2 * 1024 * 1024, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughput(b, tc.payloadSize, false, benchmarkTransportSecurityModernPSK) }) } } func BenchmarkBulkTCPThroughputTrustedRaw(b *testing.B) { cases := []struct { name string payloadSize int }{ {name: "chunk_256KiB", payloadSize: 256 * 1024}, {name: "chunk_512KiB", payloadSize: 512 * 1024}, {name: "chunk_768KiB", payloadSize: 768 * 1024}, {name: "chunk_1MiB", payloadSize: 1024 * 1024}, {name: "chunk_2MiB", payloadSize: 2 * 1024 * 1024}, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughput(b, tc.payloadSize, false, benchmarkTransportSecurityTrustedRaw) }) } } func BenchmarkBulkTCPThroughputDedicated(b *testing.B) { cases := []struct { name string payloadSize int }{ { name: "chunk_256KiB", payloadSize: 256 * 1024, }, { name: "chunk_512KiB", payloadSize: 512 * 1024, }, { name: "chunk_768KiB", payloadSize: 768 * 1024, }, { name: "chunk_1MiB", payloadSize: 1024 * 1024, }, { name: "chunk_2MiB", payloadSize: 2 * 1024 * 1024, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughput(b, tc.payloadSize, true, benchmarkTransportSecurityModernPSK) }) } } func BenchmarkBulkTCPThroughputDedicatedTrustedRaw(b *testing.B) { cases := []struct { name string payloadSize int }{ {name: "chunk_256KiB", payloadSize: 256 * 1024}, {name: "chunk_512KiB", payloadSize: 512 * 1024}, {name: "chunk_768KiB", payloadSize: 768 * 1024}, {name: "chunk_1MiB", payloadSize: 1024 * 1024}, {name: "chunk_2MiB", payloadSize: 2 * 1024 * 1024}, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughput(b, tc.payloadSize, true, benchmarkTransportSecurityTrustedRaw) }) } } func BenchmarkBulkTCPThroughputConcurrent(b *testing.B) { cases := []struct { name string payloadSize int concurrency int }{ { name: "bulks_2_512KiB", payloadSize: 512 * 1024, concurrency: 2, }, { name: "bulks_4_512KiB", payloadSize: 512 * 1024, concurrency: 4, }, { name: "bulks_2_1MiB", payloadSize: 1024 * 1024, concurrency: 2, }, { name: "bulks_4_1MiB", payloadSize: 1024 * 1024, concurrency: 4, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughputConcurrent(b, tc.payloadSize, tc.concurrency, false, benchmarkTransportSecurityModernPSK) }) } } func BenchmarkBulkTCPThroughputConcurrentTrustedRaw(b *testing.B) { cases := []struct { name string payloadSize int concurrency int }{ {name: "bulks_2_512KiB", payloadSize: 512 * 1024, concurrency: 2}, {name: "bulks_4_512KiB", payloadSize: 512 * 1024, concurrency: 4}, {name: "bulks_2_1MiB", payloadSize: 1024 * 1024, concurrency: 2}, {name: "bulks_4_1MiB", payloadSize: 1024 * 1024, concurrency: 4}, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughputConcurrent(b, tc.payloadSize, tc.concurrency, false, benchmarkTransportSecurityTrustedRaw) }) } } func BenchmarkBulkTCPThroughputConcurrentDedicated(b *testing.B) { cases := []struct { name string payloadSize int concurrency int }{ { name: "bulks_2_512KiB", payloadSize: 512 * 1024, concurrency: 2, }, { name: "bulks_4_512KiB", payloadSize: 512 * 1024, concurrency: 4, }, { name: "bulks_2_1MiB", payloadSize: 1024 * 1024, concurrency: 2, }, { name: "bulks_4_1MiB", payloadSize: 1024 * 1024, concurrency: 4, }, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughputConcurrent(b, tc.payloadSize, tc.concurrency, true, benchmarkTransportSecurityModernPSK) }) } } func BenchmarkBulkTCPThroughputConcurrentDedicatedTrustedRaw(b *testing.B) { cases := []struct { name string payloadSize int concurrency int }{ {name: "bulks_2_512KiB", payloadSize: 512 * 1024, concurrency: 2}, {name: "bulks_4_512KiB", payloadSize: 512 * 1024, concurrency: 4}, {name: "bulks_2_1MiB", payloadSize: 1024 * 1024, concurrency: 2}, {name: "bulks_4_1MiB", payloadSize: 1024 * 1024, concurrency: 4}, } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { benchmarkBulkTCPThroughputConcurrent(b, tc.payloadSize, tc.concurrency, true, benchmarkTransportSecurityTrustedRaw) }) } } func benchmarkBulkTCPThroughput(b *testing.B, payloadSize int, dedicated bool, securityMode benchmarkTransportSecurityMode) { b.Helper() server := NewServer().(*ServerCommon) benchmarkApplyServerTransportSecurity(b, server, securityMode) acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", benchmarkTCPListenAddr(b)); err != nil { b.Fatalf("server Listen failed: %v", err) } b.Cleanup(func() { _ = server.Stop() }) client := NewClient().(*ClientCommon) benchmarkApplyClientTransportSecurity(b, client, securityMode) if err := client.Connect("tcp", benchmarkTCPDialAddr(b, server.listener.Addr().String())); err != nil { b.Fatalf("client Connect failed: %v", err) } b.Cleanup(func() { _ = client.Stop() }) totalBytes := int64(payloadSize) if b.N > 1 { totalBytes = int64(payloadSize) * int64(b.N) } bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{ Offset: 0, Length: totalBytes, }, ChunkSize: payloadSize, Dedicated: dedicated, }) if err != nil { b.Fatalf("client OpenBulk failed: %v", err) } accepted := waitBenchmarkAcceptedBulk(b, acceptCh, 5*time.Second) drainDone := make(chan error, 1) go func() { _, err := io.Copy(io.Discard, accepted.Bulk) if err != nil && !errors.Is(err, io.EOF) { drainDone <- err return } drainDone <- nil }() payload := make([]byte, payloadSize) for i := range payload { payload[i] = byte(i) } b.ReportAllocs() b.SetBytes(int64(payloadSize)) b.ResetTimer() for i := 0; i < b.N; i++ { n, err := bulk.Write(payload) if err != nil { b.Fatalf("bulk Write failed at iter %d: %v", i, err) } if n != len(payload) { b.Fatalf("bulk Write bytes mismatch at iter %d: got %d want %d", i, n, len(payload)) } } b.StopTimer() if err := bulk.CloseWrite(); err != nil { b.Fatalf("bulk CloseWrite failed: %v", err) } select { case err := <-drainDone: if err != nil { b.Fatalf("server drain failed: %v", err) } case <-time.After(10 * time.Second): b.Fatal("timed out waiting for server drain") } _ = accepted.Bulk.Close() _ = bulk.Close() } func benchmarkBulkTCPThroughputConcurrent(b *testing.B, payloadSize int, concurrency int, dedicated bool, securityMode benchmarkTransportSecurityMode) { b.Helper() if concurrency <= 0 { b.Fatal("concurrency must be > 0") } server := NewServer().(*ServerCommon) benchmarkApplyServerTransportSecurity(b, server, securityMode) acceptCh := make(chan BulkAcceptInfo, concurrency*2) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", benchmarkTCPListenAddr(b)); err != nil { b.Fatalf("server Listen failed: %v", err) } b.Cleanup(func() { _ = server.Stop() }) client := NewClient().(*ClientCommon) benchmarkApplyClientTransportSecurity(b, client, securityMode) if err := client.Connect("tcp", benchmarkTCPDialAddr(b, server.listener.Addr().String())); err != nil { b.Fatalf("client Connect failed: %v", err) } b.Cleanup(func() { _ = client.Stop() }) bulks := make([]Bulk, 0, concurrency) acceptedBulks := make([]Bulk, 0, concurrency) totalBytes := int64(payloadSize) if b.N > 1 { totalBytes = int64(payloadSize) * int64(b.N) } for index := 0; index < concurrency; index++ { bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{ Offset: int64(index) * totalBytes, Length: totalBytes, }, ChunkSize: payloadSize, Dedicated: dedicated, }) if err != nil { b.Fatalf("client OpenBulk failed for bulk %d: %v", index, err) } bulks = append(bulks, bulk) accepted := waitBenchmarkAcceptedBulk(b, acceptCh, 5*time.Second) acceptedBulks = append(acceptedBulks, accepted.Bulk) } drainDone := make(chan error, concurrency) for _, acceptedBulk := range acceptedBulks { bulk := acceptedBulk go func() { _, err := io.Copy(io.Discard, bulk) if err != nil && !errors.Is(err, io.EOF) { drainDone <- err return } drainDone <- nil }() } payload := make([]byte, payloadSize) for i := range payload { payload[i] = byte(i) } b.ReportAllocs() b.SetBytes(int64(payloadSize)) b.ResetTimer() var wg sync.WaitGroup errCh := make(chan error, concurrency) for index, bulk := range bulks { count := b.N / concurrency if index < b.N%concurrency { count++ } wg.Add(1) go func(bulk Bulk, count int) { defer wg.Done() for i := 0; i < count; i++ { n, err := bulk.Write(payload) if err != nil { errCh <- err return } if n != len(payload) { errCh <- errors.New("bulk write bytes mismatch") return } } }(bulk, count) } wg.Wait() close(errCh) for err := range errCh { if err != nil { b.Fatalf("concurrent bulk write failed: %v", err) } } b.StopTimer() for index, bulk := range bulks { if err := bulk.CloseWrite(); err != nil { b.Fatalf("bulk %d CloseWrite failed: %v", index, err) } } for index := 0; index < concurrency; index++ { select { case err := <-drainDone: if err != nil { b.Fatalf("server drain failed: %v", err) } case <-time.After(10 * time.Second): b.Fatalf("timed out waiting for server drain %d/%d", index+1, concurrency) } } for _, bulk := range acceptedBulks { _ = bulk.Close() } for _, bulk := range bulks { _ = bulk.Close() } } func waitBenchmarkAcceptedBulk(tb testing.TB, ch <-chan BulkAcceptInfo, timeout time.Duration) BulkAcceptInfo { tb.Helper() select { case info := <-ch: return info case <-time.After(timeout): tb.Fatalf("timed out waiting for accepted bulk after %v", timeout) return BulkAcceptInfo{} } }