notify/stream_flow_test.go

156 lines
3.5 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"sync"
"testing"
"time"
)
func TestStreamFlowControllerBlocksUntilWindowReleased(t *testing.T) {
controller := newStreamFlowController(streamConfig{
ChunkSize: 4,
InboundQueueLimit: 1,
InboundBufferedBytesLimit: 4,
OutboundWindowBytes: 4,
OutboundMaxInFlightChunks: 1,
})
releaseFirst, err := controller.acquire(context.Background(), 4)
if err != nil {
t.Fatalf("first acquire failed: %v", err)
}
defer releaseFirst()
secondDone := make(chan error, 1)
go func() {
releaseSecond, err := controller.acquire(context.Background(), 4)
if err == nil && releaseSecond != nil {
releaseSecond()
}
secondDone <- err
}()
select {
case err := <-secondDone:
t.Fatalf("second acquire should block, got err = %v", err)
case <-time.After(80 * time.Millisecond):
}
releaseFirst()
select {
case err := <-secondDone:
if err != nil {
t.Fatalf("second acquire failed after release: %v", err)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for second acquire")
}
}
func TestStreamFlowControllerAdmitsRequestsFIFO(t *testing.T) {
controller := newStreamFlowController(streamConfig{
ChunkSize: 4,
InboundQueueLimit: 1,
InboundBufferedBytesLimit: 4,
OutboundWindowBytes: 4,
OutboundMaxInFlightChunks: 1,
})
releaseFirst, err := controller.acquire(context.Background(), 4)
if err != nil {
t.Fatalf("first acquire failed: %v", err)
}
orderCh := make(chan int, 2)
releaseCh := make(chan struct{})
startAcquire := func(id int, start <-chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
<-start
release, err := controller.acquire(context.Background(), 4)
if err != nil {
t.Errorf("acquire %d failed: %v", id, err)
return
}
orderCh <- id
<-releaseCh
release()
}()
}
var wg sync.WaitGroup
startSecond := make(chan struct{})
startThird := make(chan struct{})
startAcquire(2, startSecond, &wg)
startAcquire(3, startThird, &wg)
close(startSecond)
time.Sleep(20 * time.Millisecond)
close(startThird)
time.Sleep(50 * time.Millisecond)
releaseFirst()
first := <-orderCh
if first != 2 {
t.Fatalf("first admitted request = %d, want 2", first)
}
close(releaseCh)
wg.Wait()
second := <-orderCh
if second != 3 {
t.Fatalf("second admitted request = %d, want 3", second)
}
}
func TestStreamFlowControllerTryAcquireDoesNotBypassQueuedWaiter(t *testing.T) {
controller := newStreamFlowController(streamConfig{
ChunkSize: 4,
InboundQueueLimit: 1,
InboundBufferedBytesLimit: 4,
OutboundWindowBytes: 4,
OutboundMaxInFlightChunks: 1,
})
releaseFirst, err := controller.acquire(context.Background(), 4)
if err != nil {
t.Fatalf("first acquire failed: %v", err)
}
defer releaseFirst()
waiterReady := make(chan struct{})
waiterAcquired := make(chan func(), 1)
go func() {
close(waiterReady)
releaseSecond, err := controller.acquire(context.Background(), 4)
if err != nil {
t.Errorf("waiter acquire failed: %v", err)
return
}
waiterAcquired <- releaseSecond
}()
<-waiterReady
time.Sleep(20 * time.Millisecond)
if controller.tryAcquire(4) {
t.Fatal("tryAcquire should not bypass queued waiter")
}
releaseFirst()
var releaseSecond func()
select {
case releaseSecond = <-waiterAcquired:
case <-time.After(time.Second):
t.Fatal("timed out waiting for queued waiter to acquire")
}
if releaseSecond == nil {
t.Fatal("queued waiter returned nil release func")
}
releaseSecond()
}