diff --git a/backend/ftp/ftp.go b/backend/ftp/ftp.go index 148e04f0b..f9401ac64 100644 --- a/backend/ftp/ftp.go +++ b/backend/ftp/ftp.go @@ -21,6 +21,7 @@ import ( "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/obscure" + "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/encoder" @@ -33,6 +34,12 @@ var ( currentUser = env.CurrentUser() ) +const ( + minSleep = 10 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 // bigger for slower decay, exponential +) + // Register with Fs func init() { fs.Register(&fs.RegInfo{ @@ -157,6 +164,7 @@ type Fs struct { drain *time.Timer // used to drain the pool when we stop using the connections tokens *pacer.TokenDispenser tlsConf *tls.Config + pacer *fs.Pacer // pacer for FTP connections } // Object describes an FTP file @@ -250,8 +258,24 @@ func (d *dialCtx) dial(network, address string) (net.Conn, error) { return conn, err } +// shouldRetry returns a boolean as to whether this err deserve to be +// retried. It returns the err as a convenience +func shouldRetry(ctx context.Context, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } + switch errX := err.(type) { + case *textproto.Error: + switch errX.Code { + case ftp.StatusNotAvailable: + return true, err + } + } + return fserrors.ShouldRetry(err), err +} + // Open a new connection to the FTP server. -func (f *Fs) ftpConnection(ctx context.Context) (*ftp.ServerConn, error) { +func (f *Fs) ftpConnection(ctx context.Context) (c *ftp.ServerConn, err error) { fs.Debugf(f, "Connecting to FTP server") dCtx := dialCtx{f, ctx} ftpConfig := []ftp.DialOption{ftp.DialWithDialFunc(dCtx.dial)} @@ -273,18 +297,22 @@ func (f *Fs) ftpConnection(ctx context.Context) (*ftp.ServerConn, error) { if f.ci.Dump&(fs.DumpHeaders|fs.DumpBodies|fs.DumpRequests|fs.DumpResponses) != 0 { ftpConfig = append(ftpConfig, ftp.DialWithDebugOutput(&debugLog{auth: f.ci.Dump&fs.DumpAuth != 0})) } - c, err := ftp.Dial(f.dialAddr, ftpConfig...) + err = f.pacer.Call(func() (bool, error) { + c, err = ftp.Dial(f.dialAddr, ftpConfig...) + if err != nil { + return shouldRetry(ctx, err) + } + err = c.Login(f.user, f.pass) + if err != nil { + _ = c.Quit() + return shouldRetry(ctx, err) + } + return false, nil + }) if err != nil { - fs.Errorf(f, "Error while Dialing %s: %s", f.dialAddr, err) - return nil, errors.Wrap(err, "ftpConnection Dial") + err = errors.Wrapf(err, "failed to make FTP connection to %q", f.dialAddr) } - err = c.Login(f.user, f.pass) - if err != nil { - _ = c.Quit() - fs.Errorf(f, "Error while Logging in into %s: %s", f.dialAddr, err) - return nil, errors.Wrap(err, "ftpConnection Login") - } - return c, nil + return c, err } // Get an FTP connection from the pool, or open a new one @@ -417,6 +445,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (ff fs.Fs dialAddr: dialAddr, tokens: pacer.NewTokenDispenser(opt.Concurrency), tlsConf: tlsConfig, + pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ CanHaveEmptyDirectories: true,