notify/bulk_buffer_release_test.go

120 lines
2.9 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
"testing"
"time"
)
func TestBulkOwnedChunkReleaseAfterRead(t *testing.T) {
bulk := newBulkHandle(context.Background(), newBulkRuntime("buffer-release-read"), clientFileScope(), BulkOpenRequest{
BulkID: "buffer-release-read",
DataID: 1,
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
released := 0
if err := bulk.pushOwnedChunkWithReleaseNoReset([]byte("hello"), func() {
released++
}); err != nil {
t.Fatalf("pushOwnedChunkWithReleaseNoReset failed: %v", err)
}
buf := make([]byte, 5)
n, err := bulk.Read(buf)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
if n != 5 || string(buf[:n]) != "hello" {
t.Fatalf("Read = %d %q, want 5 hello", n, string(buf[:n]))
}
if released != 1 {
t.Fatalf("release count = %d, want 1", released)
}
}
func TestBulkOwnedChunkReleaseOnReset(t *testing.T) {
bulk := newBulkHandle(context.Background(), newBulkRuntime("buffer-release-reset"), clientFileScope(), BulkOpenRequest{
BulkID: "buffer-release-reset",
DataID: 1,
}, 0, nil, nil, 0, nil, nil, nil, nil, nil)
released := 0
if err := bulk.pushOwnedChunkWithReleaseNoReset([]byte("hello"), func() {
released++
}); err != nil {
t.Fatalf("pushOwnedChunkWithReleaseNoReset failed: %v", err)
}
bulk.markReset(errors.New("boom"))
if released != 1 {
t.Fatalf("release count = %d, want 1", released)
}
}
func TestBulkReadDoesNotBlockOnAsyncWindowRelease(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
releaseStarted := make(chan struct{})
releaseUnblock := make(chan struct{})
bulk := newBulkHandle(ctx, newBulkRuntime("buffer-release-async"), clientFileScope(), BulkOpenRequest{
BulkID: "buffer-release-async",
DataID: 1,
ChunkSize: 4,
WindowBytes: 4,
MaxInFlight: 1,
}, 0, nil, nil, 0, nil, nil, nil, nil, func(_ *bulkHandle, bytes int64, chunks int) error {
if bytes != 4 || chunks != 1 {
t.Fatalf("release = (%d,%d), want (4,1)", bytes, chunks)
}
close(releaseStarted)
<-releaseUnblock
return nil
})
if err := bulk.pushOwnedChunk([]byte("ping")); err != nil {
t.Fatalf("pushOwnedChunk failed: %v", err)
}
buf := make([]byte, 4)
doneCh := make(chan error, 1)
go func() {
n, err := bulk.Read(buf)
if err != nil {
doneCh <- err
return
}
if got, want := n, 4; got != want {
doneCh <- errors.New("unexpected read size")
return
}
doneCh <- nil
}()
select {
case err := <-doneCh:
if err != nil {
t.Fatalf("Read failed: %v", err)
}
case <-time.After(200 * time.Millisecond):
t.Fatal("Read should not block on async release sender")
}
select {
case <-releaseStarted:
case <-time.After(time.Second):
t.Fatal("window release sender did not start")
}
close(releaseUnblock)
cancel()
if bulk.releaseWorkerDone != nil {
select {
case <-bulk.releaseWorkerDone:
case <-time.After(time.Second):
t.Fatal("release worker did not exit")
}
}
}