notify/file_transfer_snapshot.go
starainrt 09d972c7b7
feat(notify): 重构通信内核并补齐 stream/bulk/record/transfer 能力
- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层
  - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径
  - 完成 transfer/file 传输内核与状态快照、诊断能力
  - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块
  - 增加大规模回归、并发与基准测试覆盖
  - 更新依赖库
2026-04-15 15:24:36 +08:00

191 lines
5.6 KiB
Go

package notify
import (
"sort"
"strconv"
"time"
)
type fileTransferDirection uint8
const (
fileTransferDirectionReceive fileTransferDirection = iota
fileTransferDirectionSend
)
type fileTransferSnapshot struct {
Direction fileTransferDirection
Scope string
RuntimeScope string
TransportGeneration uint64
NetType NetType
Kind EnvelopeKind
FileID string
Path string
Received int64
Total int64
Percent float64
Done bool
Err error
StartedAt time.Time
UpdatedAt time.Time
Duration time.Duration
RateBPS float64
StepDuration time.Duration
InstantRateBPS float64
Time time.Time
Stage string
}
func fileTransferMonitorScope(event FileEvent) string {
if logical := fileEventLogicalConnSnapshot(event); logical != nil {
return serverFileScope(logical)
}
return clientFileScope()
}
func fileTransferRuntimeScope(event FileEvent) string {
if event.TransportConn != nil {
return serverTransportScopeForTransport(event.TransportConn)
}
if logical := fileEventLogicalConnSnapshot(event); logical != nil {
return serverTransportScope(logical)
}
return clientFileScope()
}
func fileTransferTransportGeneration(event FileEvent) uint64 {
if event.TransportConn != nil {
return event.TransportConn.TransportGeneration()
}
logical := fileEventLogicalConnSnapshot(event)
if logical == nil {
return 0
}
return logical.transportGenerationSnapshot()
}
func fileTransferMonitorKey(direction fileTransferDirection, scope string, fileID string) string {
if fileID == "" {
return ""
}
return strconv.Itoa(int(direction)) + "|" + scope + "|" + fileID
}
func fileTransferRuntimeMonitorKey(direction fileTransferDirection, runtimeScope string, fileID string) string {
return fileTransferMonitorKey(direction, normalizeFileScope(runtimeScope), fileID)
}
func fileTransferSnapshotFromEvent(direction fileTransferDirection, event FileEvent) fileTransferSnapshot {
return fileTransferSnapshot{
Direction: direction,
Scope: fileTransferMonitorScope(event),
RuntimeScope: fileTransferRuntimeScope(event),
TransportGeneration: fileTransferTransportGeneration(event),
NetType: event.NetType,
Kind: event.Kind,
FileID: event.Packet.FileID,
Path: event.Path,
Received: event.Received,
Total: event.Total,
Percent: event.Percent,
Done: event.Done,
Err: event.Err,
StartedAt: event.StartedAt,
UpdatedAt: event.UpdatedAt,
Duration: event.Duration,
RateBPS: event.RateBPS,
StepDuration: event.StepDuration,
InstantRateBPS: event.InstantRateBPS,
Time: event.Time,
Stage: event.Packet.Stage,
}
}
func isFileTransferTerminal(kind EnvelopeKind) bool {
return kind == EnvelopeFileEnd || kind == EnvelopeFileAbort
}
func isFileTransferObservable(kind EnvelopeKind) bool {
return kind == EnvelopeFileMeta || kind == EnvelopeFileChunk || kind == EnvelopeFileEnd || kind == EnvelopeFileAbort
}
func sortedFileTransferSnapshots(src map[string]fileTransferSnapshot) []fileTransferSnapshot {
keys := make([]string, 0, len(src))
for key := range src {
keys = append(keys, key)
}
sort.Strings(keys)
out := make([]fileTransferSnapshot, 0, len(keys))
for _, key := range keys {
out = append(out, src[key])
}
return out
}
func latestFileTransferSnapshotsLocked(active map[string]fileTransferSnapshot, completed map[string]fileTransferSnapshot) []fileTransferSnapshot {
merged := make(map[string]fileTransferSnapshot, len(active)+len(completed))
for key, snapshot := range completed {
merged[key] = snapshot
}
for key, snapshot := range active {
merged[key] = snapshot
}
return sortedFileTransferSnapshots(merged)
}
func filteredFileTransferSnapshots(src map[string]fileTransferSnapshot, direction fileTransferDirection) []fileTransferSnapshot {
out := make([]fileTransferSnapshot, 0, len(src))
for _, snapshot := range sortedFileTransferSnapshots(src) {
if snapshot.Direction != direction {
continue
}
out = append(out, snapshot)
}
return out
}
func filterFileTransferSnapshotsByFileID(src []fileTransferSnapshot, fileID string) []fileTransferSnapshot {
out := make([]fileTransferSnapshot, 0, len(src))
for _, snapshot := range src {
if snapshot.FileID != fileID {
continue
}
out = append(out, snapshot)
}
return out
}
func filterFileTransferSnapshotsByDirectionAndFileID(src []fileTransferSnapshot, direction fileTransferDirection, fileID string) []fileTransferSnapshot {
out := make([]fileTransferSnapshot, 0, len(src))
for _, snapshot := range src {
if snapshot.Direction != direction || snapshot.FileID != fileID {
continue
}
out = append(out, snapshot)
}
return out
}
func fileTransferSnapshotOlder(candidate fileTransferSnapshot, current fileTransferSnapshot, candidateKey string, currentKey string) bool {
candidateTime := fileTransferSnapshotCompletedTime(candidate)
currentTime := fileTransferSnapshotCompletedTime(current)
if candidateTime.Before(currentTime) {
return true
}
if currentTime.Before(candidateTime) {
return false
}
return candidateKey < currentKey
}
func fileTransferSnapshotCompletedTime(snapshot fileTransferSnapshot) time.Time {
if !snapshot.Time.IsZero() {
return snapshot.Time
}
if !snapshot.UpdatedAt.IsZero() {
return snapshot.UpdatedAt
}
return snapshot.StartedAt
}