notify/file_receive_checkpoint.go

174 lines
5.7 KiB
Go
Raw Permalink Normal View History

package notify
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"os"
"path/filepath"
"strings"
"time"
)
type fileReceiveCheckpoint struct {
FileID string `json:"file_id"`
Name string `json:"name"`
Size int64 `json:"size"`
Mode uint32 `json:"mode"`
ModTime int64 `json:"mod_time"`
Checksum string `json:"checksum"`
Received int64 `json:"received"`
TmpPath string `json:"tmp_path"`
FinalPath string `json:"final_path"`
StartedAt int64 `json:"started_at"`
UpdatedAt int64 `json:"updated_at"`
PreviousUpdatedAt int64 `json:"previous_updated_at"`
PreviousReceived int64 `json:"previous_received"`
}
func (p *fileReceivePool) restoreCheckpointLocked(scope string, packet FilePacket, now time.Time) (*fileReceiveSession, bool, error) {
checkpoint, ok, err := p.loadCheckpointLocked(scope, packet.FileID)
if err != nil || !ok {
return nil, ok, err
}
name := filepath.Base(packet.Name)
if name == "." || name == "/" || name == "" {
name = "unnamed.bin"
}
if checkpoint.FileID != packet.FileID || checkpoint.Name != name || checkpoint.Size != packet.Size || !strings.EqualFold(checkpoint.Checksum, packet.Checksum) {
p.removeCheckpointLocked(scope, packet.FileID)
if checkpoint.TmpPath != "" {
_ = os.Remove(checkpoint.TmpPath)
}
return nil, false, nil
}
if checkpoint.TmpPath == "" {
p.removeCheckpointLocked(scope, packet.FileID)
return nil, false, nil
}
info, statErr := os.Stat(checkpoint.TmpPath)
if statErr != nil {
if checkpoint.FinalPath != "" && pathExists(checkpoint.FinalPath) {
session := checkpoint.toSession(now)
session.tmpPath = checkpoint.FinalPath
session.finalPath = checkpoint.FinalPath
session.received = session.size
p.completed[fileReceiveKey(scope, packet.FileID)] = session.copy()
p.removeCheckpointLocked(scope, packet.FileID)
return session.copy(), true, nil
}
p.removeCheckpointLocked(scope, packet.FileID)
return nil, false, nil
}
received := info.Size()
if received < 0 {
received = 0
}
if packet.Size > 0 && received > packet.Size {
received = packet.Size
}
session := checkpoint.toSession(now)
session.name = name
session.mode = os.FileMode(packet.Mode)
session.modTime = filePacketModTime(packet)
session.checksum = packet.Checksum
session.received = received
if session.finalPath == "" || (session.finalPath != session.tmpPath && pathExists(session.finalPath)) {
session.finalPath = p.uniqueFinalPathLocked(p.receiveDirLocked(), name, packet.FileID)
}
p.sessions[fileReceiveKey(scope, packet.FileID)] = session
if session.received != checkpoint.Received || session.finalPath != checkpoint.FinalPath {
if err := p.saveCheckpointLocked(scope, session); err != nil {
return nil, true, err
}
}
return session.copy(), true, nil
}
func (p *fileReceivePool) loadCheckpointLocked(scope string, fileID string) (fileReceiveCheckpoint, bool, error) {
path := p.checkpointPathLocked(scope, fileID)
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return fileReceiveCheckpoint{}, false, nil
}
return fileReceiveCheckpoint{}, false, err
}
var checkpoint fileReceiveCheckpoint
if err := json.Unmarshal(data, &checkpoint); err != nil {
_ = os.Remove(path)
return fileReceiveCheckpoint{}, false, nil
}
return checkpoint, true, nil
}
func (p *fileReceivePool) saveCheckpointLocked(scope string, session *fileReceiveSession) error {
if p == nil || session == nil || session.fileID == "" {
return nil
}
path := p.checkpointPathLocked(scope, session.fileID)
checkpoint := fileReceiveCheckpoint{
FileID: session.fileID,
Name: session.name,
Size: session.size,
Mode: uint32(session.mode.Perm()),
ModTime: session.modTime.UnixNano(),
Checksum: session.checksum,
Received: session.received,
TmpPath: session.tmpPath,
FinalPath: session.finalPath,
StartedAt: session.startedAt.UnixNano(),
UpdatedAt: session.updatedAt.UnixNano(),
PreviousUpdatedAt: session.previousUpdatedAt.UnixNano(),
PreviousReceived: session.previousReceived,
}
data, err := json.Marshal(checkpoint)
if err != nil {
return err
}
tmpPath := path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0o600); err != nil {
return err
}
return os.Rename(tmpPath, path)
}
func (p *fileReceivePool) removeCheckpointLocked(scope string, fileID string) {
if p == nil || fileID == "" {
return
}
_ = os.Remove(p.checkpointPathLocked(scope, fileID))
}
func (p *fileReceivePool) checkpointPathLocked(scope string, fileID string) string {
baseDir := p.receiveDirLocked()
sum := sha256.Sum256([]byte(fileReceiveKey(scope, fileID)))
return filepath.Join(baseDir, ".notify_recv_"+hex.EncodeToString(sum[:8])+".json")
}
func (checkpoint fileReceiveCheckpoint) toSession(now time.Time) *fileReceiveSession {
now = normalizeFileEventTime(now)
session := &fileReceiveSession{
fileID: checkpoint.FileID,
name: checkpoint.Name,
size: checkpoint.Size,
mode: os.FileMode(checkpoint.Mode),
modTime: time.Unix(0, checkpoint.ModTime),
checksum: checkpoint.Checksum,
received: checkpoint.Received,
tmpPath: checkpoint.TmpPath,
finalPath: checkpoint.FinalPath,
previousReceived: checkpoint.PreviousReceived,
}
session.startedAt = unixNanoTime(checkpoint.StartedAt)
session.updatedAt = unixNanoTime(checkpoint.UpdatedAt)
session.previousUpdatedAt = unixNanoTime(checkpoint.PreviousUpdatedAt)
if session.startedAt.IsZero() {
session.startedAt = now
}
if session.updatedAt.IsZero() {
session.updatedAt = now
}
return session
}