package notify import ( "bytes" "context" "encoding/binary" "errors" "net" "testing" "time" "b612.me/stario" ) type bulkAttachScriptConn struct { readBuf *bytes.Reader writeBuf bytes.Buffer } func newBulkAttachScriptConn(inbound []byte) *bulkAttachScriptConn { return &bulkAttachScriptConn{ readBuf: bytes.NewReader(append([]byte(nil), inbound...)), } } func (c *bulkAttachScriptConn) Read(p []byte) (int, error) { return c.readBuf.Read(p) } func (c *bulkAttachScriptConn) Write(p []byte) (int, error) { return c.writeBuf.Write(p) } func (c *bulkAttachScriptConn) Close() error { return nil } func (c *bulkAttachScriptConn) LocalAddr() net.Addr { return bulkAttachTestAddr("local") } func (c *bulkAttachScriptConn) RemoteAddr() net.Addr { return bulkAttachTestAddr("remote") } func (c *bulkAttachScriptConn) SetDeadline(time.Time) error { return nil } func (c *bulkAttachScriptConn) SetReadDeadline(time.Time) error { return nil } func (c *bulkAttachScriptConn) SetWriteDeadline(time.Time) error { return nil } func (c *bulkAttachScriptConn) writtenBytes() []byte { return append([]byte(nil), c.writeBuf.Bytes()...) } type bulkAttachTestAddr string func (a bulkAttachTestAddr) Network() string { return "tcp" } func (a bulkAttachTestAddr) String() string { return string(a) } func encodeDedicatedRecordForAttachTest(payload []byte) []byte { out := make([]byte, bulkDedicatedRecordHeaderLen+len(payload)) copy(out[:4], bulkDedicatedRecordMagic) binary.BigEndian.PutUint32(out[4:8], uint32(len(payload))) copy(out[bulkDedicatedRecordHeaderLen:], payload) return out } func TestSendDedicatedBulkAttachRequestKeepsCoalescedDedicatedPayloadUnread(t *testing.T) { client := NewClient().(*ClientCommon) UseLegacySecurityClient(client) client.msgID = 100 bulk := newBulkHandle(context.Background(), newBulkRuntime("dedicated-attach-test"), clientFileScope(), BulkOpenRequest{ BulkID: "bulk-attach-test", DataID: 1, Dedicated: true, AttachToken: "attach-token", }, 0, nil, nil, 0, nil, nil, nil, nil, nil) encodedResp, err := client.sequenceEn(bulkAttachResponse{Accepted: true}) if err != nil { t.Fatalf("encode bulkAttachResponse failed: %v", err) } replyFrame, err := encodeDirectSignalFrame(stario.NewQueue(), client.sequenceEn, client.msgEn, client.SecretKey, TransferMsg{ ID: 101, Key: systemBulkAttachKey, Value: encodedResp, Type: MSG_SYS_REPLY, }) if err != nil { t.Fatalf("encode attach reply frame failed: %v", err) } dedicatedPayload := []byte("dedicated-tail-bytes") conn := newBulkAttachScriptConn(append(replyFrame, encodeDedicatedRecordForAttachTest(dedicatedPayload)...)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() resp, err := client.sendDedicatedBulkAttachRequest(ctx, conn, bulk) if err != nil { t.Fatalf("sendDedicatedBulkAttachRequest failed: %v", err) } if !resp.Accepted { t.Fatalf("bulk attach response = %+v, want accepted", resp) } parsedReq := stario.NewQueue() var reqMsg TransferMsg if err := parsedReq.ParseMessageOwned(conn.writtenBytes(), "attach-request", func(msgq stario.MsgQueue) error { transfer, err := decodeDirectSignalPayload(client.sequenceDe, client.msgDe, client.SecretKey, msgq.Msg) if err != nil { return err } reqMsg = transfer return nil }); err != nil { t.Fatalf("parse written attach request failed: %v", err) } if reqMsg.Key != systemBulkAttachKey || reqMsg.Type != MSG_SYS_WAIT { t.Fatalf("attach request message mismatch: %+v", reqMsg) } readPayload, err := readBulkDedicatedRecord(conn) if err != nil { t.Fatalf("readBulkDedicatedRecord after attach failed: %v", err) } if !bytes.Equal(readPayload, dedicatedPayload) { t.Fatalf("dedicated payload mismatch: got %q want %q", string(readPayload), string(dedicatedPayload)) } } func TestSendDedicatedBulkAttachRequestUsesBootstrapProtectionEvenAfterSteadySwitch(t *testing.T) { client := NewClient().(*ClientCommon) if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { t.Fatalf("UseModernPSKClient failed: %v", err) } client.msgID = 100 alternate, err := deriveModernPSKProtectionProfile([]byte("notify-dedicated-attach-other-secret"), integrationModernPSKOptions(), ProtectionManaged) if err != nil { t.Fatalf("deriveModernPSKProtectionProfile(alternate) failed: %v", err) } client.setClientTransportProtectionProfile(alternate) bulk := newBulkHandle(context.Background(), newBulkRuntime("dedicated-attach-bootstrap-test"), clientFileScope(), BulkOpenRequest{ BulkID: "bulk-attach-bootstrap-test", DataID: 1, Dedicated: true, AttachToken: "attach-token", }, 0, nil, nil, 0, nil, nil, nil, nil, nil) bootstrap := client.clientDedicatedBulkAttachTransportProtectionProfile() encodedResp, err := client.sequenceEn(bulkAttachResponse{Accepted: true}) if err != nil { t.Fatalf("encode bulkAttachResponse failed: %v", err) } replyFrame, err := encodeDirectSignalFrame(stario.NewQueue(), client.sequenceEn, bootstrap.msgEn, bootstrap.secretKey, TransferMsg{ ID: 101, Key: systemBulkAttachKey, Value: encodedResp, Type: MSG_SYS_REPLY, }) if err != nil { t.Fatalf("encode attach reply frame failed: %v", err) } conn := newBulkAttachScriptConn(replyFrame) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() resp, err := client.sendDedicatedBulkAttachRequest(ctx, conn, bulk) if err != nil { t.Fatalf("sendDedicatedBulkAttachRequest failed: %v", err) } if !resp.Accepted { t.Fatalf("bulk attach response = %+v, want accepted", resp) } parsedReq := stario.NewQueue() var reqMsg TransferMsg if err := parsedReq.ParseMessageOwned(conn.writtenBytes(), "attach-request", func(msgq stario.MsgQueue) error { transfer, err := decodeDirectSignalPayload(client.sequenceDe, bootstrap.msgDe, bootstrap.secretKey, msgq.Msg) if err != nil { return err } reqMsg = transfer return nil }); err != nil { t.Fatalf("parse written attach request with bootstrap profile failed: %v", err) } if reqMsg.Key != systemBulkAttachKey || reqMsg.Type != MSG_SYS_WAIT { t.Fatalf("attach request message mismatch: %+v", reqMsg) } if err := parsedReq.ParseMessageOwned(conn.writtenBytes(), "attach-request-current", func(msgq stario.MsgQueue) error { _, err := decodeDirectSignalPayload(client.sequenceDe, alternate.msgDe, alternate.secretKey, msgq.Msg) return err }); !errors.Is(err, errTransportPayloadDecryptFailed) { t.Fatalf("decode written attach request with current steady profile error = %v, want %v", err, errTransportPayloadDecryptFailed) } } func TestHandleBulkAttachSystemMessageAcceptedWritesDirectReplyBeforeDedicatedHandoff(t *testing.T) { server := NewServer().(*ServerCommon) UseLegacySecurityServer(server) sidecarLeft, sidecarRight := net.Pipe() defer sidecarRight.Close() current := server.bootstrapAcceptedLogical("dedicated-attach-current", nil, sidecarLeft) if current == nil { t.Fatal("bootstrapAcceptedLogical(current) should return logical") } target := server.bootstrapAcceptedLogical("dedicated-attach-target", nil, nil) if target == nil { t.Fatal("bootstrapAcceptedLogical(target) should return logical") } bulk := newBulkHandle(context.Background(), server.getBulkRuntime(), serverFileScope(target), BulkOpenRequest{ BulkID: "server-dedicated-attach-test", DataID: 7, Dedicated: true, AttachToken: "attach-token", }, 0, target, nil, 0, nil, nil, nil, nil, nil) bulk.markAcceptHandled() if err := server.getBulkRuntime().register(serverFileScope(target), bulk); err != nil { t.Fatalf("register bulk runtime failed: %v", err) } reqPayload, err := server.sequenceEn(bulkAttachRequest{ PeerID: target.ID(), BulkID: bulk.ID(), AttachToken: "attach-token", }) if err != nil { t.Fatalf("encode bulkAttachRequest failed: %v", err) } msg := Message{ NetType: NET_SERVER, LogicalConn: current, ClientConn: current.compatClientConn(), TransferMsg: TransferMsg{ ID: 42, Key: systemBulkAttachKey, Value: reqPayload, Type: MSG_SYS_WAIT, }, inboundConn: sidecarLeft, Time: time.Now(), } type attachReplyResult struct { transfer TransferMsg resp bulkAttachResponse err error } replyCh := make(chan attachReplyResult, 1) go func() { _ = sidecarRight.SetReadDeadline(time.Now().Add(time.Second)) replyPayload, err := readDirectSignalFramePayload(sidecarRight) if err != nil { replyCh <- attachReplyResult{err: err} return } transfer, err := decodeDirectSignalPayload(server.sequenceDe, current.msgDeSnapshot(), current.secretKeySnapshot(), replyPayload) if err != nil { replyCh <- attachReplyResult{err: err} return } resp, err := decodeBulkAttachResponse(server.sequenceDe, transfer.Value) replyCh <- attachReplyResult{transfer: transfer, resp: resp, err: err} }() if !server.handleBulkAttachSystemMessage(msg) { t.Fatal("handleBulkAttachSystemMessage should accept dedicated attach message") } var result attachReplyResult select { case result = <-replyCh: case <-time.After(2 * time.Second): t.Fatal("timed out waiting for direct attach reply") } if result.err != nil { t.Fatalf("read direct attach reply failed: %v", result.err) } transfer := result.transfer if transfer.ID != msg.ID || transfer.Key != systemBulkAttachKey || transfer.Type != MSG_SYS_REPLY { t.Fatalf("attach reply mismatch: %+v", transfer) } resp := result.resp if !resp.Accepted || resp.Error != "" { t.Fatalf("bulk attach response = %+v, want accepted", resp) } if got := bulk.dedicatedConnSnapshot(); got != sidecarLeft { t.Fatalf("dedicated conn mismatch: got %v want %v", got, sidecarLeft) } if current.transportAttachedSnapshot() { t.Fatal("attach sidecar logical transport should be detached after handoff") } if got := server.GetLogicalConn(current.ID()); got != nil { t.Fatalf("attach sidecar logical should be removed after handoff, got %+v", got) } } func TestHandleBulkAttachSystemMessageDoesNotExposeSharedSidecarBeforeReplyCompletes(t *testing.T) { server := NewServer().(*ServerCommon) UseLegacySecurityServer(server) sidecarLeft, sidecarRight := net.Pipe() defer sidecarRight.Close() current := server.bootstrapAcceptedLogical("dedicated-attach-current-blocked", nil, sidecarLeft) if current == nil { t.Fatal("bootstrapAcceptedLogical(current) should return logical") } target := server.bootstrapAcceptedLogical("dedicated-attach-target-blocked", nil, nil) if target == nil { t.Fatal("bootstrapAcceptedLogical(target) should return logical") } currentBulk := newBulkHandle(context.Background(), server.getBulkRuntime(), serverFileScope(target), BulkOpenRequest{ BulkID: "server-dedicated-current", DataID: 17, Dedicated: true, AttachToken: "attach-token", }, 0, target, nil, 0, nil, nil, nil, nil, nil) currentBulk.markAcceptHandled() if err := server.getBulkRuntime().register(serverFileScope(target), currentBulk); err != nil { t.Fatalf("register current bulk runtime failed: %v", err) } pendingBulk := newBulkHandle(context.Background(), server.getBulkRuntime(), serverFileScope(target), BulkOpenRequest{ BulkID: "server-dedicated-pending", DataID: 18, Dedicated: true, AttachToken: "attach-token-2", }, 0, target, nil, 0, nil, nil, nil, nil, nil) pendingBulk.markAcceptHandled() if err := server.getBulkRuntime().register(serverFileScope(target), pendingBulk); err != nil { t.Fatalf("register pending bulk runtime failed: %v", err) } reqPayload, err := server.sequenceEn(bulkAttachRequest{ PeerID: target.ID(), BulkID: currentBulk.ID(), AttachToken: "attach-token", }) if err != nil { t.Fatalf("encode bulkAttachRequest failed: %v", err) } msg := Message{ NetType: NET_SERVER, LogicalConn: current, ClientConn: current.compatClientConn(), TransferMsg: TransferMsg{ ID: 77, Key: systemBulkAttachKey, Value: reqPayload, Type: MSG_SYS_WAIT, }, inboundConn: sidecarLeft, Time: time.Now(), } done := make(chan struct{}) go func() { defer close(done) _ = server.handleBulkAttachSystemMessage(msg) }() time.Sleep(50 * time.Millisecond) if got := pendingBulk.dedicatedConnSnapshot(); got != nil { t.Fatal("pending dedicated bulk should not observe shared sidecar before attach reply is fully sent") } if got := server.serverDedicatedSidecarSnapshot(target); got != nil { t.Fatal("server dedicated sidecar should not be published before attach reply is fully sent") } type attachReplyResult struct { transfer TransferMsg resp bulkAttachResponse err error } replyCh := make(chan attachReplyResult, 1) go func() { _ = sidecarRight.SetReadDeadline(time.Now().Add(time.Second)) replyPayload, err := readDirectSignalFramePayload(sidecarRight) if err != nil { replyCh <- attachReplyResult{err: err} return } transfer, err := decodeDirectSignalPayload(server.sequenceDe, current.msgDeSnapshot(), current.secretKeySnapshot(), replyPayload) if err != nil { replyCh <- attachReplyResult{err: err} return } resp, err := decodeBulkAttachResponse(server.sequenceDe, transfer.Value) replyCh <- attachReplyResult{transfer: transfer, resp: resp, err: err} }() select { case result := <-replyCh: if result.err != nil { t.Fatalf("read direct attach reply failed: %v", result.err) } if !result.resp.Accepted { t.Fatalf("bulk attach response = %+v, want accepted", result.resp) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for direct attach reply") } select { case <-done: case <-time.After(2 * time.Second): t.Fatal("timed out waiting for handleBulkAttachSystemMessage to finish") } if got := pendingBulk.dedicatedConnSnapshot(); got != sidecarLeft { t.Fatalf("pending dedicated bulk conn mismatch after reply: got %v want %v", got, sidecarLeft) } if got := server.serverDedicatedSidecarSnapshot(target); got == nil || got.conn != sidecarLeft { t.Fatal("server dedicated sidecar should be published after attach reply completes") } } func TestBulkAttachResponseErrorCarriesStructuredFields(t *testing.T) { resp := toBulkAttachResponseError(&bulkAttachError{ Code: bulkAttachErrorCodeTokenMismatch, Retryable: false, Message: "bulk attach token mismatch", FailedSeq: 7, FailedBulk: "bulk-1", }, "fallback-bulk") if resp.Accepted { t.Fatalf("Accepted = %v, want false", resp.Accepted) } if got, want := resp.Code, string(bulkAttachErrorCodeTokenMismatch); got != want { t.Fatalf("Code = %q, want %q", got, want) } if got, want := resp.Retryable, false; got != want { t.Fatalf("Retryable = %v, want %v", got, want) } if got, want := resp.FailedSeq, uint64(7); got != want { t.Fatalf("FailedSeq = %d, want %d", got, want) } if got, want := resp.FailedBulk, "bulk-1"; got != want { t.Fatalf("FailedBulk = %q, want %q", got, want) } } func TestResolveInboundDedicatedBulkRejectsAlreadyAttachedAfterDataStarted(t *testing.T) { server := NewServer().(*ServerCommon) UseLegacySecurityServer(server) currentLeft, currentRight := net.Pipe() defer currentRight.Close() current := server.bootstrapAcceptedLogical("dedicated-attach-current", nil, currentLeft) if current == nil { t.Fatal("bootstrapAcceptedLogical(current) should return logical") } target := server.bootstrapAcceptedLogical("dedicated-attach-target", nil, nil) if target == nil { t.Fatal("bootstrapAcceptedLogical(target) should return logical") } bulk := newBulkHandle(context.Background(), server.getBulkRuntime(), serverFileScope(target), BulkOpenRequest{ BulkID: "server-dedicated-attach-test", DataID: 7, Dedicated: true, AttachToken: "attach-token", }, 0, target, nil, 0, nil, nil, nil, nil, nil) bulk.markAcceptHandled() if err := server.getBulkRuntime().register(serverFileScope(target), bulk); err != nil { t.Fatalf("register bulk runtime failed: %v", err) } attachedLeft, attachedRight := net.Pipe() defer attachedRight.Close() if err := bulk.attachDedicatedConn(attachedLeft); err != nil { t.Fatalf("attachDedicatedConn failed: %v", err) } bulk.markDedicatedDataStarted() _, _, err := server.resolveInboundDedicatedBulk(current, bulkAttachRequest{ PeerID: target.ID(), BulkID: bulk.ID(), AttachToken: "attach-token", }) if err == nil { t.Fatal("resolveInboundDedicatedBulk should reject duplicate attach after data started") } var attachErr *bulkAttachError if !errors.As(err, &attachErr) || attachErr == nil { t.Fatalf("resolveInboundDedicatedBulk error type = %T, want *bulkAttachError", err) } if got, want := attachErr.Code, bulkAttachErrorCodeAlreadyAttached; got != want { t.Fatalf("attach error code = %q, want %q", got, want) } if attachErr.Retryable { t.Fatalf("attach error retryable = true, want false") } } func TestResolveInboundDedicatedBulkAllowsReattachBeforeDataStarts(t *testing.T) { server := NewServer().(*ServerCommon) UseLegacySecurityServer(server) currentLeft, currentRight := net.Pipe() defer currentRight.Close() current := server.bootstrapAcceptedLogical("dedicated-attach-current", nil, currentLeft) if current == nil { t.Fatal("bootstrapAcceptedLogical(current) should return logical") } target := server.bootstrapAcceptedLogical("dedicated-attach-target", nil, nil) if target == nil { t.Fatal("bootstrapAcceptedLogical(target) should return logical") } bulk := newBulkHandle(context.Background(), server.getBulkRuntime(), serverFileScope(target), BulkOpenRequest{ BulkID: "server-dedicated-attach-test", DataID: 7, Dedicated: true, AttachToken: "attach-token", }, 0, target, nil, 0, nil, nil, nil, nil, nil) bulk.markAcceptHandled() if err := server.getBulkRuntime().register(serverFileScope(target), bulk); err != nil { t.Fatalf("register bulk runtime failed: %v", err) } attachedLeft, attachedRight := net.Pipe() defer attachedRight.Close() if err := bulk.attachDedicatedConn(attachedLeft); err != nil { t.Fatalf("attachDedicatedConn failed: %v", err) } resolvedLogical, resolvedBulk, err := server.resolveInboundDedicatedBulk(current, bulkAttachRequest{ PeerID: target.ID(), BulkID: bulk.ID(), AttachToken: "attach-token", }) if err != nil { t.Fatalf("resolveInboundDedicatedBulk failed: %v", err) } if resolvedLogical != target { t.Fatalf("resolved logical mismatch: got %v want %v", resolvedLogical, target) } if resolvedBulk != bulk { t.Fatalf("resolved bulk mismatch: got %v want %v", resolvedBulk, bulk) } }