notify/bulk_shared_batch_test.go

459 lines
13 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"testing"
"time"
)
func TestBulkFastBatchPlainRoundTrip(t *testing.T) {
releasePayload, err := encodeBulkDedicatedReleasePayload(4096, 2)
if err != nil {
t.Fatalf("encodeBulkDedicatedReleasePayload failed: %v", err)
}
frames := []bulkFastFrame{
{
Type: bulkFastPayloadTypeData,
DataID: 11,
Seq: 7,
Payload: []byte("alpha"),
},
{
Type: bulkFastPayloadTypeRelease,
DataID: 12,
Seq: 0,
Payload: releasePayload,
},
}
wire, err := encodeBulkFastBatchPlain(frames)
if err != nil {
t.Fatalf("encodeBulkFastBatchPlain failed: %v", err)
}
decoded, matched, err := decodeBulkFastBatchPlain(wire)
if err != nil {
t.Fatalf("decodeBulkFastBatchPlain failed: %v", err)
}
if !matched {
t.Fatal("decodeBulkFastBatchPlain should match encoded batch")
}
if got, want := len(decoded), len(frames); got != want {
t.Fatalf("decoded frame count = %d, want %d", got, want)
}
for index := range frames {
if got, want := decoded[index].Type, frames[index].Type; got != want {
t.Fatalf("frame %d type = %d, want %d", index, got, want)
}
if got, want := decoded[index].DataID, frames[index].DataID; got != want {
t.Fatalf("frame %d dataID = %d, want %d", index, got, want)
}
if got, want := decoded[index].Seq, frames[index].Seq; got != want {
t.Fatalf("frame %d seq = %d, want %d", index, got, want)
}
if got, want := string(decoded[index].Payload), string(frames[index].Payload); got != want {
t.Fatalf("frame %d payload = %q, want %q", index, got, want)
}
}
}
func TestBulkBatchSenderEncodeRequestsCoalescesSharedFastV2Frames(t *testing.T) {
var (
singleCalls int
batchCalls [][]bulkFastFrame
)
sender := &bulkBatchSender{
codec: bulkBatchCodec{
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
singleCalls++
return []byte("single"), nil, nil
},
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
cloned := make([]bulkFastFrame, len(frames))
copy(cloned, frames)
batchCalls = append(batchCalls, cloned)
return []byte("batch"), nil, nil
},
},
}
payloads, err := sender.encodeRequests([]bulkBatchRequest{
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeData,
DataID: 101,
Seq: 1,
Payload: []byte("a"),
}},
fastPathVersion: bulkFastPathVersionV2,
},
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeRelease,
DataID: 101,
Seq: 0,
Payload: []byte("rel"),
}},
fastPathVersion: bulkFastPathVersionV2,
},
})
if err != nil {
t.Fatalf("encodeRequests failed: %v", err)
}
if got, want := len(payloads), 1; got != want {
t.Fatalf("payload count = %d, want %d", got, want)
}
if singleCalls != 0 {
t.Fatalf("single encode calls = %d, want 0", singleCalls)
}
if got, want := len(batchCalls), 1; got != want {
t.Fatalf("batch encode calls = %d, want %d", got, want)
}
if got, want := len(batchCalls[0]), 2; got != want {
t.Fatalf("batched item count = %d, want %d", got, want)
}
}
func TestBulkBatchSenderEncodeRequestsCoalescesAcrossRequestsAndBulks(t *testing.T) {
var (
singleCalls int
batchCalls [][]bulkFastFrame
)
sender := &bulkBatchSender{
codec: bulkBatchCodec{
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
singleCalls++
return []byte("single"), nil, nil
},
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
cloned := make([]bulkFastFrame, len(frames))
copy(cloned, frames)
batchCalls = append(batchCalls, cloned)
return []byte("batch"), nil, nil
},
},
}
payloads, err := sender.encodeRequests([]bulkBatchRequest{
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeData,
DataID: 101,
Seq: 1,
Payload: []byte("bulk-a"),
}},
fastPathVersion: bulkFastPathVersionV2,
},
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeData,
DataID: 202,
Seq: 1,
Payload: []byte("bulk-b"),
}},
fastPathVersion: bulkFastPathVersionV2,
},
})
if err != nil {
t.Fatalf("encodeRequests failed: %v", err)
}
if got, want := len(payloads), 1; got != want {
t.Fatalf("payload count = %d, want %d", got, want)
}
if singleCalls != 0 {
t.Fatalf("single encode calls = %d, want 0", singleCalls)
}
if got, want := len(batchCalls), 1; got != want {
t.Fatalf("batch encode calls = %d, want %d", got, want)
}
if got, want := len(batchCalls[0]), 2; got != want {
t.Fatalf("batched item count = %d, want %d", got, want)
}
if got, want := batchCalls[0][0].DataID, uint64(101); got != want {
t.Fatalf("first batched dataID = %d, want %d", got, want)
}
if got, want := batchCalls[0][1].DataID, uint64(202); got != want {
t.Fatalf("second batched dataID = %d, want %d", got, want)
}
}
func TestBulkBatchSenderEncodeRequestsSplitsLargeCrossBulkSuperBatch(t *testing.T) {
var batchCalls [][]bulkFastFrame
sender := &bulkBatchSender{
codec: bulkBatchCodec{
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
return []byte("single"), nil, nil
},
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
cloned := make([]bulkFastFrame, len(frames))
copy(cloned, frames)
batchCalls = append(batchCalls, cloned)
return []byte("batch"), nil, nil
},
},
}
payload := make([]byte, 768*1024)
payloads, err := sender.encodeRequests([]bulkBatchRequest{
{
frames: []bulkFastFrame{
{
Type: bulkFastPayloadTypeData,
DataID: 101,
Seq: 1,
Payload: payload,
},
{
Type: bulkFastPayloadTypeData,
DataID: 101,
Seq: 2,
Payload: payload,
},
},
fastPathVersion: bulkFastPathVersionV2,
},
{
frames: []bulkFastFrame{
{
Type: bulkFastPayloadTypeData,
DataID: 202,
Seq: 1,
Payload: payload,
},
{
Type: bulkFastPayloadTypeData,
DataID: 202,
Seq: 2,
Payload: payload,
},
},
fastPathVersion: bulkFastPathVersionV2,
},
})
if err != nil {
t.Fatalf("encodeRequests failed: %v", err)
}
if got, want := len(payloads), 2; got != want {
t.Fatalf("payload count = %d, want %d", got, want)
}
if got, want := len(batchCalls), 2; got != want {
t.Fatalf("batch encode calls = %d, want %d", got, want)
}
if got, want := len(batchCalls[0]), 2; got != want {
t.Fatalf("first payload frame count = %d, want %d", got, want)
}
if got, want := len(batchCalls[1]), 2; got != want {
t.Fatalf("second payload frame count = %d, want %d", got, want)
}
if got, want := batchCalls[0][0].DataID, uint64(101); got != want {
t.Fatalf("first payload dataID = %d, want %d", got, want)
}
if got, want := batchCalls[1][0].DataID, uint64(202); got != want {
t.Fatalf("second payload dataID = %d, want %d", got, want)
}
}
func TestBulkReleaseFastRoundTripTransport(t *testing.T) {
server := NewServer().(*ServerCommon)
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKServer failed: %v", err)
}
acceptCh := make(chan BulkAcceptInfo, 1)
server.SetBulkHandler(func(info BulkAcceptInfo) error {
acceptCh <- info
return nil
})
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
t.Fatalf("server Listen failed: %v", err)
}
defer func() {
_ = server.Stop()
}()
client := NewClient().(*ClientCommon)
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
t.Fatalf("UseModernPSKClient failed: %v", err)
}
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
t.Fatalf("client Connect failed: %v", err)
}
defer func() {
_ = client.Stop()
}()
const chunkSize = 64 * 1024
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
Range: BulkRange{Offset: 0, Length: chunkSize},
ChunkSize: chunkSize,
WindowBytes: chunkSize,
MaxInFlight: 1,
})
if err != nil {
t.Fatalf("client OpenBulk failed: %v", err)
}
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
clientHandle := bulk.(*bulkHandle)
serverHandle := accepted.Bulk.(*bulkHandle)
if got, want := clientHandle.FastPathVersion(), uint8(bulkFastPathVersionV2); got != want {
t.Fatalf("client fast path version = %d, want %d", got, want)
}
if got, want := serverHandle.FastPathVersion(), uint8(bulkFastPathVersionV2); got != want {
t.Fatalf("server fast path version = %d, want %d", got, want)
}
clientHandle.mu.Lock()
clientHandle.outboundAvailBytes = 0
clientHandle.outboundInFlight = 1
clientHandle.mu.Unlock()
releaseFn := serverBulkReleaseSender(server, accepted.LogicalConn, accepted.TransportConn)
if err := releaseFn(serverHandle, chunkSize, 1); err != nil {
t.Fatalf("server bulk release sender failed: %v", err)
}
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
clientHandle.mu.Lock()
avail := clientHandle.outboundAvailBytes
inFlight := clientHandle.outboundInFlight
clientHandle.mu.Unlock()
if avail == chunkSize && inFlight == 0 {
return
}
time.Sleep(10 * time.Millisecond)
}
clientHandle.mu.Lock()
avail := clientHandle.outboundAvailBytes
inFlight := clientHandle.outboundInFlight
clientHandle.mu.Unlock()
t.Fatalf("client outbound window not released by fast path: avail=%d inFlight=%d", avail, inFlight)
}
func TestBulkBatchSenderEncodeRequestsResetsBatchBytesAfterFlushBoundary(t *testing.T) {
var (
singleCalls int
batchCalls [][]bulkFastFrame
)
sender := &bulkBatchSender{
codec: bulkBatchCodec{
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
singleCalls++
return []byte("single"), nil, nil
},
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
cloned := make([]bulkFastFrame, len(frames))
copy(cloned, frames)
batchCalls = append(batchCalls, cloned)
return []byte("batch"), nil, nil
},
},
}
largePayload := make([]byte, bulkFastBatchMaxPlainBytes-bulkFastBatchHeaderLen-bulkFastBatchItemHeaderLen-128)
payloads, err := sender.encodeRequests([]bulkBatchRequest{
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeData,
DataID: 101,
Seq: 1,
Payload: largePayload,
}},
fastPathVersion: bulkFastPathVersionV2,
},
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeData,
DataID: 101,
Seq: 2,
Payload: []byte("sep"),
}},
fastPathVersion: bulkFastPathVersionV1,
},
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeData,
DataID: 202,
Seq: 1,
Payload: []byte("a"),
}},
fastPathVersion: bulkFastPathVersionV2,
},
{
frames: []bulkFastFrame{{
Type: bulkFastPayloadTypeData,
DataID: 202,
Seq: 2,
Payload: []byte("b"),
}},
fastPathVersion: bulkFastPathVersionV2,
},
})
if err != nil {
t.Fatalf("encodeRequests failed: %v", err)
}
if got, want := len(payloads), 3; got != want {
t.Fatalf("payload count = %d, want %d", got, want)
}
if got, want := singleCalls, 2; got != want {
t.Fatalf("single encode calls = %d, want %d", got, want)
}
if got, want := len(batchCalls), 1; got != want {
t.Fatalf("batch encode calls = %d, want %d", got, want)
}
if got, want := len(batchCalls[0]), 2; got != want {
t.Fatalf("post-flush batched frame count = %d, want %d", got, want)
}
}
func TestBulkBatchSenderEncodeRequestsUsesBindingAdaptiveSoftLimit(t *testing.T) {
binding := &transportBinding{}
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 640*time.Millisecond, 0, nil)
var batchCalls [][]bulkFastFrame
sender := &bulkBatchSender{
binding: binding,
codec: bulkBatchCodec{
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
return []byte("single"), nil, nil
},
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
cloned := make([]bulkFastFrame, len(frames))
copy(cloned, frames)
batchCalls = append(batchCalls, cloned)
return []byte("batch"), nil, nil
},
},
}
payload := make([]byte, 96*1024)
payloads, err := sender.encodeRequests([]bulkBatchRequest{
{
frames: []bulkFastFrame{
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: payload},
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 2, Payload: payload},
},
fastPathVersion: bulkFastPathVersionV2,
},
{
frames: []bulkFastFrame{
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: payload},
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 2, Payload: payload},
},
fastPathVersion: bulkFastPathVersionV2,
},
})
if err != nil {
t.Fatalf("encodeRequests failed: %v", err)
}
if got, want := len(payloads), 2; got != want {
t.Fatalf("payload count = %d, want %d", got, want)
}
if got, want := len(batchCalls), 2; got != want {
t.Fatalf("batch encode calls = %d, want %d", got, want)
}
for index, want := range []uint64{101, 202} {
if got := batchCalls[index][0].DataID; got != want {
t.Fatalf("payload %d first dataID = %d, want %d", index, got, want)
}
if got, wantItems := len(batchCalls[index]), 2; got != wantItems {
t.Fatalf("payload %d item count = %d, want %d", index, got, wantItems)
}
}
}