package notify import ( "context" "testing" "time" ) func TestBulkFastBatchPlainRoundTrip(t *testing.T) { releasePayload, err := encodeBulkDedicatedReleasePayload(4096, 2) if err != nil { t.Fatalf("encodeBulkDedicatedReleasePayload failed: %v", err) } frames := []bulkFastFrame{ { Type: bulkFastPayloadTypeData, DataID: 11, Seq: 7, Payload: []byte("alpha"), }, { Type: bulkFastPayloadTypeRelease, DataID: 12, Seq: 0, Payload: releasePayload, }, } wire, err := encodeBulkFastBatchPlain(frames) if err != nil { t.Fatalf("encodeBulkFastBatchPlain failed: %v", err) } decoded, matched, err := decodeBulkFastBatchPlain(wire) if err != nil { t.Fatalf("decodeBulkFastBatchPlain failed: %v", err) } if !matched { t.Fatal("decodeBulkFastBatchPlain should match encoded batch") } if got, want := len(decoded), len(frames); got != want { t.Fatalf("decoded frame count = %d, want %d", got, want) } for index := range frames { if got, want := decoded[index].Type, frames[index].Type; got != want { t.Fatalf("frame %d type = %d, want %d", index, got, want) } if got, want := decoded[index].DataID, frames[index].DataID; got != want { t.Fatalf("frame %d dataID = %d, want %d", index, got, want) } if got, want := decoded[index].Seq, frames[index].Seq; got != want { t.Fatalf("frame %d seq = %d, want %d", index, got, want) } if got, want := string(decoded[index].Payload), string(frames[index].Payload); got != want { t.Fatalf("frame %d payload = %q, want %q", index, got, want) } } } func TestBulkBatchSenderEncodeRequestsCoalescesSharedFastV2Frames(t *testing.T) { var ( singleCalls int batchCalls [][]bulkFastFrame ) sender := &bulkBatchSender{ codec: bulkBatchCodec{ encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) { singleCalls++ return []byte("single"), nil, nil }, encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) { cloned := make([]bulkFastFrame, len(frames)) copy(cloned, frames) batchCalls = append(batchCalls, cloned) return []byte("batch"), nil, nil }, }, } payloads, err := sender.encodeRequests([]bulkBatchRequest{ { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: []byte("a"), }}, fastPathVersion: bulkFastPathVersionV2, }, { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeRelease, DataID: 101, Seq: 0, Payload: []byte("rel"), }}, fastPathVersion: bulkFastPathVersionV2, }, }) if err != nil { t.Fatalf("encodeRequests failed: %v", err) } if got, want := len(payloads), 1; got != want { t.Fatalf("payload count = %d, want %d", got, want) } if singleCalls != 0 { t.Fatalf("single encode calls = %d, want 0", singleCalls) } if got, want := len(batchCalls), 1; got != want { t.Fatalf("batch encode calls = %d, want %d", got, want) } if got, want := len(batchCalls[0]), 2; got != want { t.Fatalf("batched item count = %d, want %d", got, want) } } func TestBulkBatchSenderEncodeRequestsCoalescesAcrossRequestsAndBulks(t *testing.T) { var ( singleCalls int batchCalls [][]bulkFastFrame ) sender := &bulkBatchSender{ codec: bulkBatchCodec{ encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) { singleCalls++ return []byte("single"), nil, nil }, encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) { cloned := make([]bulkFastFrame, len(frames)) copy(cloned, frames) batchCalls = append(batchCalls, cloned) return []byte("batch"), nil, nil }, }, } payloads, err := sender.encodeRequests([]bulkBatchRequest{ { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: []byte("bulk-a"), }}, fastPathVersion: bulkFastPathVersionV2, }, { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: []byte("bulk-b"), }}, fastPathVersion: bulkFastPathVersionV2, }, }) if err != nil { t.Fatalf("encodeRequests failed: %v", err) } if got, want := len(payloads), 1; got != want { t.Fatalf("payload count = %d, want %d", got, want) } if singleCalls != 0 { t.Fatalf("single encode calls = %d, want 0", singleCalls) } if got, want := len(batchCalls), 1; got != want { t.Fatalf("batch encode calls = %d, want %d", got, want) } if got, want := len(batchCalls[0]), 2; got != want { t.Fatalf("batched item count = %d, want %d", got, want) } if got, want := batchCalls[0][0].DataID, uint64(101); got != want { t.Fatalf("first batched dataID = %d, want %d", got, want) } if got, want := batchCalls[0][1].DataID, uint64(202); got != want { t.Fatalf("second batched dataID = %d, want %d", got, want) } } func TestBulkBatchSenderEncodeRequestsSplitsLargeCrossBulkSuperBatch(t *testing.T) { var batchCalls [][]bulkFastFrame sender := &bulkBatchSender{ codec: bulkBatchCodec{ encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) { return []byte("single"), nil, nil }, encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) { cloned := make([]bulkFastFrame, len(frames)) copy(cloned, frames) batchCalls = append(batchCalls, cloned) return []byte("batch"), nil, nil }, }, } payload := make([]byte, 768*1024) payloads, err := sender.encodeRequests([]bulkBatchRequest{ { frames: []bulkFastFrame{ { Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: payload, }, { Type: bulkFastPayloadTypeData, DataID: 101, Seq: 2, Payload: payload, }, }, fastPathVersion: bulkFastPathVersionV2, }, { frames: []bulkFastFrame{ { Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: payload, }, { Type: bulkFastPayloadTypeData, DataID: 202, Seq: 2, Payload: payload, }, }, fastPathVersion: bulkFastPathVersionV2, }, }) if err != nil { t.Fatalf("encodeRequests failed: %v", err) } if got, want := len(payloads), 2; got != want { t.Fatalf("payload count = %d, want %d", got, want) } if got, want := len(batchCalls), 2; got != want { t.Fatalf("batch encode calls = %d, want %d", got, want) } if got, want := len(batchCalls[0]), 2; got != want { t.Fatalf("first payload frame count = %d, want %d", got, want) } if got, want := len(batchCalls[1]), 2; got != want { t.Fatalf("second payload frame count = %d, want %d", got, want) } if got, want := batchCalls[0][0].DataID, uint64(101); got != want { t.Fatalf("first payload dataID = %d, want %d", got, want) } if got, want := batchCalls[1][0].DataID, uint64(202); got != want { t.Fatalf("second payload dataID = %d, want %d", got, want) } } func TestBulkReleaseFastRoundTripTransport(t *testing.T) { server := NewServer().(*ServerCommon) if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKServer failed: %v", err) } acceptCh := make(chan BulkAcceptInfo, 1) server.SetBulkHandler(func(info BulkAcceptInfo) error { acceptCh <- info return nil }) if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { t.Fatalf("server Listen failed: %v", err) } defer func() { _ = server.Stop() }() client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { t.Fatalf("client Connect failed: %v", err) } defer func() { _ = client.Stop() }() const chunkSize = 64 * 1024 bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{ Range: BulkRange{Offset: 0, Length: chunkSize}, ChunkSize: chunkSize, WindowBytes: chunkSize, MaxInFlight: 1, }) if err != nil { t.Fatalf("client OpenBulk failed: %v", err) } accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second) clientHandle := bulk.(*bulkHandle) serverHandle := accepted.Bulk.(*bulkHandle) if got, want := clientHandle.FastPathVersion(), uint8(bulkFastPathVersionV2); got != want { t.Fatalf("client fast path version = %d, want %d", got, want) } if got, want := serverHandle.FastPathVersion(), uint8(bulkFastPathVersionV2); got != want { t.Fatalf("server fast path version = %d, want %d", got, want) } clientHandle.mu.Lock() clientHandle.outboundAvailBytes = 0 clientHandle.outboundInFlight = 1 clientHandle.mu.Unlock() releaseFn := serverBulkReleaseSender(server, accepted.LogicalConn, accepted.TransportConn) if err := releaseFn(serverHandle, chunkSize, 1); err != nil { t.Fatalf("server bulk release sender failed: %v", err) } deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { clientHandle.mu.Lock() avail := clientHandle.outboundAvailBytes inFlight := clientHandle.outboundInFlight clientHandle.mu.Unlock() if avail == chunkSize && inFlight == 0 { return } time.Sleep(10 * time.Millisecond) } clientHandle.mu.Lock() avail := clientHandle.outboundAvailBytes inFlight := clientHandle.outboundInFlight clientHandle.mu.Unlock() t.Fatalf("client outbound window not released by fast path: avail=%d inFlight=%d", avail, inFlight) } func TestBulkBatchSenderEncodeRequestsResetsBatchBytesAfterFlushBoundary(t *testing.T) { var ( singleCalls int batchCalls [][]bulkFastFrame ) sender := &bulkBatchSender{ codec: bulkBatchCodec{ encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) { singleCalls++ return []byte("single"), nil, nil }, encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) { cloned := make([]bulkFastFrame, len(frames)) copy(cloned, frames) batchCalls = append(batchCalls, cloned) return []byte("batch"), nil, nil }, }, } largePayload := make([]byte, bulkFastBatchMaxPlainBytes-bulkFastBatchHeaderLen-bulkFastBatchItemHeaderLen-128) payloads, err := sender.encodeRequests([]bulkBatchRequest{ { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: largePayload, }}, fastPathVersion: bulkFastPathVersionV2, }, { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeData, DataID: 101, Seq: 2, Payload: []byte("sep"), }}, fastPathVersion: bulkFastPathVersionV1, }, { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: []byte("a"), }}, fastPathVersion: bulkFastPathVersionV2, }, { frames: []bulkFastFrame{{ Type: bulkFastPayloadTypeData, DataID: 202, Seq: 2, Payload: []byte("b"), }}, fastPathVersion: bulkFastPathVersionV2, }, }) if err != nil { t.Fatalf("encodeRequests failed: %v", err) } if got, want := len(payloads), 3; got != want { t.Fatalf("payload count = %d, want %d", got, want) } if got, want := singleCalls, 2; got != want { t.Fatalf("single encode calls = %d, want %d", got, want) } if got, want := len(batchCalls), 1; got != want { t.Fatalf("batch encode calls = %d, want %d", got, want) } if got, want := len(batchCalls[0]), 2; got != want { t.Fatalf("post-flush batched frame count = %d, want %d", got, want) } } func TestBulkBatchSenderEncodeRequestsUsesBindingAdaptiveSoftLimit(t *testing.T) { binding := &transportBinding{} binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 640*time.Millisecond, 0, nil) var batchCalls [][]bulkFastFrame sender := &bulkBatchSender{ binding: binding, codec: bulkBatchCodec{ encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) { return []byte("single"), nil, nil }, encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) { cloned := make([]bulkFastFrame, len(frames)) copy(cloned, frames) batchCalls = append(batchCalls, cloned) return []byte("batch"), nil, nil }, }, } payload := make([]byte, 96*1024) payloads, err := sender.encodeRequests([]bulkBatchRequest{ { frames: []bulkFastFrame{ {Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: payload}, {Type: bulkFastPayloadTypeData, DataID: 101, Seq: 2, Payload: payload}, }, fastPathVersion: bulkFastPathVersionV2, }, { frames: []bulkFastFrame{ {Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: payload}, {Type: bulkFastPayloadTypeData, DataID: 202, Seq: 2, Payload: payload}, }, fastPathVersion: bulkFastPathVersionV2, }, }) if err != nil { t.Fatalf("encodeRequests failed: %v", err) } if got, want := len(payloads), 2; got != want { t.Fatalf("payload count = %d, want %d", got, want) } if got, want := len(batchCalls), 2; got != want { t.Fatalf("batch encode calls = %d, want %d", got, want) } for index, want := range []uint64{101, 202} { if got := batchCalls[index][0].DataID; got != want { t.Fatalf("payload %d first dataID = %d, want %d", index, got, want) } if got, wantItems := len(batchCalls[index]), 2; got != wantItems { t.Fatalf("payload %d item count = %d, want %d", index, got, wantItems) } } }