- make stream fast path honor adaptive soft payload limits end-to-end - split oversized fast-stream payloads into sequential frames before batching - use adaptive soft cap when encoding stream batch payloads - move timeout-like error detection into production code for adaptive tx - tune notify FrameReader read size explicitly to avoid throughput regression - drop local stario replace and depend on released b612.me/stario v0.1.1
161 lines
6.5 KiB
Go
161 lines
6.5 KiB
Go
package notify
|
|
|
|
import (
|
|
"b612.me/stario"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestTransportBindingAdaptiveBulkSoftPayloadStartsAggressive(t *testing.T) {
|
|
binding := &transportBinding{}
|
|
if got, want := binding.bulkAdaptiveSoftPayloadBytesSnapshot(), bulkAdaptiveSoftPayloadStartBytes; got != want {
|
|
t.Fatalf("adaptive bulk soft payload = %d, want %d", got, want)
|
|
}
|
|
}
|
|
|
|
func TestTransportBindingAdaptiveBulkSoftPayloadShrinksAfterSlowWrite(t *testing.T) {
|
|
binding := &transportBinding{}
|
|
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 640*time.Millisecond, 0, nil)
|
|
if got, want := binding.bulkAdaptiveSoftPayloadBytesSnapshot(), bulkAdaptiveSoftPayloadMinBytes; got != want {
|
|
t.Fatalf("adaptive bulk soft payload = %d, want %d", got, want)
|
|
}
|
|
}
|
|
|
|
func TestTransportBindingAdaptiveBulkSoftPayloadRecoversAfterGoodWrites(t *testing.T) {
|
|
binding := &transportBinding{}
|
|
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 640*time.Millisecond, 0, nil)
|
|
samples := bulkAdaptiveSoftPayloadGrowSuccesses * (len(bulkAdaptiveSoftPayloadSteps) - 1)
|
|
for i := 0; i < samples; i++ {
|
|
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 6*time.Millisecond, 0, nil)
|
|
}
|
|
if got, want := binding.bulkAdaptiveSoftPayloadBytesSnapshot(), bulkAdaptiveSoftPayloadStartBytes; got != want {
|
|
t.Fatalf("adaptive bulk soft payload = %d, want %d", got, want)
|
|
}
|
|
}
|
|
|
|
type delayedWriteConn struct {
|
|
delay time.Duration
|
|
}
|
|
|
|
func (c *delayedWriteConn) Read([]byte) (int, error) { return 0, net.ErrClosed }
|
|
func (c *delayedWriteConn) Write(p []byte) (int, error) { time.Sleep(c.delay); return len(p), nil }
|
|
func (c *delayedWriteConn) Close() error { return nil }
|
|
func (c *delayedWriteConn) LocalAddr() net.Addr { return nil }
|
|
func (c *delayedWriteConn) RemoteAddr() net.Addr { return nil }
|
|
func (c *delayedWriteConn) SetDeadline(time.Time) error { return nil }
|
|
func (c *delayedWriteConn) SetReadDeadline(time.Time) error { return nil }
|
|
func (c *delayedWriteConn) SetWriteDeadline(time.Time) error { return nil }
|
|
|
|
func TestBulkBatchSenderFlushAdaptsToSlowWrites(t *testing.T) {
|
|
binding := newTransportBinding(&delayedWriteConn{delay: 20 * time.Millisecond}, stario.NewQueue())
|
|
sender := newTestBulkBatchSender(binding)
|
|
defer sender.stop()
|
|
|
|
payload := make([]byte, 512*1024)
|
|
err := sender.flush([]bulkBatchRequest{
|
|
{
|
|
frames: []bulkFastFrame{
|
|
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: payload},
|
|
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 2, Payload: payload},
|
|
},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
{
|
|
frames: []bulkFastFrame{
|
|
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: payload},
|
|
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 2, Payload: payload},
|
|
},
|
|
fastPathVersion: bulkFastPathVersionV2,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("flush failed: %v", err)
|
|
}
|
|
if got := binding.bulkAdaptiveSoftPayloadBytesSnapshot(); got >= bulkAdaptiveSoftPayloadStartBytes {
|
|
t.Fatalf("adaptive bulk soft payload = %d, want smaller than %d after slow write", got, bulkAdaptiveSoftPayloadStartBytes)
|
|
}
|
|
}
|
|
|
|
func TestTransportBindingAdaptiveStreamStartsAggressive(t *testing.T) {
|
|
binding := &transportBinding{}
|
|
if got, want := binding.streamAdaptiveSoftPayloadBytesSnapshot(), streamAdaptiveSoftPayloadStartBytes; got != want {
|
|
t.Fatalf("adaptive stream soft payload = %d, want %d", got, want)
|
|
}
|
|
if got, want := binding.streamAdaptiveWaitThresholdBytesSnapshot(), streamBatchWaitThreshold; got != want {
|
|
t.Fatalf("adaptive stream wait threshold = %d, want %d", got, want)
|
|
}
|
|
if got, want := binding.streamAdaptiveFlushDelaySnapshot(), streamBatchMaxFlushDelay; got != want {
|
|
t.Fatalf("adaptive stream flush delay = %s, want %s", got, want)
|
|
}
|
|
}
|
|
|
|
func TestTransportBindingAdaptiveStreamShrinksAfterSlowWrite(t *testing.T) {
|
|
binding := &transportBinding{}
|
|
binding.observeStreamAdaptivePayloadWrite(2*1024*1024, 640*time.Millisecond, 0, nil)
|
|
if got, want := binding.streamAdaptiveSoftPayloadBytesSnapshot(), streamAdaptiveSoftPayloadMinBytes; got != want {
|
|
t.Fatalf("adaptive stream soft payload = %d, want %d", got, want)
|
|
}
|
|
if got, want := binding.streamAdaptiveWaitThresholdBytesSnapshot(), streamAdaptiveWaitThresholdMinBytes; got != want {
|
|
t.Fatalf("adaptive stream wait threshold = %d, want %d", got, want)
|
|
}
|
|
if got := binding.streamAdaptiveFlushDelaySnapshot(); got != 0 {
|
|
t.Fatalf("adaptive stream flush delay = %s, want 0", got)
|
|
}
|
|
}
|
|
|
|
func TestTransportBindingAdaptiveStreamRecoversAfterGoodWrites(t *testing.T) {
|
|
binding := &transportBinding{}
|
|
binding.observeStreamAdaptivePayloadWrite(2*1024*1024, 640*time.Millisecond, 0, nil)
|
|
samples := streamAdaptiveSoftPayloadGrowSuccesses * (len(streamAdaptiveSoftPayloadSteps) - 1)
|
|
for i := 0; i < samples; i++ {
|
|
binding.observeStreamAdaptivePayloadWrite(2*1024*1024, 6*time.Millisecond, 0, nil)
|
|
}
|
|
if got, want := binding.streamAdaptiveSoftPayloadBytesSnapshot(), streamAdaptiveSoftPayloadStartBytes; got != want {
|
|
t.Fatalf("adaptive stream soft payload = %d, want %d", got, want)
|
|
}
|
|
if got, want := binding.streamAdaptiveWaitThresholdBytesSnapshot(), streamBatchWaitThreshold; got != want {
|
|
t.Fatalf("adaptive stream wait threshold = %d, want %d", got, want)
|
|
}
|
|
if got, want := binding.streamAdaptiveFlushDelaySnapshot(), streamBatchMaxFlushDelay; got != want {
|
|
t.Fatalf("adaptive stream flush delay = %s, want %s", got, want)
|
|
}
|
|
}
|
|
|
|
func TestStreamBatchSenderFlushAdaptsToSlowWrites(t *testing.T) {
|
|
binding := newTransportBinding(&delayedWriteConn{delay: 20 * time.Millisecond}, stario.NewQueue())
|
|
sender := newTestStreamBatchSender(binding, nil)
|
|
defer sender.stop()
|
|
|
|
payload := make([]byte, 512*1024)
|
|
err := sender.flush([]streamBatchRequest{
|
|
{
|
|
frame: streamFastDataFrame{
|
|
DataID: 101,
|
|
Seq: 1,
|
|
Payload: payload,
|
|
},
|
|
hasFrame: true,
|
|
fastPathVersion: streamFastPathVersionV2,
|
|
},
|
|
{
|
|
frame: streamFastDataFrame{
|
|
DataID: 202,
|
|
Seq: 1,
|
|
Payload: payload,
|
|
},
|
|
hasFrame: true,
|
|
fastPathVersion: streamFastPathVersionV2,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("flush failed: %v", err)
|
|
}
|
|
if got := binding.streamAdaptiveSoftPayloadBytesSnapshot(); got >= streamAdaptiveSoftPayloadStartBytes {
|
|
t.Fatalf("adaptive stream soft payload = %d, want smaller than %d after slow write", got, streamAdaptiveSoftPayloadStartBytes)
|
|
}
|
|
if got := binding.streamAdaptiveWaitThresholdBytesSnapshot(); got >= streamBatchWaitThreshold {
|
|
t.Fatalf("adaptive stream wait threshold = %d, want smaller than %d after slow write", got, streamBatchWaitThreshold)
|
|
}
|
|
}
|