- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层 - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径 - 完成 transfer/file 传输内核与状态快照、诊断能力 - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块 - 增加大规模回归、并发与基准测试覆盖 - 更新依赖库
211 lines
6.5 KiB
Go
211 lines
6.5 KiB
Go
package notify
|
|
|
|
import (
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
type fileTransferSummary struct {
|
|
Direction fileTransferDirection
|
|
Scope string
|
|
RuntimeScope string
|
|
TransportGeneration uint64
|
|
NetType NetType
|
|
Kind EnvelopeKind
|
|
FileID string
|
|
Path string
|
|
Received int64
|
|
Total int64
|
|
Percent float64
|
|
Active bool
|
|
Terminal bool
|
|
Done bool
|
|
Failed bool
|
|
Err error
|
|
StartedAt time.Time
|
|
UpdatedAt time.Time
|
|
Duration time.Duration
|
|
RateBPS float64
|
|
StepDuration time.Duration
|
|
InstantRateBPS float64
|
|
Time time.Time
|
|
Stage string
|
|
}
|
|
|
|
type fileTransferSummaryRecord struct {
|
|
snapshot fileTransferSnapshot
|
|
active bool
|
|
}
|
|
|
|
func fileTransferSummaryFromSnapshot(snapshot fileTransferSnapshot, active bool) fileTransferSummary {
|
|
return fileTransferSummary{
|
|
Direction: snapshot.Direction,
|
|
Scope: snapshot.Scope,
|
|
RuntimeScope: snapshot.RuntimeScope,
|
|
TransportGeneration: snapshot.TransportGeneration,
|
|
NetType: snapshot.NetType,
|
|
Kind: snapshot.Kind,
|
|
FileID: snapshot.FileID,
|
|
Path: snapshot.Path,
|
|
Received: snapshot.Received,
|
|
Total: snapshot.Total,
|
|
Percent: snapshot.Percent,
|
|
Active: active,
|
|
Terminal: !active && isFileTransferTerminal(snapshot.Kind),
|
|
Done: snapshot.Done,
|
|
Failed: snapshot.Kind == EnvelopeFileAbort || snapshot.Err != nil,
|
|
Err: snapshot.Err,
|
|
StartedAt: snapshot.StartedAt,
|
|
UpdatedAt: snapshot.UpdatedAt,
|
|
Duration: snapshot.Duration,
|
|
RateBPS: snapshot.RateBPS,
|
|
StepDuration: snapshot.StepDuration,
|
|
InstantRateBPS: snapshot.InstantRateBPS,
|
|
Time: snapshot.Time,
|
|
Stage: snapshot.Stage,
|
|
}
|
|
}
|
|
|
|
func (m *fileTransferMonitor) activeSummaries() []fileTransferSummary {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return summariesFromSnapshots(sortedFileTransferSnapshots(m.active), true)
|
|
}
|
|
|
|
func (m *fileTransferMonitor) completedSummaries() []fileTransferSummary {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return summariesFromSnapshots(sortedFileTransferSnapshots(m.completed), false)
|
|
}
|
|
|
|
func (m *fileTransferMonitor) latestSummary(direction fileTransferDirection, scope string, fileID string) (fileTransferSummary, bool) {
|
|
if m == nil {
|
|
return fileTransferSummary{}, false
|
|
}
|
|
key := fileTransferMonitorKey(direction, scope, fileID)
|
|
if key == "" {
|
|
return fileTransferSummary{}, false
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if snapshot, ok := m.active[key]; ok {
|
|
return fileTransferSummaryFromSnapshot(snapshot, true), true
|
|
}
|
|
snapshot, ok := m.completed[key]
|
|
if !ok {
|
|
return fileTransferSummary{}, false
|
|
}
|
|
return fileTransferSummaryFromSnapshot(snapshot, false), true
|
|
}
|
|
|
|
func (m *fileTransferMonitor) summariesByFileID(fileID string) []fileTransferSummary {
|
|
if m == nil || fileID == "" {
|
|
return nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return summariesFromRecords(filterFileTransferSummaryRecordsByFileID(latestFileTransferSummaryRecordsLocked(m.active, m.completed), fileID))
|
|
}
|
|
|
|
func (m *fileTransferMonitor) summariesByDirectionAndFileID(direction fileTransferDirection, fileID string) []fileTransferSummary {
|
|
if m == nil || fileID == "" {
|
|
return nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return summariesFromRecords(filterFileTransferSummaryRecordsByDirectionAndFileID(latestFileTransferSummaryRecordsLocked(m.active, m.completed), direction, fileID))
|
|
}
|
|
|
|
func (m *fileTransferMonitor) runtimeSummariesByFileID(fileID string) []fileTransferSummary {
|
|
if m == nil || fileID == "" {
|
|
return nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return summariesFromRecords(filterFileTransferSummaryRecordsByFileID(latestFileTransferSummaryRecordsLocked(m.runtimeActive, m.runtimeCompleted), fileID))
|
|
}
|
|
|
|
func (m *fileTransferMonitor) runtimeSummariesByDirectionAndFileID(direction fileTransferDirection, fileID string) []fileTransferSummary {
|
|
if m == nil || fileID == "" {
|
|
return nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return summariesFromRecords(filterFileTransferSummaryRecordsByDirectionAndFileID(latestFileTransferSummaryRecordsLocked(m.runtimeActive, m.runtimeCompleted), direction, fileID))
|
|
}
|
|
|
|
func latestFileTransferSummaryRecordsLocked(active map[string]fileTransferSnapshot, completed map[string]fileTransferSnapshot) []fileTransferSummaryRecord {
|
|
keys := make([]string, 0, len(active)+len(completed))
|
|
seen := make(map[string]struct{}, len(active)+len(completed))
|
|
for key := range completed {
|
|
if _, ok := seen[key]; ok {
|
|
continue
|
|
}
|
|
seen[key] = struct{}{}
|
|
keys = append(keys, key)
|
|
}
|
|
for key := range active {
|
|
if _, ok := seen[key]; ok {
|
|
continue
|
|
}
|
|
seen[key] = struct{}{}
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
out := make([]fileTransferSummaryRecord, 0, len(keys))
|
|
for _, key := range keys {
|
|
if snapshot, ok := active[key]; ok {
|
|
out = append(out, fileTransferSummaryRecord{snapshot: snapshot, active: true})
|
|
continue
|
|
}
|
|
if snapshot, ok := completed[key]; ok {
|
|
out = append(out, fileTransferSummaryRecord{snapshot: snapshot, active: false})
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func summariesFromSnapshots(src []fileTransferSnapshot, active bool) []fileTransferSummary {
|
|
out := make([]fileTransferSummary, 0, len(src))
|
|
for _, snapshot := range src {
|
|
out = append(out, fileTransferSummaryFromSnapshot(snapshot, active))
|
|
}
|
|
return out
|
|
}
|
|
|
|
func summariesFromRecords(src []fileTransferSummaryRecord) []fileTransferSummary {
|
|
out := make([]fileTransferSummary, 0, len(src))
|
|
for _, record := range src {
|
|
out = append(out, fileTransferSummaryFromSnapshot(record.snapshot, record.active))
|
|
}
|
|
return out
|
|
}
|
|
|
|
func filterFileTransferSummaryRecordsByFileID(src []fileTransferSummaryRecord, fileID string) []fileTransferSummaryRecord {
|
|
out := make([]fileTransferSummaryRecord, 0, len(src))
|
|
for _, record := range src {
|
|
if record.snapshot.FileID != fileID {
|
|
continue
|
|
}
|
|
out = append(out, record)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func filterFileTransferSummaryRecordsByDirectionAndFileID(src []fileTransferSummaryRecord, direction fileTransferDirection, fileID string) []fileTransferSummaryRecord {
|
|
out := make([]fileTransferSummaryRecord, 0, len(src))
|
|
for _, record := range src {
|
|
if record.snapshot.Direction != direction || record.snapshot.FileID != fileID {
|
|
continue
|
|
}
|
|
out = append(out, record)
|
|
}
|
|
return out
|
|
}
|