notify/file_dispatcher.go

172 lines
4.8 KiB
Go
Raw Permalink Normal View History

package notify
import (
"fmt"
"net"
"time"
)
func (c *ClientCommon) dispatchFileEnvelope(env Envelope, now time.Time) {
event := FileEvent{
NetType: NET_CLIENT,
ServerConn: c,
Kind: env.Kind,
Packet: env.File,
Time: now,
}
pool := c.getFileReceivePool()
switch env.Kind {
case EnvelopeAck:
event.Packet.Stage = env.File.Stage
event.Packet.Error = env.File.Error
event.Received = env.File.Offset
if c.getFileAckPool().deliver(clientFileScope(), event) {
return
}
case EnvelopeFileMeta:
session, err := pool.onMeta(clientFileScope(), env.File, now)
if session != nil {
event.Path = session.tmpPath
event.Received = session.received
fillFileEventTiming(&event, session)
}
event.Err = err
case EnvelopeFileChunk:
session, err := pool.onChunk(clientFileScope(), env.File, now)
if session != nil {
event.Path = session.tmpPath
event.Received = session.received
fillFileEventTiming(&event, session)
}
event.Err = err
case EnvelopeFileEnd:
finalPath, session, err := pool.onEnd(clientFileScope(), env.File, now)
if session != nil {
event.Path = finalPath
event.Received = session.received
fillFileEventTiming(&event, session)
}
event.Err = err
case EnvelopeFileAbort:
session, err := pool.onAbort(clientFileScope(), env.File, now)
event.Received = env.File.Offset
if session != nil {
event.Path = session.tmpPath
fillFileEventTiming(&event, session)
}
event.Err = err
default:
}
if env.Kind == EnvelopeFileMeta || env.Kind == EnvelopeFileChunk || env.Kind == EnvelopeFileEnd || env.Kind == EnvelopeFileAbort {
if ackErr := c.sendFileAck(env, event.Err); ackErr != nil && event.Err == nil {
event.Err = ackErr
}
}
fillFileEventProgress(&event)
c.publishReceivedFileEvent(event)
}
func (s *ServerCommon) dispatchFileEnvelope(logical *LogicalConn, transport *TransportConn, conn net.Conn, env Envelope, now time.Time) {
if transport == nil && logical != nil {
transport = logical.CurrentTransportConn()
}
event := FileEvent{
LogicalConn: logical,
NetType: NET_SERVER,
TransportConn: transport,
Kind: env.Kind,
Packet: env.File,
Time: now,
}
pool := s.getFileReceivePool()
switch env.Kind {
case EnvelopeAck:
event.Packet.Stage = env.File.Stage
event.Packet.Error = env.File.Error
event.Received = env.File.Offset
scopes := serverTransportDeliveryScopes(logical)
if transport := fileEventTransportConnSnapshot(event); transport != nil {
scopes = serverTransportDeliveryScopesForTransport(transport)
}
if s.getFileAckPool().deliverAny(scopes, event) {
return
}
case EnvelopeFileMeta:
session, err := pool.onMeta(serverFileScope(logical), env.File, now)
if session != nil {
event.Path = session.tmpPath
event.Received = session.received
fillFileEventTiming(&event, session)
}
event.Err = err
case EnvelopeFileChunk:
session, err := pool.onChunk(serverFileScope(logical), env.File, now)
if session != nil {
event.Path = session.tmpPath
event.Received = session.received
fillFileEventTiming(&event, session)
}
event.Err = err
case EnvelopeFileEnd:
finalPath, session, err := pool.onEnd(serverFileScope(logical), env.File, now)
if session != nil {
event.Path = finalPath
event.Received = session.received
fillFileEventTiming(&event, session)
}
event.Err = err
case EnvelopeFileAbort:
session, err := pool.onAbort(serverFileScope(logical), env.File, now)
event.Received = env.File.Offset
if session != nil {
event.Path = session.tmpPath
fillFileEventTiming(&event, session)
}
event.Err = err
default:
}
if env.Kind == EnvelopeFileMeta || env.Kind == EnvelopeFileChunk || env.Kind == EnvelopeFileEnd || env.Kind == EnvelopeFileAbort {
if ackErr := s.sendFileAckInbound(logical, transport, conn, env, event.Err); ackErr != nil && event.Err == nil {
event.Err = ackErr
}
}
fillFileEventProgress(&event)
s.publishReceivedFileEvent(event)
}
func (c *ClientCommon) emitFileEvent(event FileEvent) {
c.mu.Lock()
handler := c.onFileEvent
c.mu.Unlock()
if handler == nil {
return
}
handler(event)
}
func (s *ServerCommon) emitFileEvent(event FileEvent) {
s.mu.Lock()
handler := s.onFileEvent
s.mu.Unlock()
if handler == nil {
return
}
handler(event)
}
func (c *ClientCommon) logFileEvent(role string, event FileEvent) {
if !(c.debugMode || event.Err != nil) {
return
}
fmt.Printf("%s file event kind=%d file_id=%s received=%d path=%s err=%v\n",
role, event.Kind, event.Packet.FileID, event.Received, event.Path, event.Err)
}
func (s *ServerCommon) logFileEvent(role string, event FileEvent) {
if !(s.debugMode || event.Err != nil) {
return
}
fmt.Printf("%s file event kind=%d file_id=%s received=%d path=%s err=%v\n",
role, event.Kind, event.Packet.FileID, event.Received, event.Path, event.Err)
}