package notify import ( "context" "errors" "testing" "time" ) func TestBulkOwnedChunkReleaseAfterRead(t *testing.T) { bulk := newBulkHandle(context.Background(), newBulkRuntime("buffer-release-read"), clientFileScope(), BulkOpenRequest{ BulkID: "buffer-release-read", DataID: 1, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) released := 0 if err := bulk.pushOwnedChunkWithReleaseNoReset([]byte("hello"), func() { released++ }); err != nil { t.Fatalf("pushOwnedChunkWithReleaseNoReset failed: %v", err) } buf := make([]byte, 5) n, err := bulk.Read(buf) if err != nil { t.Fatalf("Read failed: %v", err) } if n != 5 || string(buf[:n]) != "hello" { t.Fatalf("Read = %d %q, want 5 hello", n, string(buf[:n])) } if released != 1 { t.Fatalf("release count = %d, want 1", released) } } func TestBulkOwnedChunkReleaseOnReset(t *testing.T) { bulk := newBulkHandle(context.Background(), newBulkRuntime("buffer-release-reset"), clientFileScope(), BulkOpenRequest{ BulkID: "buffer-release-reset", DataID: 1, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) released := 0 if err := bulk.pushOwnedChunkWithReleaseNoReset([]byte("hello"), func() { released++ }); err != nil { t.Fatalf("pushOwnedChunkWithReleaseNoReset failed: %v", err) } bulk.markReset(errors.New("boom")) if released != 1 { t.Fatalf("release count = %d, want 1", released) } } func TestBulkReadDoesNotBlockOnAsyncWindowRelease(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() releaseStarted := make(chan struct{}) releaseUnblock := make(chan struct{}) bulk := newBulkHandle(ctx, newBulkRuntime("buffer-release-async"), clientFileScope(), BulkOpenRequest{ BulkID: "buffer-release-async", DataID: 1, ChunkSize: 4, WindowBytes: 4, MaxInFlight: 1, }, 0, nil, nil, 0, nil, nil, nil, nil, func(_ *bulkHandle, bytes int64, chunks int) error { if bytes != 4 || chunks != 1 { t.Fatalf("release = (%d,%d), want (4,1)", bytes, chunks) } close(releaseStarted) <-releaseUnblock return nil }) if err := bulk.pushOwnedChunk([]byte("ping")); err != nil { t.Fatalf("pushOwnedChunk failed: %v", err) } buf := make([]byte, 4) doneCh := make(chan error, 1) go func() { n, err := bulk.Read(buf) if err != nil { doneCh <- err return } if got, want := n, 4; got != want { doneCh <- errors.New("unexpected read size") return } doneCh <- nil }() select { case err := <-doneCh: if err != nil { t.Fatalf("Read failed: %v", err) } case <-time.After(200 * time.Millisecond): t.Fatal("Read should not block on async release sender") } select { case <-releaseStarted: case <-time.After(time.Second): t.Fatal("window release sender did not start") } close(releaseUnblock) cancel() if bulk.releaseWorkerDone != nil { select { case <-bulk.releaseWorkerDone: case <-time.After(time.Second): t.Fatal("release worker did not exit") } } }