notify/transport_binding_adaptive_test.go

161 lines
6.5 KiB
Go
Raw Permalink Normal View History

package notify
import (
"b612.me/stario"
"net"
"testing"
"time"
)
func TestTransportBindingAdaptiveBulkSoftPayloadStartsAggressive(t *testing.T) {
binding := &transportBinding{}
if got, want := binding.bulkAdaptiveSoftPayloadBytesSnapshot(), bulkAdaptiveSoftPayloadStartBytes; got != want {
t.Fatalf("adaptive bulk soft payload = %d, want %d", got, want)
}
}
func TestTransportBindingAdaptiveBulkSoftPayloadShrinksAfterSlowWrite(t *testing.T) {
binding := &transportBinding{}
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 640*time.Millisecond, 0, nil)
if got, want := binding.bulkAdaptiveSoftPayloadBytesSnapshot(), bulkAdaptiveSoftPayloadMinBytes; got != want {
t.Fatalf("adaptive bulk soft payload = %d, want %d", got, want)
}
}
func TestTransportBindingAdaptiveBulkSoftPayloadRecoversAfterGoodWrites(t *testing.T) {
binding := &transportBinding{}
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 640*time.Millisecond, 0, nil)
samples := bulkAdaptiveSoftPayloadGrowSuccesses * (len(bulkAdaptiveSoftPayloadSteps) - 1)
for i := 0; i < samples; i++ {
binding.observeBulkAdaptivePayloadWrite(8*1024*1024, 6*time.Millisecond, 0, nil)
}
if got, want := binding.bulkAdaptiveSoftPayloadBytesSnapshot(), bulkAdaptiveSoftPayloadStartBytes; got != want {
t.Fatalf("adaptive bulk soft payload = %d, want %d", got, want)
}
}
type delayedWriteConn struct {
delay time.Duration
}
func (c *delayedWriteConn) Read([]byte) (int, error) { return 0, net.ErrClosed }
func (c *delayedWriteConn) Write(p []byte) (int, error) { time.Sleep(c.delay); return len(p), nil }
func (c *delayedWriteConn) Close() error { return nil }
func (c *delayedWriteConn) LocalAddr() net.Addr { return nil }
func (c *delayedWriteConn) RemoteAddr() net.Addr { return nil }
func (c *delayedWriteConn) SetDeadline(time.Time) error { return nil }
func (c *delayedWriteConn) SetReadDeadline(time.Time) error { return nil }
func (c *delayedWriteConn) SetWriteDeadline(time.Time) error { return nil }
func TestBulkBatchSenderFlushAdaptsToSlowWrites(t *testing.T) {
binding := newTransportBinding(&delayedWriteConn{delay: 20 * time.Millisecond}, stario.NewQueue())
sender := newTestBulkBatchSender(binding)
defer sender.stop()
payload := make([]byte, 512*1024)
err := sender.flush([]bulkBatchRequest{
{
frames: []bulkFastFrame{
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 1, Payload: payload},
{Type: bulkFastPayloadTypeData, DataID: 101, Seq: 2, Payload: payload},
},
fastPathVersion: bulkFastPathVersionV2,
},
{
frames: []bulkFastFrame{
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 1, Payload: payload},
{Type: bulkFastPayloadTypeData, DataID: 202, Seq: 2, Payload: payload},
},
fastPathVersion: bulkFastPathVersionV2,
},
})
if err != nil {
t.Fatalf("flush failed: %v", err)
}
if got := binding.bulkAdaptiveSoftPayloadBytesSnapshot(); got >= bulkAdaptiveSoftPayloadStartBytes {
t.Fatalf("adaptive bulk soft payload = %d, want smaller than %d after slow write", got, bulkAdaptiveSoftPayloadStartBytes)
}
}
func TestTransportBindingAdaptiveStreamStartsAggressive(t *testing.T) {
binding := &transportBinding{}
if got, want := binding.streamAdaptiveSoftPayloadBytesSnapshot(), streamAdaptiveSoftPayloadStartBytes; got != want {
t.Fatalf("adaptive stream soft payload = %d, want %d", got, want)
}
if got, want := binding.streamAdaptiveWaitThresholdBytesSnapshot(), streamBatchWaitThreshold; got != want {
t.Fatalf("adaptive stream wait threshold = %d, want %d", got, want)
}
if got, want := binding.streamAdaptiveFlushDelaySnapshot(), streamBatchMaxFlushDelay; got != want {
t.Fatalf("adaptive stream flush delay = %s, want %s", got, want)
}
}
func TestTransportBindingAdaptiveStreamShrinksAfterSlowWrite(t *testing.T) {
binding := &transportBinding{}
binding.observeStreamAdaptivePayloadWrite(2*1024*1024, 640*time.Millisecond, 0, nil)
if got, want := binding.streamAdaptiveSoftPayloadBytesSnapshot(), streamAdaptiveSoftPayloadMinBytes; got != want {
t.Fatalf("adaptive stream soft payload = %d, want %d", got, want)
}
if got, want := binding.streamAdaptiveWaitThresholdBytesSnapshot(), streamAdaptiveWaitThresholdMinBytes; got != want {
t.Fatalf("adaptive stream wait threshold = %d, want %d", got, want)
}
if got := binding.streamAdaptiveFlushDelaySnapshot(); got != 0 {
t.Fatalf("adaptive stream flush delay = %s, want 0", got)
}
}
func TestTransportBindingAdaptiveStreamRecoversAfterGoodWrites(t *testing.T) {
binding := &transportBinding{}
binding.observeStreamAdaptivePayloadWrite(2*1024*1024, 640*time.Millisecond, 0, nil)
samples := streamAdaptiveSoftPayloadGrowSuccesses * (len(streamAdaptiveSoftPayloadSteps) - 1)
for i := 0; i < samples; i++ {
binding.observeStreamAdaptivePayloadWrite(2*1024*1024, 6*time.Millisecond, 0, nil)
}
if got, want := binding.streamAdaptiveSoftPayloadBytesSnapshot(), streamAdaptiveSoftPayloadStartBytes; got != want {
t.Fatalf("adaptive stream soft payload = %d, want %d", got, want)
}
if got, want := binding.streamAdaptiveWaitThresholdBytesSnapshot(), streamBatchWaitThreshold; got != want {
t.Fatalf("adaptive stream wait threshold = %d, want %d", got, want)
}
if got, want := binding.streamAdaptiveFlushDelaySnapshot(), streamBatchMaxFlushDelay; got != want {
t.Fatalf("adaptive stream flush delay = %s, want %s", got, want)
}
}
func TestStreamBatchSenderFlushAdaptsToSlowWrites(t *testing.T) {
binding := newTransportBinding(&delayedWriteConn{delay: 20 * time.Millisecond}, stario.NewQueue())
sender := newTestStreamBatchSender(binding, nil)
defer sender.stop()
payload := make([]byte, 512*1024)
err := sender.flush([]streamBatchRequest{
{
frame: streamFastDataFrame{
DataID: 101,
Seq: 1,
Payload: payload,
},
hasFrame: true,
fastPathVersion: streamFastPathVersionV2,
},
{
frame: streamFastDataFrame{
DataID: 202,
Seq: 1,
Payload: payload,
},
hasFrame: true,
fastPathVersion: streamFastPathVersionV2,
},
})
if err != nil {
t.Fatalf("flush failed: %v", err)
}
if got := binding.streamAdaptiveSoftPayloadBytesSnapshot(); got >= streamAdaptiveSoftPayloadStartBytes {
t.Fatalf("adaptive stream soft payload = %d, want smaller than %d after slow write", got, streamAdaptiveSoftPayloadStartBytes)
}
if got := binding.streamAdaptiveWaitThresholdBytesSnapshot(); got >= streamBatchWaitThreshold {
t.Fatalf("adaptive stream wait threshold = %d, want smaller than %d after slow write", got, streamBatchWaitThreshold)
}
}