notify/record_codec_test.go
starainrt 7ed3dd5b37
feat: 完善 RecordStream 的协议协商、运行观测与文档说明
- 将 RecordStream 出站路径收敛为单 writer loop
  - 支持在 batch header 中 piggyback AckSeq,保留独立 ack 作为兼容回退
  - 增加 record stream 打开阶段能力协商,支持 mixed-version peer 自动降级
  - 补充 RecordSnapshot 与 diagnostics summary 的 record-plane 观测项
  - 增加 batch/ack/error frame、piggyback ack、barrier 等待拆分与 apply backlog 指标
  - 收紧 TransportConn detach 后的 runtime snapshot 语义
  - 补充 README 中的 RecordStream 语义、兼容行为与诊断快照说明
  - 补充相关单测与 race 回归验证
2026-04-15 19:52:45 +08:00

74 lines
2.4 KiB
Go

package notify
import "testing"
func TestEncodeDecodeRecordBatchFrameV1(t *testing.T) {
batch := []recordOutboundMessage{
{Seq: 7, Payload: []byte("alpha")},
{Seq: 8, Payload: []byte("beta")},
}
payload, err := encodeRecordBatchFrame(batch, 0, false)
if err != nil {
t.Fatalf("encodeRecordBatchFrame v1 failed: %v", err)
}
frame, err := decodeRecordFrame(payload)
if err != nil {
t.Fatalf("decodeRecordFrame v1 failed: %v", err)
}
if got, want := frame.Version, uint8(recordFrameVersionV1); got != want {
t.Fatalf("frame version = %d, want %d", got, want)
}
if got, want := frame.Type, recordFrameTypeBatch; got != want {
t.Fatalf("frame type = %d, want %d", got, want)
}
if frame.AckSeq != 0 {
t.Fatalf("frame ack seq = %d, want 0", frame.AckSeq)
}
if got, want := len(frame.Batch), len(batch); got != want {
t.Fatalf("batch len = %d, want %d", got, want)
}
for i := range batch {
if got, want := frame.Batch[i].Seq, batch[i].Seq; got != want {
t.Fatalf("batch[%d].seq = %d, want %d", i, got, want)
}
if got, want := string(frame.Batch[i].Payload), string(batch[i].Payload); got != want {
t.Fatalf("batch[%d].payload = %q, want %q", i, got, want)
}
}
}
func TestEncodeDecodeRecordBatchFrameV2CarriesAckSeq(t *testing.T) {
batch := []recordOutboundMessage{
{Seq: 11, Payload: []byte("alpha")},
{Seq: 12, Payload: []byte("beta")},
}
payload, err := encodeRecordBatchFrame(batch, 9, true)
if err != nil {
t.Fatalf("encodeRecordBatchFrame v2 failed: %v", err)
}
frame, err := decodeRecordFrame(payload)
if err != nil {
t.Fatalf("decodeRecordFrame v2 failed: %v", err)
}
if got, want := frame.Version, uint8(recordFrameVersionV2); got != want {
t.Fatalf("frame version = %d, want %d", got, want)
}
if got, want := frame.Type, recordFrameTypeBatch; got != want {
t.Fatalf("frame type = %d, want %d", got, want)
}
if got, want := frame.AckSeq, uint64(9); got != want {
t.Fatalf("frame ack seq = %d, want %d", got, want)
}
if got, want := len(frame.Batch), len(batch); got != want {
t.Fatalf("batch len = %d, want %d", got, want)
}
for i := range batch {
if got, want := frame.Batch[i].Seq, batch[i].Seq; got != want {
t.Fatalf("batch[%d].seq = %d, want %d", i, got, want)
}
if got, want := string(frame.Batch[i].Payload), string(batch[i].Payload); got != want {
t.Fatalf("batch[%d].payload = %q, want %q", i, got, want)
}
}
}