package notify import ( "context" "errors" "fmt" "io" "os" "path/filepath" "time" ) const defaultFileChunkSize = 64 * 1024 type fileSendHooks struct { config fileTransferConfig startSession func(*fileSendSession) sendReliable func(context.Context, Envelope) error sendAbort func(fileID string, stage string, offset int64, cause error) error publishEvent func(FileEvent) } type fileSendError struct { stage string offset int64 err error } func (e *fileSendError) Error() string { if e == nil || e.err == nil { return "" } return e.err.Error() } func (e *fileSendError) Unwrap() error { if e == nil { return nil } return e.err } func (c *ClientCommon) SendFile(ctx context.Context, filePath string) error { target := transferSendTarget{ runtime: c.getTransferRuntime(), runtimeScope: clientFileScope(), publicScope: clientFileScope(), transportGeneration: 0, sequenceEn: c.sequenceEn, sequenceDe: c.sequenceDe, openStream: func(ctx context.Context, opt StreamOpenOptions) (Stream, error) { return c.OpenStream(ctx, opt) }, sendBegin: func(ctx context.Context, req TransferBeginRequest) (TransferBeginResponse, error) { return SendTransferBeginClient(ctx, c, req) }, sendResume: func(ctx context.Context, req TransferResumeRequest) (TransferResumeResponse, error) { return SendTransferResumeClient(ctx, c, req) }, sendCommit: func(ctx context.Context, req TransferCommitRequest) (TransferCommitResponse, error) { return SendTransferCommitClient(ctx, c, req) }, sendAbort: func(ctx context.Context, req TransferAbortRequest) (TransferAbortResponse, error) { return SendTransferAbortClient(ctx, c, req) }, } return c.sendFileViaTransfer(ctx, filePath, target, func(event FileEvent) { event.NetType = NET_CLIENT event.ServerConn = c c.publishSendFileEventMonitorOnly(event) }) } func (s *ServerCommon) SendFile(ctx context.Context, client *ClientConn, filePath string) error { return s.SendFileLogical(ctx, logicalConnFromClient(client), filePath) } func (s *ServerCommon) SendFileLogical(ctx context.Context, client *LogicalConn, filePath string) error { if client == nil { return s.SendFileTransport(ctx, nil, filePath) } return s.SendFileTransport(ctx, s.resolveOutboundTransport(client), filePath) } func (s *ServerCommon) SendFileTransport(ctx context.Context, transport *TransportConn, filePath string) error { if transport == nil { return transportDetachedErrorForTransport(transport) } logical := transport.logicalConnSnapshot() if logical == nil || !transport.Attached() || !transport.IsCurrent() { return transportDetachedErrorForTransport(transport) } target := transferSendTarget{ runtime: s.getTransferRuntime(), runtimeScope: serverTransportScopeForTransport(transport), publicScope: serverFileScope(logical), transportGeneration: transport.TransportGeneration(), logical: logical, transport: transport, sequenceEn: s.sequenceEn, sequenceDe: s.sequenceDe, openStream: func(ctx context.Context, opt StreamOpenOptions) (Stream, error) { return s.OpenStreamTransport(ctx, transport, opt) }, sendBegin: func(ctx context.Context, req TransferBeginRequest) (TransferBeginResponse, error) { return SendTransferBeginTransport(ctx, s, transport, req) }, sendResume: func(ctx context.Context, req TransferResumeRequest) (TransferResumeResponse, error) { return SendTransferResumeTransport(ctx, s, transport, req) }, sendCommit: func(ctx context.Context, req TransferCommitRequest) (TransferCommitResponse, error) { return SendTransferCommitTransport(ctx, s, transport, req) }, sendAbort: func(ctx context.Context, req TransferAbortRequest) (TransferAbortResponse, error) { return SendTransferAbortTransport(ctx, s, transport, req) }, } return s.sendFileViaTransfer(ctx, filePath, target, func(event FileEvent) { event.NetType = NET_SERVER event.LogicalConn = logical event.TransportConn = transport s.publishSendFileEventMonitorOnly(event) }) } func (c *ClientCommon) sendFileViaTransfer(ctx context.Context, filePath string, target transferSendTarget, publishEvent func(FileEvent)) error { return sendFileViaTransfer(ctx, filePath, target, publishEvent) } func (s *ServerCommon) sendFileViaTransfer(ctx context.Context, filePath string, target transferSendTarget, publishEvent func(FileEvent)) error { return sendFileViaTransfer(ctx, filePath, target, publishEvent) } func sendFileViaTransfer(ctx context.Context, filePath string, target transferSendTarget, publishEvent func(FileEvent)) error { if ctx == nil { ctx = context.Background() } session, err := newFileSendSession(filePath, time.Now()) if err != nil { return err } session.fileID = buildStableFileTransferID(session) source, err := newTransferFileSource(filePath, session.size) if err != nil { return err } defer source.Close() if publishEvent != nil { hooks := transferSendHooks{ onNegotiated: func(nextOffset int64, _ bool) { session.syncProgress(nextOffset, time.Now()) publishEvent(session.onMetaSent(time.Now())) }, onSegmentSent: func(offset int64, sentBytes int64) { event, chunkErr := session.onChunkSent(offset, sentBytes, time.Now()) if chunkErr == nil { publishEvent(event) } }, onCommitted: func() { publishEvent(session.onEndSent(time.Now())) }, onAbort: func(stage string, offset int64, cause error) { publishEvent(session.onAbort(stage, offset, cause, time.Now())) }, } handle, err := startTransferSendWithHooks(ctx, TransferSendOptions{ Descriptor: buildFileTransferDescriptor(session), Source: source, ChunkSize: defaultFileChunkSize, VerifyChecksum: false, }, target, hooks) if err != nil { return err } return handle.Wait(ctx) } handle, err := startTransferSend(ctx, TransferSendOptions{ Descriptor: buildFileTransferDescriptor(session), Source: source, ChunkSize: defaultFileChunkSize, VerifyChecksum: false, }, target) if err != nil { return err } return handle.Wait(ctx) } func sendFileWithHooks(ctx context.Context, filePath string, hooks fileSendHooks) error { if ctx == nil { ctx = context.Background() } hooks.config = normalizeFileTransferConfig(hooks.config) session, err := newFileSendSession(filePath, time.Now()) if err != nil { return err } if hooks.startSession != nil { hooks.startSession(session) } if err := sendFileMetaWithHooks(ctx, session, hooks); err != nil { return err } if err := sendFileChunksWithHooks(ctx, session, hooks); err != nil { return err } if err := sendFileEndWithHooks(ctx, session, hooks); err != nil { return err } return nil } func newFileSendSession(filePath string, now time.Time) (*fileSendSession, error) { fi, err := os.Stat(filePath) if err != nil { return nil, err } if fi.IsDir() { return nil, fmt.Errorf("file path is a directory: %s", filePath) } checksum, err := computeFileChecksum(filePath) if err != nil { return nil, err } now = normalizeFileEventTime(now) name := filepath.Base(filePath) if name == "" || name == "." || name == string(filepath.Separator) { name = "unnamed.bin" } return &fileSendSession{ fileID: buildFileID(filePath), path: filePath, name: name, size: fi.Size(), mode: fi.Mode().Perm(), modTime: fi.ModTime(), checksum: checksum, startedAt: now, updatedAt: now, }, nil } type fileSendSession struct { fileID string path string name string size int64 mode os.FileMode modTime time.Time checksum string sent int64 startedAt time.Time updatedAt time.Time previousUpdatedAt time.Time previousSent int64 } func (s *fileSendSession) metaEnvelope() Envelope { return newFileMetaEnvelope(s.fileID, s.name, s.size, s.checksum, uint32(s.mode.Perm()), s.modTime.UnixNano()) } func (s *fileSendSession) chunkEnvelope(offset int64, chunk []byte) Envelope { return newFileChunkEnvelope(s.fileID, offset, chunk) } func (s *fileSendSession) endEnvelope() Envelope { return newFileEndEnvelope(s.fileID) } func (s *fileSendSession) filePacket() FilePacket { return FilePacket{ FileID: s.fileID, Name: s.name, Size: s.size, Mode: uint32(s.mode.Perm()), ModTime: s.modTime.UnixNano(), Checksum: s.checksum, } } func (s *fileSendSession) advance(delta int64, now time.Time) { now = normalizeFileEventTime(now) if s.startedAt.IsZero() { s.startedAt = now } s.previousUpdatedAt = s.updatedAt s.previousSent = s.sent s.updatedAt = now s.sent += delta if s.sent < 0 { s.sent = 0 } if s.size > 0 && s.sent > s.size { s.sent = s.size } } func (s *fileSendSession) syncProgress(progress int64, now time.Time) { now = normalizeFileEventTime(now) if progress < 0 { progress = 0 } if s.size > 0 && progress > s.size { progress = s.size } if s.startedAt.IsZero() { s.startedAt = now } s.previousUpdatedAt = s.updatedAt s.previousSent = s.sent s.updatedAt = now s.sent = progress } func (s *fileSendSession) buildEvent(kind EnvelopeKind, packet FilePacket, err error, now time.Time) FileEvent { now = normalizeFileEventTime(now) if err != nil && packet.Error == "" { packet.Error = err.Error() } event := FileEvent{ Kind: kind, Packet: packet, Path: s.path, Received: s.sent, Err: err, Time: now, } fillFileSendEventTiming(&event, s) fillFileEventProgress(&event) return event } func (s *fileSendSession) onMetaSent(now time.Time) FileEvent { s.advance(0, now) return s.buildEvent(EnvelopeFileMeta, s.filePacket(), nil, now) } func (s *fileSendSession) onChunkSent(offset int64, chunkSize int64, now time.Time) (FileEvent, error) { if offset != s.sent { return FileEvent{}, fmt.Errorf("file chunk offset mismatch: got %d want %d", offset, s.sent) } packet := s.filePacket() packet.Offset = offset s.advance(chunkSize, now) return s.buildEvent(EnvelopeFileChunk, packet, nil, now), nil } func (s *fileSendSession) onEndSent(now time.Time) FileEvent { s.advance(0, now) return s.buildEvent(EnvelopeFileEnd, s.filePacket(), nil, now) } func (s *fileSendSession) onAbort(stage string, offset int64, cause error, now time.Time) FileEvent { packet := s.filePacket() packet.Stage = stage packet.Offset = offset s.advance(0, now) return s.buildEvent(EnvelopeFileAbort, packet, cause, now) } func sendFileMetaWithHooks(ctx context.Context, session *fileSendSession, hooks fileSendHooks) error { if err := hooks.sendReliable(ctx, session.metaEnvelope()); err != nil { return handleFileSendFailure(session, hooks, "meta", 0, err) } publishFileSendEvent(hooks, session.onMetaSent(time.Now())) return nil } func sendFileChunksWithHooks(ctx context.Context, session *fileSendSession, hooks fileSendHooks) error { fd, err := os.Open(session.path) if err != nil { return handleFileSendFailure(session, hooks, "chunk", session.sent, err) } defer fd.Close() streamErr := streamFileChunks(ctx, fd, hooks.config.ChunkSize, func(offset int64, chunk []byte) error { err := hooks.sendReliable(ctx, session.chunkEnvelope(offset, chunk)) if err != nil { return &fileSendError{stage: "chunk", offset: offset, err: err} } event, stateErr := session.onChunkSent(offset, int64(len(chunk)), time.Now()) if stateErr != nil { return &fileSendError{stage: "chunk", offset: offset, err: stateErr} } publishFileSendEvent(hooks, event) return nil }) if streamErr == nil { return nil } var sendErr *fileSendError if errors.As(streamErr, &sendErr) { return handleFileSendFailure(session, hooks, sendErr.stage, sendErr.offset, sendErr.err) } return handleFileSendFailure(session, hooks, "chunk", session.sent, streamErr) } func sendFileEndWithHooks(ctx context.Context, session *fileSendSession, hooks fileSendHooks) error { if err := hooks.sendReliable(ctx, session.endEnvelope()); err != nil { return handleFileSendFailure(session, hooks, "end", session.sent, err) } publishFileSendEvent(hooks, session.onEndSent(time.Now())) return nil } func handleFileSendFailure(session *fileSendSession, hooks fileSendHooks, stage string, offset int64, cause error) error { if session != nil && hooks.sendAbort != nil && session.fileID != "" { _ = hooks.sendAbort(session.fileID, stage, offset, cause) } if session != nil { publishFileSendEvent(hooks, session.onAbort(stage, offset, cause, time.Now())) } return cause } func publishFileSendEvent(hooks fileSendHooks, event FileEvent) { if hooks.publishEvent != nil { hooks.publishEvent(event) } } func streamFileChunks(ctx context.Context, reader io.Reader, chunkSize int, sendChunk func(offset int64, chunk []byte) error) error { if chunkSize <= 0 { chunkSize = defaultFileChunkSize } buf := make([]byte, chunkSize) var offset int64 for { select { case <-ctx.Done(): return fmt.Errorf("file stream canceled: %w", ctx.Err()) default: } n, readErr := reader.Read(buf) if n > 0 { chunk := make([]byte, n) copy(chunk, buf[:n]) if err := sendChunk(offset, chunk); err != nil { return err } offset += int64(n) } if readErr == nil { continue } if errors.Is(readErr, io.EOF) { return nil } return readErr } }