package notify import ( "b612.me/stario" "context" "math" "testing" "time" ) func TestStreamFastDataFrameRoundTrip(t *testing.T) { frame, err := encodeStreamFastDataFrame(11, 7, []byte("payload")) if err != nil { t.Fatalf("encodeStreamFastDataFrame failed: %v", err) } got, matched, err := decodeStreamFastDataFrame(frame) if err != nil { t.Fatalf("decodeStreamFastDataFrame failed: %v", err) } if !matched { t.Fatal("decodeStreamFastDataFrame should match fast payload") } if got.DataID != 11 { t.Fatalf("data id = %d, want %d", got.DataID, 11) } if got.Seq != 7 { t.Fatalf("seq = %d, want %d", got.Seq, 7) } if string(got.Payload) != "payload" { t.Fatalf("payload = %q, want %q", got.Payload, "payload") } } func TestClientDispatchInboundTransportPayloadFastStream(t *testing.T) { client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } runtime := client.getStreamRuntime() if runtime == nil { t.Fatal("client stream runtime should not be nil") } stream := newStreamHandle(context.Background(), runtime, clientFileScope(), StreamOpenRequest{ StreamID: "fast-client", DataID: 23, Channel: StreamDataChannel, }, 0, nil, nil, 0, nil, nil, nil, runtime.configSnapshot()) if err := runtime.register(clientFileScope(), stream); err != nil { t.Fatalf("register stream failed: %v", err) } payload, err := client.encodeFastStreamDataPayload(23, 1, []byte("fast-payload")) if err != nil { t.Fatalf("encodeFastStreamDataPayload failed: %v", err) } if err := client.dispatchInboundTransportPayload(payload, time.Now()); err != nil { t.Fatalf("dispatchInboundTransportPayload failed: %v", err) } readStreamExactly(t, stream, "fast-payload", 2*time.Second) } func TestClientPushMessageFastDispatchesDirectWithRuntimeDispatcher(t *testing.T) { client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } stopCtx, stopFn := context.WithCancel(context.Background()) defer stopFn() queue := stario.NewQueueCtx(stopCtx, 4, math.MaxUint32) rt := newClientSessionRuntime(nil, stopCtx, stopFn, queue, 1) client.setClientSessionRuntime(rt) gotCh := make(chan Message, 1) client.SetLink("client-fast-dispatch", func(msg *Message) { gotCh <- *msg }) env, err := wrapTransferMsgEnvelope(TransferMsg{ ID: 31, Key: "client-fast-dispatch", Value: MsgVal("payload"), Type: MSG_ASYNC, }, client.sequenceEn) if err != nil { t.Fatalf("wrapTransferMsgEnvelope failed: %v", err) } wire, err := client.encodeEnvelope(env) if err != nil { t.Fatalf("encodeEnvelope failed: %v", err) } if !client.pushMessageFast(queue, wire, rt.inboundDispatcher) { t.Fatal("pushMessageFast should use direct dispatch") } select { case msg := <-gotCh: if got, want := msg.Key, "client-fast-dispatch"; got != want { t.Fatalf("message key = %q, want %q", got, want) } case <-time.After(time.Second): t.Fatal("timed out waiting for direct client dispatch") } select { case msg := <-queue.RestoreChan(): t.Fatalf("fast path should not enqueue RestoreChan message, got %+v", msg) default: } }