package notify import ( "context" "errors" "net" "time" ) type fileTransferRetryHooks struct { onRetry func(err error, attempt int) onTimeout func(err error, attempt int) } func fileStageByKind(kind EnvelopeKind) string { switch kind { case EnvelopeFileMeta: return "meta" case EnvelopeFileChunk: return "chunk" case EnvelopeFileEnd: return "end" case EnvelopeFileAbort: return "abort" default: return "" } } func (c *ClientCommon) sendFileAck(src Envelope, processErr error) error { errMsg := "" if processErr != nil { errMsg = processErr.Error() } ack := newFileAckEnvelope(src.File.FileID, fileStageByKind(src.Kind), src.File.Offset, errMsg) return c.sendEnvelope(ack) } func (s *ServerCommon) sendFileAck(logical *LogicalConn, src Envelope, processErr error) error { if logical == nil { return s.sendFileAckTransport(nil, src, processErr) } return s.sendFileAckTransport(s.resolveOutboundTransport(logical), src, processErr) } func (s *ServerCommon) sendFileAckTransport(transport *TransportConn, src Envelope, processErr error) error { errMsg := "" if processErr != nil { errMsg = processErr.Error() } ack := newFileAckEnvelope(src.File.FileID, fileStageByKind(src.Kind), src.File.Offset, errMsg) return s.sendEnvelopeTransport(transport, ack) } func (s *ServerCommon) sendFileAckInbound(logical *LogicalConn, transport *TransportConn, conn net.Conn, src Envelope, processErr error) error { if conn == nil { return s.sendFileAckTransport(transport, src, processErr) } errMsg := "" if processErr != nil { errMsg = processErr.Error() } ack := newFileAckEnvelope(src.File.FileID, fileStageByKind(src.Kind), src.File.Offset, errMsg) return s.sendEnvelopeInboundTransport(logical, transport, conn, ack) } func (c *ClientCommon) sendFileAbort(fileID string, stage string, offset int64, cause error) error { errMsg := "" if cause != nil { errMsg = cause.Error() } return c.sendEnvelope(newFileAbortEnvelope(fileID, stage, offset, errMsg)) } func (s *ServerCommon) sendFileAbort(logical *LogicalConn, fileID string, stage string, offset int64, cause error) error { if logical == nil { return s.sendFileAbortTransport(nil, fileID, stage, offset, cause) } return s.sendFileAbortTransport(s.resolveOutboundTransport(logical), fileID, stage, offset, cause) } func (s *ServerCommon) sendFileAbortTransport(transport *TransportConn, fileID string, stage string, offset int64, cause error) error { errMsg := "" if cause != nil { errMsg = cause.Error() } return s.sendEnvelopeTransport(transport, newFileAbortEnvelope(fileID, stage, offset, errMsg)) } func (c *ClientCommon) sendFileEnvelopeWithAck(env Envelope, timeout time.Duration) error { pool := c.getFileAckPool() wait := pool.prepare(clientFileScope(), env.File.FileID, fileStageByKind(env.Kind), env.File.Offset) if err := c.sendEnvelope(env); err != nil { wait.cancel() return err } return pool.waitPrepared(wait, timeout) } func (s *ServerCommon) sendFileEnvelopeWithAck(logical *LogicalConn, env Envelope, timeout time.Duration) error { if logical == nil { return s.sendFileEnvelopeWithAckTransport(nil, env, timeout) } return s.sendFileEnvelopeWithAckTransport(s.resolveOutboundTransport(logical), env, timeout) } func (s *ServerCommon) sendFileEnvelopeWithAckTransport(transport *TransportConn, env Envelope, timeout time.Duration) error { pool := s.getFileAckPool() wait := pool.prepare(serverTransportScopeForTransport(transport), env.File.FileID, fileStageByKind(env.Kind), env.File.Offset) if err := s.sendEnvelopeTransport(transport, env); err != nil { wait.cancel() return err } return pool.waitPrepared(wait, timeout) } func (c *ClientCommon) sendFileEnvelopeReliable(ctx context.Context, env Envelope, cfg fileTransferConfig) error { state := c.getFileTransferState() scope := clientFileScope() stage := fileStageByKind(env.Kind) state.recordRuntimeStage(fileTransferDirectionSend, scope, env.File.FileID, stage) return retryFileTransferSend(ctx, cfg, func(cfg fileTransferConfig) error { return c.sendFileEnvelopeWithAck(env, cfg.AckTimeout) }, fileTransferRetryHooks{ onRetry: func(err error, _ int) { state.recordRuntimeRetry(fileTransferDirectionSend, scope, env.File.FileID) }, onTimeout: func(err error, _ int) { state.recordRuntimeTimeout(fileTransferDirectionSend, scope, env.File.FileID) state.recordRuntimeFailureStage(fileTransferDirectionSend, scope, env.File.FileID, stage) }, }) } func (s *ServerCommon) sendFileEnvelopeReliable(ctx context.Context, logical *LogicalConn, env Envelope, cfg fileTransferConfig) error { if logical == nil { return s.sendFileEnvelopeReliableTransport(ctx, nil, env, cfg) } return s.sendFileEnvelopeReliableTransport(ctx, s.resolveOutboundTransport(logical), env, cfg) } func (s *ServerCommon) sendFileEnvelopeReliableTransport(ctx context.Context, transport *TransportConn, env Envelope, cfg fileTransferConfig) error { state := s.getFileTransferState() scope := serverTransportScopeForTransport(transport) stage := fileStageByKind(env.Kind) state.recordRuntimeStage(fileTransferDirectionSend, scope, env.File.FileID, stage) return retryFileTransferSend(ctx, cfg, func(cfg fileTransferConfig) error { return s.sendFileEnvelopeWithAckTransport(transport, env, cfg.AckTimeout) }, fileTransferRetryHooks{ onRetry: func(err error, _ int) { state.recordRuntimeRetry(fileTransferDirectionSend, scope, env.File.FileID) }, onTimeout: func(err error, _ int) { state.recordRuntimeTimeout(fileTransferDirectionSend, scope, env.File.FileID) state.recordRuntimeFailureStage(fileTransferDirectionSend, scope, env.File.FileID, stage) }, }) } func retryFileTransferSend(ctx context.Context, cfg fileTransferConfig, send func(fileTransferConfig) error, hooks ...fileTransferRetryHooks) error { cfg = normalizeFileTransferConfig(cfg) var lastErr error hook := mergeFileTransferRetryHooks(hooks...) for attempt := 0; attempt < cfg.SendRetry; attempt++ { if ctx != nil { select { case <-ctx.Done(): return ctx.Err() default: } } lastErr = send(cfg) if lastErr == nil { return nil } if errors.Is(lastErr, errFileAckTimeout) && hook.onTimeout != nil { hook.onTimeout(lastErr, attempt+1) } if attempt+1 < cfg.SendRetry && hook.onRetry != nil { hook.onRetry(lastErr, attempt+1) } } if lastErr == nil { lastErr = errors.New("file send failed") } return lastErr } func mergeFileTransferRetryHooks(hooks ...fileTransferRetryHooks) fileTransferRetryHooks { var merged fileTransferRetryHooks for _, hook := range hooks { if hook.onRetry != nil { merged.onRetry = hook.onRetry } if hook.onTimeout != nil { merged.onTimeout = hook.onTimeout } } return merged }