- 新增 managed/external/nested 三种传输保护模式 - 新增 peer attach 显式认证、抗重放、channel binding 和可选前向保密协商 - 明确单连接注入与可重拨连接源的语义边界 - 禁止 ConnectByConn 场景下 dedicated bulk 走 sidecar,auto 模式自动回退 shared - 修正 dedicated attach 在 bootstrap/steady profile 切换下的处理逻辑 - 优化 shared bulk super-batch 与批量 framed write 路径 - 降低 stream/bulk fast path 的复制和分发损耗 - 补齐 benchmark、回归测试、运行时快照和 README 文档
197 lines
6.0 KiB
Go
197 lines
6.0 KiB
Go
package notify
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestBulkDedicatedLaneBatchRequestPrepareBorrowedSharesItems(t *testing.T) {
|
|
req := getBulkDedicatedLaneBatchRequest()
|
|
defer req.recycle()
|
|
|
|
items := []bulkDedicatedSendRequest{{
|
|
Type: bulkFastPayloadTypeData,
|
|
Seq: 7,
|
|
Payload: []byte("hello"),
|
|
}}
|
|
req.prepare(context.Background(), 11, items, true, true)
|
|
|
|
if got, want := len(req.Items), 1; got != want {
|
|
t.Fatalf("prepared item count = %d, want %d", got, want)
|
|
}
|
|
if &req.Items[0] != &items[0] {
|
|
t.Fatal("prepare with borrowed items should share request items")
|
|
}
|
|
}
|
|
|
|
func TestBulkDedicatedLaneSenderTryDirectSubmitWriteFlushesWholePayload(t *testing.T) {
|
|
conn := &shortWriteBulkRecordConn{maxPerWrite: 1024}
|
|
encodeCalls := 0
|
|
sender := &bulkDedicatedLaneSender{
|
|
conn: conn,
|
|
stopCh: make(chan struct{}),
|
|
encode: func(batches []bulkDedicatedOutboundBatch) ([]byte, func(), error) {
|
|
encodeCalls++
|
|
payload, err := encodeBulkDedicatedBatchesPlain(batches)
|
|
return payload, nil, err
|
|
},
|
|
}
|
|
|
|
payload := bytes.Repeat([]byte("a"), 3*defaultBulkChunkSize)
|
|
submitted, written, err := sender.tryDirectSubmitWrite(context.Background(), 9, 1, payload, defaultBulkChunkSize)
|
|
if err != nil {
|
|
t.Fatalf("tryDirectSubmitWrite error = %v", err)
|
|
}
|
|
if !submitted {
|
|
t.Fatal("tryDirectSubmitWrite should submit directly")
|
|
}
|
|
if got, want := written, len(payload); got != want {
|
|
t.Fatalf("written = %d, want %d", got, want)
|
|
}
|
|
if encodeCalls == 0 {
|
|
t.Fatal("encode should be called at least once")
|
|
}
|
|
if got := sender.queued.Load(); got != 0 {
|
|
t.Fatalf("queued requests = %d, want 0", got)
|
|
}
|
|
}
|
|
|
|
func TestBulkDedicatedLaneSenderCollectBatchRequestsBatchesAcrossDataIDs(t *testing.T) {
|
|
sender := &bulkDedicatedLaneSender{
|
|
reqCh: make(chan *bulkDedicatedLaneBatchRequest, 3),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
first := &bulkDedicatedLaneBatchRequest{
|
|
Ctx: context.Background(),
|
|
DataID: 7,
|
|
Items: []bulkDedicatedSendRequest{{
|
|
Type: bulkFastPayloadTypeData,
|
|
Seq: 1,
|
|
Payload: []byte("a"),
|
|
}},
|
|
Deadline: time.Now().Add(2 * time.Second),
|
|
}
|
|
if !first.tryStart() {
|
|
t.Fatal("first request should start")
|
|
}
|
|
|
|
second := &bulkDedicatedLaneBatchRequest{
|
|
Ctx: context.Background(),
|
|
DataID: 7,
|
|
Items: []bulkDedicatedSendRequest{{
|
|
Type: bulkFastPayloadTypeData,
|
|
Seq: 2,
|
|
Payload: []byte("b"),
|
|
}},
|
|
Deadline: time.Now().Add(time.Second),
|
|
}
|
|
third := &bulkDedicatedLaneBatchRequest{
|
|
Ctx: context.Background(),
|
|
DataID: 8,
|
|
Items: []bulkDedicatedSendRequest{{
|
|
Type: bulkFastPayloadTypeData,
|
|
Seq: 3,
|
|
Payload: []byte("c"),
|
|
}},
|
|
Deadline: time.Now().Add(3 * time.Second),
|
|
}
|
|
|
|
sender.reqCh <- second
|
|
sender.reqCh <- third
|
|
|
|
batchReqs := []*bulkDedicatedLaneBatchRequest{first}
|
|
batches := []bulkDedicatedOutboundBatch{{
|
|
DataID: first.DataID,
|
|
Items: first.Items,
|
|
}}
|
|
batchBytes := bulkDedicatedBatchesPlainLen(batches)
|
|
deadline := first.Deadline
|
|
|
|
carry, err := sender.collectBatchRequests(&batchReqs, &batches, &batchBytes, &deadline)
|
|
if err != nil {
|
|
t.Fatalf("collectBatchRequests error = %v", err)
|
|
}
|
|
if carry != nil {
|
|
t.Fatalf("carry request = %+v, want nil", carry)
|
|
}
|
|
if len(batchReqs) != 3 {
|
|
t.Fatalf("batched request count = %d, want 3", len(batchReqs))
|
|
}
|
|
if len(batches) != 2 {
|
|
t.Fatalf("batched group count = %d, want 2", len(batches))
|
|
}
|
|
if got, want := batches[0].DataID, uint64(7); got != want {
|
|
t.Fatalf("first batch dataID = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batches[0].Items), 2; got != want {
|
|
t.Fatalf("first batch item count = %d, want %d", got, want)
|
|
}
|
|
if got, want := batches[1].DataID, uint64(8); got != want {
|
|
t.Fatalf("second batch dataID = %d, want %d", got, want)
|
|
}
|
|
if got, want := len(batches[1].Items), 1; got != want {
|
|
t.Fatalf("second batch item count = %d, want %d", got, want)
|
|
}
|
|
if got, want := batches[0].Items[0].Seq, uint64(1); got != want {
|
|
t.Fatalf("first batch seq = %d, want %d", got, want)
|
|
}
|
|
if got, want := batches[0].Items[1].Seq, uint64(2); got != want {
|
|
t.Fatalf("second batch seq = %d, want %d", got, want)
|
|
}
|
|
if got, want := batches[1].Items[0].Seq, uint64(3); got != want {
|
|
t.Fatalf("third batch seq = %d, want %d", got, want)
|
|
}
|
|
if !deadline.Equal(second.Deadline) {
|
|
t.Fatalf("merged deadline = %v, want %v", deadline, second.Deadline)
|
|
}
|
|
}
|
|
|
|
func TestDedicatedBulkSuperBatchRoundTrip(t *testing.T) {
|
|
batches := []bulkDedicatedOutboundBatch{
|
|
{
|
|
DataID: 7,
|
|
Items: []bulkDedicatedSendRequest{
|
|
{Type: bulkFastPayloadTypeData, Seq: 1, Payload: []byte("alpha")},
|
|
{Type: bulkFastPayloadTypeData, Seq: 2, Payload: []byte("beta")},
|
|
},
|
|
},
|
|
{
|
|
DataID: 8,
|
|
Items: []bulkDedicatedSendRequest{
|
|
{Type: bulkFastPayloadTypeClose, Flags: bulkFastPayloadFlagFullClose, Seq: 3},
|
|
{Type: bulkFastPayloadTypeRelease, Seq: 4, Payload: []byte{1, 2, 3, 4}},
|
|
},
|
|
},
|
|
}
|
|
|
|
plain, err := encodeBulkDedicatedBatchesPlain(batches)
|
|
if err != nil {
|
|
t.Fatalf("encodeBulkDedicatedBatchesPlain error = %v", err)
|
|
}
|
|
got, err := decodeDedicatedBulkInboundBatches(plain)
|
|
if err != nil {
|
|
t.Fatalf("decodeDedicatedBulkInboundBatches error = %v", err)
|
|
}
|
|
if len(got) != 2 {
|
|
t.Fatalf("decoded batch count = %d, want 2", len(got))
|
|
}
|
|
if got[0].DataID != 7 || got[1].DataID != 8 {
|
|
t.Fatalf("decoded dataIDs = %d,%d, want 7,8", got[0].DataID, got[1].DataID)
|
|
}
|
|
if len(got[0].Items) != 2 || len(got[1].Items) != 2 {
|
|
t.Fatalf("decoded item counts = %d,%d, want 2,2", len(got[0].Items), len(got[1].Items))
|
|
}
|
|
if !bytes.Equal(got[0].Items[0].Payload, []byte("alpha")) || !bytes.Equal(got[0].Items[1].Payload, []byte("beta")) {
|
|
t.Fatalf("decoded first batch payloads mismatch: %+v", got[0].Items)
|
|
}
|
|
if got[1].Items[0].Type != bulkFastPayloadTypeClose || got[1].Items[0].Flags != bulkFastPayloadFlagFullClose {
|
|
t.Fatalf("decoded close item mismatch: %+v", got[1].Items[0])
|
|
}
|
|
if got[1].Items[1].Type != bulkFastPayloadTypeRelease || !bytes.Equal(got[1].Items[1].Payload, []byte{1, 2, 3, 4}) {
|
|
t.Fatalf("decoded release item mismatch: %+v", got[1].Items[1])
|
|
}
|
|
}
|