- make stream fast path honor adaptive soft payload limits end-to-end - split oversized fast-stream payloads into sequential frames before batching - use adaptive soft cap when encoding stream batch payloads - move timeout-like error detection into production code for adaptive tx - tune notify FrameReader read size explicitly to avoid throughput regression - drop local stario replace and depend on released b612.me/stario v0.1.1
459 lines
13 KiB
Go
459 lines
13 KiB
Go
package notify
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestBulkFastBatchPlainRoundTrip(t *testing.T) {
|
|
releasePayload, err := encodeBulkDedicatedReleasePayload(4096, 2)
|
|
if err != nil {
|
|
t.Fatalf("encodeBulkDedicatedReleasePayload failed: %v", err)
|
|
}
|
|
frames := []bulkFastFrame{
|
|
{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 11,
|
|
Seq: 7,
|
|
Payload: []byte("alpha"),
|
|
},
|
|
{
|
|
Type: bulkFastPayloadTypeRelease,
|
|
DataID: 12,
|
|
Seq: 0,
|
|
Payload: releasePayload,
|
|
},
|
|
}
|
|
wire, err := encodeBulkFastBatchPlain(frames)
|
|
if err != nil {
|
|
t.Fatalf("encodeBulkFastBatchPlain failed: %v", err)
|
|
}
|
|
decoded, matched, err := decodeBulkFastBatchPlain(wire)
|
|
if err != nil {
|
|
t.Fatalf("decodeBulkFastBatchPlain failed: %v", err)
|
|
}
|
|
if !matched {
|
|
t.Fatal("decodeBulkFastBatchPlain should match encoded batch")
|
|
}
|
|
if got, want := len(decoded), len(frames); got != want {
|
|
t.Fatalf("decoded frame count = %d, want %d", got, want)
|
|
}
|
|
for index := range frames {
|
|
if got, want := decoded[index].Type, frames[index].Type; got != want {
|
|
t.Fatalf("frame %d type = %d, want %d", index, got, want)
|
|
}
|
|
if got, want := decoded[index].DataID, frames[index].DataID; got != want {
|
|
t.Fatalf("frame %d dataID = %d, want %d", index, got, want)
|
|
}
|
|
if got, want := decoded[index].Seq, frames[index].Seq; got != want {
|
|
t.Fatalf("frame %d seq = %d, want %d", index, got, want)
|
|
}
|
|
if got, want := string(decoded[index].Payload), string(frames[index].Payload); got != want {
|
|
t.Fatalf("frame %d payload = %q, want %q", index, got, want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestBulkBatchSenderEncodeRequestsCoalescesSharedFastV2Frames(t *testing.T) {
|
|
var (
|
|
singleCalls int
|
|
batchCalls [][]bulkFastFrame
|
|
)
|
|
sender := &bulkBatchSender{
|
|
codec: bulkBatchCodec{
|
|
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
|
|
singleCalls++
|
|
return []byte("single"), nil, nil
|
|
},
|
|
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
|
|
cloned := make([]bulkFastFrame, len(frames))
|
|
copy(cloned, frames)
|
|
batchCalls = append(batchCalls, cloned)
|
|
return []byte("batch"), nil, nil
|
|
},
|
|
},
|
|
}
|
|
payloads, err := sender.encodeRequests([]bulkBatchRequest{
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 101,
|
|
Seq: 1,
|
|
Payload: []byte("a"),
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeRelease,
|
|
DataID: 101,
|
|
Seq: 0,
|
|
Payload: []byte("rel"),
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("encodeRequests failed: %v", err)
|
|
}
|
|
if got, want := len(payloads), 1; got != want {
|
|
t.Fatalf("payload count = %d, want %d", got, want)
|
|
}
|
|
if singleCalls != 0 {
|
|
t.Fatalf("single encode calls = %d, want 0", singleCalls)
|
|
}
|
|
if got, want := len(batchCalls), 1; got != want {
|
|
t.Fatalf("batch encode calls = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls[0]), 2; got != want {
|
|
t.Fatalf("batched item count = %d, want %d", got, want)
|
|
}
|
|
}
|
|
|
|
func TestBulkBatchSenderEncodeRequestsCoalescesAcrossRequestsAndBulks(t *testing.T) {
|
|
var (
|
|
singleCalls int
|
|
batchCalls [][]bulkFastFrame
|
|
)
|
|
sender := &bulkBatchSender{
|
|
codec: bulkBatchCodec{
|
|
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
|
|
singleCalls++
|
|
return []byte("single"), nil, nil
|
|
},
|
|
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
|
|
cloned := make([]bulkFastFrame, len(frames))
|
|
copy(cloned, frames)
|
|
batchCalls = append(batchCalls, cloned)
|
|
return []byte("batch"), nil, nil
|
|
},
|
|
},
|
|
}
|
|
payloads, err := sender.encodeRequests([]bulkBatchRequest{
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 101,
|
|
Seq: 1,
|
|
Payload: []byte("bulk-a"),
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 202,
|
|
Seq: 1,
|
|
Payload: []byte("bulk-b"),
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("encodeRequests failed: %v", err)
|
|
}
|
|
if got, want := len(payloads), 1; got != want {
|
|
t.Fatalf("payload count = %d, want %d", got, want)
|
|
}
|
|
if singleCalls != 0 {
|
|
t.Fatalf("single encode calls = %d, want 0", singleCalls)
|
|
}
|
|
if got, want := len(batchCalls), 1; got != want {
|
|
t.Fatalf("batch encode calls = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls[0]), 2; got != want {
|
|
t.Fatalf("batched item count = %d, want %d", got, want)
|
|
}
|
|
if got, want := batchCalls[0][0].DataID, uint64(101); got != want {
|
|
t.Fatalf("first batched dataID = %d, want %d", got, want)
|
|
}
|
|
if got, want := batchCalls[0][1].DataID, uint64(202); got != want {
|
|
t.Fatalf("second batched dataID = %d, want %d", got, want)
|
|
}
|
|
}
|
|
|
|
func TestBulkBatchSenderEncodeRequestsSplitsLargeCrossBulkSuperBatch(t *testing.T) {
|
|
var batchCalls [][]bulkFastFrame
|
|
sender := &bulkBatchSender{
|
|
codec: bulkBatchCodec{
|
|
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
|
|
return []byte("single"), nil, nil
|
|
},
|
|
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
|
|
cloned := make([]bulkFastFrame, len(frames))
|
|
copy(cloned, frames)
|
|
batchCalls = append(batchCalls, cloned)
|
|
return []byte("batch"), nil, nil
|
|
},
|
|
},
|
|
}
|
|
payload := make([]byte, 768*1024)
|
|
payloads, err := sender.encodeRequests([]bulkBatchRequest{
|
|
{
|
|
frames: []bulkFastFrame{
|
|
{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 101,
|
|
Seq: 1,
|
|
Payload: payload,
|
|
},
|
|
{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 101,
|
|
Seq: 2,
|
|
Payload: payload,
|
|
},
|
|
},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{
|
|
{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 202,
|
|
Seq: 1,
|
|
Payload: payload,
|
|
},
|
|
{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 202,
|
|
Seq: 2,
|
|
Payload: payload,
|
|
},
|
|
},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("encodeRequests failed: %v", err)
|
|
}
|
|
if got, want := len(payloads), 2; got != want {
|
|
t.Fatalf("payload count = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls), 2; got != want {
|
|
t.Fatalf("batch encode calls = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls[0]), 2; got != want {
|
|
t.Fatalf("first payload frame count = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls[1]), 2; got != want {
|
|
t.Fatalf("second payload frame count = %d, want %d", got, want)
|
|
}
|
|
if got, want := batchCalls[0][0].DataID, uint64(101); got != want {
|
|
t.Fatalf("first payload dataID = %d, want %d", got, want)
|
|
}
|
|
if got, want := batchCalls[1][0].DataID, uint64(202); got != want {
|
|
t.Fatalf("second payload dataID = %d, want %d", got, want)
|
|
}
|
|
}
|
|
|
|
func TestBulkReleaseFastRoundTripTransport(t *testing.T) {
|
|
server := NewServer().(*ServerCommon)
|
|
if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKServer failed: %v", err)
|
|
}
|
|
|
|
acceptCh := make(chan BulkAcceptInfo, 1)
|
|
server.SetBulkHandler(func(info BulkAcceptInfo) error {
|
|
acceptCh <- info
|
|
return nil
|
|
})
|
|
|
|
if err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
t.Fatalf("server Listen failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = server.Stop()
|
|
}()
|
|
|
|
client := NewClient().(*ClientCommon)
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
|
}
|
|
if err := client.Connect("tcp", server.listener.Addr().String()); err != nil {
|
|
t.Fatalf("client Connect failed: %v", err)
|
|
}
|
|
defer func() {
|
|
_ = client.Stop()
|
|
}()
|
|
|
|
const chunkSize = 64 * 1024
|
|
bulk, err := client.OpenBulk(context.Background(), BulkOpenOptions{
|
|
Range: BulkRange{Offset: 0, Length: chunkSize},
|
|
ChunkSize: chunkSize,
|
|
WindowBytes: chunkSize,
|
|
MaxInFlight: 1,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("client OpenBulk failed: %v", err)
|
|
}
|
|
|
|
accepted := waitAcceptedBulk(t, acceptCh, 2*time.Second)
|
|
clientHandle := bulk.(*bulkHandle)
|
|
serverHandle := accepted.Bulk.(*bulkHandle)
|
|
if got, want := clientHandle.FastPathVersion(), uint8(bulkFastPathVersionV2); got != want {
|
|
t.Fatalf("client fast path version = %d, want %d", got, want)
|
|
}
|
|
if got, want := serverHandle.FastPathVersion(), uint8(bulkFastPathVersionV2); got != want {
|
|
t.Fatalf("server fast path version = %d, want %d", got, want)
|
|
}
|
|
|
|
clientHandle.mu.Lock()
|
|
clientHandle.outboundAvailBytes = 0
|
|
clientHandle.outboundInFlight = 1
|
|
clientHandle.mu.Unlock()
|
|
|
|
releaseFn := serverBulkReleaseSender(server, accepted.LogicalConn, accepted.TransportConn)
|
|
if err := releaseFn(serverHandle, chunkSize, 1); err != nil {
|
|
t.Fatalf("server bulk release sender failed: %v", err)
|
|
}
|
|
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
clientHandle.mu.Lock()
|
|
avail := clientHandle.outboundAvailBytes
|
|
inFlight := clientHandle.outboundInFlight
|
|
clientHandle.mu.Unlock()
|
|
if avail == chunkSize && inFlight == 0 {
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
clientHandle.mu.Lock()
|
|
avail := clientHandle.outboundAvailBytes
|
|
inFlight := clientHandle.outboundInFlight
|
|
clientHandle.mu.Unlock()
|
|
t.Fatalf("client outbound window not released by fast path: avail=%d inFlight=%d", avail, inFlight)
|
|
}
|
|
|
|
func TestBulkBatchSenderEncodeRequestsResetsBatchBytesAfterFlushBoundary(t *testing.T) {
|
|
var (
|
|
singleCalls int
|
|
batchCalls [][]bulkFastFrame
|
|
)
|
|
sender := &bulkBatchSender{
|
|
codec: bulkBatchCodec{
|
|
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
|
|
singleCalls++
|
|
return []byte("single"), nil, nil
|
|
},
|
|
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
|
|
cloned := make([]bulkFastFrame, len(frames))
|
|
copy(cloned, frames)
|
|
batchCalls = append(batchCalls, cloned)
|
|
return []byte("batch"), nil, nil
|
|
},
|
|
},
|
|
}
|
|
largePayload := make([]byte, bulkFastBatchMaxPlainBytes-bulkFastBatchHeaderLen-bulkFastBatchItemHeaderLen-128)
|
|
payloads, err := sender.encodeRequests([]bulkBatchRequest{
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 101,
|
|
Seq: 1,
|
|
Payload: largePayload,
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 101,
|
|
Seq: 2,
|
|
Payload: []byte("sep"),
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV1,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 202,
|
|
Seq: 1,
|
|
Payload: []byte("a"),
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{{
|
|
Type: bulkFastPayloadTypeData,
|
|
DataID: 202,
|
|
Seq: 2,
|
|
Payload: []byte("b"),
|
|
}},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("encodeRequests failed: %v", err)
|
|
}
|
|
if got, want := len(payloads), 3; got != want {
|
|
t.Fatalf("payload count = %d, want %d", got, want)
|
|
}
|
|
if got, want := singleCalls, 2; got != want {
|
|
t.Fatalf("single encode calls = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls), 1; got != want {
|
|
t.Fatalf("batch encode calls = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls[0]), 2; got != want {
|
|
t.Fatalf("post-flush batched frame count = %d, want %d", got, want)
|
|
}
|
|
}
|
|
|
|
func TestBulkBatchSenderEncodeRequestsUsesBindingAdaptiveSoftLimit(t *testing.T) {
|
|
binding := &transportBinding{}
|
|
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 640*time.Millisecond, 0, nil)
|
|
var batchCalls [][]bulkFastFrame
|
|
sender := &bulkBatchSender{
|
|
binding: binding,
|
|
codec: bulkBatchCodec{
|
|
encodeSingle: func(frame bulkFastFrame) ([]byte, func(), error) {
|
|
return []byte("single"), nil, nil
|
|
},
|
|
encodeBatch: func(frames []bulkFastFrame) ([]byte, func(), error) {
|
|
cloned := make([]bulkFastFrame, len(frames))
|
|
copy(cloned, frames)
|
|
batchCalls = append(batchCalls, cloned)
|
|
return []byte("batch"), nil, nil
|
|
},
|
|
},
|
|
}
|
|
payload := make([]byte, 96*1024)
|
|
payloads, err := sender.encodeRequests([]bulkBatchRequest{
|
|
{
|
|
frames: []bulkFastFrame{
|
|
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: payload},
|
|
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 2, Payload: payload},
|
|
},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{
|
|
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: payload},
|
|
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 2, Payload: payload},
|
|
},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("encodeRequests failed: %v", err)
|
|
}
|
|
if got, want := len(payloads), 2; got != want {
|
|
t.Fatalf("payload count = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batchCalls), 2; got != want {
|
|
t.Fatalf("batch encode calls = %d, want %d", got, want)
|
|
}
|
|
for index, want := range []uint64{101, 202} {
|
|
if got := batchCalls[index][0].DataID; got != want {
|
|
t.Fatalf("payload %d first dataID = %d, want %d", index, got, want)
|
|
}
|
|
if got, wantItems := len(batchCalls[index]), 2; got != wantItems {
|
|
t.Fatalf("payload %d item count = %d, want %d", index, got, wantItems)
|
|
}
|
|
}
|
|
}
|