package notify import ( "context" "errors" "testing" "time" ) func TestCleanupFailedClientStartClearsSessionResources(t *testing.T) { client := NewClient().(*ClientCommon) pending := client.getPendingWaitPool().createAndStore(TransferMsg{ID: 101, Type: MSG_SYNC_ASK}) fileWait := client.getFileAckPool().prepare("scope-client", "file-client", "chunk", 0) signalWait := client.getSignalAckPool().prepare("scope-client", 202) _ = client.getReceivedSignalCache().seenOrRemember("scope-client", 303) client.cleanupFailedClientStart() select { case _, ok := <-pending.Reply: if ok { t.Fatal("pending wait reply channel should be closed") } default: t.Fatal("pending wait reply channel should be closed immediately") } select { case _, ok := <-fileWait.reply: if ok { t.Fatal("file ack wait channel should be closed") } default: t.Fatal("file ack wait channel should be closed immediately") } select { case _, ok := <-signalWait.reply: if ok { t.Fatal("signal ack wait channel should be closed") } default: t.Fatal("signal ack wait channel should be closed immediately") } if duplicate := client.getReceivedSignalCache().seenOrRemember("scope-client", 303); duplicate { t.Fatal("received signal cache should be cleared by cleanup") } } func TestCleanupFailedServerStartClearsSessionResources(t *testing.T) { server := NewServer().(*ServerCommon) pending := server.getPendingWaitPool().createAndStore(TransferMsg{ID: 111, Type: MSG_SYNC_ASK}) fileWait := server.getFileAckPool().prepare("scope-server", "file-server", "chunk", 0) signalWait := server.getSignalAckPool().prepare("scope-server", 222) _ = server.getReceivedSignalCache().seenOrRemember("scope-server", 333) server.cleanupFailedServerStart() select { case _, ok := <-pending.Reply: if ok { t.Fatal("pending wait reply channel should be closed") } default: t.Fatal("pending wait reply channel should be closed immediately") } select { case _, ok := <-fileWait.reply: if ok { t.Fatal("file ack wait channel should be closed") } default: t.Fatal("file ack wait channel should be closed immediately") } select { case _, ok := <-signalWait.reply: if ok { t.Fatal("signal ack wait channel should be closed") } default: t.Fatal("signal ack wait channel should be closed immediately") } if duplicate := server.getReceivedSignalCache().seenOrRemember("scope-server", 333); duplicate { t.Fatal("received signal cache should be cleared by cleanup") } } func TestCleanupClientSessionResourcesResetsActiveIOWithServiceShutdown(t *testing.T) { client := NewClient().(*ClientCommon) stream := newStreamHandle(context.Background(), client.getStreamRuntime(), clientFileScope(), StreamOpenRequest{ StreamID: "cleanup-client-stream", Channel: StreamDataChannel, }, 0, nil, nil, 0, nil, nil, nil, defaultStreamConfig()) if err := client.getStreamRuntime().register(clientFileScope(), stream); err != nil { t.Fatalf("register client stream failed: %v", err) } bulk := newBulkHandle(context.Background(), client.getBulkRuntime(), clientFileScope(), BulkOpenRequest{ BulkID: "cleanup-client-bulk", DataID: 1, Range: BulkRange{Length: 1}, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) if err := client.getBulkRuntime().register(clientFileScope(), bulk); err != nil { t.Fatalf("register client bulk failed: %v", err) } client.cleanupClientSessionResources() if err := readStreamError(t, stream, time.Second); !errors.Is(err, errServiceShutdown) { t.Fatalf("client cleanup stream error = %v, want %v", err, errServiceShutdown) } if err := readBulkError(t, bulk, time.Second); !errors.Is(err, errServiceShutdown) { t.Fatalf("client cleanup bulk error = %v, want %v", err, errServiceShutdown) } if _, ok := client.getStreamRuntime().lookup(clientFileScope(), "cleanup-client-stream"); ok { t.Fatal("client cleanup should remove stream runtime entry") } if _, ok := client.getBulkRuntime().lookup(clientFileScope(), "cleanup-client-bulk"); ok { t.Fatal("client cleanup should remove bulk runtime entry") } } func TestCleanupServerSessionResourcesResetsActiveIOWithServiceShutdown(t *testing.T) { server := NewServer().(*ServerCommon) scope := "cleanup-server" stream := newStreamHandle(context.Background(), server.getStreamRuntime(), scope, StreamOpenRequest{ StreamID: "cleanup-server-stream", Channel: StreamDataChannel, }, 0, nil, nil, 0, nil, nil, nil, defaultStreamConfig()) if err := server.getStreamRuntime().register(scope, stream); err != nil { t.Fatalf("register server stream failed: %v", err) } bulk := newBulkHandle(context.Background(), server.getBulkRuntime(), scope, BulkOpenRequest{ BulkID: "cleanup-server-bulk", DataID: 1, Range: BulkRange{Length: 1}, }, 0, nil, nil, 0, nil, nil, nil, nil, nil) if err := server.getBulkRuntime().register(scope, bulk); err != nil { t.Fatalf("register server bulk failed: %v", err) } server.cleanupServerSessionResources() if err := readStreamError(t, stream, time.Second); !errors.Is(err, errServiceShutdown) { t.Fatalf("server cleanup stream error = %v, want %v", err, errServiceShutdown) } if err := readBulkError(t, bulk, time.Second); !errors.Is(err, errServiceShutdown) { t.Fatalf("server cleanup bulk error = %v, want %v", err, errServiceShutdown) } if _, ok := server.getStreamRuntime().lookup(scope, "cleanup-server-stream"); ok { t.Fatal("server cleanup should remove stream runtime entry") } if _, ok := server.getBulkRuntime().lookup(scope, "cleanup-server-bulk"); ok { t.Fatal("server cleanup should remove bulk runtime entry") } }