108 lines
2.4 KiB
Go
108 lines
2.4 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"sync"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
func TestStreamFlowControllerBlocksUntilWindowReleased(t *testing.T) {
|
||
|
|
controller := newStreamFlowController(streamConfig{
|
||
|
|
ChunkSize: 4,
|
||
|
|
InboundQueueLimit: 1,
|
||
|
|
InboundBufferedBytesLimit: 4,
|
||
|
|
OutboundWindowBytes: 4,
|
||
|
|
OutboundMaxInFlightChunks: 1,
|
||
|
|
})
|
||
|
|
|
||
|
|
releaseFirst, err := controller.acquire(context.Background(), 4)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("first acquire failed: %v", err)
|
||
|
|
}
|
||
|
|
defer releaseFirst()
|
||
|
|
|
||
|
|
secondDone := make(chan error, 1)
|
||
|
|
go func() {
|
||
|
|
releaseSecond, err := controller.acquire(context.Background(), 4)
|
||
|
|
if err == nil && releaseSecond != nil {
|
||
|
|
releaseSecond()
|
||
|
|
}
|
||
|
|
secondDone <- err
|
||
|
|
}()
|
||
|
|
|
||
|
|
select {
|
||
|
|
case err := <-secondDone:
|
||
|
|
t.Fatalf("second acquire should block, got err = %v", err)
|
||
|
|
case <-time.After(80 * time.Millisecond):
|
||
|
|
}
|
||
|
|
|
||
|
|
releaseFirst()
|
||
|
|
|
||
|
|
select {
|
||
|
|
case err := <-secondDone:
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("second acquire failed after release: %v", err)
|
||
|
|
}
|
||
|
|
case <-time.After(time.Second):
|
||
|
|
t.Fatal("timed out waiting for second acquire")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestStreamFlowControllerAdmitsRequestsFIFO(t *testing.T) {
|
||
|
|
controller := newStreamFlowController(streamConfig{
|
||
|
|
ChunkSize: 4,
|
||
|
|
InboundQueueLimit: 1,
|
||
|
|
InboundBufferedBytesLimit: 4,
|
||
|
|
OutboundWindowBytes: 4,
|
||
|
|
OutboundMaxInFlightChunks: 1,
|
||
|
|
})
|
||
|
|
|
||
|
|
releaseFirst, err := controller.acquire(context.Background(), 4)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("first acquire failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
orderCh := make(chan int, 2)
|
||
|
|
releaseCh := make(chan struct{})
|
||
|
|
startAcquire := func(id int, start <-chan struct{}, wg *sync.WaitGroup) {
|
||
|
|
wg.Add(1)
|
||
|
|
go func() {
|
||
|
|
defer wg.Done()
|
||
|
|
<-start
|
||
|
|
release, err := controller.acquire(context.Background(), 4)
|
||
|
|
if err != nil {
|
||
|
|
t.Errorf("acquire %d failed: %v", id, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
orderCh <- id
|
||
|
|
<-releaseCh
|
||
|
|
release()
|
||
|
|
}()
|
||
|
|
}
|
||
|
|
|
||
|
|
var wg sync.WaitGroup
|
||
|
|
startSecond := make(chan struct{})
|
||
|
|
startThird := make(chan struct{})
|
||
|
|
startAcquire(2, startSecond, &wg)
|
||
|
|
startAcquire(3, startThird, &wg)
|
||
|
|
|
||
|
|
close(startSecond)
|
||
|
|
time.Sleep(20 * time.Millisecond)
|
||
|
|
close(startThird)
|
||
|
|
time.Sleep(50 * time.Millisecond)
|
||
|
|
releaseFirst()
|
||
|
|
|
||
|
|
first := <-orderCh
|
||
|
|
if first != 2 {
|
||
|
|
t.Fatalf("first admitted request = %d, want 2", first)
|
||
|
|
}
|
||
|
|
close(releaseCh)
|
||
|
|
wg.Wait()
|
||
|
|
|
||
|
|
second := <-orderCh
|
||
|
|
if second != 3 {
|
||
|
|
t.Fatalf("second admitted request = %d, want 3", second)
|
||
|
|
}
|
||
|
|
}
|