- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层 - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径 - 完成 transfer/file 传输内核与状态快照、诊断能力 - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块 - 增加大规模回归、并发与基准测试覆盖 - 更新依赖库
303 lines
9.1 KiB
Go
303 lines
9.1 KiB
Go
package notify
|
|
|
|
import itransfer "b612.me/notify/internal/transfer"
|
|
|
|
type fileTransferState struct {
|
|
monitor *fileTransferMonitor
|
|
query fileTransferQuery
|
|
runtime *transferRuntime
|
|
}
|
|
|
|
func newFileTransferState() *fileTransferState {
|
|
return newFileTransferStateWithConfig(defaultFileTransferConfig())
|
|
}
|
|
|
|
func newFileTransferStateWithConfig(cfg fileTransferConfig) *fileTransferState {
|
|
monitor := newFileTransferMonitorWithConfig(cfg)
|
|
return &fileTransferState{
|
|
monitor: monitor,
|
|
query: newFileTransferQuery(monitor),
|
|
runtime: newTransferRuntime(),
|
|
}
|
|
}
|
|
|
|
func (s *fileTransferState) observe(direction fileTransferDirection, event FileEvent) {
|
|
if s == nil || s.monitor == nil {
|
|
return
|
|
}
|
|
s.monitor.observe(direction, event)
|
|
s.observeRuntime(direction, event)
|
|
}
|
|
|
|
func (s *fileTransferState) observeMonitorOnly(direction fileTransferDirection, event FileEvent) {
|
|
if s == nil || s.monitor == nil {
|
|
return
|
|
}
|
|
s.monitor.observe(direction, event)
|
|
}
|
|
|
|
func (s *fileTransferState) applyConfig(cfg fileTransferConfig) {
|
|
if s == nil || s.monitor == nil {
|
|
return
|
|
}
|
|
s.monitor.applyConfig(cfg)
|
|
}
|
|
|
|
func (s *fileTransferState) monitorView() *fileTransferMonitor {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
return s.monitor
|
|
}
|
|
|
|
func (s *fileTransferState) active() fileTransferSummaryGroup {
|
|
if s == nil {
|
|
return fileTransferSummaryGroup{}
|
|
}
|
|
return s.query.active()
|
|
}
|
|
|
|
func (s *fileTransferState) completed() fileTransferSummaryGroup {
|
|
if s == nil {
|
|
return fileTransferSummaryGroup{}
|
|
}
|
|
return s.query.completed()
|
|
}
|
|
|
|
func (s *fileTransferState) failed() fileTransferSummaryGroup {
|
|
if s == nil {
|
|
return fileTransferSummaryGroup{}
|
|
}
|
|
return s.query.failed()
|
|
}
|
|
|
|
func (s *fileTransferState) latest(direction fileTransferDirection, scope string, fileID string) (fileTransferSummary, bool) {
|
|
if s == nil || s.monitor == nil {
|
|
return fileTransferSummary{}, false
|
|
}
|
|
return s.monitor.latestSummary(direction, scope, fileID)
|
|
}
|
|
|
|
func (s *fileTransferState) latestByFileID(fileID string) fileTransferSummaryGroup {
|
|
if s == nil {
|
|
return fileTransferSummaryGroup{}
|
|
}
|
|
return s.query.latestByFileID(fileID)
|
|
}
|
|
|
|
func (s *fileTransferState) latestSendByFileID(fileID string) []fileTransferSummary {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
return s.query.latestSendByFileID(fileID)
|
|
}
|
|
|
|
func (s *fileTransferState) latestReceiveByFileID(fileID string) []fileTransferSummary {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
return s.query.latestReceiveByFileID(fileID)
|
|
}
|
|
|
|
func (s *fileTransferState) latestByFileIDQuery(fileID string, query fileTransferSummaryQuery) fileTransferSummaryGroup {
|
|
if s == nil {
|
|
return fileTransferSummaryGroup{}
|
|
}
|
|
return s.query.latestByFileIDQuery(fileID, query)
|
|
}
|
|
|
|
func (s *fileTransferState) latestSendByFileIDQuery(fileID string, query fileTransferSummaryQuery) []fileTransferSummary {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
return s.query.latestSendByFileIDQuery(fileID, query)
|
|
}
|
|
|
|
func (s *fileTransferState) latestReceiveByFileIDQuery(fileID string, query fileTransferSummaryQuery) []fileTransferSummary {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
return s.query.latestReceiveByFileIDQuery(fileID, query)
|
|
}
|
|
|
|
func (s *fileTransferState) observeRuntime(direction fileTransferDirection, event FileEvent) {
|
|
if s == nil || s.runtime == nil || event.Packet.FileID == "" {
|
|
return
|
|
}
|
|
runtimeScope := transferRuntimeScopeForEvent(event)
|
|
publicScope := transferRuntimePublicScopeForEvent(event)
|
|
transportGeneration := transferRuntimeTransportGenerationForEvent(event)
|
|
s.ensureRuntimeTransfer(direction, runtimeScope, publicScope, transportGeneration, event)
|
|
s.recordRuntimeStage(direction, runtimeScope, event.Packet.FileID, runtimeTransferStageForEvent(event))
|
|
switch event.Kind {
|
|
case EnvelopeFileChunk:
|
|
s.runtime.activate(direction, runtimeScope, event.Packet.FileID)
|
|
s.syncRuntimeProgress(direction, runtimeScope, event)
|
|
case EnvelopeFileEnd:
|
|
s.runtime.activate(direction, runtimeScope, event.Packet.FileID)
|
|
s.syncRuntimeProgress(direction, runtimeScope, event)
|
|
switch direction {
|
|
case fileTransferDirectionSend:
|
|
s.runtime.beginCommit(direction, runtimeScope, event.Packet.FileID)
|
|
case fileTransferDirectionReceive:
|
|
s.runtime.beginVerify(direction, runtimeScope, event.Packet.FileID)
|
|
}
|
|
s.runtime.complete(direction, runtimeScope, event.Packet.FileID)
|
|
case EnvelopeFileAbort:
|
|
s.syncRuntimeProgress(direction, runtimeScope, event)
|
|
s.recordRuntimeFailureStage(direction, runtimeScope, event.Packet.FileID, event.Packet.Stage)
|
|
s.runtime.abort(direction, runtimeScope, event.Packet.FileID, event.Err)
|
|
}
|
|
}
|
|
|
|
func (s *fileTransferState) ensureRuntimeTransfer(direction fileTransferDirection, runtimeScope string, publicScope string, transportGeneration uint64, event FileEvent) {
|
|
if s == nil || s.runtime == nil || event.Packet.FileID == "" {
|
|
return
|
|
}
|
|
s.runtime.ensureTransferDescriptor(direction, runtimeScope, publicScope, transportGeneration, itransfer.Descriptor{
|
|
ID: event.Packet.FileID,
|
|
Channel: itransfer.DataChannel,
|
|
Size: event.Packet.Size,
|
|
Checksum: event.Packet.Checksum,
|
|
Metadata: buildKernelTransferMetadata(event),
|
|
})
|
|
}
|
|
|
|
func (s *fileTransferState) startRuntimeSendSession(runtimeScope string, publicScope string, transportGeneration uint64, session *fileSendSession) {
|
|
if s == nil || s.runtime == nil || session == nil || session.fileID == "" {
|
|
return
|
|
}
|
|
s.runtime.ensureTransferDescriptor(fileTransferDirectionSend, runtimeScope, publicScope, transportGeneration, itransfer.Descriptor{
|
|
ID: session.fileID,
|
|
Channel: itransfer.DataChannel,
|
|
Size: session.size,
|
|
Checksum: session.checksum,
|
|
Metadata: itransfer.Metadata{
|
|
"name": session.name,
|
|
"path": session.path,
|
|
},
|
|
})
|
|
}
|
|
|
|
func buildKernelTransferMetadata(event FileEvent) itransfer.Metadata {
|
|
metadata := make(itransfer.Metadata)
|
|
if event.Packet.Name != "" {
|
|
metadata["name"] = event.Packet.Name
|
|
}
|
|
if event.Path != "" {
|
|
metadata["path"] = event.Path
|
|
}
|
|
if len(metadata) == 0 {
|
|
return nil
|
|
}
|
|
return metadata
|
|
}
|
|
|
|
func (s *fileTransferState) syncRuntimeProgress(direction fileTransferDirection, scope string, event FileEvent) {
|
|
if s == nil || s.runtime == nil {
|
|
return
|
|
}
|
|
snapshot, ok := s.runtimeSnapshot(direction, scope, event.Packet.FileID)
|
|
if !ok {
|
|
return
|
|
}
|
|
progress := event.Received
|
|
if progress < 0 {
|
|
progress = 0
|
|
}
|
|
switch direction {
|
|
case fileTransferDirectionReceive:
|
|
if delta := progress - snapshot.ReceivedBytes; delta > 0 {
|
|
s.runtime.recordReceive(direction, scope, event.Packet.FileID, delta)
|
|
}
|
|
default:
|
|
if delta := progress - snapshot.SentBytes; delta > 0 {
|
|
s.runtime.recordSend(direction, scope, event.Packet.FileID, delta)
|
|
}
|
|
s.runtime.setAckedBytes(direction, scope, event.Packet.FileID, progress)
|
|
}
|
|
}
|
|
|
|
func (s *fileTransferState) recordRuntimeRetry(direction fileTransferDirection, scope string, fileID string) {
|
|
if s == nil || s.runtime == nil || fileID == "" {
|
|
return
|
|
}
|
|
s.runtime.recordRetry(direction, scope, fileID)
|
|
}
|
|
|
|
func (s *fileTransferState) recordRuntimeTimeout(direction fileTransferDirection, scope string, fileID string) {
|
|
if s == nil || s.runtime == nil || fileID == "" {
|
|
return
|
|
}
|
|
s.runtime.recordTimeout(direction, scope, fileID)
|
|
}
|
|
|
|
func (s *fileTransferState) recordRuntimeStage(direction fileTransferDirection, scope string, fileID string, stage string) {
|
|
if s == nil || s.runtime == nil || fileID == "" || stage == "" {
|
|
return
|
|
}
|
|
s.runtime.recordStage(direction, scope, fileID, stage)
|
|
}
|
|
|
|
func (s *fileTransferState) recordRuntimeFailureStage(direction fileTransferDirection, scope string, fileID string, stage string) {
|
|
if s == nil || s.runtime == nil || fileID == "" || stage == "" {
|
|
return
|
|
}
|
|
s.runtime.recordFailureStage(direction, scope, fileID, stage)
|
|
}
|
|
|
|
func (s *fileTransferState) runtimeSnapshot(direction fileTransferDirection, scope string, transferID string) (itransfer.Snapshot, bool) {
|
|
if s == nil || s.runtime == nil || transferID == "" {
|
|
return itransfer.Snapshot{}, false
|
|
}
|
|
return s.runtime.snapshot(direction, scope, transferID)
|
|
}
|
|
|
|
func transferRuntimeScopeForEvent(event FileEvent) string {
|
|
if event.TransportConn != nil {
|
|
return serverTransportScopeForTransport(event.TransportConn)
|
|
}
|
|
if logical := fileEventLogicalConnSnapshot(event); logical != nil {
|
|
return serverTransportScope(logical)
|
|
}
|
|
return clientFileScope()
|
|
}
|
|
|
|
func transferRuntimePublicScopeForEvent(event FileEvent) string {
|
|
return fileTransferMonitorScope(event)
|
|
}
|
|
|
|
func transferRuntimeTransportGenerationForEvent(event FileEvent) uint64 {
|
|
if event.TransportConn != nil {
|
|
return event.TransportConn.TransportGeneration()
|
|
}
|
|
logical := fileEventLogicalConnSnapshot(event)
|
|
if logical == nil {
|
|
return 0
|
|
}
|
|
return logical.transportGenerationSnapshot()
|
|
}
|
|
|
|
func runtimeTransferStageForEvent(event FileEvent) string {
|
|
if event.Packet.Stage != "" {
|
|
return event.Packet.Stage
|
|
}
|
|
return fileStageByKind(event.Kind)
|
|
}
|
|
|
|
func (c *ClientCommon) getTransferRuntime() *transferRuntime {
|
|
return c.getFileTransferState().runtime
|
|
}
|
|
|
|
func (s *ServerCommon) getTransferRuntime() *transferRuntime {
|
|
return s.getFileTransferState().runtime
|
|
}
|
|
|
|
func (c *ClientCommon) getFileTransferState() *fileTransferState {
|
|
return c.getLogicalSessionState().fileTransfers
|
|
}
|
|
|
|
func (s *ServerCommon) getFileTransferState() *fileTransferState {
|
|
return s.getLogicalSessionState().fileTransfers
|
|
}
|