113 lines
3.3 KiB
Go
113 lines
3.3 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"b612.me/stario"
|
||
|
|
"context"
|
||
|
|
"math"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
func TestStreamFastDataFrameRoundTrip(t *testing.T) {
|
||
|
|
frame, err := encodeStreamFastDataFrame(11, 7, []byte("payload"))
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("encodeStreamFastDataFrame failed: %v", err)
|
||
|
|
}
|
||
|
|
got, matched, err := decodeStreamFastDataFrame(frame)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("decodeStreamFastDataFrame failed: %v", err)
|
||
|
|
}
|
||
|
|
if !matched {
|
||
|
|
t.Fatal("decodeStreamFastDataFrame should match fast payload")
|
||
|
|
}
|
||
|
|
if got.DataID != 11 {
|
||
|
|
t.Fatalf("data id = %d, want %d", got.DataID, 11)
|
||
|
|
}
|
||
|
|
if got.Seq != 7 {
|
||
|
|
t.Fatalf("seq = %d, want %d", got.Seq, 7)
|
||
|
|
}
|
||
|
|
if string(got.Payload) != "payload" {
|
||
|
|
t.Fatalf("payload = %q, want %q", got.Payload, "payload")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestClientDispatchInboundTransportPayloadFastStream(t *testing.T) {
|
||
|
|
client := NewClient().(*ClientCommon)
|
||
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
||
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
||
|
|
}
|
||
|
|
runtime := client.getStreamRuntime()
|
||
|
|
if runtime == nil {
|
||
|
|
t.Fatal("client stream runtime should not be nil")
|
||
|
|
}
|
||
|
|
stream := newStreamHandle(context.Background(), runtime, clientFileScope(), StreamOpenRequest{
|
||
|
|
StreamID: "fast-client",
|
||
|
|
DataID: 23,
|
||
|
|
Channel: StreamDataChannel,
|
||
|
|
}, 0, nil, nil, 0, nil, nil, nil, runtime.configSnapshot())
|
||
|
|
if err := runtime.register(clientFileScope(), stream); err != nil {
|
||
|
|
t.Fatalf("register stream failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
payload, err := client.encodeFastStreamDataPayload(23, 1, []byte("fast-payload"))
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("encodeFastStreamDataPayload failed: %v", err)
|
||
|
|
}
|
||
|
|
if err := client.dispatchInboundTransportPayload(payload, time.Now()); err != nil {
|
||
|
|
t.Fatalf("dispatchInboundTransportPayload failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
readStreamExactly(t, stream, "fast-payload", 2*time.Second)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestClientPushMessageFastDispatchesDirectWithRuntimeDispatcher(t *testing.T) {
|
||
|
|
client := NewClient().(*ClientCommon)
|
||
|
|
if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil {
|
||
|
|
t.Fatalf("UseModernPSKClient failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
stopCtx, stopFn := context.WithCancel(context.Background())
|
||
|
|
defer stopFn()
|
||
|
|
queue := stario.NewQueueCtx(stopCtx, 4, math.MaxUint32)
|
||
|
|
rt := newClientSessionRuntime(nil, stopCtx, stopFn, queue, 1)
|
||
|
|
client.setClientSessionRuntime(rt)
|
||
|
|
|
||
|
|
gotCh := make(chan Message, 1)
|
||
|
|
client.SetLink("client-fast-dispatch", func(msg *Message) {
|
||
|
|
gotCh <- *msg
|
||
|
|
})
|
||
|
|
|
||
|
|
env, err := wrapTransferMsgEnvelope(TransferMsg{
|
||
|
|
ID: 31,
|
||
|
|
Key: "client-fast-dispatch",
|
||
|
|
Value: MsgVal("payload"),
|
||
|
|
Type: MSG_ASYNC,
|
||
|
|
}, client.sequenceEn)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("wrapTransferMsgEnvelope failed: %v", err)
|
||
|
|
}
|
||
|
|
wire, err := client.encodeEnvelope(env)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("encodeEnvelope failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
if !client.pushMessageFast(queue, wire, rt.inboundDispatcher) {
|
||
|
|
t.Fatal("pushMessageFast should use direct dispatch")
|
||
|
|
}
|
||
|
|
|
||
|
|
select {
|
||
|
|
case msg := <-gotCh:
|
||
|
|
if got, want := msg.Key, "client-fast-dispatch"; got != want {
|
||
|
|
t.Fatalf("message key = %q, want %q", got, want)
|
||
|
|
}
|
||
|
|
case <-time.After(time.Second):
|
||
|
|
t.Fatal("timed out waiting for direct client dispatch")
|
||
|
|
}
|
||
|
|
|
||
|
|
select {
|
||
|
|
case msg := <-queue.RestoreChan():
|
||
|
|
t.Fatalf("fast path should not enqueue RestoreChan message, got %+v", msg)
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
}
|