86 lines
2.3 KiB
Go
86 lines
2.3 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
func TestStreamOwnedChunkReleaseAfterRead(t *testing.T) {
|
||
|
|
stream := newStreamHandle(context.Background(), newStreamRuntime("stream-buffer-release-read"), clientFileScope(), StreamOpenRequest{
|
||
|
|
StreamID: "stream-buffer-release-read",
|
||
|
|
DataID: 1,
|
||
|
|
}, 0, nil, nil, 0, nil, nil, nil, streamConfig{})
|
||
|
|
|
||
|
|
released := 0
|
||
|
|
if err := stream.pushOwnedChunkWithRelease([]byte("hello"), func() {
|
||
|
|
released++
|
||
|
|
}); err != nil {
|
||
|
|
t.Fatalf("pushOwnedChunkWithRelease failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
buf := make([]byte, 5)
|
||
|
|
n, err := stream.Read(buf)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("Read failed: %v", err)
|
||
|
|
}
|
||
|
|
if n != 5 || string(buf[:n]) != "hello" {
|
||
|
|
t.Fatalf("Read = %d %q, want 5 hello", n, string(buf[:n]))
|
||
|
|
}
|
||
|
|
if released != 1 {
|
||
|
|
t.Fatalf("release count = %d, want 1", released)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestStreamOwnedChunkReleaseOnReset(t *testing.T) {
|
||
|
|
stream := newStreamHandle(context.Background(), newStreamRuntime("stream-buffer-release-reset"), clientFileScope(), StreamOpenRequest{
|
||
|
|
StreamID: "stream-buffer-release-reset",
|
||
|
|
DataID: 1,
|
||
|
|
}, 0, nil, nil, 0, nil, nil, nil, streamConfig{})
|
||
|
|
|
||
|
|
released := 0
|
||
|
|
if err := stream.pushOwnedChunkWithRelease([]byte("hello"), func() {
|
||
|
|
released++
|
||
|
|
}); err != nil {
|
||
|
|
t.Fatalf("pushOwnedChunkWithRelease failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
stream.markReset(errors.New("boom"))
|
||
|
|
if released != 1 {
|
||
|
|
t.Fatalf("release count = %d, want 1", released)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestClientDispatchFastStreamDataWithOwnerReleasesAfterRead(t *testing.T) {
|
||
|
|
client := NewClient().(*ClientCommon)
|
||
|
|
runtime := client.getStreamRuntime()
|
||
|
|
if runtime == nil {
|
||
|
|
t.Fatal("client stream runtime should not be nil")
|
||
|
|
}
|
||
|
|
stream := newStreamHandle(context.Background(), runtime, clientFileScope(), StreamOpenRequest{
|
||
|
|
StreamID: "stream-owner",
|
||
|
|
DataID: 23,
|
||
|
|
Channel: StreamDataChannel,
|
||
|
|
}, 0, nil, nil, 0, nil, nil, nil, runtime.configSnapshot())
|
||
|
|
if err := runtime.register(clientFileScope(), stream); err != nil {
|
||
|
|
t.Fatalf("register stream failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
released := 0
|
||
|
|
owner := newStreamReadPayloadOwner(func() {
|
||
|
|
released++
|
||
|
|
})
|
||
|
|
client.dispatchFastStreamDataWithOwner(streamFastDataFrame{
|
||
|
|
DataID: 23,
|
||
|
|
Seq: 1,
|
||
|
|
Payload: []byte("fast-owner"),
|
||
|
|
}, owner)
|
||
|
|
owner.done()
|
||
|
|
|
||
|
|
readStreamExactly(t, stream, "fast-owner", 2*time.Second)
|
||
|
|
if released != 1 {
|
||
|
|
t.Fatalf("release count = %d, want 1", released)
|
||
|
|
}
|
||
|
|
}
|