package notify import ( "context" "strconv" "sync" "time" itransfer "b612.me/notify/internal/transfer" ) const ( transferMetadataIDKey = "_notify.transfer_id" transferMetadataScopeKey = "_notify.transfer_scope" transferMetadataRuntimeScopeKey = "_notify.transfer_runtime_scope" transferMetadataTransportGenerationKey = "_notify.transfer_transport_generation" transferMetadataSendChunkSizeKey = "_notify.transfer_send_chunk_size" transferMetadataSendParallelismKey = "_notify.transfer_send_parallelism" transferMetadataSendMaxInflightKey = "_notify.transfer_send_max_inflight_bytes" ) type transferRuntime struct { manager *itransfer.Manager mu sync.RWMutex store TransferResumeStore } func newTransferRuntime() *transferRuntime { return &transferRuntime{ manager: itransfer.NewManager(), } } func (r *transferRuntime) snapshots() []itransfer.Snapshot { if r == nil || r.manager == nil { return nil } return r.manager.Snapshots() } func (r *transferRuntime) snapshot(direction fileTransferDirection, scope string, transferID string) (itransfer.Snapshot, bool) { if r == nil || r.manager == nil || transferID == "" { return itransfer.Snapshot{}, false } return r.manager.Snapshot(r.key(direction, scope, transferID)) } func (r *transferRuntime) ensureTransferDescriptor(direction fileTransferDirection, runtimeScope string, publicScope string, transportGeneration uint64, desc itransfer.Descriptor) { if r == nil || r.manager == nil || desc.ID == "" { return } publicID := desc.ID key := r.key(direction, runtimeScope, publicID) if _, ok := r.manager.Snapshot(key); ok { return } desc.ID = key desc.Metadata = transferRuntimeMetadataWithScope(runtimeScope, publicScope, transportGeneration, desc.Metadata, publicID) switch direction { case fileTransferDirectionReceive: snapshot, _ := r.manager.StartIncoming(desc) r.persistSnapshot(snapshot) default: snapshot, _ := r.manager.StartOutgoing(desc) r.persistSnapshot(snapshot) } } func (r *transferRuntime) activate(direction fileTransferDirection, scope string, transferID string) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.Activate(r.key(direction, scope, transferID)) r.persistSnapshot(snapshot) } func (r *transferRuntime) beginCommit(direction fileTransferDirection, scope string, transferID string) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.BeginCommit(r.key(direction, scope, transferID)) r.persistSnapshot(snapshot) } func (r *transferRuntime) beginVerify(direction fileTransferDirection, scope string, transferID string) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.BeginVerify(r.key(direction, scope, transferID)) r.persistSnapshot(snapshot) } func (r *transferRuntime) complete(direction fileTransferDirection, scope string, transferID string) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.Complete(r.key(direction, scope, transferID)) r.persistSnapshot(snapshot) } func (r *transferRuntime) abort(direction fileTransferDirection, scope string, transferID string, err error) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.Abort(r.key(direction, scope, transferID), err) r.persistSnapshot(snapshot) } func (r *transferRuntime) fail(direction fileTransferDirection, scope string, transferID string, err error) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.Fail(r.key(direction, scope, transferID), err) r.persistSnapshot(snapshot) } func (r *transferRuntime) resume(direction fileTransferDirection, scope string, transferID string, confirmedBytes int64) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.Resume(r.key(direction, scope, transferID), confirmedBytes) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordSend(direction fileTransferDirection, scope string, transferID string, sentBytes int64) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.RecordSend(r.key(direction, scope, transferID), sentBytes) r.persistSnapshot(snapshot) } func (r *transferRuntime) setAckedBytes(direction fileTransferDirection, scope string, transferID string, ackedBytes int64) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.SetAckedBytes(r.key(direction, scope, transferID), ackedBytes) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordReceive(direction fileTransferDirection, scope string, transferID string, recvBytes int64) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.RecordReceive(r.key(direction, scope, transferID), recvBytes) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordRetry(direction fileTransferDirection, scope string, transferID string) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.RecordRetry(r.key(direction, scope, transferID)) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordTimeout(direction fileTransferDirection, scope string, transferID string) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.RecordTimeout(r.key(direction, scope, transferID)) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordStage(direction fileTransferDirection, scope string, transferID string, stage string) { if r == nil || r.manager == nil || transferID == "" || stage == "" { return } snapshot, _ := r.manager.SetStage(r.key(direction, scope, transferID), stage) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordFailureStage(direction fileTransferDirection, scope string, transferID string, stage string) { if r == nil || r.manager == nil || transferID == "" || stage == "" { return } snapshot, _ := r.manager.SetFailureStage(r.key(direction, scope, transferID), stage) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordTelemetry(direction fileTransferDirection, scope string, transferID string, delta itransfer.TelemetryDelta) { if r == nil || r.manager == nil || transferID == "" { return } snapshot, _ := r.manager.RecordTelemetry(r.key(direction, scope, transferID), delta) r.persistSnapshot(snapshot) } func (r *transferRuntime) recordSourceRead(direction fileTransferDirection, scope string, transferID string, dur time.Duration) { r.recordTelemetry(direction, scope, transferID, itransfer.TelemetryDelta{ SourceReadDuration: dur, SourceReadCount: 1, }) } func (r *transferRuntime) recordStreamWrite(direction fileTransferDirection, scope string, transferID string, dur time.Duration) { r.recordTelemetry(direction, scope, transferID, itransfer.TelemetryDelta{ StreamWriteDuration: dur, StreamWriteCount: 1, }) } func (r *transferRuntime) recordSinkWrite(direction fileTransferDirection, scope string, transferID string, dur time.Duration) { r.recordTelemetry(direction, scope, transferID, itransfer.TelemetryDelta{ SinkWriteDuration: dur, SinkWriteCount: 1, }) } func (r *transferRuntime) recordSyncDuration(direction fileTransferDirection, scope string, transferID string, dur time.Duration) { r.recordTelemetry(direction, scope, transferID, itransfer.TelemetryDelta{ SyncDuration: dur, }) } func (r *transferRuntime) recordVerifyDuration(direction fileTransferDirection, scope string, transferID string, dur time.Duration) { r.recordTelemetry(direction, scope, transferID, itransfer.TelemetryDelta{ VerifyDuration: dur, }) } func (r *transferRuntime) recordCommitDuration(direction fileTransferDirection, scope string, transferID string, dur time.Duration) { r.recordTelemetry(direction, scope, transferID, itransfer.TelemetryDelta{ CommitDuration: dur, }) } func (r *transferRuntime) recordCommitWaitDuration(direction fileTransferDirection, scope string, transferID string, dur time.Duration) { r.recordTelemetry(direction, scope, transferID, itransfer.TelemetryDelta{ CommitWaitDuration: dur, }) } func (r *transferRuntime) recordSendOptions(direction fileTransferDirection, scope string, transferID string, chunkSize int, parallelism int, maxInflightBytes int64) { if r == nil || r.manager == nil || transferID == "" { return } metadata := make(itransfer.Metadata, 3) if chunkSize > 0 { metadata[transferMetadataSendChunkSizeKey] = strconv.Itoa(chunkSize) } if parallelism > 0 { metadata[transferMetadataSendParallelismKey] = strconv.Itoa(parallelism) } if maxInflightBytes > 0 { metadata[transferMetadataSendMaxInflightKey] = strconv.FormatInt(maxInflightBytes, 10) } if len(metadata) == 0 { return } snapshot, _ := r.manager.MergeMetadata(r.key(direction, scope, transferID), metadata) r.persistSnapshot(snapshot) } func (r *transferRuntime) key(direction fileTransferDirection, scope string, transferID string) string { return fileTransferMonitorKey(direction, normalizeFileScope(scope), transferID) } func (r *transferRuntime) setResumeStore(store TransferResumeStore) { if r == nil { return } r.mu.Lock() r.store = store r.mu.Unlock() } func (r *transferRuntime) resumeStoreSnapshot() TransferResumeStore { if r == nil { return nil } r.mu.RLock() defer r.mu.RUnlock() return r.store } func (r *transferRuntime) recover(ctx context.Context) error { store := r.resumeStoreSnapshot() if store == nil { return nil } if ctx == nil { ctx = context.Background() } snapshots, err := store.LoadTransferSnapshots(ctx) if err != nil { return err } for _, snapshot := range snapshots { if transferSnapshotTerminal(snapshot.State) { continue } _, _ = r.manager.Restore(restoreInternalTransferSnapshot(snapshot)) } return nil } func (r *transferRuntime) resumableSnapshot(direction fileTransferDirection, publicScope string, transferID string) (TransferSnapshot, bool) { if r == nil || transferID == "" { return TransferSnapshot{}, false } wantScope := normalizeFileScope(publicScope) var matched TransferSnapshot found := false for _, snapshot := range transferSnapshotsFromRuntime(r) { if snapshot.ID != transferID || snapshot.Direction != convertTransferDirectionPublic(direction) { continue } if normalizeFileScope(snapshot.Scope) != wantScope { continue } if transferSnapshotTerminal(snapshot.State) { continue } if !found || snapshot.UpdatedAt.After(matched.UpdatedAt) { matched = snapshot found = true } } return matched, found } func (r *transferRuntime) persistSnapshot(snapshot itransfer.Snapshot) { store := r.resumeStoreSnapshot() if store == nil || snapshot.ID == "" { return } publicSnapshot := convertTransferSnapshot(snapshot) ctx := context.Background() if transferSnapshotTerminal(publicSnapshot.State) { _ = store.DeleteTransferSnapshot(ctx, publicSnapshot) return } _ = store.SaveTransferSnapshot(ctx, publicSnapshot) } func restoreInternalTransferSnapshot(snapshot TransferSnapshot) itransfer.Snapshot { runtimeScope := normalizeFileScope(snapshot.RuntimeScope) publicScope := normalizeFileScope(snapshot.Scope) if runtimeScope == defaultFileScope && publicScope != defaultFileScope { runtimeScope = publicScope } metadata := transferRuntimeMetadataWithScope(runtimeScope, publicScope, snapshot.TransportGeneration, itransfer.Metadata(cloneTransferMetadata(snapshot.Metadata)), snapshot.ID) metadata = transferRuntimeMetadataWithSendConfig(metadata, snapshot.ChunkSize, snapshot.Parallelism, snapshot.MaxInflightBytes) return itransfer.Snapshot{ ID: fileTransferMonitorKey(convertTransferDirectionFile(snapshot.Direction), runtimeScope, snapshot.ID), Direction: convertTransferDirectionInternal(snapshot.Direction), Channel: transferChannelToKernel(snapshot.Channel), State: convertTransferStateInternal(snapshot.State), Stage: snapshot.Stage, LastFailureStage: snapshot.LastFailureStage, Size: snapshot.Size, Checksum: snapshot.Checksum, Metadata: metadata, SentBytes: snapshot.SentBytes, AckedBytes: snapshot.AckedBytes, ReceivedBytes: snapshot.ReceivedBytes, InflightBytes: snapshot.InflightBytes, RetryCount: snapshot.RetryCount, TimeoutCount: snapshot.TimeoutCount, LastError: snapshot.LastError, SourceReadDuration: snapshot.SourceReadDuration, StreamWriteDuration: snapshot.StreamWriteDuration, SinkWriteDuration: snapshot.SinkWriteDuration, SyncDuration: snapshot.SyncDuration, VerifyDuration: snapshot.VerifyDuration, CommitDuration: snapshot.CommitDuration, CommitWaitDuration: snapshot.CommitWaitDuration, SourceReadCount: snapshot.SourceReadCount, StreamWriteCount: snapshot.StreamWriteCount, SinkWriteCount: snapshot.SinkWriteCount, StartedAt: snapshot.StartedAt.UnixNano(), UpdatedAt: snapshot.UpdatedAt.UnixNano(), CompletedAt: snapshot.CompletedAt.UnixNano(), } } func convertTransferDirectionInternal(direction TransferDirection) itransfer.Direction { if direction == TransferDirectionReceive { return itransfer.DirectionReceive } return itransfer.DirectionSend } func convertTransferDirectionFile(direction TransferDirection) fileTransferDirection { if direction == TransferDirectionReceive { return fileTransferDirectionReceive } return fileTransferDirectionSend } func convertTransferDirectionPublic(direction fileTransferDirection) TransferDirection { if direction == fileTransferDirectionReceive { return TransferDirectionReceive } return TransferDirectionSend } func convertTransferStateInternal(state TransferState) itransfer.State { switch state { case TransferStateNegotiating: return itransfer.StateNegotiating case TransferStatePrepared: return itransfer.StatePrepared case TransferStateActive: return itransfer.StateActive case TransferStatePaused: return itransfer.StatePaused case TransferStateCommitting: return itransfer.StateCommitting case TransferStateVerifying: return itransfer.StateVerifying case TransferStateDone: return itransfer.StateDone case TransferStateAborted: return itransfer.StateAborted case TransferStateFailed: return itransfer.StateFailed default: return itransfer.StateInit } } func transferSnapshotTerminal(state TransferState) bool { switch state { case TransferStateDone, TransferStateAborted, TransferStateFailed: return true default: return false } } func transferRuntimeMetadataWithScope(runtimeScope string, publicScope string, transportGeneration uint64, metadata itransfer.Metadata, transferID string) itransfer.Metadata { cloned := cloneTransferRuntimeMetadata(metadata) if cloned == nil { cloned = make(itransfer.Metadata) } runtimeScope = normalizeFileScope(runtimeScope) publicScope = normalizeFileScope(publicScope) if publicScope == defaultFileScope && runtimeScope != defaultFileScope { publicScope = runtimeScope } cloned[transferMetadataIDKey] = transferID cloned[transferMetadataScopeKey] = publicScope cloned[transferMetadataRuntimeScopeKey] = runtimeScope if transportGeneration > 0 { cloned[transferMetadataTransportGenerationKey] = strconv.FormatUint(transportGeneration, 10) } else { delete(cloned, transferMetadataTransportGenerationKey) } return cloned } func transferRuntimeMetadataWithSendConfig(metadata itransfer.Metadata, chunkSize int, parallelism int, maxInflightBytes int64) itransfer.Metadata { cloned := cloneTransferRuntimeMetadata(metadata) if cloned == nil { cloned = make(itransfer.Metadata) } if chunkSize > 0 { cloned[transferMetadataSendChunkSizeKey] = strconv.Itoa(chunkSize) } else { delete(cloned, transferMetadataSendChunkSizeKey) } if parallelism > 0 { cloned[transferMetadataSendParallelismKey] = strconv.Itoa(parallelism) } else { delete(cloned, transferMetadataSendParallelismKey) } if maxInflightBytes > 0 { cloned[transferMetadataSendMaxInflightKey] = strconv.FormatInt(maxInflightBytes, 10) } else { delete(cloned, transferMetadataSendMaxInflightKey) } return cloned } func cloneTransferRuntimeMetadata(src itransfer.Metadata) itransfer.Metadata { if len(src) == 0 { return nil } dst := make(itransfer.Metadata, len(src)) for key, value := range src { dst[key] = value } return dst }