172 lines
4.8 KiB
Go
172 lines
4.8 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"fmt"
|
||
|
|
"net"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
func (c *ClientCommon) dispatchFileEnvelope(env Envelope, now time.Time) {
|
||
|
|
event := FileEvent{
|
||
|
|
NetType: NET_CLIENT,
|
||
|
|
ServerConn: c,
|
||
|
|
Kind: env.Kind,
|
||
|
|
Packet: env.File,
|
||
|
|
Time: now,
|
||
|
|
}
|
||
|
|
pool := c.getFileReceivePool()
|
||
|
|
switch env.Kind {
|
||
|
|
case EnvelopeAck:
|
||
|
|
event.Packet.Stage = env.File.Stage
|
||
|
|
event.Packet.Error = env.File.Error
|
||
|
|
event.Received = env.File.Offset
|
||
|
|
if c.getFileAckPool().deliver(clientFileScope(), event) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
case EnvelopeFileMeta:
|
||
|
|
session, err := pool.onMeta(clientFileScope(), env.File, now)
|
||
|
|
if session != nil {
|
||
|
|
event.Path = session.tmpPath
|
||
|
|
event.Received = session.received
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
case EnvelopeFileChunk:
|
||
|
|
session, err := pool.onChunk(clientFileScope(), env.File, now)
|
||
|
|
if session != nil {
|
||
|
|
event.Path = session.tmpPath
|
||
|
|
event.Received = session.received
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
case EnvelopeFileEnd:
|
||
|
|
finalPath, session, err := pool.onEnd(clientFileScope(), env.File, now)
|
||
|
|
if session != nil {
|
||
|
|
event.Path = finalPath
|
||
|
|
event.Received = session.received
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
case EnvelopeFileAbort:
|
||
|
|
session, err := pool.onAbort(clientFileScope(), env.File, now)
|
||
|
|
event.Received = env.File.Offset
|
||
|
|
if session != nil {
|
||
|
|
event.Path = session.tmpPath
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
if env.Kind == EnvelopeFileMeta || env.Kind == EnvelopeFileChunk || env.Kind == EnvelopeFileEnd || env.Kind == EnvelopeFileAbort {
|
||
|
|
if ackErr := c.sendFileAck(env, event.Err); ackErr != nil && event.Err == nil {
|
||
|
|
event.Err = ackErr
|
||
|
|
}
|
||
|
|
}
|
||
|
|
fillFileEventProgress(&event)
|
||
|
|
c.publishReceivedFileEvent(event)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) dispatchFileEnvelope(logical *LogicalConn, transport *TransportConn, conn net.Conn, env Envelope, now time.Time) {
|
||
|
|
if transport == nil && logical != nil {
|
||
|
|
transport = logical.CurrentTransportConn()
|
||
|
|
}
|
||
|
|
event := FileEvent{
|
||
|
|
LogicalConn: logical,
|
||
|
|
NetType: NET_SERVER,
|
||
|
|
TransportConn: transport,
|
||
|
|
Kind: env.Kind,
|
||
|
|
Packet: env.File,
|
||
|
|
Time: now,
|
||
|
|
}
|
||
|
|
pool := s.getFileReceivePool()
|
||
|
|
switch env.Kind {
|
||
|
|
case EnvelopeAck:
|
||
|
|
event.Packet.Stage = env.File.Stage
|
||
|
|
event.Packet.Error = env.File.Error
|
||
|
|
event.Received = env.File.Offset
|
||
|
|
scopes := serverTransportDeliveryScopes(logical)
|
||
|
|
if transport := fileEventTransportConnSnapshot(event); transport != nil {
|
||
|
|
scopes = serverTransportDeliveryScopesForTransport(transport)
|
||
|
|
}
|
||
|
|
if s.getFileAckPool().deliverAny(scopes, event) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
case EnvelopeFileMeta:
|
||
|
|
session, err := pool.onMeta(serverFileScope(logical), env.File, now)
|
||
|
|
if session != nil {
|
||
|
|
event.Path = session.tmpPath
|
||
|
|
event.Received = session.received
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
case EnvelopeFileChunk:
|
||
|
|
session, err := pool.onChunk(serverFileScope(logical), env.File, now)
|
||
|
|
if session != nil {
|
||
|
|
event.Path = session.tmpPath
|
||
|
|
event.Received = session.received
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
case EnvelopeFileEnd:
|
||
|
|
finalPath, session, err := pool.onEnd(serverFileScope(logical), env.File, now)
|
||
|
|
if session != nil {
|
||
|
|
event.Path = finalPath
|
||
|
|
event.Received = session.received
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
case EnvelopeFileAbort:
|
||
|
|
session, err := pool.onAbort(serverFileScope(logical), env.File, now)
|
||
|
|
event.Received = env.File.Offset
|
||
|
|
if session != nil {
|
||
|
|
event.Path = session.tmpPath
|
||
|
|
fillFileEventTiming(&event, session)
|
||
|
|
}
|
||
|
|
event.Err = err
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
if env.Kind == EnvelopeFileMeta || env.Kind == EnvelopeFileChunk || env.Kind == EnvelopeFileEnd || env.Kind == EnvelopeFileAbort {
|
||
|
|
if ackErr := s.sendFileAckInbound(logical, transport, conn, env, event.Err); ackErr != nil && event.Err == nil {
|
||
|
|
event.Err = ackErr
|
||
|
|
}
|
||
|
|
}
|
||
|
|
fillFileEventProgress(&event)
|
||
|
|
s.publishReceivedFileEvent(event)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) emitFileEvent(event FileEvent) {
|
||
|
|
c.mu.Lock()
|
||
|
|
handler := c.onFileEvent
|
||
|
|
c.mu.Unlock()
|
||
|
|
if handler == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
handler(event)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) emitFileEvent(event FileEvent) {
|
||
|
|
s.mu.Lock()
|
||
|
|
handler := s.onFileEvent
|
||
|
|
s.mu.Unlock()
|
||
|
|
if handler == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
handler(event)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) logFileEvent(role string, event FileEvent) {
|
||
|
|
if !(c.debugMode || event.Err != nil) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
fmt.Printf("%s file event kind=%d file_id=%s received=%d path=%s err=%v\n",
|
||
|
|
role, event.Kind, event.Packet.FileID, event.Received, event.Path, event.Err)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) logFileEvent(role string, event FileEvent) {
|
||
|
|
if !(s.debugMode || event.Err != nil) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
fmt.Printf("%s file event kind=%d file_id=%s received=%d path=%s err=%v\n",
|
||
|
|
role, event.Kind, event.Packet.FileID, event.Received, event.Path, event.Err)
|
||
|
|
}
|