notify/bulk_dedicated_lane_sender_test.go

197 lines
6.0 KiB
Go
Raw Permalink Normal View History

package notify
import (
"bytes"
"context"
"testing"
"time"
)
func TestBulkDedicatedLaneBatchRequestPrepareBorrowedSharesItems(t *testing.T) {
req := getBulkDedicatedLaneBatchRequest()
defer req.recycle()
items := []bulkDedicatedSendRequest{{
Type: bulkFastPayloadTypeData,
Seq: 7,
Payload: []byte("hello"),
}}
req.prepare(context.Background(), 11, items, true, true)
if got, want := len(req.Items), 1; got != want {
t.Fatalf("prepared item count = %d, want %d", got, want)
}
if &req.Items[0] != &items[0] {
t.Fatal("prepare with borrowed items should share request items")
}
}
func TestBulkDedicatedLaneSenderTryDirectSubmitWriteFlushesWholePayload(t *testing.T) {
conn := &shortWriteBulkRecordConn{maxPerWrite: 1024}
encodeCalls := 0
sender := &bulkDedicatedLaneSender{
conn: conn,
stopCh: make(chan struct{}),
encode: func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
encodeCalls++
payload, err := encodeBulkDedicatedBatchesPlain(batches)
return payload, nil, err
},
}
payload := bytes.Repeat([]byte("a"), 3*defaultBulkChunkSize)
submitted, written, err := sender.tryDirectSubmitWrite(context.Background(), 9, 1, payload, defaultBulkChunkSize)
if err != nil {
t.Fatalf("tryDirectSubmitWrite error = %v", err)
}
if !submitted {
t.Fatal("tryDirectSubmitWrite should submit directly")
}
if got, want := written, len(payload); got != want {
t.Fatalf("written = %d, want %d", got, want)
}
if encodeCalls == 0 {
t.Fatal("encode should be called at least once")
}
if got := sender.queued.Load(); got != 0 {
t.Fatalf("queued requests = %d, want 0", got)
}
}
func TestBulkDedicatedLaneSenderCollectBatchRequestsBatchesAcrossDataIDs(t *testing.T) {
sender := &bulkDedicatedLaneSender{
reqCh: make(chan *bulkDedicatedLaneBatchRequest, 3),
stopCh: make(chan struct{}),
}
first := &bulkDedicatedLaneBatchRequest{
Ctx: context.Background(),
DataID: 7,
Items: []bulkDedicatedSendRequest{{
Type: bulkFastPayloadTypeData,
Seq: 1,
Payload: []byte("a"),
}},
Deadline: time.Now().Add(2 * time.Second),
}
if !first.tryStart() {
t.Fatal("first request should start")
}
second := &bulkDedicatedLaneBatchRequest{
Ctx: context.Background(),
DataID: 7,
Items: []bulkDedicatedSendRequest{{
Type: bulkFastPayloadTypeData,
Seq: 2,
Payload: []byte("b"),
}},
Deadline: time.Now().Add(time.Second),
}
third := &bulkDedicatedLaneBatchRequest{
Ctx: context.Background(),
DataID: 8,
Items: []bulkDedicatedSendRequest{{
Type: bulkFastPayloadTypeData,
Seq: 3,
Payload: []byte("c"),
}},
Deadline: time.Now().Add(3 * time.Second),
}
sender.reqCh <- second
sender.reqCh <- third
batchReqs := []*bulkDedicatedLaneBatchRequest{first}
batches := []bulkDedicatedOutboundBatch{{
DataID: first.DataID,
Items: first.Items,
}}
batchBytes := bulkDedicatedBatchesPlainLen(batches)
deadline := first.Deadline
carry, err := sender.collectBatchRequests(&batchReqs, &batches, &batchBytes, &deadline)
if err != nil {
t.Fatalf("collectBatchRequests error = %v", err)
}
if carry != nil {
t.Fatalf("carry request = %+v, want nil", carry)
}
if len(batchReqs) != 3 {
t.Fatalf("batched request count = %d, want 3", len(batchReqs))
}
if len(batches) != 2 {
t.Fatalf("batched group count = %d, want 2", len(batches))
}
if got, want := batches[0].DataID, uint64(7); got != want {
t.Fatalf("first batch dataID = %d, want %d", got, want)
}
if got, want := len(batches[0].Items), 2; got != want {
t.Fatalf("first batch item count = %d, want %d", got, want)
}
if got, want := batches[1].DataID, uint64(8); got != want {
t.Fatalf("second batch dataID = %d, want %d", got, want)
}
if got, want := len(batches[1].Items), 1; got != want {
t.Fatalf("second batch item count = %d, want %d", got, want)
}
if got, want := batches[0].Items[0].Seq, uint64(1); got != want {
t.Fatalf("first batch seq = %d, want %d", got, want)
}
if got, want := batches[0].Items[1].Seq, uint64(2); got != want {
t.Fatalf("second batch seq = %d, want %d", got, want)
}
if got, want := batches[1].Items[0].Seq, uint64(3); got != want {
t.Fatalf("third batch seq = %d, want %d", got, want)
}
if !deadline.Equal(second.Deadline) {
t.Fatalf("merged deadline = %v, want %v", deadline, second.Deadline)
}
}
func TestDedicatedBulkSuperBatchRoundTrip(t *testing.T) {
batches := []bulkDedicatedOutboundBatch{
{
DataID: 7,
Items: []bulkDedicatedSendRequest{
{Type: bulkFastPayloadTypeData, Seq: 1, Payload: []byte("alpha")},
{Type: bulkFastPayloadTypeData, Seq: 2, Payload: []byte("beta")},
},
},
{
DataID: 8,
Items: []bulkDedicatedSendRequest{
{Type: bulkFastPayloadTypeClose, Flags: bulkFastPayloadFlagFullClose, Seq: 3},
{Type: bulkFastPayloadTypeRelease, Seq: 4, Payload: []byte{1, 2, 3, 4}},
},
},
}
plain, err := encodeBulkDedicatedBatchesPlain(batches)
if err != nil {
t.Fatalf("encodeBulkDedicatedBatchesPlain error = %v", err)
}
got, err := decodeDedicatedBulkInboundBatches(plain)
if err != nil {
t.Fatalf("decodeDedicatedBulkInboundBatches error = %v", err)
}
if len(got) != 2 {
t.Fatalf("decoded batch count = %d, want 2", len(got))
}
if got[0].DataID != 7 || got[1].DataID != 8 {
t.Fatalf("decoded dataIDs = %d,%d, want 7,8", got[0].DataID, got[1].DataID)
}
if len(got[0].Items) != 2 || len(got[1].Items) != 2 {
t.Fatalf("decoded item counts = %d,%d, want 2,2", len(got[0].Items), len(got[1].Items))
}
if !bytes.Equal(got[0].Items[0].Payload, []byte("alpha")) || !bytes.Equal(got[0].Items[1].Payload, []byte("beta")) {
t.Fatalf("decoded first batch payloads mismatch: %+v", got[0].Items)
}
if got[1].Items[0].Type != bulkFastPayloadTypeClose || got[1].Items[0].Flags != bulkFastPayloadFlagFullClose {
t.Fatalf("decoded close item mismatch: %+v", got[1].Items[0])
}
if got[1].Items[1].Type != bulkFastPayloadTypeRelease || !bytes.Equal(got[1].Items[1].Payload, []byte{1, 2, 3, 4}) {
t.Fatalf("decoded release item mismatch: %+v", got[1].Items[1])
}
}