package notify import ( "bytes" "context" "testing" "time" ) func TestBulkDedicatedLaneBatchRequestPrepareBorrowedSharesItems(t *testing.T) { req := getBulkDedicatedLaneBatchRequest() defer req.recycle() items := []bulkDedicatedSendRequest{{ Type: bulkFastPayloadTypeData, Seq: 7, Payload: []byte("hello"), }} req.prepare(context.Background(), 11, items, true, true) if got, want := len(req.Items), 1; got != want { t.Fatalf("prepared item count = %d, want %d", got, want) } if &req.Items[0] != &items[0] { t.Fatal("prepare with borrowed items should share request items") } } func TestBulkDedicatedLaneSenderTryDirectSubmitWriteFlushesWholePayload(t *testing.T) { conn := &shortWriteBulkRecordConn{maxPerWrite: 1024} encodeCalls := 0 sender := &bulkDedicatedLaneSender{ conn: conn, stopCh: make(chan struct{}), encode: func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) { encodeCalls++ payload, err := encodeBulkDedicatedBatchesPlain(batches) return payload, nil, err }, } payload := bytes.Repeat([]byte("a"), 3*defaultBulkChunkSize) submitted, written, err := sender.tryDirectSubmitWrite(context.Background(), 9, 1, payload, defaultBulkChunkSize) if err != nil { t.Fatalf("tryDirectSubmitWrite error = %v", err) } if !submitted { t.Fatal("tryDirectSubmitWrite should submit directly") } if got, want := written, len(payload); got != want { t.Fatalf("written = %d, want %d", got, want) } if encodeCalls == 0 { t.Fatal("encode should be called at least once") } if got := sender.queued.Load(); got != 0 { t.Fatalf("queued requests = %d, want 0", got) } } func TestBulkDedicatedLaneSenderCollectBatchRequestsBatchesAcrossDataIDs(t *testing.T) { sender := &bulkDedicatedLaneSender{ reqCh: make(chan *bulkDedicatedLaneBatchRequest, 3), stopCh: make(chan struct{}), } first := &bulkDedicatedLaneBatchRequest{ Ctx: context.Background(), DataID: 7, Items: []bulkDedicatedSendRequest{{ Type: bulkFastPayloadTypeData, Seq: 1, Payload: []byte("a"), }}, Deadline: time.Now().Add(2 * time.Second), } if !first.tryStart() { t.Fatal("first request should start") } second := &bulkDedicatedLaneBatchRequest{ Ctx: context.Background(), DataID: 7, Items: []bulkDedicatedSendRequest{{ Type: bulkFastPayloadTypeData, Seq: 2, Payload: []byte("b"), }}, Deadline: time.Now().Add(time.Second), } third := &bulkDedicatedLaneBatchRequest{ Ctx: context.Background(), DataID: 8, Items: []bulkDedicatedSendRequest{{ Type: bulkFastPayloadTypeData, Seq: 3, Payload: []byte("c"), }}, Deadline: time.Now().Add(3 * time.Second), } sender.reqCh <- second sender.reqCh <- third batchReqs := []*bulkDedicatedLaneBatchRequest{first} batches := []bulkDedicatedOutboundBatch{{ DataID: first.DataID, Items: first.Items, }} batchBytes := bulkDedicatedBatchesPlainLen(batches) deadline := first.Deadline carry, err := sender.collectBatchRequests(&batchReqs, &batches, &batchBytes, &deadline) if err != nil { t.Fatalf("collectBatchRequests error = %v", err) } if carry != nil { t.Fatalf("carry request = %+v, want nil", carry) } if len(batchReqs) != 3 { t.Fatalf("batched request count = %d, want 3", len(batchReqs)) } if len(batches) != 2 { t.Fatalf("batched group count = %d, want 2", len(batches)) } if got, want := batches[0].DataID, uint64(7); got != want { t.Fatalf("first batch dataID = %d, want %d", got, want) } if got, want := len(batches[0].Items), 2; got != want { t.Fatalf("first batch item count = %d, want %d", got, want) } if got, want := batches[1].DataID, uint64(8); got != want { t.Fatalf("second batch dataID = %d, want %d", got, want) } if got, want := len(batches[1].Items), 1; got != want { t.Fatalf("second batch item count = %d, want %d", got, want) } if got, want := batches[0].Items[0].Seq, uint64(1); got != want { t.Fatalf("first batch seq = %d, want %d", got, want) } if got, want := batches[0].Items[1].Seq, uint64(2); got != want { t.Fatalf("second batch seq = %d, want %d", got, want) } if got, want := batches[1].Items[0].Seq, uint64(3); got != want { t.Fatalf("third batch seq = %d, want %d", got, want) } if !deadline.Equal(second.Deadline) { t.Fatalf("merged deadline = %v, want %v", deadline, second.Deadline) } } func TestDedicatedBulkSuperBatchRoundTrip(t *testing.T) { batches := []bulkDedicatedOutboundBatch{ { DataID: 7, Items: []bulkDedicatedSendRequest{ {Type: bulkFastPayloadTypeData, Seq: 1, Payload: []byte("alpha")}, {Type: bulkFastPayloadTypeData, Seq: 2, Payload: []byte("beta")}, }, }, { DataID: 8, Items: []bulkDedicatedSendRequest{ {Type: bulkFastPayloadTypeClose, Flags: bulkFastPayloadFlagFullClose, Seq: 3}, {Type: bulkFastPayloadTypeRelease, Seq: 4, Payload: []byte{1, 2, 3, 4}}, }, }, } plain, err := encodeBulkDedicatedBatchesPlain(batches) if err != nil { t.Fatalf("encodeBulkDedicatedBatchesPlain error = %v", err) } got, err := decodeDedicatedBulkInboundBatches(plain) if err != nil { t.Fatalf("decodeDedicatedBulkInboundBatches error = %v", err) } if len(got) != 2 { t.Fatalf("decoded batch count = %d, want 2", len(got)) } if got[0].DataID != 7 || got[1].DataID != 8 { t.Fatalf("decoded dataIDs = %d,%d, want 7,8", got[0].DataID, got[1].DataID) } if len(got[0].Items) != 2 || len(got[1].Items) != 2 { t.Fatalf("decoded item counts = %d,%d, want 2,2", len(got[0].Items), len(got[1].Items)) } if !bytes.Equal(got[0].Items[0].Payload, []byte("alpha")) || !bytes.Equal(got[0].Items[1].Payload, []byte("beta")) { t.Fatalf("decoded first batch payloads mismatch: %+v", got[0].Items) } if got[1].Items[0].Type != bulkFastPayloadTypeClose || got[1].Items[0].Flags != bulkFastPayloadFlagFullClose { t.Fatalf("decoded close item mismatch: %+v", got[1].Items[0]) } if got[1].Items[1].Type != bulkFastPayloadTypeRelease || !bytes.Equal(got[1].Items[1].Payload, []byte{1, 2, 3, 4}) { t.Fatalf("decoded release item mismatch: %+v", got[1].Items[1]) } }