- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层 - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径 - 完成 transfer/file 传输内核与状态快照、诊断能力 - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块 - 增加大规模回归、并发与基准测试覆盖 - 更新依赖库
104 lines
2.2 KiB
Go
104 lines
2.2 KiB
Go
package notify
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestInboundDispatcherSerializesPerSource(t *testing.T) {
|
|
dispatcher := newInboundDispatcher()
|
|
defer dispatcher.CloseAndWait()
|
|
|
|
firstStarted := make(chan struct{}, 1)
|
|
secondStarted := make(chan struct{}, 1)
|
|
otherStarted := make(chan struct{}, 1)
|
|
releaseFirst := make(chan struct{})
|
|
|
|
var mu sync.Mutex
|
|
var order []string
|
|
|
|
record := func(step string) {
|
|
mu.Lock()
|
|
order = append(order, step)
|
|
mu.Unlock()
|
|
}
|
|
|
|
if !dispatcher.Dispatch("alpha", func() {
|
|
record("alpha-1-start")
|
|
firstStarted <- struct{}{}
|
|
<-releaseFirst
|
|
record("alpha-1-end")
|
|
}) {
|
|
t.Fatal("dispatch alpha-1 failed")
|
|
}
|
|
if !dispatcher.Dispatch("alpha", func() {
|
|
record("alpha-2-start")
|
|
secondStarted <- struct{}{}
|
|
record("alpha-2-end")
|
|
}) {
|
|
t.Fatal("dispatch alpha-2 failed")
|
|
}
|
|
if !dispatcher.Dispatch("beta", func() {
|
|
record("beta-1-start")
|
|
otherStarted <- struct{}{}
|
|
record("beta-1-end")
|
|
}) {
|
|
t.Fatal("dispatch beta-1 failed")
|
|
}
|
|
|
|
select {
|
|
case <-firstStarted:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for alpha-1")
|
|
}
|
|
select {
|
|
case <-otherStarted:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for beta-1")
|
|
}
|
|
select {
|
|
case <-secondStarted:
|
|
t.Fatal("alpha-2 started before alpha-1 finished")
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
|
|
close(releaseFirst)
|
|
|
|
select {
|
|
case <-secondStarted:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for alpha-2")
|
|
}
|
|
|
|
dispatcher.CloseAndWait()
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if len(order) == 0 {
|
|
t.Fatal("dispatch order is empty")
|
|
}
|
|
alpha1Start := indexOfString(order, "alpha-1-start")
|
|
alpha1End := indexOfString(order, "alpha-1-end")
|
|
alpha2Start := indexOfString(order, "alpha-2-start")
|
|
beta1Start := indexOfString(order, "beta-1-start")
|
|
if alpha1Start < 0 || alpha1End < 0 || alpha2Start < 0 || beta1Start < 0 {
|
|
t.Fatalf("unexpected order trace: %v", order)
|
|
}
|
|
if alpha2Start < alpha1End {
|
|
t.Fatalf("alpha source was not serialized: %v", order)
|
|
}
|
|
if beta1Start > alpha1End {
|
|
t.Fatalf("beta source did not run in parallel window: %v", order)
|
|
}
|
|
}
|
|
|
|
func indexOfString(list []string, target string) int {
|
|
for idx, item := range list {
|
|
if item == target {
|
|
return idx
|
|
}
|
|
}
|
|
return -1
|
|
}
|