notify/file_ack_test.go

194 lines
4.8 KiB
Go
Raw Permalink Normal View History

package notify
import "testing"
func TestFileAckPoolPreparedWaitConsumesEarlyAck(t *testing.T) {
pool := newFileAckPool()
wait := pool.prepare("client:a", "file-1", "chunk", 64)
ok := pool.deliver("client:a", FileEvent{
Packet: FilePacket{
FileID: "file-1",
Stage: "chunk",
Offset: 64,
},
})
if !ok {
t.Fatalf("deliver should match prepared waiter")
}
if err := pool.waitPrepared(wait, defaultFileAckTimeout); err != nil {
t.Fatalf("waitPrepared failed: %v", err)
}
}
func TestFileAckPoolPreparedWaitReturnsAckError(t *testing.T) {
pool := newFileAckPool()
wait := pool.prepare("client:a", "file-2", "meta", 0)
ok := pool.deliver("client:a", FileEvent{
Packet: FilePacket{
FileID: "file-2",
Stage: "meta",
Offset: 0,
Error: "checksum mismatch",
},
})
if !ok {
t.Fatalf("deliver should match prepared waiter")
}
err := pool.waitPrepared(wait, defaultFileAckTimeout)
if err == nil {
t.Fatal("waitPrepared should return ack error")
}
if got, want := err.Error(), "checksum mismatch"; got != want {
t.Fatalf("unexpected error: %v", err)
}
}
func TestFileAckPoolCancelRemovesPreparedWaiter(t *testing.T) {
pool := newFileAckPool()
wait := pool.prepare("client:a", "file-3", "end", 0)
wait.cancel()
ok := pool.deliver("client:a", FileEvent{
Packet: FilePacket{
FileID: "file-3",
Stage: "end",
Offset: 0,
},
})
if ok {
t.Fatal("deliver should not match canceled waiter")
}
}
func TestFileAckPoolScopeIsolation(t *testing.T) {
pool := newFileAckPool()
waitA := pool.prepare("server:client-a", "file-4", "chunk", 128)
waitB := pool.prepare("server:client-b", "file-4", "chunk", 128)
ok := pool.deliver("server:client-a", FileEvent{
Packet: FilePacket{
FileID: "file-4",
Stage: "chunk",
Offset: 128,
},
})
if !ok {
t.Fatal("deliver should match scopeA waiter")
}
if err := pool.waitPrepared(waitA, defaultFileAckTimeout); err != nil {
t.Fatalf("waitPrepared scopeA failed: %v", err)
}
ok = pool.deliver("server:client-a", FileEvent{
Packet: FilePacket{
FileID: "file-4",
Stage: "chunk",
Offset: 128,
},
})
if ok {
t.Fatal("scopeA ack should not consume scopeB waiter")
}
ok = pool.deliver("server:client-b", FileEvent{
Packet: FilePacket{
FileID: "file-4",
Stage: "chunk",
Offset: 128,
},
})
if !ok {
t.Fatal("deliver should match scopeB waiter")
}
if err := pool.waitPrepared(waitB, defaultFileAckTimeout); err != nil {
t.Fatalf("waitPrepared scopeB failed: %v", err)
}
}
func TestFileAckPoolCloseAllCancelsPreparedWaiters(t *testing.T) {
pool := newFileAckPool()
wait := pool.prepare("client:a", "file-5", "chunk", 256)
pool.closeAll()
err := pool.waitPrepared(wait, defaultFileAckTimeout)
if err == nil {
t.Fatal("waitPrepared should return cancel error after closeAll")
}
if got, want := err.Error(), "file ack canceled"; got != want {
t.Fatalf("unexpected error after closeAll: got %q want %q", got, want)
}
}
func TestFileAckPoolCloseScopeCancelsMatchingWaitersOnly(t *testing.T) {
pool := newFileAckPool()
waitA := pool.prepare("server:client-a", "file-6", "chunk", 256)
waitB := pool.prepare("server:client-b", "file-6", "chunk", 256)
pool.closeScope("server:client-a")
err := pool.waitPrepared(waitA, defaultFileAckTimeout)
if err == nil {
t.Fatal("scopeA waiter should be canceled")
}
if got, want := err.Error(), "file ack canceled"; got != want {
t.Fatalf("unexpected scopeA error: got %q want %q", got, want)
}
ok := pool.deliver("server:client-b", FileEvent{
Packet: FilePacket{
FileID: "file-6",
Stage: "chunk",
Offset: 256,
},
})
if !ok {
t.Fatal("scopeB waiter should remain deliverable")
}
if err := pool.waitPrepared(waitB, defaultFileAckTimeout); err != nil {
t.Fatalf("waitPrepared scopeB failed: %v", err)
}
}
func TestServerRemoveClientClosesScopedFileAckWaiters(t *testing.T) {
server := NewServer().(*ServerCommon)
clientA := &ClientConn{ClientID: "client-a"}
clientB := &ClientConn{ClientID: "client-b"}
pool := server.getFileAckPool()
waitA := pool.prepare(serverFileScope(clientA), "file-7", "end", 0)
waitB := pool.prepare(serverFileScope(clientB), "file-7", "end", 0)
server.removeClient(clientA)
err := pool.waitPrepared(waitA, defaultFileAckTimeout)
if err == nil {
t.Fatal("clientA waiter should be canceled when client is removed")
}
if got, want := err.Error(), "file ack canceled"; got != want {
t.Fatalf("unexpected clientA error: got %q want %q", got, want)
}
ok := pool.deliver(serverFileScope(clientB), FileEvent{
Packet: FilePacket{
FileID: "file-7",
Stage: "end",
Offset: 0,
},
})
if !ok {
t.Fatal("clientB waiter should remain deliverable")
}
if err := pool.waitPrepared(waitB, defaultFileAckTimeout); err != nil {
t.Fatalf("waitPrepared clientB failed: %v", err)
}
}