package notify import ( "context" "sync" "testing" "time" ) func TestStreamFlowControllerBlocksUntilWindowReleased(t *testing.T) { controller := newStreamFlowController(streamConfig{ ChunkSize: 4, InboundQueueLimit: 1, InboundBufferedBytesLimit: 4, OutboundWindowBytes: 4, OutboundMaxInFlightChunks: 1, }) releaseFirst, err := controller.acquire(context.Background(), 4) if err != nil { t.Fatalf("first acquire failed: %v", err) } defer releaseFirst() secondDone := make(chan error, 1) go func() { releaseSecond, err := controller.acquire(context.Background(), 4) if err == nil && releaseSecond != nil { releaseSecond() } secondDone <- err }() select { case err := <-secondDone: t.Fatalf("second acquire should block, got err = %v", err) case <-time.After(80 * time.Millisecond): } releaseFirst() select { case err := <-secondDone: if err != nil { t.Fatalf("second acquire failed after release: %v", err) } case <-time.After(time.Second): t.Fatal("timed out waiting for second acquire") } } func TestStreamFlowControllerAdmitsRequestsFIFO(t *testing.T) { controller := newStreamFlowController(streamConfig{ ChunkSize: 4, InboundQueueLimit: 1, InboundBufferedBytesLimit: 4, OutboundWindowBytes: 4, OutboundMaxInFlightChunks: 1, }) releaseFirst, err := controller.acquire(context.Background(), 4) if err != nil { t.Fatalf("first acquire failed: %v", err) } orderCh := make(chan int, 2) releaseCh := make(chan struct{}) startAcquire := func(id int, start <-chan struct{}, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() <-start release, err := controller.acquire(context.Background(), 4) if err != nil { t.Errorf("acquire %d failed: %v", id, err) return } orderCh <- id <-releaseCh release() }() } var wg sync.WaitGroup startSecond := make(chan struct{}) startThird := make(chan struct{}) startAcquire(2, startSecond, &wg) startAcquire(3, startThird, &wg) close(startSecond) time.Sleep(20 * time.Millisecond) close(startThird) time.Sleep(50 * time.Millisecond) releaseFirst() first := <-orderCh if first != 2 { t.Fatalf("first admitted request = %d, want 2", first) } close(releaseCh) wg.Wait() second := <-orderCh if second != 3 { t.Fatalf("second admitted request = %d, want 3", second) } } func TestStreamFlowControllerTryAcquireDoesNotBypassQueuedWaiter(t *testing.T) { controller := newStreamFlowController(streamConfig{ ChunkSize: 4, InboundQueueLimit: 1, InboundBufferedBytesLimit: 4, OutboundWindowBytes: 4, OutboundMaxInFlightChunks: 1, }) releaseFirst, err := controller.acquire(context.Background(), 4) if err != nil { t.Fatalf("first acquire failed: %v", err) } defer releaseFirst() waiterReady := make(chan struct{}) waiterAcquired := make(chan func(), 1) go func() { close(waiterReady) releaseSecond, err := controller.acquire(context.Background(), 4) if err != nil { t.Errorf("waiter acquire failed: %v", err) return } waiterAcquired <- releaseSecond }() <-waiterReady time.Sleep(20 * time.Millisecond) if controller.tryAcquire(4) { t.Fatal("tryAcquire should not bypass queued waiter") } releaseFirst() var releaseSecond func() select { case releaseSecond = <-waiterAcquired: case <-time.After(time.Second): t.Fatal("timed out waiting for queued waiter to acquire") } if releaseSecond == nil { t.Fatal("queued waiter returned nil release func") } releaseSecond() }