notify/file_event.go

244 lines
6.7 KiB
Go
Raw Permalink Normal View History

package notify
import "time"
type FileEvent struct {
NetType NetType
LogicalConn *LogicalConn
// Deprecated: ClientConn aliases LogicalConn for compatibility.
ClientConn *ClientConn
TransportConn *TransportConn
ServerConn Client
Kind EnvelopeKind
Packet FilePacket
Path string
Received int64
Total int64
Percent float64
Done bool
StartedAt time.Time
UpdatedAt time.Time
Duration time.Duration
RateBPS float64
StepDuration time.Duration
InstantRateBPS float64
Err error
Time time.Time
}
func normalizeFileEventTime(now time.Time) time.Time {
if now.IsZero() {
return time.Now()
}
return now
}
func hydrateServerFileEventPeerFields(event FileEvent) FileEvent {
if event.LogicalConn == nil {
event.LogicalConn = logicalConnFromClient(event.ClientConn)
}
if event.ClientConn == nil {
event.ClientConn = event.LogicalConn.compatClientConn()
}
if event.TransportConn == nil && event.LogicalConn != nil {
event.TransportConn = event.LogicalConn.CurrentTransportConn()
}
return event
}
func fileEventLogicalConnSnapshot(event FileEvent) *LogicalConn {
if event.LogicalConn != nil {
return event.LogicalConn
}
return logicalConnFromClient(event.ClientConn)
}
func fileEventTransportConnSnapshot(event FileEvent) *TransportConn {
if event.TransportConn != nil {
return event.TransportConn
}
logical := fileEventLogicalConnSnapshot(event)
if logical == nil {
return nil
}
return logical.CurrentTransportConn()
}
type fileEventTimeline struct {
startedAt time.Time
updatedAt time.Time
previousUpdatedAt time.Time
previousProgress int64
}
func fillFileEventProgress(event *FileEvent) {
if event == nil {
return
}
event.Total = event.Packet.Size
if event.Received < 0 {
event.Received = 0
}
if event.Total > 0 && event.Received > event.Total {
event.Received = event.Total
}
switch event.Kind {
case EnvelopeFileEnd:
event.Done = event.Err == nil
if event.Done && event.Total > 0 {
event.Received = event.Total
}
case EnvelopeFileAbort:
event.Done = false
}
if event.Total <= 0 {
if event.Done {
event.Percent = 100
}
if !event.StartedAt.IsZero() && !event.UpdatedAt.IsZero() && !event.UpdatedAt.Before(event.StartedAt) {
event.Duration = event.UpdatedAt.Sub(event.StartedAt)
}
return
}
event.Percent = float64(event.Received) * 100 / float64(event.Total)
if event.Percent < 0 {
event.Percent = 0
}
if event.Percent > 100 {
event.Percent = 100
}
if !event.StartedAt.IsZero() && !event.UpdatedAt.IsZero() && !event.UpdatedAt.Before(event.StartedAt) {
event.Duration = event.UpdatedAt.Sub(event.StartedAt)
}
if event.Duration > 0 && event.Received > 0 {
event.RateBPS = float64(event.Received) / event.Duration.Seconds()
}
}
func fillFileEventTimeline(event *FileEvent, timeline fileEventTimeline) {
if event == nil {
return
}
event.StartedAt = timeline.startedAt
event.UpdatedAt = timeline.updatedAt
if !timeline.previousUpdatedAt.IsZero() && !timeline.updatedAt.Before(timeline.previousUpdatedAt) {
event.StepDuration = timeline.updatedAt.Sub(timeline.previousUpdatedAt)
}
if delta := event.Received - timeline.previousProgress; delta > 0 && event.StepDuration > 0 {
event.InstantRateBPS = float64(delta) / event.StepDuration.Seconds()
}
}
func fillFileEventTiming(event *FileEvent, session *fileReceiveSession) {
if session == nil {
return
}
fillFileEventTimeline(event, fileEventTimeline{
startedAt: session.startedAt,
updatedAt: session.updatedAt,
previousUpdatedAt: session.previousUpdatedAt,
previousProgress: session.previousReceived,
})
}
func fillFileSendEventTiming(event *FileEvent, session *fileSendSession) {
if session == nil {
return
}
fillFileEventTimeline(event, fileEventTimeline{
startedAt: session.startedAt,
updatedAt: session.updatedAt,
previousUpdatedAt: session.previousUpdatedAt,
previousProgress: session.previousSent,
})
}
func normalizeFileEventCallback(fn func(FileEvent)) func(FileEvent) {
if fn == nil {
return func(FileEvent) {}
}
return fn
}
func (c *ClientCommon) setFileEventObserver(fn func(FileEvent)) {
c.mu.Lock()
c.fileEventObserver = normalizeFileEventCallback(fn)
c.mu.Unlock()
}
func (s *ServerCommon) setFileEventObserver(fn func(FileEvent)) {
s.mu.Lock()
s.fileEventObserver = normalizeFileEventCallback(fn)
s.mu.Unlock()
}
func (c *ClientCommon) observeFileEvent(event FileEvent) {
c.mu.Lock()
observer := c.fileEventObserver
c.mu.Unlock()
normalizeFileEventCallback(observer)(event)
}
func (s *ServerCommon) observeFileEvent(event FileEvent) {
s.mu.RLock()
observer := s.fileEventObserver
s.mu.RUnlock()
normalizeFileEventCallback(observer)(hydrateServerFileEventPeerFields(event))
}
func (c *ClientCommon) publishReceivedFileEvent(event FileEvent) {
c.getFileTransferState().observe(fileTransferDirectionReceive, event)
c.observeFileEvent(event)
c.logFileEvent("client", event)
c.emitFileEvent(event)
}
func (c *ClientCommon) publishReceivedFileEventMonitorOnly(event FileEvent) {
c.getFileTransferState().observeMonitorOnly(fileTransferDirectionReceive, event)
c.observeFileEvent(event)
c.logFileEvent("client", event)
c.emitFileEvent(event)
}
func (s *ServerCommon) publishReceivedFileEvent(event FileEvent) {
event = hydrateServerFileEventPeerFields(event)
s.getFileTransferState().observe(fileTransferDirectionReceive, event)
s.observeFileEvent(event)
s.logFileEvent("server", event)
s.emitFileEvent(event)
}
func (s *ServerCommon) publishReceivedFileEventMonitorOnly(event FileEvent) {
event = hydrateServerFileEventPeerFields(event)
s.getFileTransferState().observeMonitorOnly(fileTransferDirectionReceive, event)
s.observeFileEvent(event)
s.logFileEvent("server", event)
s.emitFileEvent(event)
}
func (c *ClientCommon) publishSendFileEvent(event FileEvent) {
c.getFileTransferState().observe(fileTransferDirectionSend, event)
c.observeFileEvent(event)
c.logFileEvent("client-send", event)
}
func (c *ClientCommon) publishSendFileEventMonitorOnly(event FileEvent) {
c.getFileTransferState().observeMonitorOnly(fileTransferDirectionSend, event)
c.observeFileEvent(event)
c.logFileEvent("client-send", event)
}
func (s *ServerCommon) publishSendFileEvent(event FileEvent) {
event = hydrateServerFileEventPeerFields(event)
s.getFileTransferState().observe(fileTransferDirectionSend, event)
s.observeFileEvent(event)
s.logFileEvent("server-send", event)
}
func (s *ServerCommon) publishSendFileEventMonitorOnly(event FileEvent) {
event = hydrateServerFileEventPeerFields(event)
s.getFileTransferState().observeMonitorOnly(fileTransferDirectionSend, event)
s.observeFileEvent(event)
s.logFileEvent("server-send", event)
}