From 7ed3dd5b37551d612fb797e45bbb3194469ffafb Mon Sep 17 00:00:00 2001 From: starainrt Date: Wed, 15 Apr 2026 19:52:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E5=96=84=20RecordStream=20?= =?UTF-8?q?=E7=9A=84=E5=8D=8F=E8=AE=AE=E5=8D=8F=E5=95=86=E3=80=81=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E8=A7=82=E6=B5=8B=E4=B8=8E=E6=96=87=E6=A1=A3=E8=AF=B4?= =?UTF-8?q?=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将 RecordStream 出站路径收敛为单 writer loop - 支持在 batch header 中 piggyback AckSeq,保留独立 ack 作为兼容回退 - 增加 record stream 打开阶段能力协商,支持 mixed-version peer 自动降级 - 补充 RecordSnapshot 与 diagnostics summary 的 record-plane 观测项 - 增加 batch/ack/error frame、piggyback ack、barrier 等待拆分与 apply backlog 指标 - 收紧 TransportConn detach 后的 runtime snapshot 语义 - 补充 README 中的 RecordStream 语义、兼容行为与诊断快照说明 - 补充相关单测与 race 回归验证 --- README.md | 46 +++++ client_record.go | 2 + client_stream.go | 1 + diagnostics_snapshot.go | 103 +++++++++++ diagnostics_snapshot_test.go | 254 +++++++++++++++++++++++++- record_codec.go | 150 ++++++++++++---- record_codec_test.go | 73 ++++++++ record_negotiation.go | 48 +++++ record_negotiation_test.go | 78 ++++++++ record_runtime.go | 127 ++++++++++++- record_snapshot.go | 252 ++++++++++++++++++++++++++ record_stream.go | 340 ++++++++++++++++++++++++----------- server_record.go | 3 + server_stream.go | 2 + stream_control.go | 3 + transport_conn.go | 4 + 16 files changed, 1341 insertions(+), 145 deletions(-) create mode 100644 record_codec_test.go create mode 100644 record_negotiation.go create mode 100644 record_negotiation_test.go create mode 100644 record_snapshot.go diff --git a/README.md b/README.md index 57d4f19..204b70c 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ - 记录流数据面:`OpenRecordStream` - 批量数据面:`OpenBulk`(`shared` / `dedicated`) - 文件传输内核:transfer control / progress / resume +- 观测面:runtime snapshot / diagnostics summary - 会话模型:`LogicalConn`(逻辑会话)与 `TransportConn`(物理承载)分离 ## 版本要求 @@ -82,6 +83,51 @@ func main() { } ``` +## RecordStream 说明 + +`RecordStream` 构建在 `Stream` 之上,适合“有边界的顺序记录”场景。 + +- 写入入口:`OpenRecordStream`、`WriteRecord` +- 接收入口:`ReadRecord` +- 确认入口:`AckRecord` +- 检查点:`Barrier`、`BarrierTo` +- 错误回包:`RecordFailure` + +确认语义: + +- `AckRecord` 表示“该序号及其之前的连续记录已完成 apply”,不是“已收到” +- `Barrier` / `BarrierTo` 等待的是对端 `apply-complete` 的最大连续序号 +- `RecordFailure` 会返回 `FailedSeq`、`Code`、`Retryable`、`Message` + +兼容与传输: + +- record stream 在打开阶段协商 batch ack 能力 +- 双端都支持时,累计 `AckSeq` 会随 batch header piggyback 发送 +- 对端不支持时,自动回退到独立 ack frame +- mixed-version peer 可以互通,不要求双方同时升级 + +## 诊断快照 + +顶层诊断入口: + +- `GetClientDiagnosticsSnapshot` +- `GetServerDiagnosticsSnapshot` + +快照内容: + +- 会话运行态:client / server runtime +- 数据面快照:`StreamSnapshot`、`BulkSnapshot`、`RecordSnapshot` +- 文件传输快照:`TransferSnapshot` +- 汇总视图:`DiagnosticsSummary` + +`RecordSnapshot` / `DiagnosticsSummary.RecordTelemetry` 当前覆盖: + +- batch / ack / error frame 收发计数 +- piggyback ack 命中计数 +- barrier 等待时间拆分:`flush` / `apply` +- `outstanding records/bytes` +- `pending apply / pending ack / peak pending apply` + ## 传输与 IPC - `tcp` diff --git a/client_record.go b/client_record.go index de5a708..c3b85ff 100644 --- a/client_record.go +++ b/client_record.go @@ -24,6 +24,7 @@ func (c *ClientCommon) OpenRecordStream(ctx context.Context, opt RecordOpenOptio _ = stream.Reset(err) return nil, err } + bindRecordRuntime(record, c.getRecordRuntime()) return record, nil } @@ -51,6 +52,7 @@ func (c *ClientCommon) claimInboundRecordStream(stream *streamHandle) (bool, err if err != nil { return true, err } + bindRecordRuntime(record, runtime) info := RecordAcceptInfo{ ID: stream.ID(), Metadata: stream.Metadata(), diff --git a/client_stream.go b/client_stream.go index dea3762..567ea34 100644 --- a/client_stream.go +++ b/client_stream.go @@ -35,6 +35,7 @@ func (c *ClientCommon) OpenStream(ctx context.Context, opt StreamOpenOptions) (S if resp.DataID != 0 { req.DataID = resp.DataID } + req.Metadata = mergeStreamMetadata(req.Metadata, resp.Metadata) stream := newStreamHandle(c.clientStopContextSnapshot(), runtime, clientFileScope(), req, c.currentClientSessionEpoch(), nil, nil, resp.TransportGeneration, clientStreamCloseSender(c), clientStreamResetSender(c), clientStreamDataSender(c, c.currentClientSessionEpoch()), runtime.configSnapshot()) stream.setClientSnapshotOwner(c) stream.setAddrSnapshot(c.clientStreamAddrSnapshot()) diff --git a/diagnostics_snapshot.go b/diagnostics_snapshot.go index 98c9601..bc177ea 100644 --- a/diagnostics_snapshot.go +++ b/diagnostics_snapshot.go @@ -34,6 +34,27 @@ type DiagnosticsTransferTelemetrySummary struct { CommitWaitRatio float64 } +type DiagnosticsRecordTelemetrySummary struct { + BatchFramesSent int64 + AckFramesSent int64 + ErrorFramesSent int64 + BatchFramesReceived int64 + AckFramesReceived int64 + ErrorFramesReceived int64 + FrameSendCount int64 + FrameReceiveCount int64 + PiggybackAckSent int64 + PiggybackAckReceived int64 + BarrierCount int64 + BarrierFlushWaitDuration time.Duration + BarrierApplyWaitDuration time.Duration + OutstandingRecords int + OutstandingBytes int + PendingApplyRecords int + PendingAckRecords int + PeakPendingApplyRecords int +} + type DiagnosticsSummary struct { LogicalCount int CurrentTransportCount int @@ -49,6 +70,11 @@ type DiagnosticsSummary struct { StaleBulkCount int ResetBulkCount int + RecordCount int + ActiveRecordCount int + StaleRecordCount int + ResetRecordCount int + TransferCount int ActiveTransferCount int PausedTransferCount int @@ -58,6 +84,8 @@ type DiagnosticsSummary struct { StreamResetCauses DiagnosticsResetCauseSummary BulkResetCauses DiagnosticsResetCauseSummary + RecordResetCauses DiagnosticsResetCauseSummary + RecordTelemetry DiagnosticsRecordTelemetrySummary TransferTelemetry DiagnosticsTransferTelemetrySummary } @@ -65,6 +93,7 @@ type ClientDiagnosticsSnapshot struct { Runtime ClientRuntimeSnapshot Streams []StreamSnapshot Bulks []BulkSnapshot + Records []RecordSnapshot Transfers []TransferSnapshot Summary DiagnosticsSummary } @@ -75,6 +104,7 @@ type ServerDiagnosticsSnapshot struct { CurrentTransports []TransportConnRuntimeSnapshot Streams []StreamSnapshot Bulks []BulkSnapshot + Records []RecordSnapshot Transfers []TransferSnapshot Summary DiagnosticsSummary } @@ -100,6 +130,10 @@ func GetClientDiagnosticsSnapshot(c Client) (ClientDiagnosticsSnapshot, error) { if err != nil { return ClientDiagnosticsSnapshot{}, err } + records, err := GetClientRecordSnapshots(c) + if err != nil { + return ClientDiagnosticsSnapshot{}, err + } transfers, err := GetClientTransferSnapshots(c) if err != nil { return ClientDiagnosticsSnapshot{}, err @@ -108,6 +142,7 @@ func GetClientDiagnosticsSnapshot(c Client) (ClientDiagnosticsSnapshot, error) { Runtime: runtime, Streams: streams, Bulks: bulks, + Records: records, Transfers: transfers, } snapshot.Summary = summarizeClientDiagnosticsSnapshot(snapshot) @@ -138,6 +173,10 @@ func GetServerDiagnosticsSnapshot(s Server) (ServerDiagnosticsSnapshot, error) { if err != nil { return ServerDiagnosticsSnapshot{}, err } + records, err := GetServerRecordSnapshots(s) + if err != nil { + return ServerDiagnosticsSnapshot{}, err + } transfers, err := GetServerTransferSnapshots(s) if err != nil { return ServerDiagnosticsSnapshot{}, err @@ -148,6 +187,7 @@ func GetServerDiagnosticsSnapshot(s Server) (ServerDiagnosticsSnapshot, error) { CurrentTransports: transports, Streams: streams, Bulks: bulks, + Records: records, Transfers: transfers, } snapshot.Summary = summarizeServerDiagnosticsSnapshot(snapshot) @@ -203,6 +243,7 @@ func summarizeClientDiagnosticsSnapshot(snapshot ClientDiagnosticsSnapshot) Diag } summarizeStreamSnapshots(&summary, snapshot.Streams) summarizeBulkSnapshots(&summary, snapshot.Bulks) + summarizeRecordSnapshots(&summary, snapshot.Records) summarizeTransferSnapshots(&summary, snapshot.Transfers) return summary } @@ -214,6 +255,7 @@ func summarizeServerDiagnosticsSnapshot(snapshot ServerDiagnosticsSnapshot) Diag } summarizeStreamSnapshots(&summary, snapshot.Streams) summarizeBulkSnapshots(&summary, snapshot.Bulks) + summarizeRecordSnapshots(&summary, snapshot.Records) summarizeTransferSnapshots(&summary, snapshot.Transfers) return summary } @@ -266,6 +308,27 @@ func summarizeBulkSnapshots(summary *DiagnosticsSummary, snapshots []BulkSnapsho } } +func summarizeRecordSnapshots(summary *DiagnosticsSummary, snapshots []RecordSnapshot) { + if summary == nil { + return + } + summary.RecordCount = len(snapshots) + for _, snapshot := range snapshots { + switch { + case snapshot.ResetError != "": + summary.ResetRecordCount++ + accumulateDiagnosticsResetCause(&summary.RecordResetCauses, snapshot.ResetError, "") + case recordSnapshotFinished(snapshot): + case recordSnapshotBoundActive(snapshot): + summary.ActiveRecordCount++ + default: + summary.StaleRecordCount++ + } + accumulateDiagnosticsRecordTelemetry(&summary.RecordTelemetry, snapshot) + } + finalizeDiagnosticsRecordTelemetry(&summary.RecordTelemetry) +} + func summarizeTransferSnapshots(summary *DiagnosticsSummary, snapshots []TransferSnapshot) { if summary == nil { return @@ -297,6 +360,10 @@ func bulkSnapshotFinished(snapshot BulkSnapshot) bool { return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed } +func recordSnapshotFinished(snapshot RecordSnapshot) bool { + return snapshot.ResetError == "" && snapshot.LocalClosed && snapshot.RemoteClosed +} + func streamSnapshotBoundActive(snapshot StreamSnapshot) bool { return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent } @@ -305,6 +372,10 @@ func bulkSnapshotBoundActive(snapshot BulkSnapshot) bool { return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent } +func recordSnapshotBoundActive(snapshot RecordSnapshot) bool { + return snapshot.BindingCurrent && snapshot.TransportAttached && snapshot.TransportCurrent +} + func accumulateDiagnosticsResetCause(summary *DiagnosticsResetCauseSummary, resetError string, backpressureError string) { if summary == nil || resetError == "" { return @@ -362,6 +433,38 @@ func finalizeDiagnosticsTransferTelemetry(summary *DiagnosticsTransferTelemetryS summary.CommitWaitRatio = durationRatio(summary.CommitWaitDuration, summary.ObservedDuration) } +func accumulateDiagnosticsRecordTelemetry(summary *DiagnosticsRecordTelemetrySummary, snapshot RecordSnapshot) { + if summary == nil { + return + } + summary.BatchFramesSent += snapshot.BatchFramesSent + summary.AckFramesSent += snapshot.AckFramesSent + summary.ErrorFramesSent += snapshot.ErrorFramesSent + summary.BatchFramesReceived += snapshot.BatchFramesReceived + summary.AckFramesReceived += snapshot.AckFramesReceived + summary.ErrorFramesReceived += snapshot.ErrorFramesReceived + summary.PiggybackAckSent += snapshot.PiggybackAckSent + summary.PiggybackAckReceived += snapshot.PiggybackAckReceived + summary.BarrierCount += snapshot.BarrierCount + summary.BarrierFlushWaitDuration += snapshot.BarrierFlushWaitDuration + summary.BarrierApplyWaitDuration += snapshot.BarrierApplyWaitDuration + summary.OutstandingRecords += snapshot.OutstandingRecords + summary.OutstandingBytes += snapshot.OutstandingBytes + summary.PendingApplyRecords += snapshot.PendingApplyRecords + summary.PendingAckRecords += snapshot.PendingAckRecords + if snapshot.PeakPendingApplyRecords > summary.PeakPendingApplyRecords { + summary.PeakPendingApplyRecords = snapshot.PeakPendingApplyRecords + } +} + +func finalizeDiagnosticsRecordTelemetry(summary *DiagnosticsRecordTelemetrySummary) { + if summary == nil { + return + } + summary.FrameSendCount = summary.BatchFramesSent + summary.AckFramesSent + summary.ErrorFramesSent + summary.FrameReceiveCount = summary.BatchFramesReceived + summary.AckFramesReceived + summary.ErrorFramesReceived +} + func sortClientConnRuntimeSnapshots(src []ClientConnRuntimeSnapshot) { sort.Slice(src, func(i, j int) bool { if src[i].ClientID != src[j].ClientID { diff --git a/diagnostics_snapshot_test.go b/diagnostics_snapshot_test.go index ecd388b..2d6ce67 100644 --- a/diagnostics_snapshot_test.go +++ b/diagnostics_snapshot_test.go @@ -20,7 +20,7 @@ func TestGetClientDiagnosticsSnapshotDefaults(t *testing.T) { if got, want := snapshot.Runtime.OwnerState, "idle"; got != want { t.Fatalf("Runtime.OwnerState = %q, want %q", got, want) } - if len(snapshot.Streams) != 0 || len(snapshot.Bulks) != 0 || len(snapshot.Transfers) != 0 { + if len(snapshot.Streams) != 0 || len(snapshot.Bulks) != 0 || len(snapshot.Records) != 0 || len(snapshot.Transfers) != 0 { t.Fatalf("default diagnostics should be empty: %+v", snapshot) } if snapshot.Summary != (DiagnosticsSummary{}) { @@ -130,6 +130,137 @@ func TestGetClientDiagnosticsSnapshotAggregatesActiveState(t *testing.T) { _ = bulk.Close() } +func TestGetDiagnosticsSnapshotAggregatesActiveRecordState(t *testing.T) { + server := NewServer().(*ServerCommon) + if err := UseModernPSKServer(server, integrationSharedSecret, integrationModernPSKOptions()); err != nil { + t.Fatalf("UseModernPSKServer failed: %v", err) + } + + recordAcceptCh := make(chan RecordAcceptInfo, 1) + recordReleaseCh := make(chan struct{}) + recordHandlerDone := make(chan error, 1) + server.SetRecordStreamHandler(func(info RecordAcceptInfo) error { + recordAcceptCh <- info + msg, err := info.RecordStream.ReadRecord(context.Background()) + if err != nil { + recordHandlerDone <- err + return err + } + if string(msg.Payload) != "diag-record" { + err = errors.New("unexpected record payload") + recordHandlerDone <- err + return err + } + if err := info.RecordStream.AckRecord(msg.Seq); err != nil { + recordHandlerDone <- err + return err + } + <-recordReleaseCh + err = info.RecordStream.Close() + recordHandlerDone <- err + return err + }) + + if err := server.Listen("tcp", "127.0.0.1:0"); err != nil { + t.Fatalf("server Listen failed: %v", err) + } + defer func() { + _ = server.Stop() + }() + + client := NewClient().(*ClientCommon) + if err := UseModernPSKClient(client, integrationSharedSecret, integrationModernPSKOptions()); err != nil { + t.Fatalf("UseModernPSKClient failed: %v", err) + } + if err := client.Connect("tcp", server.listener.Addr().String()); err != nil { + t.Fatalf("client Connect failed: %v", err) + } + defer func() { + _ = client.Stop() + }() + + record, err := client.OpenRecordStream(context.Background(), RecordOpenOptions{ + Stream: StreamOpenOptions{ID: "diag-client-record"}, + }) + if err != nil { + t.Fatalf("client OpenRecordStream failed: %v", err) + } + select { + case <-recordAcceptCh: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting accepted record stream") + } + + if _, err := record.WriteRecord(context.Background(), []byte("diag-record")); err != nil { + t.Fatalf("WriteRecord failed: %v", err) + } + if _, err := record.Barrier(context.Background()); err != nil { + t.Fatalf("Barrier failed: %v", err) + } + + clientSnapshot, err := GetClientDiagnosticsSnapshot(client) + if err != nil { + t.Fatalf("GetClientDiagnosticsSnapshot failed: %v", err) + } + if got, want := len(clientSnapshot.Records), 1; got != want { + t.Fatalf("client record snapshot count = %d, want %d", got, want) + } + if got, want := clientSnapshot.Summary.RecordCount, 1; got != want { + t.Fatalf("client RecordCount = %d, want %d", got, want) + } + if got, want := clientSnapshot.Summary.ActiveRecordCount, 1; got != want { + t.Fatalf("client ActiveRecordCount = %d, want %d", got, want) + } + clientRecord := clientSnapshot.Records[0] + if got := clientRecord.BatchFramesSent; got < 1 { + t.Fatalf("client BatchFramesSent = %d, want >= 1", got) + } + if got := clientRecord.AckFramesReceived; got < 1 { + t.Fatalf("client AckFramesReceived = %d, want >= 1", got) + } + if got := clientRecord.BarrierCount; got < 1 { + t.Fatalf("client BarrierCount = %d, want >= 1", got) + } + if got := clientSnapshot.Summary.RecordTelemetry.FrameSendCount; got < 1 { + t.Fatalf("client RecordTelemetry.FrameSendCount = %d, want >= 1", got) + } + + serverSnapshot, err := GetServerDiagnosticsSnapshot(server) + if err != nil { + t.Fatalf("GetServerDiagnosticsSnapshot failed: %v", err) + } + if got, want := len(serverSnapshot.Records), 1; got != want { + t.Fatalf("server record snapshot count = %d, want %d", got, want) + } + if got, want := serverSnapshot.Summary.RecordCount, 1; got != want { + t.Fatalf("server RecordCount = %d, want %d", got, want) + } + if got, want := serverSnapshot.Summary.ActiveRecordCount, 1; got != want { + t.Fatalf("server ActiveRecordCount = %d, want %d", got, want) + } + serverRecord := serverSnapshot.Records[0] + if got := serverRecord.BatchFramesReceived; got < 1 { + t.Fatalf("server BatchFramesReceived = %d, want >= 1", got) + } + if got := serverRecord.AckFramesSent; got < 1 { + t.Fatalf("server AckFramesSent = %d, want >= 1", got) + } + if got := serverSnapshot.Summary.RecordTelemetry.FrameReceiveCount; got < 1 { + t.Fatalf("server RecordTelemetry.FrameReceiveCount = %d, want >= 1", got) + } + + close(recordReleaseCh) + select { + case err := <-recordHandlerDone: + if err != nil { + t.Fatalf("record handler failed: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting record handler completion") + } + _ = record.Close() +} + func TestGetServerDiagnosticsSnapshotAggregatesStaleAndResetState(t *testing.T) { server := NewServer().(*ServerCommon) @@ -323,6 +454,127 @@ func TestDiagnosticsSummaryClassifiesResetCauses(t *testing.T) { } } +func TestDiagnosticsSummaryAggregatesRecordTelemetry(t *testing.T) { + summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{ + Records: []RecordSnapshot{ + { + ID: "record-active", + BindingCurrent: true, + TransportAttached: true, + TransportCurrent: true, + OutstandingRecords: 3, + OutstandingBytes: 4096, + PendingApplyRecords: 2, + PendingAckRecords: 1, + PeakPendingApplyRecords: 5, + BatchFramesSent: 10, + AckFramesSent: 4, + ErrorFramesSent: 1, + BatchFramesReceived: 8, + AckFramesReceived: 3, + ErrorFramesReceived: 0, + PiggybackAckSent: 6, + PiggybackAckReceived: 2, + BarrierCount: 4, + BarrierFlushWaitDuration: 10 * time.Millisecond, + BarrierApplyWaitDuration: 30 * time.Millisecond, + }, + { + ID: "record-reset", + ResetError: errTransportDetached.Error(), + OutstandingRecords: 1, + OutstandingBytes: 512, + PendingApplyRecords: 3, + PendingAckRecords: 2, + PeakPendingApplyRecords: 7, + BatchFramesSent: 2, + AckFramesSent: 1, + ErrorFramesSent: 1, + BatchFramesReceived: 1, + AckFramesReceived: 1, + ErrorFramesReceived: 1, + PiggybackAckSent: 1, + PiggybackAckReceived: 1, + BarrierCount: 1, + BarrierFlushWaitDuration: 5 * time.Millisecond, + BarrierApplyWaitDuration: 15 * time.Millisecond, + }, + }, + }) + + if got, want := summary.RecordCount, 2; got != want { + t.Fatalf("RecordCount = %d, want %d", got, want) + } + if got, want := summary.ActiveRecordCount, 1; got != want { + t.Fatalf("ActiveRecordCount = %d, want %d", got, want) + } + if got, want := summary.ResetRecordCount, 1; got != want { + t.Fatalf("ResetRecordCount = %d, want %d", got, want) + } + if got, want := summary.RecordResetCauses.Total, 1; got != want { + t.Fatalf("RecordResetCauses.Total = %d, want %d", got, want) + } + if got, want := summary.RecordResetCauses.TransportDetached, 1; got != want { + t.Fatalf("RecordResetCauses.TransportDetached = %d, want %d", got, want) + } + + telemetry := summary.RecordTelemetry + if got, want := telemetry.BatchFramesSent, int64(12); got != want { + t.Fatalf("BatchFramesSent = %d, want %d", got, want) + } + if got, want := telemetry.AckFramesSent, int64(5); got != want { + t.Fatalf("AckFramesSent = %d, want %d", got, want) + } + if got, want := telemetry.ErrorFramesSent, int64(2); got != want { + t.Fatalf("ErrorFramesSent = %d, want %d", got, want) + } + if got, want := telemetry.BatchFramesReceived, int64(9); got != want { + t.Fatalf("BatchFramesReceived = %d, want %d", got, want) + } + if got, want := telemetry.AckFramesReceived, int64(4); got != want { + t.Fatalf("AckFramesReceived = %d, want %d", got, want) + } + if got, want := telemetry.ErrorFramesReceived, int64(1); got != want { + t.Fatalf("ErrorFramesReceived = %d, want %d", got, want) + } + if got, want := telemetry.FrameSendCount, int64(19); got != want { + t.Fatalf("FrameSendCount = %d, want %d", got, want) + } + if got, want := telemetry.FrameReceiveCount, int64(14); got != want { + t.Fatalf("FrameReceiveCount = %d, want %d", got, want) + } + if got, want := telemetry.PiggybackAckSent, int64(7); got != want { + t.Fatalf("PiggybackAckSent = %d, want %d", got, want) + } + if got, want := telemetry.PiggybackAckReceived, int64(3); got != want { + t.Fatalf("PiggybackAckReceived = %d, want %d", got, want) + } + if got, want := telemetry.BarrierCount, int64(5); got != want { + t.Fatalf("BarrierCount = %d, want %d", got, want) + } + if got, want := telemetry.BarrierFlushWaitDuration, 15*time.Millisecond; got != want { + t.Fatalf("BarrierFlushWaitDuration = %v, want %v", got, want) + } + if got, want := telemetry.BarrierApplyWaitDuration, 45*time.Millisecond; got != want { + t.Fatalf("BarrierApplyWaitDuration = %v, want %v", got, want) + } + if got, want := telemetry.OutstandingRecords, 4; got != want { + t.Fatalf("OutstandingRecords = %d, want %d", got, want) + } + if got, want := telemetry.OutstandingBytes, 4608; got != want { + t.Fatalf("OutstandingBytes = %d, want %d", got, want) + } + if got, want := telemetry.PendingApplyRecords, 5; got != want { + t.Fatalf("PendingApplyRecords = %d, want %d", got, want) + } + if got, want := telemetry.PendingAckRecords, 3; got != want { + t.Fatalf("PendingAckRecords = %d, want %d", got, want) + } + if got, want := telemetry.PeakPendingApplyRecords, 7; got != want { + t.Fatalf("PeakPendingApplyRecords = %d, want %d", got, want) + } +} + func TestDiagnosticsSummaryAggregatesTransferTelemetry(t *testing.T) { summary := summarizeClientDiagnosticsSnapshot(ClientDiagnosticsSnapshot{ Transfers: []TransferSnapshot{ diff --git a/record_codec.go b/record_codec.go index 5452bae..9c6bc19 100644 --- a/record_codec.go +++ b/record_codec.go @@ -6,14 +6,16 @@ import ( ) const ( - recordFrameMagic = "NRS1" - recordFrameVersion = 1 - recordFrameTypeBatch uint8 = 1 - recordFrameTypeAck uint8 = 2 - recordFrameTypeError uint8 = 3 - recordFrameHeaderSize = 8 - recordBatchHeaderSize = 10 - recordErrorHeaderSize = 16 + recordFrameMagic = "NRS1" + recordFrameVersionV1 = 1 + recordFrameVersionV2 = 2 + recordFrameTypeBatch uint8 = 1 + recordFrameTypeAck uint8 = 2 + recordFrameTypeError uint8 = 3 + recordFrameHeaderSize = 8 + recordBatchHeaderV1Size = 10 + recordBatchHeaderV2Size = 18 + recordErrorHeaderSize = 16 ) var ( @@ -27,6 +29,7 @@ type recordOutboundMessage struct { } type recordFrame struct { + Version uint8 Type uint8 Batch []recordOutboundMessage AckSeq uint64 @@ -34,7 +37,7 @@ type recordFrame struct { Retryable bool } -func encodeRecordBatchFrame(batch []recordOutboundMessage) ([]byte, error) { +func encodeRecordBatchFrame(batch []recordOutboundMessage, ackSeq uint64, useV2 bool) ([]byte, error) { if len(batch) == 0 { return nil, nil } @@ -42,7 +45,13 @@ func encodeRecordBatchFrame(batch []recordOutboundMessage) ([]byte, error) { if firstSeq == 0 { return nil, errRecordSeqInvalid } - size := recordFrameHeaderSize + recordBatchHeaderSize + version := uint8(recordFrameVersionV1) + batchHeaderSize := recordBatchHeaderV1Size + if useV2 { + version = recordFrameVersionV2 + batchHeaderSize = recordBatchHeaderV2Size + } + size := recordFrameHeaderSize + batchHeaderSize for index, item := range batch { wantSeq := firstSeq + uint64(index) if item.Seq != wantSeq { @@ -52,11 +61,14 @@ func encodeRecordBatchFrame(batch []recordOutboundMessage) ([]byte, error) { } frame := make([]byte, size) copy(frame[:4], recordFrameMagic) - frame[4] = recordFrameVersion + frame[4] = version frame[5] = recordFrameTypeBatch binary.BigEndian.PutUint16(frame[8:10], uint16(len(batch))) binary.BigEndian.PutUint64(frame[10:18], firstSeq) - offset := recordFrameHeaderSize + recordBatchHeaderSize + offset := recordFrameHeaderSize + batchHeaderSize + if useV2 { + binary.BigEndian.PutUint64(frame[18:26], ackSeq) + } for _, item := range batch { binary.BigEndian.PutUint32(frame[offset:offset+4], uint32(len(item.Payload))) offset += 4 @@ -69,7 +81,7 @@ func encodeRecordBatchFrame(batch []recordOutboundMessage) ([]byte, error) { func encodeRecordAckFrame(ackSeq uint64) ([]byte, error) { frame := make([]byte, recordFrameHeaderSize+8) copy(frame[:4], recordFrameMagic) - frame[4] = recordFrameVersion + frame[4] = recordFrameVersionV1 frame[5] = recordFrameTypeAck binary.BigEndian.PutUint64(frame[8:16], ackSeq) return frame, nil @@ -83,7 +95,7 @@ func encodeRecordErrorFrame(failure RecordFailure) ([]byte, error) { msgBytes := []byte(failure.Message) frame := make([]byte, recordFrameHeaderSize+recordErrorHeaderSize+len(codeBytes)+len(msgBytes)) copy(frame[:4], recordFrameMagic) - frame[4] = recordFrameVersion + frame[4] = recordFrameVersionV1 frame[5] = recordFrameTypeError if failure.Retryable { frame[6] = 1 @@ -102,30 +114,62 @@ func decodeRecordFrame(payload []byte) (recordFrame, error) { if len(payload) < recordFrameHeaderSize || string(payload[:4]) != recordFrameMagic { return recordFrame{}, errRecordFrameInvalid } - if payload[4] != recordFrameVersion { - return recordFrame{}, errRecordFrameInvalid - } + version := payload[4] frameType := payload[5] - switch frameType { - case recordFrameTypeBatch: - return decodeRecordBatchFrame(payload) - case recordFrameTypeAck: - if len(payload) != recordFrameHeaderSize+8 { + switch version { + case recordFrameVersionV1: + switch frameType { + case recordFrameTypeBatch: + return decodeRecordBatchFrameV1(payload) + case recordFrameTypeAck: + if len(payload) != recordFrameHeaderSize+8 { + return recordFrame{}, errRecordFrameInvalid + } + return recordFrame{ + Version: recordFrameVersionV1, + Type: recordFrameTypeAck, + AckSeq: binary.BigEndian.Uint64(payload[8:16]), + }, nil + case recordFrameTypeError: + frame, err := decodeRecordErrorFrame(payload) + if err != nil { + return recordFrame{}, err + } + frame.Version = recordFrameVersionV1 + return frame, nil + default: + return recordFrame{}, errRecordFrameInvalid + } + case recordFrameVersionV2: + switch frameType { + case recordFrameTypeBatch: + return decodeRecordBatchFrameV2(payload) + case recordFrameTypeAck: + if len(payload) != recordFrameHeaderSize+8 { + return recordFrame{}, errRecordFrameInvalid + } + return recordFrame{ + Version: recordFrameVersionV2, + Type: recordFrameTypeAck, + AckSeq: binary.BigEndian.Uint64(payload[8:16]), + }, nil + case recordFrameTypeError: + frame, err := decodeRecordErrorFrame(payload) + if err != nil { + return recordFrame{}, err + } + frame.Version = recordFrameVersionV2 + return frame, nil + default: return recordFrame{}, errRecordFrameInvalid } - return recordFrame{ - Type: recordFrameTypeAck, - AckSeq: binary.BigEndian.Uint64(payload[8:16]), - }, nil - case recordFrameTypeError: - return decodeRecordErrorFrame(payload) default: return recordFrame{}, errRecordFrameInvalid } } -func decodeRecordBatchFrame(payload []byte) (recordFrame, error) { - if len(payload) < recordFrameHeaderSize+recordBatchHeaderSize { +func decodeRecordBatchFrameV1(payload []byte) (recordFrame, error) { + if len(payload) < recordFrameHeaderSize+recordBatchHeaderV1Size { return recordFrame{}, errRecordFrameInvalid } count := int(binary.BigEndian.Uint16(payload[8:10])) @@ -133,7 +177,7 @@ func decodeRecordBatchFrame(payload []byte) (recordFrame, error) { if count <= 0 || firstSeq == 0 { return recordFrame{}, errRecordFrameInvalid } - offset := recordFrameHeaderSize + recordBatchHeaderSize + offset := recordFrameHeaderSize + recordBatchHeaderV1Size batch := make([]recordOutboundMessage, 0, count) for index := 0; index < count; index++ { if offset+4 > len(payload) { @@ -155,8 +199,48 @@ func decodeRecordBatchFrame(payload []byte) (recordFrame, error) { return recordFrame{}, errRecordFrameInvalid } return recordFrame{ - Type: recordFrameTypeBatch, - Batch: batch, + Version: recordFrameVersionV1, + Type: recordFrameTypeBatch, + Batch: batch, + }, nil +} + +func decodeRecordBatchFrameV2(payload []byte) (recordFrame, error) { + if len(payload) < recordFrameHeaderSize+recordBatchHeaderV2Size { + return recordFrame{}, errRecordFrameInvalid + } + count := int(binary.BigEndian.Uint16(payload[8:10])) + firstSeq := binary.BigEndian.Uint64(payload[10:18]) + ackSeq := binary.BigEndian.Uint64(payload[18:26]) + if count <= 0 || firstSeq == 0 { + return recordFrame{}, errRecordFrameInvalid + } + offset := recordFrameHeaderSize + recordBatchHeaderV2Size + batch := make([]recordOutboundMessage, 0, count) + for index := 0; index < count; index++ { + if offset+4 > len(payload) { + return recordFrame{}, errRecordFrameInvalid + } + itemLen := int(binary.BigEndian.Uint32(payload[offset : offset+4])) + offset += 4 + if itemLen < 0 || offset+itemLen > len(payload) { + return recordFrame{}, errRecordFrameInvalid + } + item := recordOutboundMessage{ + Seq: firstSeq + uint64(index), + Payload: append([]byte(nil), payload[offset:offset+itemLen]...), + } + offset += itemLen + batch = append(batch, item) + } + if offset != len(payload) { + return recordFrame{}, errRecordFrameInvalid + } + return recordFrame{ + Version: recordFrameVersionV2, + Type: recordFrameTypeBatch, + Batch: batch, + AckSeq: ackSeq, }, nil } diff --git a/record_codec_test.go b/record_codec_test.go new file mode 100644 index 0000000..510bf89 --- /dev/null +++ b/record_codec_test.go @@ -0,0 +1,73 @@ +package notify + +import "testing" + +func TestEncodeDecodeRecordBatchFrameV1(t *testing.T) { + batch := []recordOutboundMessage{ + {Seq: 7, Payload: []byte("alpha")}, + {Seq: 8, Payload: []byte("beta")}, + } + payload, err := encodeRecordBatchFrame(batch, 0, false) + if err != nil { + t.Fatalf("encodeRecordBatchFrame v1 failed: %v", err) + } + frame, err := decodeRecordFrame(payload) + if err != nil { + t.Fatalf("decodeRecordFrame v1 failed: %v", err) + } + if got, want := frame.Version, uint8(recordFrameVersionV1); got != want { + t.Fatalf("frame version = %d, want %d", got, want) + } + if got, want := frame.Type, recordFrameTypeBatch; got != want { + t.Fatalf("frame type = %d, want %d", got, want) + } + if frame.AckSeq != 0 { + t.Fatalf("frame ack seq = %d, want 0", frame.AckSeq) + } + if got, want := len(frame.Batch), len(batch); got != want { + t.Fatalf("batch len = %d, want %d", got, want) + } + for i := range batch { + if got, want := frame.Batch[i].Seq, batch[i].Seq; got != want { + t.Fatalf("batch[%d].seq = %d, want %d", i, got, want) + } + if got, want := string(frame.Batch[i].Payload), string(batch[i].Payload); got != want { + t.Fatalf("batch[%d].payload = %q, want %q", i, got, want) + } + } +} + +func TestEncodeDecodeRecordBatchFrameV2CarriesAckSeq(t *testing.T) { + batch := []recordOutboundMessage{ + {Seq: 11, Payload: []byte("alpha")}, + {Seq: 12, Payload: []byte("beta")}, + } + payload, err := encodeRecordBatchFrame(batch, 9, true) + if err != nil { + t.Fatalf("encodeRecordBatchFrame v2 failed: %v", err) + } + frame, err := decodeRecordFrame(payload) + if err != nil { + t.Fatalf("decodeRecordFrame v2 failed: %v", err) + } + if got, want := frame.Version, uint8(recordFrameVersionV2); got != want { + t.Fatalf("frame version = %d, want %d", got, want) + } + if got, want := frame.Type, recordFrameTypeBatch; got != want { + t.Fatalf("frame type = %d, want %d", got, want) + } + if got, want := frame.AckSeq, uint64(9); got != want { + t.Fatalf("frame ack seq = %d, want %d", got, want) + } + if got, want := len(frame.Batch), len(batch); got != want { + t.Fatalf("batch len = %d, want %d", got, want) + } + for i := range batch { + if got, want := frame.Batch[i].Seq, batch[i].Seq; got != want { + t.Fatalf("batch[%d].seq = %d, want %d", i, got, want) + } + if got, want := string(frame.Batch[i].Payload), string(batch[i].Payload); got != want { + t.Fatalf("batch[%d].payload = %q, want %q", i, got, want) + } + } +} diff --git a/record_negotiation.go b/record_negotiation.go new file mode 100644 index 0000000..891f234 --- /dev/null +++ b/record_negotiation.go @@ -0,0 +1,48 @@ +package notify + +const ( + recordStreamMetadataCapBatchAckKey = "_notify.record_cap_batch_ack" + recordStreamMetadataUseBatchAckKey = "_notify.record_use_batch_ack" + recordStreamMetadataEnabledValue = "1" +) + +func advertiseRecordStreamOpenMetadata(metadata StreamMetadata) StreamMetadata { + metadata = cloneStreamMetadata(metadata) + if metadata == nil { + metadata = make(StreamMetadata, 1) + } + metadata[recordStreamMetadataCapBatchAckKey] = recordStreamMetadataEnabledValue + return metadata +} + +func negotiateRecordStreamOpenMetadata(channel StreamChannel, metadata StreamMetadata) (StreamMetadata, StreamMetadata) { + metadata = cloneStreamMetadata(metadata) + if normalizeStreamChannel(channel) != StreamRecordChannel { + return metadata, nil + } + if metadata[recordStreamMetadataCapBatchAckKey] != recordStreamMetadataEnabledValue { + return metadata, nil + } + metadata[recordStreamMetadataUseBatchAckKey] = recordStreamMetadataEnabledValue + return metadata, StreamMetadata{ + recordStreamMetadataUseBatchAckKey: recordStreamMetadataEnabledValue, + } +} + +func mergeStreamMetadata(base StreamMetadata, overlay StreamMetadata) StreamMetadata { + if len(base) == 0 && len(overlay) == 0 { + return nil + } + merged := cloneStreamMetadata(base) + if merged == nil { + merged = make(StreamMetadata, len(overlay)) + } + for key, value := range overlay { + merged[key] = value + } + return merged +} + +func recordStreamUseBatchAck(metadata StreamMetadata) bool { + return metadata[recordStreamMetadataUseBatchAckKey] == recordStreamMetadataEnabledValue +} diff --git a/record_negotiation_test.go b/record_negotiation_test.go new file mode 100644 index 0000000..e747e57 --- /dev/null +++ b/record_negotiation_test.go @@ -0,0 +1,78 @@ +package notify + +import ( + "context" + "net" + "testing" + "time" +) + +func TestNegotiateRecordStreamOpenMetadataEnablesBatchAck(t *testing.T) { + reqMetadata, respMetadata := negotiateRecordStreamOpenMetadata(StreamRecordChannel, StreamMetadata{ + recordStreamMetadataCapBatchAckKey: recordStreamMetadataEnabledValue, + }) + if !recordStreamUseBatchAck(reqMetadata) { + t.Fatal("request metadata should enable batch ack") + } + if !recordStreamUseBatchAck(respMetadata) { + t.Fatal("response metadata should enable batch ack") + } +} + +func TestNegotiateRecordStreamOpenMetadataKeepsFallbackWithoutCapability(t *testing.T) { + reqMetadata, respMetadata := negotiateRecordStreamOpenMetadata(StreamRecordChannel, nil) + if recordStreamUseBatchAck(reqMetadata) { + t.Fatalf("request metadata should keep fallback mode: %+v", reqMetadata) + } + if recordStreamUseBatchAck(respMetadata) { + t.Fatalf("response metadata should keep fallback mode: %+v", respMetadata) + } +} + +func TestOpenRecordStreamNegotiatesBatchAck(t *testing.T) { + server := NewServer().(*ServerCommon) + secret := []byte("0123456789abcdef0123456789abcdef") + server = newRunningPeerAttachServerForTest(t, func(server *ServerCommon) { + server.SetSecretKey(secret) + }) + + acceptedCh := make(chan RecordAcceptInfo, 1) + server.SetRecordStreamHandler(func(info RecordAcceptInfo) error { + acceptedCh <- info + return nil + }) + + client := NewClient().(*ClientCommon) + client.SetSecretKey(secret) + left, right := net.Pipe() + defer right.Close() + bootstrapPeerAttachConnForTest(t, server, right) + if err := client.ConnectByConn(left); err != nil { + t.Fatalf("client ConnectByConn failed: %v", err) + } + defer func() { + client.setByeFromServer(true) + _ = client.Stop() + }() + + record, err := client.OpenRecordStream(context.Background(), RecordOpenOptions{}) + if err != nil { + t.Fatalf("OpenRecordStream failed: %v", err) + } + defer func() { + _ = record.Close() + }() + + if !recordStreamUseBatchAck(record.Metadata()) { + t.Fatalf("client record stream metadata should negotiate batch ack: %+v", record.Metadata()) + } + + select { + case accepted := <-acceptedCh: + if !recordStreamUseBatchAck(accepted.Metadata) { + t.Fatalf("accepted record metadata should negotiate batch ack: %+v", accepted.Metadata) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting accepted record stream") + } +} diff --git a/record_runtime.go b/record_runtime.go index 080823f..e4b43d3 100644 --- a/record_runtime.go +++ b/record_runtime.go @@ -1,14 +1,20 @@ package notify -import "sync" +import ( + "strconv" + "sync" +) type recordRuntime struct { mu sync.RWMutex handler func(RecordAcceptInfo) error + records map[string]*recordStream } func newRecordRuntime() *recordRuntime { - return &recordRuntime{} + return &recordRuntime{ + records: make(map[string]*recordStream), + } } func (r *recordRuntime) setHandler(fn func(RecordAcceptInfo) error) { @@ -42,3 +48,120 @@ func (s *ServerCommon) getRecordRuntime() *recordRuntime { } return s.recordRuntime } + +func (r *recordRuntime) register(record *recordStream) { + if r == nil || record == nil { + return + } + key := record.runtimeRegistryKey() + if key == "" { + return + } + r.mu.Lock() + r.records[key] = record + r.mu.Unlock() +} + +func (r *recordRuntime) remove(key string) { + if r == nil || key == "" { + return + } + r.mu.Lock() + delete(r.records, key) + r.mu.Unlock() +} + +func (r *recordRuntime) snapshots() []RecordSnapshot { + if r == nil { + return nil + } + r.mu.RLock() + records := make([]*recordStream, 0, len(r.records)) + for _, record := range r.records { + if record == nil { + continue + } + records = append(records, record) + } + r.mu.RUnlock() + snapshots := make([]RecordSnapshot, 0, len(records)) + for _, record := range records { + snapshots = append(snapshots, record.snapshot()) + } + sortRecordSnapshots(snapshots) + return snapshots +} + +func bindRecordRuntime(record RecordStream, runtime *recordRuntime) { + if runtime == nil || record == nil { + return + } + rs, ok := record.(*recordStream) + if !ok { + return + } + rs.bindRuntime(runtime) +} + +func (r *recordStream) bindRuntime(runtime *recordRuntime) { + if r == nil || runtime == nil { + return + } + key := r.runtimeRegistryKey() + if key == "" { + return + } + r.mu.Lock() + r.runtime = runtime + r.runtimeKey = key + r.mu.Unlock() + runtime.register(r) + r.runtimeWatchOnce.Do(func() { + go func() { + streamCtx := r.stream.Context() + if streamCtx == nil { + <-r.ctx.Done() + } else { + select { + case <-r.ctx.Done(): + case <-streamCtx.Done(): + } + } + r.detachRuntime() + }() + }) +} + +func (r *recordStream) detachRuntime() { + if r == nil { + return + } + r.runtimeDetachOnce.Do(func() { + r.mu.Lock() + runtime := r.runtime + key := r.runtimeKey + r.runtime = nil + r.runtimeKey = "" + r.mu.Unlock() + if runtime != nil { + runtime.remove(key) + } + }) +} + +func (r *recordStream) runtimeRegistryKey() string { + if r == nil || r.stream == nil { + return "" + } + scope := "" + dataID := uint64(0) + if stream, ok := r.stream.(*streamHandle); ok { + scope = normalizeFileScope(stream.runtimeScope) + dataID = stream.dataID + } + key := scope + "\x00" + r.stream.ID() + if dataID != 0 { + key += "\x01" + strconv.FormatUint(dataID, 10) + } + return key +} diff --git a/record_snapshot.go b/record_snapshot.go new file mode 100644 index 0000000..14e67ea --- /dev/null +++ b/record_snapshot.go @@ -0,0 +1,252 @@ +package notify + +import ( + "errors" + "io" + "sort" + "time" +) + +type RecordSnapshot struct { + ID string + DataID uint64 + Scope string + Metadata StreamMetadata + UseBatchAck bool + BindingOwner string + BindingAlive bool + BindingCurrent bool + BindingReason string + BindingError string + SessionEpoch uint64 + LogicalClientID string + LocalAddress string + RemoteAddress string + TransportGeneration uint64 + TransportAttached bool + TransportHasRuntimeConn bool + TransportCurrent bool + TransportDetachReason string + TransportDetachKind string + TransportDetachGeneration uint64 + TransportDetachError string + TransportDetachedAt time.Time + ReattachEligible bool + LocalClosed bool + LocalReadClosed bool + RemoteClosed bool + PeerReadClosed bool + OutboundClosed bool + NextOutboundSeq uint64 + EnqueuedOutboundSeq uint64 + FlushedOutboundSeq uint64 + AckedOutboundSeq uint64 + OutstandingRecords int + OutstandingBytes int + InboundReceivedSeq uint64 + InboundAppliedSeq uint64 + InboundAckSentSeq uint64 + PendingApplyRecords int + PendingAckRecords int + PeakPendingApplyRecords int + BatchFramesSent int64 + AckFramesSent int64 + ErrorFramesSent int64 + BatchFramesReceived int64 + AckFramesReceived int64 + ErrorFramesReceived int64 + PiggybackAckSent int64 + PiggybackAckReceived int64 + BarrierCount int64 + BarrierFlushWaitDuration time.Duration + BarrierApplyWaitDuration time.Duration + OpenedAt time.Time + LastReadAt time.Time + LastWriteAt time.Time + StreamResetError string + ReadError string + TerminalError string + ResetError string +} + +type clientRecordSnapshotReader interface { + clientRecordSnapshots() []RecordSnapshot +} + +type serverRecordSnapshotReader interface { + serverRecordSnapshots() []RecordSnapshot +} + +var ( + errClientRecordSnapshotNil = errors.New("client record snapshot target is nil") + errServerRecordSnapshotNil = errors.New("server record snapshot target is nil") + errClientRecordSnapshotUnsupported = errors.New("client record snapshot target type is unsupported") + errServerRecordSnapshotUnsupported = errors.New("server record snapshot target type is unsupported") +) + +func GetClientRecordSnapshots(c Client) ([]RecordSnapshot, error) { + if c == nil { + return nil, errClientRecordSnapshotNil + } + reader, ok := any(c).(clientRecordSnapshotReader) + if !ok { + return nil, errClientRecordSnapshotUnsupported + } + return reader.clientRecordSnapshots(), nil +} + +func GetServerRecordSnapshots(s Server) ([]RecordSnapshot, error) { + if s == nil { + return nil, errServerRecordSnapshotNil + } + reader, ok := any(s).(serverRecordSnapshotReader) + if !ok { + return nil, errServerRecordSnapshotUnsupported + } + return reader.serverRecordSnapshots(), nil +} + +func (c *ClientCommon) clientRecordSnapshots() []RecordSnapshot { + return recordSnapshotsFromRuntime(c.getRecordRuntime()) +} + +func (s *ServerCommon) serverRecordSnapshots() []RecordSnapshot { + return recordSnapshotsFromRuntime(s.getRecordRuntime()) +} + +func recordSnapshotsFromRuntime(runtime *recordRuntime) []RecordSnapshot { + if runtime == nil { + return nil + } + return runtime.snapshots() +} + +func sortRecordSnapshots(src []RecordSnapshot) { + sort.Slice(src, func(i, j int) bool { + if src[i].Scope != src[j].Scope { + return src[i].Scope < src[j].Scope + } + if src[i].ID != src[j].ID { + return src[i].ID < src[j].ID + } + if src[i].DataID != src[j].DataID { + return src[i].DataID < src[j].DataID + } + return src[i].TransportGeneration < src[j].TransportGeneration + }) +} + +func (r *recordStream) snapshot() RecordSnapshot { + if r == nil { + return RecordSnapshot{} + } + snapshot := RecordSnapshot{} + if stream, ok := r.stream.(*streamHandle); ok { + snapshot = recordSnapshotFromStreamSnapshot(stream.snapshot()) + } else if r.stream != nil { + snapshot.ID = r.stream.ID() + snapshot.Metadata = cloneStreamMetadata(r.stream.Metadata()) + snapshot.TransportGeneration = r.stream.TransportGeneration() + if addr := r.stream.LocalAddr(); addr != nil { + snapshot.LocalAddress = addr.String() + } + if addr := r.stream.RemoteAddr(); addr != nil { + snapshot.RemoteAddress = addr.String() + } + if logical := r.stream.LogicalConn(); logical != nil { + snapshot.LogicalClientID = logical.ID() + } + } + snapshot.UseBatchAck = r.useBatchAck + snapshot.BatchFramesSent = r.obs.batchFramesSent.Load() + snapshot.AckFramesSent = r.obs.ackFramesSent.Load() + snapshot.ErrorFramesSent = r.obs.errorFramesSent.Load() + snapshot.BatchFramesReceived = r.obs.batchFramesReceived.Load() + snapshot.AckFramesReceived = r.obs.ackFramesReceived.Load() + snapshot.ErrorFramesReceived = r.obs.errorFramesReceived.Load() + snapshot.PiggybackAckSent = r.obs.piggybackAckSent.Load() + snapshot.PiggybackAckReceived = r.obs.piggybackAckReceived.Load() + snapshot.BarrierCount = r.obs.barrierCount.Load() + snapshot.BarrierFlushWaitDuration = time.Duration(r.obs.barrierFlushWaitNanos.Load()) + snapshot.BarrierApplyWaitDuration = time.Duration(r.obs.barrierApplyWaitNanos.Load()) + + r.mu.Lock() + snapshot.OutboundClosed = r.outboundClosed + snapshot.NextOutboundSeq = r.nextOutboundSeq + snapshot.EnqueuedOutboundSeq = r.enqueuedOutboundSeq + snapshot.FlushedOutboundSeq = r.flushedOutboundSeq + snapshot.AckedOutboundSeq = r.ackedOutboundSeq + snapshot.OutstandingRecords = r.outstandingRecords + snapshot.OutstandingBytes = r.outstandingBytes + snapshot.InboundReceivedSeq = r.inboundReceivedSeq + snapshot.InboundAppliedSeq = r.inboundAppliedSeq + snapshot.InboundAckSentSeq = r.inboundAckSentSeq + snapshot.PendingApplyRecords = recordPendingCount(r.inboundReceivedSeq, r.inboundAppliedSeq) + snapshot.PendingAckRecords = recordPendingCount(r.inboundAppliedSeq, r.inboundAckSentSeq) + snapshot.PeakPendingApplyRecords = r.maxPendingApply + if r.readErr != nil && !errors.Is(r.readErr, io.EOF) { + snapshot.ReadError = r.readErr.Error() + } + if r.terminalErr != nil { + snapshot.TerminalError = r.terminalErr.Error() + } + r.mu.Unlock() + + switch { + case snapshot.TerminalError != "": + snapshot.ResetError = snapshot.TerminalError + case snapshot.StreamResetError != "": + snapshot.ResetError = snapshot.StreamResetError + case snapshot.ReadError != "": + snapshot.ResetError = snapshot.ReadError + } + return snapshot +} + +func recordSnapshotFromStreamSnapshot(stream StreamSnapshot) RecordSnapshot { + return RecordSnapshot{ + ID: stream.ID, + DataID: stream.DataID, + Scope: stream.Scope, + Metadata: cloneStreamMetadata(stream.Metadata), + BindingOwner: stream.BindingOwner, + BindingAlive: stream.BindingAlive, + BindingCurrent: stream.BindingCurrent, + BindingReason: stream.BindingReason, + BindingError: stream.BindingError, + SessionEpoch: stream.SessionEpoch, + LogicalClientID: stream.LogicalClientID, + LocalAddress: stream.LocalAddress, + RemoteAddress: stream.RemoteAddress, + TransportGeneration: stream.TransportGeneration, + TransportAttached: stream.TransportAttached, + TransportHasRuntimeConn: stream.TransportHasRuntimeConn, + TransportCurrent: stream.TransportCurrent, + TransportDetachReason: stream.TransportDetachReason, + TransportDetachKind: stream.TransportDetachKind, + TransportDetachGeneration: stream.TransportDetachGeneration, + TransportDetachError: stream.TransportDetachError, + TransportDetachedAt: stream.TransportDetachedAt, + ReattachEligible: stream.ReattachEligible, + LocalClosed: stream.LocalClosed, + LocalReadClosed: stream.LocalReadClosed, + RemoteClosed: stream.RemoteClosed, + PeerReadClosed: stream.PeerReadClosed, + OpenedAt: stream.OpenedAt, + LastReadAt: stream.LastReadAt, + LastWriteAt: stream.LastWriteAt, + StreamResetError: stream.ResetError, + } +} + +func recordPendingCount(high uint64, low uint64) int { + if high <= low { + return 0 + } + diff := high - low + maxInt := uint64(^uint(0) >> 1) + if diff > maxInt { + return int(maxInt) + } + return int(diff) +} diff --git a/record_stream.go b/record_stream.go index 5bf32d2..ffa9d04 100644 --- a/record_stream.go +++ b/record_stream.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" ) @@ -103,25 +104,47 @@ type recordConfig struct { type recordFlushRequest struct { targetSeq uint64 + forceAck bool done chan error } +type recordObservability struct { + batchFramesSent atomic.Int64 + ackFramesSent atomic.Int64 + errorFramesSent atomic.Int64 + batchFramesReceived atomic.Int64 + ackFramesReceived atomic.Int64 + errorFramesReceived atomic.Int64 + piggybackAckSent atomic.Int64 + piggybackAckReceived atomic.Int64 + barrierCount atomic.Int64 + barrierFlushWaitNanos atomic.Int64 + barrierApplyWaitNanos atomic.Int64 +} + type recordStream struct { - stream Stream - ctx context.Context - cancel context.CancelFunc - cfg recordConfig - writeMu sync.Mutex - sendCh chan recordOutboundMessage - flushCh chan recordFlushRequest - recvCh chan RecordMessage - ackCh chan struct{} - readerCh chan struct{} + stream Stream + ctx context.Context + cancel context.CancelFunc + cfg recordConfig + writeMu sync.Mutex + sendCh chan recordOutboundMessage + flushCh chan recordFlushRequest + recvCh chan RecordMessage + ackCh chan struct{} + readerCh chan struct{} + useBatchAck bool + obs recordObservability mu sync.Mutex stateNotify chan struct{} + runtime *recordRuntime + runtimeKey string + runtimeWatchOnce sync.Once + runtimeDetachOnce sync.Once + nextOutboundSeq uint64 enqueuedOutboundSeq uint64 flushedOutboundSeq uint64 @@ -135,6 +158,7 @@ type recordStream struct { inboundAppliedSeq uint64 inboundApplied map[uint64]struct{} inboundAckSentSeq uint64 + maxPendingApply int remoteClosed bool readErr error @@ -193,6 +217,7 @@ func recordConfigFromOptions(opt RecordOpenOptions) recordConfig { func normalizeRecordStreamOpenOptions(opt StreamOpenOptions) StreamOpenOptions { opt.Channel = StreamRecordChannel + opt.Metadata = advertiseRecordStreamOpenMetadata(opt.Metadata) return opt } @@ -207,22 +232,22 @@ func WrapStreamAsRecord(stream Stream, opt RecordOpenOptions) (RecordStream, err } ctx, cancel := context.WithCancel(parent) record := &recordStream{ - stream: stream, - ctx: ctx, - cancel: cancel, - cfg: recordConfigFromOptions(opt), - sendCh: make(chan recordOutboundMessage, opt.MaxBatchRecords*2), - flushCh: make(chan recordFlushRequest), - recvCh: make(chan RecordMessage, opt.InboundQueueLimit), - ackCh: make(chan struct{}, 1), - readerCh: make(chan struct{}), + stream: stream, + ctx: ctx, + cancel: cancel, + cfg: recordConfigFromOptions(opt), + sendCh: make(chan recordOutboundMessage, opt.MaxBatchRecords*2), + flushCh: make(chan recordFlushRequest), + recvCh: make(chan RecordMessage, opt.InboundQueueLimit), + ackCh: make(chan struct{}, 1), + readerCh: make(chan struct{}), + useBatchAck: recordStreamUseBatchAck(stream.Metadata()), stateNotify: make(chan struct{}), outstandingSizes: make(map[uint64]int), inboundApplied: make(map[uint64]struct{}), } - go record.sendLoop() - go record.ackLoop() + go record.writerLoop() go record.readLoop() return record, nil } @@ -360,13 +385,20 @@ func (r *recordStream) BarrierTo(ctx context.Context, target uint64) (uint64, er if target > current { return 0, errRecordSeqInvalid } - if err := r.Flush(ctx); err != nil { + r.obs.barrierCount.Add(1) + flushStart := time.Now() + err := r.Flush(ctx) + r.obs.barrierFlushWaitNanos.Add(time.Since(flushStart).Nanoseconds()) + if err != nil { return 0, err } if target == 0 { return 0, nil } - if err := r.waitAckedAtLeast(ctx, target); err != nil { + applyStart := time.Now() + err = r.waitAckedAtLeast(ctx, target) + r.obs.barrierApplyWaitNanos.Add(time.Since(applyStart).Nanoseconds()) + if err != nil { return 0, err } return target, nil @@ -520,54 +552,118 @@ func (r *recordStream) waitAckedAtLeast(ctx context.Context, target uint64) erro } } -func (r *recordStream) sendLoop() { +func (r *recordStream) writerLoop() { var ( - batch []recordOutboundMessage - batches int - bytes int - timer *time.Timer - timerCh <-chan time.Time + batch []recordOutboundMessage + batches int + bytes int + batchTimer *time.Timer + batchTimerCh <-chan time.Time + ackTimer *time.Timer + ackTimerCh <-chan time.Time ) - stopTimer := func() { - if timer == nil { + stopBatchTimer := func() { + if batchTimer == nil { return } - if !timer.Stop() { + if !batchTimer.Stop() { select { - case <-timer.C: + case <-batchTimer.C: default: } } - timerCh = nil + batchTimerCh = nil } - flush := func() error { - if len(batch) == 0 { + stopAckTimer := func() { + if ackTimer == nil { + return + } + if !ackTimer.Stop() { + select { + case <-ackTimer.C: + default: + } + } + ackTimerCh = nil + } + scheduleAck := func(hasPendingBatch bool, force bool) (uint64, bool) { + ackSeq := r.pendingAckSeq() + if ackSeq == 0 { + stopAckTimer() + return 0, false + } + if force { + stopAckTimer() + return ackSeq, true + } + if hasPendingBatch && r.useBatchAck { + stopAckTimer() + return 0, false + } + if r.shouldSendAckNow() || r.cfg.AckDelay <= 0 { + stopAckTimer() + return ackSeq, true + } + if ackTimer == nil { + ackTimer = time.NewTimer(r.cfg.AckDelay) + } else { + ackTimer.Reset(r.cfg.AckDelay) + } + ackTimerCh = ackTimer.C + return 0, false + } + sendStandaloneAck := func(ackSeq uint64) error { + if ackSeq == 0 { return nil } - payload, err := encodeRecordBatchFrame(batch) + payload, err := encodeRecordAckFrame(ackSeq) if err != nil { return err } if err := r.writePayloadFrame(payload); err != nil { return err } + r.obs.ackFramesSent.Add(1) + r.markAckSent(ackSeq) + return nil + } + flushBatch := func() error { + if len(batch) == 0 { + return nil + } + ackSeq := r.pendingAckSeq() + payload, err := encodeRecordBatchFrame(batch, ackSeq, r.useBatchAck) + if err != nil { + return err + } + if err := r.writePayloadFrame(payload); err != nil { + return err + } + r.obs.batchFramesSent.Add(1) + if r.useBatchAck && ackSeq != 0 { + r.obs.piggybackAckSent.Add(1) + r.markAckSent(ackSeq) + } r.markFlushed(batch[len(batch)-1].Seq) batch = nil batches = 0 bytes = 0 - stopTimer() + stopBatchTimer() + if ackSeq, sendNow := scheduleAck(false, false); sendNow { + return sendStandaloneAck(ackSeq) + } return nil } flushUntil := func(target uint64) error { for { if target == 0 { - return flush() + return flushBatch() } if r.flushedAtLeast(target) { return nil } if len(batch) > 0 && batch[len(batch)-1].Seq >= target { - if err := flush(); err != nil { + if err := flushBatch(); err != nil { return err } if r.flushedAtLeast(target) { @@ -583,7 +679,7 @@ func (r *recordStream) sendLoop() { batches++ bytes += len(req.Payload) if batches >= r.cfg.MaxBatchRecords || bytes >= r.cfg.MaxBatchBytes { - if err := flush(); err != nil { + if err := flushBatch(); err != nil { return err } } @@ -598,72 +694,54 @@ func (r *recordStream) sendLoop() { batches++ bytes += len(req.Payload) if len(batch) == 1 && r.cfg.MaxBatchDelay > 0 { - if timer == nil { - timer = time.NewTimer(r.cfg.MaxBatchDelay) + if batchTimer == nil { + batchTimer = time.NewTimer(r.cfg.MaxBatchDelay) } else { - timer.Reset(r.cfg.MaxBatchDelay) + batchTimer.Reset(r.cfg.MaxBatchDelay) } - timerCh = timer.C + batchTimerCh = batchTimer.C } if batches >= r.cfg.MaxBatchRecords || bytes >= r.cfg.MaxBatchBytes { - if err := flush(); err != nil { - r.setTerminalError(err) - return - } - } - case req := <-r.flushCh: - req.done <- flushUntil(req.targetSeq) - case <-timerCh: - if err := flush(); err != nil { - r.setTerminalError(err) - return - } - } - } -} - -func (r *recordStream) ackLoop() { - var ( - timer *time.Timer - timerCh <-chan time.Time - ) - stopTimer := func() { - if timer == nil { - return - } - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - timerCh = nil - } - for { - select { - case <-r.ctx.Done(): - return - case <-r.ackCh: - if r.shouldSendAckNow() { - stopTimer() - if err := r.flushAckNow(); err != nil { + if err := flushBatch(); err != nil { r.setTerminalError(err) return } continue } - if timer == nil { - timer = time.NewTimer(r.cfg.AckDelay) - } else { - timer.Reset(r.cfg.AckDelay) + if ackSeq, sendNow := scheduleAck(len(batch) > 0, false); sendNow { + if err := sendStandaloneAck(ackSeq); err != nil { + r.setTerminalError(err) + return + } } - timerCh = timer.C - case <-timerCh: - stopTimer() - if err := r.flushAckNow(); err != nil { + case req := <-r.flushCh: + err := flushUntil(req.targetSeq) + if err == nil && req.forceAck { + if ackSeq, sendNow := scheduleAck(len(batch) > 0, true); sendNow { + err = sendStandaloneAck(ackSeq) + } + } + req.done <- err + case <-batchTimerCh: + if err := flushBatch(); err != nil { r.setTerminalError(err) return } + case <-r.ackCh: + if ackSeq, sendNow := scheduleAck(len(batch) > 0, false); sendNow { + if err := sendStandaloneAck(ackSeq); err != nil { + r.setTerminalError(err) + return + } + } + case <-ackTimerCh: + stopAckTimer() + if ackSeq, sendNow := scheduleAck(len(batch) > 0, true); sendNow { + if err := sendStandaloneAck(ackSeq); err != nil { + r.setTerminalError(err) + return + } + } } } } @@ -694,6 +772,15 @@ func (r *recordStream) readLoop() { } switch frame.Type { case recordFrameTypeBatch: + r.obs.batchFramesReceived.Add(1) + if frame.AckSeq != 0 { + r.obs.piggybackAckReceived.Add(1) + if err := r.handleAckFrame(frame.AckSeq); err != nil { + r.setReadError(err) + _ = r.stream.Reset(err) + return + } + } if err := r.handleBatchFrame(frame.Batch); err != nil { _ = r.sendFailureFrame(RecordFailure{ FailedSeq: r.nextInboundFailureSeq(), @@ -705,12 +792,14 @@ func (r *recordStream) readLoop() { return } case recordFrameTypeAck: + r.obs.ackFramesReceived.Add(1) if err := r.handleAckFrame(frame.AckSeq); err != nil { r.setReadError(err) _ = r.stream.Reset(err) return } case recordFrameTypeError: + r.obs.errorFramesReceived.Add(1) r.setReadError(frame.Failure) return default: @@ -732,6 +821,7 @@ func (r *recordStream) handleBatchFrame(batch []recordOutboundMessage) error { } lastSeq := batch[len(batch)-1].Seq r.inboundReceivedSeq = lastSeq + r.updatePendingApplyLocked() r.signalStateLocked() r.mu.Unlock() for _, item := range batch { @@ -889,23 +979,21 @@ func (r *recordStream) shouldSendAckNow() bool { return r.inboundAppliedSeq > r.inboundAckSentSeq && int(r.inboundAppliedSeq-r.inboundAckSentSeq) >= r.cfg.AckEveryRecords } -func (r *recordStream) flushAckNow() error { +func (r *recordStream) pendingAckSeq() uint64 { if r == nil { - return errRecordStreamNil + return 0 } r.mu.Lock() - ackSeq := r.inboundAppliedSeq - if ackSeq <= r.inboundAckSentSeq { - r.mu.Unlock() - return nil + defer r.mu.Unlock() + if r.inboundAppliedSeq <= r.inboundAckSentSeq { + return 0 } - r.mu.Unlock() - payload, err := encodeRecordAckFrame(ackSeq) - if err != nil { - return err - } - if err := r.writePayloadFrame(payload); err != nil { - return err + return r.inboundAppliedSeq +} + +func (r *recordStream) markAckSent(ackSeq uint64) { + if r == nil || ackSeq == 0 { + return } r.mu.Lock() if ackSeq > r.inboundAckSentSeq { @@ -913,7 +1001,27 @@ func (r *recordStream) flushAckNow() error { r.signalStateLocked() } r.mu.Unlock() - return nil +} + +func (r *recordStream) flushAckNow() error { + if r == nil { + return errRecordStreamNil + } + req := recordFlushRequest{ + forceAck: true, + done: make(chan error, 1), + } + select { + case <-r.ctx.Done(): + return r.streamError() + case r.flushCh <- req: + } + select { + case <-r.ctx.Done(): + return r.streamError() + case err := <-req.done: + return err + } } func (r *recordStream) sendFailureFrame(failure RecordFailure) error { @@ -921,7 +1029,11 @@ func (r *recordStream) sendFailureFrame(failure RecordFailure) error { if err != nil { return err } - return r.writePayloadFrame(payload) + if err := r.writePayloadFrame(payload); err != nil { + return err + } + r.obs.errorFramesSent.Add(1) + return nil } func (r *recordStream) writePayloadFrame(payload []byte) error { @@ -972,3 +1084,13 @@ func (r *recordStream) signalStateLocked() { close(r.stateNotify) r.stateNotify = make(chan struct{}) } + +func (r *recordStream) updatePendingApplyLocked() { + if r == nil { + return + } + pending := recordPendingCount(r.inboundReceivedSeq, r.inboundAppliedSeq) + if pending > r.maxPendingApply { + r.maxPendingApply = pending + } +} diff --git a/server_record.go b/server_record.go index 42a8485..677ad77 100644 --- a/server_record.go +++ b/server_record.go @@ -24,6 +24,7 @@ func (s *ServerCommon) OpenRecordStreamLogical(ctx context.Context, logical *Log _ = stream.Reset(err) return nil, err } + bindRecordRuntime(record, s.getRecordRuntime()) return record, nil } @@ -41,6 +42,7 @@ func (s *ServerCommon) OpenRecordStreamTransport(ctx context.Context, transport _ = stream.Reset(err) return nil, err } + bindRecordRuntime(record, s.getRecordRuntime()) return record, nil } @@ -68,6 +70,7 @@ func (s *ServerCommon) claimInboundRecordStream(logical *LogicalConn, transport if err != nil { return true, err } + bindRecordRuntime(record, runtime) info := RecordAcceptInfo{ ID: stream.ID(), Metadata: stream.Metadata(), diff --git a/server_stream.go b/server_stream.go index a9f254f..68ac9c9 100644 --- a/server_stream.go +++ b/server_stream.go @@ -33,6 +33,7 @@ func (s *ServerCommon) OpenStreamLogical(ctx context.Context, logical *LogicalCo if resp.DataID != 0 { req.DataID = resp.DataID } + req.Metadata = mergeStreamMetadata(req.Metadata, resp.Metadata) transport := logical.CurrentTransportConn() stream := newStreamHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, resp.TransportGeneration, serverStreamCloseSender(s, logical, nil), serverStreamResetSender(s, logical, nil), serverStreamDataSender(s, transport), runtime.configSnapshot()) if err := runtime.register(scope, stream); err != nil { @@ -72,6 +73,7 @@ func (s *ServerCommon) OpenStreamTransport(ctx context.Context, transport *Trans if resp.DataID != 0 { req.DataID = resp.DataID } + req.Metadata = mergeStreamMetadata(req.Metadata, resp.Metadata) stream := newStreamHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, resp.TransportGeneration, serverStreamCloseSender(s, logical, transport), serverStreamResetSender(s, logical, transport), serverStreamDataSender(s, transport), runtime.configSnapshot()) if err := runtime.register(scope, stream); err != nil { _, _ = sendStreamResetServerTransport(context.Background(), s, transport, StreamResetRequest{ diff --git a/stream_control.go b/stream_control.go index 31c1fcb..2002bfa 100644 --- a/stream_control.go +++ b/stream_control.go @@ -20,6 +20,7 @@ type StreamOpenResponse struct { DataID uint64 Accepted bool TransportGeneration uint64 + Metadata StreamMetadata Error string } @@ -95,6 +96,7 @@ func (c *ClientCommon) handleInboundStreamOpen(msg *Message) { req.DataID = runtime.nextDataID() resp.DataID = req.DataID } + req.Metadata, resp.Metadata = negotiateRecordStreamOpenMetadata(req.Channel, req.Metadata) stream := newStreamHandle(c.clientStopContextSnapshot(), runtime, scope, req, c.currentClientSessionEpoch(), nil, nil, 0, clientStreamCloseSender(c), clientStreamResetSender(c), clientStreamDataSender(c, c.currentClientSessionEpoch()), runtime.configSnapshot()) stream.setClientSnapshotOwner(c) stream.setAddrSnapshot(c.clientStreamAddrSnapshot()) @@ -180,6 +182,7 @@ func (s *ServerCommon) handleInboundStreamOpen(msg *Message) { req.DataID = runtime.nextDataID() resp.DataID = req.DataID } + req.Metadata, resp.Metadata = negotiateRecordStreamOpenMetadata(req.Channel, req.Metadata) stream := newStreamHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, streamTransportGeneration(logical, transport), serverStreamCloseSender(s, logical, transport), serverStreamResetSender(s, logical, transport), serverStreamDataSender(s, transport), runtime.configSnapshot()) if err := runtime.register(scope, stream); err != nil { resp.Error = err.Error() diff --git a/transport_conn.go b/transport_conn.go index 1b68c86..60c6edc 100644 --- a/transport_conn.go +++ b/transport_conn.go @@ -245,6 +245,10 @@ func (t *TransportConn) runtimeSnapshot() TransportConnRuntimeSnapshot { snapshot.TransportDetachError = diag.TransportDetachError snapshot.TransportDetachedAt = diag.TransportDetachedAt snapshot.ReattachEligible = diag.ReattachEligible + if snapshot.LogicalAlive && snapshot.TransportDetachReason != "" && !snapshot.Current { + snapshot.LogicalReason = "" + snapshot.LogicalError = "" + } } return snapshot }